Source code for forte.data.data_store

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import Dict, List, Iterator, Tuple, Optional, Any, Type

import uuid
import logging
from heapq import heappush, heappop
from sortedcontainers import SortedList
from typing_inspect import get_origin

from forte.utils import get_class
from forte.utils.utils import get_full_module_name
from forte.data.ontology.code_generation_objects import EntryTree
from forte.data.ontology.ontology_code_generator import OntologyCodeGenerator
from forte.data.base_store import BaseStore
from forte.data.ontology.top import (
    Annotation,
    AudioAnnotation,
    Group,
    ImageAnnotation,
    Link,
    Generics,
    Payload,
    MultiPackGeneric,
    MultiPackGroup,
    MultiPackLink,
)
from forte.data.ontology.core import Entry, FList, FDict
from forte.common import constants


logger = logging.getLogger(__name__)

__all__ = ["DataStore"]


[docs]class DataStore(BaseStore): # TODO: temporarily disable this for development purposes. # pylint: disable=pointless-string-statement, protected-access # pylint: disable=attribute-defined-outside-init # pylint: disable=too-many-public-methods _type_attributes: dict = {} onto_gen = OntologyCodeGenerator() do_init = False def __init__( self, onto_file_path: Optional[str] = None, dynamically_add_type=True ): r"""An implementation of the data store object that mainly uses primitive types. This class will be used as the internal data representation behind data pack. The usage of primitive types provides a speed-up to the previous class-based solution. A DataStore object uses primitive types and simple python data structures to store a collection of Forte entries for certain types of unstructured data. Currently, DataStore supports storing data structures with linear span (e.g. Annotation), and relational data structures (e.g Link and Group). Future extension of the class may support data structures with 2-d range (e.g. bounding boxes). Internally, we store every entry in a variable ``__elements``, which is a nested list: a list of ``entry lists``. Every inner list, the ``entry list``, is a list storing entries for a single particular type, such as entries for :class:`~ft.onto.base_ontology.Sentence`. Different types are stored in different lists: [ <Document List>, <Sentence List>, ...]. We will discuss the sorting order later. The outer list, stores a list of ``entry lists``, and each ``entry list`` is indexed by the type of its element. Specifically, each type is associated with a unique ``type_id``, which is generated by the system. The mapping between ``type_name`` and ``type_id`` is defined by a dictionary ``self.__type_index_dict``. Entry information is stored as ``entry data`` in each ``entry list``. Each element in the ``entry list`` (an entry data) corresponds to one entry instance. Each ``entry data`` in the ``entry list`` is represented by a list of attributes. For example, an annotation type entry has the following format: [<begin>, <end>, <tid>, <type_name>, <attr_1>, <attr_2>, ..., <attr_n>]. A group type entry has the following format: [<member_type>, <[members_tid_list]>, <tid>, <type_name>, <attr_1>, <attr_2>, ..., <attr_n>]. A link type entry has the following format: [<parent_tid>, <child_tid>, <tid>, <type_name>, <attr_1>, <attr_2>, ..., <attr_n>]. The first four fields are compulsory for every ``entry data``. The third and fourth fields are always ``tid`` and ``type_name``, but the first and second fields can change across different types of entries. For example, first four fields of Annotation-Like (e.g. subclasses of Annotation or AudioAnnotation) entries are always in the order of ``begin``, ``end``, ``tid`` and ``type_name``. ``begin`` and ``end``, which are compulsory for annotations entries, represent the begin and end character indices of entries in the payload. Here, ``type_name`` is the fully qualified name of this type represented by ``entry list``. It must be a valid ontology defined as a class. ``tid`` is a unique id of every entry, which is internally generated by uuid.uuid4(). Each ``type_name`` corresponds to a pre-defined ordered list of attributes, the exact order is determined by the system through the ontology specifications. E.g. an annotation-type ``entry data`` with type :class:`~ft.onto.base_ontology.Document` has the following structure: [<begin>, <end>, <tid>, <type_name>, <document_class>, <sentiment>, <classifications>]. Here, <document_class>, <sentiment>, <classifications> are the 3 attributes of this type. This allows the ``entry list`` behaves like a table, we can find the value of an attribute through the correct ``index_id`` id (e.g. index of the outer list) and `attr_id` (e.g. index of the inner list). Note that, if the type of ``entry list`` is Annotation-Like (e.g. subclasses of Annotation or AudioAnnotation), these entries will be sorted by the first two attributes (``begin``, ``end``). However, the order of a list with types that are not Annotation-like, is currently based on the insertion order. ``onto_file_path`` is an optional argument, which allows one to pass in a user defined ontology file. This will enable the DataStore to understand and store ``entry_type`` defined in the provided file. Args: onto_file_path (str, optional): The path to the input ontology specification file, which should be a json file, and it should have all the entries inside with no import as key. """ super().__init__() if onto_file_path is None and not dynamically_add_type: raise RuntimeError( "DataStore is initialized with no existing types. Setting" "dynamically_add_type to False without providing onto_file_path" "will lead to no usable type in DataStore." ) self._onto_file_path = onto_file_path self._dynamically_add_type = dynamically_add_type """ The ``_type_attributes`` is a private dictionary that provides ``type_name``, their parent entry, and the order of corresponding attributes. The keys are fully qualified names of every type; The value is a dictionary with two keys. Key ``attribute`` provides an inner dictionary with all valid attributes for this type and the indices of attributes among these lists. Key ``parent_class`` is a string representing the ancestors of this type. This structure is supposed to be built dynamically. When a user adds new entries, `DataStore` will check unknown types and add them to ``_type_attributes``. Example: .. code-block:: python # DataStore._type_attributes is: # { # "ft.onto.base_ontology.Token": { # "attributes": {"pos": 4, "ud_xpos": 5, # "lemma": 6, "chunk": 7, "ner": 8, "sense": 9, # "is_root": 10, "ud_features": 11, "ud_misc": 12}, # "parent_class": set("forte.data.ontology.top.Annotation"), }, # "ft.onto.base_ontology.Document": { # "attributes": {"document_class": 4, # "sentiment": 5, "classifications": 6}, # "parent_class": set("forte.data.ontology.top.Annotation"), }, # "ft.onto.base_ontology.Sentence": { # "attributes": {"speaker": 4, # "part_id": 5, "sentiment": 6, # "classification": 7, "classifications": 8}, # "parent_class": set(), } # } """ self._init_top_to_core_entries() if self._onto_file_path: self._parse_onto_file() """ The `__elements` is an underlying storage structure for all the entry data added by users in this DataStore class. It is a dict of {str: list} pairs that stores sorted ``entry lists`` by ``type_name``s. Example: self.__elements = { "ft.onto.base_ontology.Token": Token SortedList(), "ft.onto.base_ontology.Document": Document SortedList(), "ft.onto.base_ontology.Sentence": Sentence SortedList(), ... } """ self.__elements: dict = {} """ A dictionary that keeps record of all annotation-like entries with their `tid`s. Since annotation-like entries are stored in sortedlists, we store references to entries directly. It is a key-value map of {tid: entry data in list format}. e.g., {1423543453: [begin, end, tid, type_name, attr_1, ..., attr_n]} """ self.__tid_ref_dict: dict = {} """ A dictionary that keeps record of all non-annotation-like entries with their `tid`s. Since non-annotation-like entries are stored in lists, we store indices of entries. It is a key-value map of {tid: [type_name, index_id]}. The `index_id` records the location of the entry in the ``type_name`` list in the `__elements`. e.g., {4345314235: ['forte.data.ontology.top.Link', 0]} """ self.__tid_idx_dict: dict = {} """ A dictionary that records how many non-annotation-like entries are deleted for each type. Since we replace non-annotation-like deleted entries with None placeholders to maintain the order and indices, the length of the list is not meaningful. We need to know how many entries are placeholders in order to know the exact count of entries. During the serialization/ deserialization, None placeholders will be dropped and the `__deletion_count` should be reset. This dictionary is a key-value map of {type_name: number of None in the list}. Example: {"forte.data.ontology.top.Group": 0, "forte.data.ontology.top.Link": 3} """ self.__deletion_count: dict = {} def __getstate__(self): r""" In serialization, 1) will serialize the annotation sorted list as a normal list; 2) will remove `tid_ref_dict`, `tid_idx_dict` and `deletion_count` to save space. """ state = super().__getstate__() state["_DataStore__elements"] = {} for k in self.__elements: # build the full `_type_attributes` self._get_type_info(k) state["_DataStore__elements"][k] = list(self.__elements[k]) state.pop("_DataStore__tid_ref_dict") state.pop("_DataStore__tid_idx_dict") state.pop("_DataStore__deletion_count") state["entries"] = state.pop("_DataStore__elements") state["fields"] = self._type_attributes for _, v in state["fields"].items(): if constants.PARENT_CLASS_KEY in v: v.pop(constants.PARENT_CLASS_KEY) return state def __setstate__(self, state): r""" In deserialization, we 1) transform the annotation list back to a sorted list; 2) recreate the `tid_ref_dict` and `tid_idx_dict` from `__elements`; 3) reset the `deletion_count`. """ self.__dict__.update(state) self.__elements = self.__dict__.pop("entries") self._type_attributes = self.__dict__.pop("fields", {}) self._DataStore__tid_ref_dict = {} self._DataStore__tid_idx_dict = {} self._DataStore__deletion_count = {} reset_index = {} for k in self.__elements: if self._is_annotation(k): # convert list back to sorted list self.__elements[k] = SortedList(self.__elements[k]) else: # remove None placeholders in non-annotation-like entry lists self.__elements[k][:] = [ x for x in self.__elements[k] if x is not None ] for e in self.__elements[k]: if self._is_annotation(k): # annotation-like # recreate `tid_ref_dict` self._DataStore__tid_ref_dict[e[constants.TID_INDEX]] = e else: # non-annotation-like # use `reset_index` to recalculate indices type_name = e[constants.ENTRY_TYPE_INDEX] # Count how many times each type occurs to know the number # of existing entries of each type. Assign the count of # `type_name` to the index of this entry. if type_name in reset_index: reset_index[type_name] += 1 else: reset_index[type_name] = 0 # record tid and its corresponding type name and index self._DataStore__tid_idx_dict[e[constants.TID_INDEX]] = [ type_name, reset_index[type_name], ]
[docs] @classmethod def deserialize( cls, data_source: str, serialize_method: str = "json", check_attribute: bool = True, suppress_warning: bool = True, accept_unknown_attribute: bool = True, ) -> "DataStore": """ Deserialize a `DataStore` from serialized data in `data_source`. Args: data_source: The path storing data source. serialize_method: The method used to serialize the data, this should be the same as how serialization is done. The current option is `json`. check_attribute: Boolean value indicating whether users want to check compatibility of attributes. Only applicable when the data being serialized is done with `save_attribute` set to True in BaseStore.serialize. If true, it will compare fields of the serialized object and the current `DataStore` class. If there are fields that have different orders in the current class and the serialized object, it switches the order of fields to match the current class. If there are fields that appear in the current class, but not in the serialized object, it handles those fields with `accept_unknown_attribute`. If there are fields that appear in the serialized object, but not in the current class, it drops those fields. suppress_warning: Boolean value indicating whether users want to see warnings when it checks attributes. Only applicable when `check_attribute` is set to True. If true, it will log warnings when there are mismatched fields. accept_unknown_attribute: Boolean value indicating whether users want to fill fields that appear in the current class, but not in the serialized object with none. Only applicable when `check_attribute` is set to True. If false, it will raise an `ValueError` if there are any contradictions in fields. Raises: ValueError: raised when 1. the serialized object has unknown fields, but `accept_unknown_attribute` is False. 2. the serialized object does not store attributes, but `check_attribute` is True. 3. the serialized object does not support json deserialization. We may change this error when we have other options for deserialization. Returns: An data store object deserialized from the string. """ def check_fields(store): """ A helper function that compares fields of the serialized object and the current `DataStore` class. Args: store: The serialized object we want to check and process. Returns: The data store object after we process its fields. """ for t, v in cls._type_attributes.items(): change_map = {} contradict_loc = [] # attribute.items() contains attribute names and attribute indices. # The difference in two sets could find fields that appear in the # current class, but not in the serialized objects, # or fields that have different orders in the current class # and the serialized objects. # If a field only occurs in the serialized object but not in # the current class, it will not be detected. # Instead, it will be dropped later. diff = set(v[constants.TYPE_ATTR_KEY].items()) - set( store._type_attributes[t][constants.TYPE_ATTR_KEY].items() ) for f in diff: # if fields appear in both the current class and the # serialized objects but have different orders, switch # fields to match the order of the current class. if ( f[0] in store._type_attributes[t][constants.TYPE_ATTR_KEY] ): # record indices of the same field in the class and # objects. Save different indices to a dictionary. change_map[f[1]] = store._type_attributes[t][ constants.TYPE_ATTR_KEY ][f[0]] # record indices of fields that only appear in the # current class. We want to fill them with None. else: contradict_loc.append(f[1]) if len(change_map) > 0: if not suppress_warning: logger.warning( "Saved %s objects have %s different orders of " "attribute fields to the current datastore. " "They are reordered to match the fields in " "the current datastore class.", t, len(change_map), ) # switch the order of fields for the serialized objects for d in store._DataStore__elements[t]: d[:] = [ d[change_map[i]] if i in change_map else d[i] # throw fields that are redundant/only appear in # the serialized object for i in range( max(v[constants.TYPE_ATTR_KEY].values()) + 1 ) ] if len(contradict_loc) > 0: if not suppress_warning: logger.warning( "Saved %s objects have %s attribute fields " "that could not be identified. " "These fields are filled with `None`. This may " "due to user's modifications of fields.", t, len(contradict_loc), ) if accept_unknown_attribute: # fill fields that only appear in the current class # but not in the serialized objects with None. for d in store._DataStore__elements[t]: for idx in contradict_loc: d[idx] = None else: raise ValueError( f"Saved {t} objects have unidentified fields " "at indices " f"{', '.join(str(v) for v in contradict_loc)}, " "which raise an error." ) return store state = cls._deserialize(data_source, serialize_method) # The following part is for customized json method only. if isinstance(state, dict): obj = DataStore() obj.__setstate__(state) else: raise ValueError( "The serialized object that you want to" " deserialize does not support json deserialization." ) if check_attribute: if len(obj._type_attributes) == 0: raise ValueError( "The serialized object that you want to deserialize does" " not support check_attribute." ) if cls._type_attributes != obj._type_attributes: obj = check_fields(obj) delattr(obj, "_type_attributes") return obj
def _new_tid(self) -> int: r"""This function generates a new ``tid`` for an entry.""" return uuid.uuid4().int def _get_type_info(self, type_name: str) -> Dict[str, Any]: """ Get the dictionary containing type information from ``DataStore._type_attributes``. If the ``type_name`` does not currently exists and dynamic import is enabled, this function will add a new key-value pair into ``DataStore._type_attributes``. The value consists of a full attribute-to-index dictionary and an empty parent set. This function returns a dictionary containing an attribute dict and a set of parent entries of the given type. For example: .. code-block:: python "ft.onto.base_ontology.Sentence": { "attributes": { "speaker": 4, "part_id": 5, "sentiment": 6, "classification": 7, "classifications": 8, }, "parent_class": set(), } Args: type_name (str): The fully qualified type name of a type. Returns: attr_dict (dict): The dictionary containing an attribute dict and a set of parent entries of the given type. Raises: RuntimeError: When the type is not provided by ontology file and dynamic import is disabled. """ # check if type is in dictionary if ( type_name in DataStore._type_attributes and constants.TYPE_ATTR_KEY in DataStore._type_attributes[type_name] ): return DataStore._type_attributes[type_name] if not self._dynamically_add_type: raise ValueError( f"{type_name} is not an existing type in current data store. " f"Dynamically add type is disabled. " f"Set dynamically_add_type=True if you need to use types other " f"than types specified in the ontology file." ) # get attribute dictionary attributes = DataStore._get_entry_attributes_by_class(type_name) attr_dict = {} attr_idx = constants.ENTRY_TYPE_INDEX + 1 for attr_name in attributes: attr_dict[attr_name] = attr_idx attr_idx += 1 new_entry_info = { constants.TYPE_ATTR_KEY: attr_dict, constants.PARENT_CLASS_KEY: set(), } DataStore._type_attributes[type_name] = new_entry_info return new_entry_info def _get_type_attribute_dict(self, type_name: str) -> Dict[str, int]: """Get the attribute dict of an entry type. The attribute dict maps attribute names to a list of consecutive integers as indices. For example: .. code-block:: python "attributes": { "speaker": 4, "part_id": 5, "sentiment": 6, "classification": 7, "classifications": 8, }, Args: type_name (str): The fully qualified type name of a type. Returns: attr_dict (dict): The attribute-to-index dictionary of an entry. """ return self._get_type_info(type_name)[constants.TYPE_ATTR_KEY] def _get_type_parent(self, type_name: str) -> str: """Get a set of parent names of an entry type. The set is a subset of all ancestors of the given type. Args: type_name (str): The fully qualified type name of a type. Returns: parent_class (str): The parent entry name of an entry. """ return self._get_type_info(type_name)[constants.PARENT_CLASS_KEY] def _default_attributes_for_type(self, type_name: str) -> List: """Get a list of attributes of an entry type with their default values. If an attribute is annotated with `FList` or `List`, then the default value is an empty list `[]`. When an attribute is annotated with `FDict` or `Dict` then the default value will be an empty dictionary `{}`. For all other cases (including primitive types, Union, NoneType, etc.) the default value will be set to `None`. Args: type_name (str): The fully qualified type name of the new entry. Returns: attr_dict (list): A list of attributes with default values. """ attr_dict: Dict = self._get_type_attribute_dict(type_name) attr_fields: Dict = self._get_entry_attributes_by_class(type_name) attr_list: List = [None] * len(attr_dict) for attr_name, attr_id in attr_dict.items(): # TODO: We should keep a record of the attribute class instead of # inspecting the class on the fly. attr_class = get_origin(attr_fields[attr_name].type) if attr_class in (FList, list, List): attr_list[attr_id - constants.ATTR_BEGIN_INDEX] = [] elif attr_class in (FDict, dict, Dict): attr_list[attr_id - constants.ATTR_BEGIN_INDEX] = {} return attr_list def _is_subclass( self, type_name: str, cls, no_dynamic_subclass: bool = False ) -> bool: r"""This function takes a fully qualified ``type_name`` class name, ``cls`` class and returns whether ``type_name`` class is the``cls`` subclass or not. This function accepts two types of class: the class defined in forte, or the classes in user provided ontology file. Args: type_name: A fully qualified name of an entry class. cls: An entry class. no_dynamic_subclass: A boolean value controlling where to look for subclasses. If True, this function will not check the subclass relations via `issubclass` but rely on pre-populated states only. Returns: A boolean value whether ``type_name`` class is the``cls`` subclass or not. """ if type_name not in DataStore._type_attributes: self._get_type_info(type_name=type_name) if ( constants.PARENT_CLASS_KEY not in DataStore._type_attributes[type_name] ): DataStore._type_attributes[type_name][ constants.PARENT_CLASS_KEY ] = set() cls_qualified_name = get_full_module_name(cls) type_name_parent_class = DataStore._type_attributes[type_name][ constants.PARENT_CLASS_KEY ] if no_dynamic_subclass: return cls_qualified_name in type_name_parent_class else: if cls_qualified_name in type_name_parent_class: return True else: entry_class = get_class(type_name) if issubclass(entry_class, cls): type_name_parent_class.add(cls_qualified_name) return True else: return False def _get_all_subclass(self, entry_type_name: str, inclusive: bool = False): """ Get all subclasses of ``entry_type_name``. Args: entry_type_name (str): subclasses of entry of ``entry_type_name`` will be yielded if it's in ``DataStore`` storage. inclusive: if it's True, then ``entry_type_name`` itself will be yielded. False otherwise. Yields: subclass entry type name of ``entry_type_name`` """ for entry_type_key in sorted(self.__elements.keys()): if ( entry_type_key == entry_type_name and inclusive ) or self._is_subclass( entry_type_key, get_class(entry_type_name), ): yield entry_type_key def _is_annotation(self, type_name: str) -> bool: r"""This function takes a type_name and returns whether a type is an annotation type or not. Args: type_name: The name of type in `self.__elements`. Returns: A boolean value whether this type_name belongs to an annotation type or not. """ return any( self._is_subclass(type_name, entry_class) for entry_class in (Annotation, AudioAnnotation) )
[docs] def all_entries(self, entry_type_name: str) -> Iterator[List]: """ Retrieve all entry data of entry type ``entry_type_name`` and entries of subclasses of entry type ``entry_type_name``. Args: entry_type_name (str): the type name of entries that the User wants to retrieve. Yields: Iterator of raw entry data in list format. """ all_subclass = self._get_all_subclass(entry_type_name, True) if self._is_annotation(type_name=entry_type_name): # When the input type is an annotation-like entry, we use # `co_iterator_annotation_like` to maintain the correct order. yield from self.co_iterator_annotation_like( type_names=list(all_subclass) ) else: for entry_type_key in all_subclass: yield from self.iter(entry_type_key)
[docs] def num_entries(self, entry_type_name: str) -> int: """ Compute the number of entries of given ``entry_type_name`` and entries of subclasses of entry type ``entry_type_name``. Args: entry_type_name (str): the type name of entries that the User wants to get its count. Returns: The number of entries of given ``entry_type_name``. """ count = 0 for entry_type_key in self._get_all_subclass(entry_type_name, True): count += len(self.__elements[entry_type_key]) # if non-annotation-like entries_type_name # we need to minus the corresponding delete count if entry_type_key in self.__deletion_count: count -= self.__deletion_count[entry_type_key] return count
def _add_entry_raw( self, entry_type: Type[Entry], type_name: str, entry: List[Any], ): """ This function add raw entry in DataStore object based on corresponding type name and sort them based on entry type. Args: entry_type: entry's type which decides the sorting of entry. type_name: The name of type in `self.__elements`. entry: raw entry data in the list format. Raises: KeyError: raised when the entry type name is not in `self.__elements`. NotImplementedError: raised when the entry type being added is not supported, currently supporting `Annotation`, `AudioAnnotation`, `Link`, `Group`, `Generics`. Returns: ``tid`` of the entry. """ if entry_type in (Annotation, AudioAnnotation): sorting_fn = lambda s: ( s[constants.BEGIN_INDEX], s[constants.END_INDEX], ) try: self.__elements[type_name].add(entry) except KeyError: self.__elements[type_name] = SortedList(key=sorting_fn) self.__elements[type_name].add(entry) elif entry_type in [ Link, Group, Generics, ImageAnnotation, Payload, MultiPackLink, MultiPackGroup, MultiPackGeneric, ]: try: self.__elements[type_name].append(entry) except KeyError: self.__elements[type_name] = [] self.__elements[type_name].append(entry) else: raise NotImplementedError( "_add_entry_raw() is not implemented " f"for entry type {entry_type}." ) tid = entry[constants.TID_INDEX] if self._is_annotation(type_name): self.__tid_ref_dict[tid] = entry else: index_id = len(self.__elements[type_name]) - 1 # last one self.__tid_idx_dict[tid] = [type_name, index_id] return tid def _is_annotation_tid(self, tid: int) -> bool: r"""This function takes a tid and returns whether an entry is an annotation or not. Args: tid: Unique Id of the entry. Returns: A boolean value whether the entry that matches this tid is an annotation or not. """ if tid in self.__tid_ref_dict: return True elif tid in self.__tid_idx_dict: return False else: raise KeyError(f"Entry with tid {tid} not found.") def _create_new_entry( self, type_name: str, attribute_data: List, tid: Optional[int] = None ) -> List: r"""This function generates a new entry with default fields. The new entry is in the format used to be stored in Data Stores. Args: type_name: The fully qualified type name of the new entry. attribute_data: It is a list that stores attributes relevant to the entry being added. In order to keep the number of attributes same for all entries, the list is populated with trailing None's. tid: ``tid`` of the generics entry. Returns: The list that represents the newly created entry. """ tid: int = self._new_tid() if tid is None else tid entry: List[Any] = [] entry.extend(attribute_data) entry += [tid, type_name] entry += self._default_attributes_for_type(type_name) return entry
[docs] def add_entry_raw( self, type_name: str, attribute_data: List, base_class: Type[Entry], tid: Optional[int] = None, allow_duplicate: bool = True, ) -> int: r""" This function provides a general implementation to add all types of entries to the data store. It can add namely Annotation, AudioAnnotation, ImageAnnotation, Link, Group and Generics. Returns the ``tid`` for the inserted entry. Args: type_name: The fully qualified type name of the new Entry. attribute_data: It is a list that stores attributes relevant to the entry being added. In order to keep the number of attributes same for all entries, the list is populated with trailing None's. base_class: The type of entry to add to the Data Store. This is a reference to the class of the entry that needs to be added to the DataStore. The reference can be to any of the classes supported by the function. tid: ``tid`` of the Entry that is being added. It's optional, and it will be auto-assigned if not given. allow_duplicate: Whether we allow duplicate in the DataStore. When it's set to False, the function will return the ``tid`` of existing entry if a duplicate is found. Default value is True. Returns: ``tid`` of the entry. """ new_entry = self._create_new_entry(type_name, attribute_data, tid) if not self._is_annotation(type_name): allow_duplicate = True if not allow_duplicate: tid_search_result = self._get_existing_ann_entry_tid(new_entry) # if found existing entry if tid_search_result != -1: return tid_search_result return self._add_entry_raw(base_class, type_name, new_entry)
def _get_existing_ann_entry_tid(self, entry: List[Any]): r""" This function searches for tid for existing annotation-like entry tid. It return the tid if the entry is found. Otherwise, it returns -1. Args: entry (Entry): annotation-like entry to search for. Raises: ValueError: raised when the entry type being searched is not supported, currently supporting `Annotation`, `AudioAnnotation`. Returns: tid for parameter ``entry`` is found. Otherwise -1. """ type_name = entry[constants.ENTRY_TYPE_INDEX] begin = entry[constants.BEGIN_INDEX] end = entry[constants.END_INDEX] if type_name not in self.__elements: return -1 if self._is_annotation(type_name): # Return the tid of existing entry if duplicate is not allowed index = self.__elements[type_name].bisect_left(entry) target_entry = self.__elements[type_name][index] if ( target_entry[constants.BEGIN_INDEX] == begin and target_entry[constants.END_INDEX] == end ): return target_entry[constants.TID_INDEX] else: return -1 else: raise ValueError( f"Get existing entry id for {type_name}" " is not supported. This function only supports " "getting entry id for annotation-like entry." )
[docs] def set_attribute(self, tid: int, attr_name: str, attr_value: Any): r"""This function locates the entry data with ``tid`` and sets its ``attr_name`` with `attr_value`. It first finds ``attr_id`` according to ``attr_name``. ``tid``, ``attr_id``, and ``attr_value`` are passed to `set_attr()`. Args: tid: Unique Id of the entry. attr_name: Name of the attribute. attr_value: Value of the attribute. Raises: KeyError: when ``tid`` or ``attr_name`` is not found. """ entry, entry_type = self.get_entry(tid) try: attr_id = self._get_type_attribute_dict(entry_type)[attr_name] except KeyError as e: raise KeyError(f"{entry_type} has no {attr_name} attribute.") from e entry[attr_id] = attr_value
def _set_attr(self, tid: int, attr_id: int, attr_value: Any): r"""This function locates the entry data with ``tid`` and sets its attribute ``attr_id`` with value `attr_value`. Called by `set_attribute()`. Args: tid: The unique id of the entry. attr_id: The id of the attribute. attr_value: The value of the attribute. """ entry, _ = self.get_entry(tid) entry[attr_id] = attr_value
[docs] def get_attribute(self, tid: int, attr_name: str) -> Any: r"""This function finds the value of ``attr_name`` in entry with ``tid``. It locates the entry data with ``tid`` and finds `attr_id` of its attribute ``attr_name``. ``tid`` and ``attr_id`` are passed to ``get_attr()``. Args: tid: Unique id of the entry. attr_name: Name of the attribute. Returns: The value of ``attr_name`` for the entry with ``tid``. Raises: KeyError: when ``tid`` or ``attr_name`` is not found. """ entry, entry_type = self.get_entry(tid) try: attr_id = self._get_type_attribute_dict(entry_type)[attr_name] except KeyError as e: raise KeyError(f"{entry_type} has no {attr_name} attribute.") from e return entry[attr_id]
def _get_attr(self, tid: int, attr_id: int) -> Any: r"""This function locates the entry data with ``tid`` and gets the value of ``attr_id`` of this entry. Called by `get_attribute()`. Args: tid: Unique id of the entry. attr_id: The id of the attribute. Returns: The value of ``attr_id`` for the entry with ``tid``. """ entry, _ = self.get_entry(tid) return entry[attr_id]
[docs] def delete_entry(self, tid: int): r"""This function locates the entry data with ``tid`` and removes it from the data store. This function removes it from `tid_ref_dict` or `tid_idx_dict` and finds its index in the list. If it is an annotation-like entry, we retrieve the entry from `tid_ref_dict` and bisect the list to find its index. If it is an non-annotation-like entry, we get the `type_name` and its index in the list directly from `tid_idx_dict`. Args: tid: Unique id of the entry. Raises: KeyError: when entry with ``tid`` is not found. RuntimeError: when internal storage is inconsistent. """ if self._is_annotation_tid(tid): # annotation-like entries entry_data = self.__tid_ref_dict.pop(tid) _, _, tid, type_name = entry_data[:4] else: # non-annotation-like entries entry_data = self.__tid_idx_dict.pop(tid) type_name = entry_data[constants.ENTRY_DICT_TYPE_INDEX] try: target_list = self.__elements[type_name] except KeyError as e: raise RuntimeError( f"When deleting entry [{tid}], its type [{type_name}]" f"does not exist in current entry lists." ) from e # complexity: O(lgn) # if it's annotation type, use bisect to find the index if self._is_annotation(type_name): try: entry_index = target_list.index(entry_data) except ValueError as e: raise RuntimeError( f"When deleting entry [{tid}], entry data is not found in" f"the target list of [{type_name}]." ) from e else: # if it's group or link, use the index in entry_list entry_index = entry_data[constants.ENTRY_DICT_ENTRY_INDEX] if entry_index >= len(target_list) or ( self._is_annotation(type_name) and target_list[entry_index] != entry_data ): raise RuntimeError( f"When deleting entry [{tid}], entry data is not found in" f"the target list of [{type_name}]." ) self._delete_entry_by_loc(type_name, entry_index)
def _delete_entry_by_loc(self, type_name: str, index_id: int): r"""It removes an entry of `index_id` by taking both the `type_id` and `index_id`. Called by `delete_entry()`. Args: type_id: The index of the list in ``self.__elements``. index_id: The index of the entry in the list. Raises: KeyError: when ``type_name`` is not found. IndexError: when ``index_id`` is not found. """ try: target_list = self.__elements[type_name] except KeyError as e: raise KeyError( f"The specified type [{type_name}] " f"does not exist in current entry lists." ) from e if index_id < 0 or index_id >= len(target_list): raise IndexError( f"The specified index_id [{index_id}] of type [{type_name}]" f"is out of boundary for entry list of length {len(target_list)}." ) if self._is_annotation(type_name): target_list.pop(index_id) if not target_list: self.__elements.pop(type_name) else: target_list[index_id] = None if type_name in self.__deletion_count: self.__deletion_count[type_name] += 1 else: self.__deletion_count[type_name] = 1 if len(target_list) - self.__deletion_count[type_name] == 0: self.__elements.pop(type_name)
[docs] def get_entry(self, tid: int) -> Tuple[List, str]: r"""This function finds the entry with ``tid``. It returns the entry and its ``type_name``. Args: tid: Unique id of the entry. Returns: The entry which ``tid`` corresponds to and its ``type_name``. Raises: ValueError: An error occurred when input ``tid`` is not found. KeyError: An error occurred when ``entry_type`` is not found. """ if self._is_annotation_tid(tid): # annotation-like entries entry = self.__tid_ref_dict[tid] entry_type = entry[constants.ENTRY_TYPE_INDEX] else: # non-annotation-like entries entry = self.__tid_idx_dict[tid] entry_type = entry[constants.ENTRY_DICT_TYPE_INDEX] idx = entry[constants.ENTRY_DICT_ENTRY_INDEX] entry = self.__elements[entry_type][idx] if entry_type not in self.__elements: raise KeyError(f"Entry of type {entry_type} is not found.") return entry, entry_type
[docs] def get_entry_index(self, tid: int) -> int: """Look up the `tid_ref_dict` and `tid_idx_dict` with key ``tid``. Return the ``index_id`` of the entry. Args: tid: Unique id of the entry. Returns: Index of the entry which ``tid`` corresponds to in the ``entry_type`` list. Raises: ValueError: An error occurred when no corresponding entry is found. """ entry, entry_type = self.get_entry(tid=tid) # If the entry is an annotation, bisect the annotation sortedlist # to find the entry. May use LRU cache to optimize speed. # Otherwise, use ``index_id`` to find the index of the entry. index_id = -1 if self._is_annotation_tid(tid): entry_list = self.__elements[entry_type] try: index_id = entry_list.index(entry) except ValueError as e: raise ValueError( f"Entry {entry} not found in entry list." ) from e if (not 0 <= index_id < len(entry_list)) or ( entry_list[index_id][constants.TID_INDEX] != entry[constants.TID_INDEX] ): raise ValueError(f"Entry {entry} not found in entry list.") else: index_id = self.__tid_idx_dict[tid][ constants.ENTRY_DICT_ENTRY_INDEX ] return index_id
[docs] def get_length(self, type_name: str) -> int: r"""This function find the length of the `type_name` entry list. It should not count None placeholders that appear in non-annotation-like entry lists. Args: type_name (str): The fully qualified type name of a type. Returns: The count of not None entries. """ if self._is_annotation(type_name): return len(self.__elements[type_name]) else: delete_count = self.__deletion_count.get(type_name, 0) return len(self.__elements[type_name]) - delete_count
def _get_bisect_range( self, search_list: SortedList, range_span: Tuple[int, int] ) -> Optional[List]: """ Perform binary search on the specified list for target entry class. Entry class can be a subtype of :class:`~forte.data.ontology.top.Annotation` or :class:`~forte.data.ontology.top.AudioAnnotation`. This function finds the the elements in the `Annotation` or `AudioAnnotation` sorted list whose begin and end index falls within `range_span`. Args: search_list: A `SortedList` object on which the binary search will be carried out. range_span: a tuple that indicates the start and end index of the range in which we want to get required entries Returns: List of entries to fetch """ # Check if there are any entries within the given range if ( search_list[0][constants.BEGIN_INDEX] > range_span[1] or search_list[-1][constants.END_INDEX] < range_span[0] ): return None result_list = [] begin_index = search_list.bisect_left([range_span[0], range_span[0]]) for idx in range(begin_index, len(search_list)): if search_list[idx][constants.BEGIN_INDEX] > range_span[1]: break if search_list[idx][constants.END_INDEX] <= range_span[1]: result_list.append(search_list[idx]) if len(result_list) == 0: return None return result_list
[docs] def co_iterator_annotation_like( self, type_names: List[str], range_span: Optional[Tuple[int, int]] = None, ) -> Iterator[List]: r""" Given two or more type names, iterate their entry lists from beginning to end together. For every single type, their entry lists are sorted by the ``begin`` and ``end`` fields. The ``co_iterator_annotation_like`` function will iterate those sorted lists together, and yield each entry in sorted order. This tasks is quite similar to merging several sorted list to one sorted list. We internally use a `MinHeap` to order the order of yielded items, and the ordering is determined by: - start index of the entry. - end index of the entry. - the index of the entry type name in input parameter ``type_names``. The precedence of those values indicates their priority in the min heap ordering. Lastly, the `range_span` argument determines the start and end position of the span range within which entries of specified by `type_name` need to be fetched. For example, if two entries have both the same begin and end field, then their order is decided by the order of user input type_name (the type that first appears in the target type list will return first). For entries that have the exact same `begin`, `end` and `type_name`, the order will be determined arbitrarily. For example, let's say we have two entry types, :class:`~ft.onto.base_ontology.Sentence` and :class:`~ft.onto.base_ontology.EntityMention`. Each type has two entries. The two entries of type `Sentence` ranges from span `(0,5)` and `(6,10)`. Similarly, the two entries of type `EntityMention` has span `(0,3)` and `(15,20)`. .. code-block:: python # function signature entries = list( co_iterator_annotation_like( type_names = [ "ft.onto.base_ontology.Sentence", "ft.onto.base_ontology.EntityMention" ], range_span = (0,12) ) ) # Fetching result result = [ all_anno.append([type(anno).__name__, anno.begin, anno.end]) for all_anno in entries ] # return result = [ ['Sentence', 0, 5], ['EntityMention', 0, 5], ['Sentence', 6, 10] ] From this we can see how `range_span` affects which entries will be fetched and also how the function chooses the order in which entries are fetched. Args: type_names: a list of string type names range_span: a tuple that indicates the start and end index of the range in which we want to get required entries Returns: An iterator of entry elements. """ # suppose the length of type_names is N and the length of entry list of # one type is M # then the time complexity of using min-heap to iterate # is O(M*log(N)) # Initialize the first entry of all entry lists # it avoids empty entry lists or non-existent entry list first_entries = [] # For every entry type, store the entries that fall within the required # range.When range_end and range_begin are None, we fetch all entries of # each type (mentioned in type_names). But when range_end and range_end # is specified, we find the list of entries that fall within the range # and only iterate through them all_entries_range = {} # This list stores the types of entries that have atleast one entry to # fetch. The order of the types in this list is the same as the order # followed by them in type_names. valid_type_names = [] if range_span is not None: for tn in type_names: possible_entries = self._get_bisect_range( self.__elements[tn], range_span ) if possible_entries is not None: all_entries_range[tn] = possible_entries valid_type_names.append(tn) else: try: for tn in type_names: all_entries_range[tn] = self.__elements[tn] valid_type_names = type_names except KeyError as e: # all_entries_range[tn] will be caught here. raise ValueError( f"Input argument `type_names` to the function contains" f" a type name [{tn}], which is not recognized." f" Please input available ones in this DataStore" f" object: {list(self.__elements.keys())}" ) from e for tn in valid_type_names: try: first_entries.append(all_entries_range[tn][0]) except IndexError as e: # all_entries_range[tn][0] will be caught here. raise ValueError( f"Entry list of type name, {tn} which is" " one list item of input argument `type_names`," " is empty. Please check data in this DataStore" " to see if empty lists are expected" f" or remove {tn} from input parameter type_names" ) from e # record the current entry index for elements # pointers[tn] is the index of entry of type tn pointers = {key: 0 for key in all_entries_range} # compare tuple (begin, end, order of type name in input argument # type_names) # we initialize a MinHeap with the first entry of all sorted entry lists # in all_entries_range # the metric of comparing entry order is represented by the tuple # (begin index of entry, end index of entry, # the index of the entry type name in input parameter ``type_names``) h: List[Tuple[Tuple[int, int, int], str]] = [] for p_idx, entry in enumerate(first_entries): entry_tuple = ( ( entry[constants.BEGIN_INDEX], entry[constants.END_INDEX], p_idx, ), entry[constants.ENTRY_TYPE_INDEX], ) heappush( h, entry_tuple, ) while h: # NOTE: we push the ordering tuple to the heap # but not the actual entry. But we can retrieve # the entry by the tuple's data. Therefore, # in some sense, the ordering tuple represents the entry. # In the following comments, # `the current entry` means the entry that # popped entry_tuple represents. # `the current entry list` means the entry # list (values of all_entries_range) where `the current entry` # locates at. # retrieve the popped entry tuple (minimum item in the heap) # and get the p_idx (the index of the current entry # list in all_entries_range) entry_tuple = heappop(h) (_, _, p_idx), type_name = entry_tuple # get the index of current entry # and locate the entry represented by the tuple for yielding pointer = pointers[type_name] entry = all_entries_range[type_name][pointer] # check whether there is next entry in the current entry list # if there is, then we push the new entry's tuple into the heap if pointer + 1 < len(all_entries_range[type_name]): pointers[type_name] += 1 new_pointer = pointers[type_name] new_entry = all_entries_range[type_name][new_pointer] new_entry_tuple = ( ( new_entry[constants.BEGIN_INDEX], new_entry[constants.END_INDEX], p_idx, ), new_entry[constants.ENTRY_TYPE_INDEX], ) heappush( h, new_entry_tuple, ) yield entry
[docs] def get( self, type_name: str, include_sub_type: bool = True, range_span: Optional[Tuple[int, int]] = None, ) -> Iterator[List]: r"""This function fetches entries from the data store of type ``type_name``. If `include_sub_type` is set to True and ``type_name`` is in [Annotation, Group, List], this function also fetches entries of subtype of ``type_name``. Otherwise, it only fetches entries of type ``type_name``. Args: type_name: The fully qualified name of the entry. include_sub_type: A boolean to indicate whether get its subclass. range_span: A tuple that contains the begin and end indices of the searching range of entries. Returns: An iterator of the entries matching the provided arguments. """ def within_range(entry: List[Any], range_span: Tuple[int, int]) -> bool: """ A helper function for deciding whether an annotation entry is inside the `range_span`. """ if not self._is_annotation(entry[constants.ENTRY_TYPE_INDEX]): return False return ( entry[constants.BEGIN_INDEX] >= range_span[0] and entry[constants.END_INDEX] <= range_span[1] ) entry_class = get_class(type_name) all_types = set() if include_sub_type: for type in self.__elements: if issubclass(get_class(type), entry_class): all_types.add(type) else: all_types.add(type_name) all_types = list(all_types) all_types.sort() if self._is_annotation(type_name): if range_span is None: yield from self.co_iterator_annotation_like(all_types) else: for entry in self.co_iterator_annotation_like( all_types, range_span=range_span ): yield entry elif issubclass(entry_class, Link): for type in all_types: if range_span is None: yield from self.iter(type) else: for entry in self.__elements[type]: if ( entry[constants.PARENT_TID_INDEX] in self.__tid_ref_dict ) and ( entry[constants.CHILD_TID_INDEX] in self.__tid_ref_dict ): parent = self.__tid_ref_dict[ entry[constants.PARENT_TID_INDEX] ] child = self.__tid_ref_dict[ entry[constants.CHILD_TID_INDEX] ] if within_range( parent, range_span ) and within_range(child, range_span): yield entry elif issubclass(entry_class, Group): for type in all_types: if range_span is None: yield from self.iter(type) else: for entry in self.__elements[type]: member_type = entry[constants.MEMBER_TYPE_INDEX] if self._is_annotation(member_type): members = entry[constants.MEMBER_TID_INDEX] within = True for m in members: e = self.__tid_ref_dict[m] if not within_range(e, range_span): within = False break if within: yield entry else: # Only fetches entries of type ``type_name`` when it's not in # [Annotation, Group, List]. if type_name not in self.__elements: raise ValueError(f"type {type_name} does not exist") yield from self.iter(type_name)
[docs] def iter(self, type_name: str) -> Iterator[List]: r"""This function iterates all `type_name` entries. It skips None placeholders that appear in non-annotation-like entry lists. Args: type_name (str): The fully qualified type name of a type. Returns: An iterator of the entries. """ for e in self.__elements[type_name]: if e is not None: yield e
[docs] def next_entry(self, tid: int) -> Optional[List]: r"""Get the next entry of the same type as the ``tid`` entry. Call ``get_entry()`` to find the current index and use it to find the next entry. If it is a non-annotation type, it will be sorted in the insertion order, which means ``next_entry`` would return the next inserted entry. Args: tid: Unique id of the entry. Returns: A list of attributes representing the next entry of the same type as the ``tid`` entry. Return `None` when accessing the next entry of the last element in entry list. Raises: IndexError: An error occurred accessing index out out of entry list. """ _, entry_type = self.get_entry(tid=tid) index_id: int = self.get_entry_index(tid=tid) entry_list = self.__elements[entry_type] if not 0 <= index_id < len(entry_list): raise IndexError( f"Index id ({index_id})) is out of bounds of the entry list." ) while index_id < len(entry_list) - 1: # skip none in group/link lists if entry_list[index_id + 1] is None: index_id += 1 else: return entry_list[index_id + 1] return None
[docs] def prev_entry(self, tid: int) -> Optional[List]: r"""Get the previous entry of the same type as the ``tid`` entry. Call ``get_entry()`` to find the current index and use it to find the previous entry. If it is a non-annotation type, it will be sorted in the insertion order, which means ``prev_entry`` would return the previous inserted entry. Args: tid: Unique id of the entry. Returns: A list of attributes representing the previous entry of the same type as the ``tid`` entry. Return `None` when accessing the previous entry of the first element in entry list. Raises: IndexError: An error occurred accessing index out out of entry list. """ _, entry_type = self.get_entry(tid=tid) index_id: int = self.get_entry_index(tid=tid) entry_list = self.__elements[entry_type] if not 0 <= index_id < len(entry_list): raise IndexError( f"Index id ({index_id})) is out of bounds of the entry list." ) while index_id > 0: # skip none in group/link lists if entry_list[index_id - 1] is None: index_id -= 1 else: return entry_list[index_id - 1] return None
def _parse_onto_file(self): r"""This function will populate the types and attributes used in `DataStore` with an ontology specification file. If a user provides a customized ontology specification file, forte will parse this file and set the internal dictionary ``DataStore._type_attributes`` to store type name, parent entry, and its attribute information accordingly. The ontology specification file should contain all the entry definitions users wanted to use, either manually or through the `-m` option of `generate_ontology create` command. This function will take this one file and only import the types specified inside it. """ if self._onto_file_path is None: return entry_tree = EntryTree() with open(self._onto_file_path, "r", encoding="utf8") as f: onto_dict = json.load(f) DataStore.onto_gen.parse_schema_for_no_import_onto_specs_file( self._onto_file_path, onto_dict, merged_entry_tree=entry_tree ) children = entry_tree.root.children while len(children) > 0: entry_node = children.pop(0) children.extend(entry_node.children) entry_name = entry_node.name if entry_name in DataStore._type_attributes: continue attr_dict = {} idx = constants.ATTR_BEGIN_INDEX # sort the attribute dictionary for d in sorted(entry_node.attributes): name = d attr_dict[name] = idx idx += 1 entry_dict = {} entry_dict[constants.PARENT_CLASS_KEY] = set() entry_dict[constants.PARENT_CLASS_KEY].add(entry_node.parent.name) entry_dict[constants.TYPE_ATTR_KEY] = attr_dict DataStore._type_attributes[entry_name] = entry_dict def _init_top_to_core_entries(self): r"""This function will populate the basic user extendable entry types in Top and Core module during DataStore initialization. """ if DataStore.do_init is True: return for ( top_entry, parents, ) in DataStore.onto_gen.top_to_core_entries.items(): entry_dict = {} entry_dict[constants.PARENT_CLASS_KEY] = set(parents) DataStore._type_attributes[top_entry] = entry_dict DataStore.do_init = True @staticmethod def _get_entry_attributes_by_class(input_entry_class_name: str) -> Dict: """Get type attributes by class name. `input_entry_class_name` should be a fully qualified name of an entry class. The `dataclass` module<https://docs.python.org/3/library/dataclasses.html> can add generated special methods to user-defined classes. `__dataclass_fields__` is an in-built function that is called on the class object, and it returns all the fields a class contains. .. note:: This function is only applicable to classes decorated as Python `dataclass` since it relies on the `__dataclass_fields__` to find out the attributes. Args: input_entry_class_name: A fully qualified name of an entry class. Returns: A dictionary of attributes with their field information corresponding to the input class. For example, for an entry ``ft.onto.base_ontology.Sentence`` we want to get a list of ["speaker", "part_id", "sentiment", "classification", "classifications"]. The solution looks like the following: .. code-block:: python # input can be a string entry_name = "ft.onto.base_ontology.Sentence" # function signature list(get_entry_attributes_by_class(entry_name)) # return # ["speaker", "part_id", "sentiment", "classification", "classifications"] """ class_ = get_class(input_entry_class_name) try: return class_.__dataclass_fields__ except AttributeError: return {}