Pipeline¶
Process Pipeline¶
-
class
forte.pipeline.
Pipeline
(resource=None, ontology_file=None, enforce_consistency=False, do_init_type_check=False)[source]¶ This controls the main inference flow of the system. A pipeline is consisted of a set of Components (readers and processors). The data flows in the pipeline as data packs, and each component will use or add information to the data packs.
-
enforce_consistency
(enforce=True)[source]¶ This function determines whether the pipeline will check the content expectations specified in each pipeline component. This function works with
initialize()
called after itself. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws aEntryNotFoundError
if the check fails. The example of implementation is mentioned in the docstrings of__init__()
.- Parameters
enforce (
bool
) – A boolean of whether to enable consistency checking for the pipeline or not.
-
init_from_config_path
(config_path)[source]¶ Read the configurations from the given path
config_path
and build the pipeline with the config.- Parameters
config_path – A string of the configuration path, which is is a YAML file that specify the structure and parameters of the pipeline.
-
init_from_config
(configs)[source]¶ Initialized the pipeline (ontology and processors) from the given configurations.
- Parameters
configs (
Dict
[str
,Any
]) – The configs used to initialize the pipeline. It should be a dictionary that contains forte_ir_version,components
and states. forte_ir_version is a string used to validate input format.components
is a list of dictionary that contains type (the class of pipeline components), configs (the corresponding component’s configs) and selector. states will be used to update the pipeline states based on the fields specified in states.attribute and states.resource.
-
save
(path)[source]¶ Store the pipeline as an IR(intermediate representation) in yaml. The path can then be passed to
init_from_config_path()
to initialize a pipeline. Note that callinginit_from_config()
from a different python environment may not work for some self defined component classes because their module name is __main__.- Parameters
path (
str
) – The file path to save configurations.
-
export
(name=None)[source]¶ Exports pipeline to FORTE_EXPORT_PATH.
FORTE_EXPORT_PATH is a directory where all serialized pipeline will be stored. Users can specify through environment variable FORTE_EXPORT_PATH.
This method will have the following behaviors:
FORTE_EXPORT_PATH will be created if assigned but not found.
If name is not provided, a default name pipeline will be used and suffixed by UUID, to prevent overwriting (e.g. pipeline-4ba29336-aa05-11ec-abec-309c23414763.yml).
If name is provided, then no suffix will be appended.
The pipeline is saved by
save()
, which exports the pipeline by_dump_to_config()
and saves it to a YAML file.
- Parameters
name (
Optional
[str
]) – Export name of the pipeline. Default is None.- Returns
Export path of pipeline config YAML.
- Return type
Optional[str]
- Raises
ValueError – if export name is already taken.
-
serve
(host='localhost', port=8008, service_name='', input_format='string')[source]¶ Start a service of the current pipeline at a specified host and port.
- Parameters
host (
str
) – Port number of pipeline service.port (
int
) – Host name of pipeline service.service_name (
str
) – Assign a name to the pipeline service for validation. This will appear in the service_name field on default page and can be queried and validated against the expected service name set by user. Default to ‘’.input_format (
str
) – Specify format of the input for validation. It can be “string” or “DataPack”. This will appear in the input_format field on default page and can be queried and validated against the expected input format set by user. Default to “string”.
- Raises
ImportError – An error occurred importing uvicorn module.
-
set_profiling
(enable_profiling=True)[source]¶ Set profiling option.
- Parameters
enable_profiling (
bool
) – A boolean of whether to enable profiling for the pipeline or not (the default is True).
-
initialize
()[source]¶ This function should be called before the pipeline can be used to process the actual data. This function will call the initialize of all the components inside this pipeline.
- Return type
- Returns
None
-
initialize_components
()[source]¶ This function will initialize all the components in this pipeline, except the reader. The components are initialized in a FIFO manner based on the order of insertion,
During initialization, the component will be configured based on its corresponding configuration. However, if the component is already initialized (for example, being initialized manually or used twice in the same pipeline), the new configuration will be ignored.
The pipeline will check for type dependencies between the components inside this pipeline, see
enforce_consistency()
for more details.
-
set_reader
(reader, config=None)[source]¶ Set the reader of the pipeline. A reader is the entry point of this pipeline, data flown into the reader will be converted to the data pack format, and being passed onto the other components for processing.
- Parameters
- Return type
- Returns
The pipeline itself, which allows you to directly chain other pipeline construction code afterwards, i.e., you can do:
Pipeline().set_reader(your_reader()).add(your_processor())
-
property
components
¶ Return all the components in this pipeline, except the reader.
Returns: A list containing the components.
- Return type
-
property
ref_names
¶ Return all the reference names in this pipeline, except the reader.
Returns: A dictionary containing the reference names.
-
property
component_configs
¶ Return the configs related to the components, except the reader.
Returns: A list containing the components configs.
-
add
(component, config=None, selector=None, selector_config=None, ref_name=None)[source]¶ Adds a pipeline component to the pipeline. The pipeline components will form a chain based on the insertion order. The customized config and selector (
Selector
) will be associated with this particular component. If the config or the selector is not provided, the default ones will be used.Here, note that the same component instance can be added multiple times to the pipeline. In such cases, the instance will only be setup at the first insertion (i.e. its initialize function will only be called once). The subsequent insertion of the same component instance will not change the behavior nor the states of the instance. Thus, a different config cannot be provided (should be None) when added the second time, otherwise a ProcessorConfigError will be thrown. In the case where one want to them to behave differently, a different instance should be used.
- Parameters
component (
PipelineComponent
) – The component to be inserted next to the pipeline.config (
Union
[HParams
,Dict
[str
,Any
],None
]) – The custom configuration to be used for the added component. Default None, which means the default_configs() of the component will be used.selector (
Optional
[Selector
]) – The selector used to pick the corresponding data pack to be consumed by the component. Default None, which means the whole pack will be used.
- Return type
- Returns
The pipeline itself, which enables one to chain the creation of the pipeline, i.e., you can do:
Pipeline().set_reader(your_reader()).add( your_processor()).add(anther_processor())
-
add_gold_packs
(pack)[source]¶ Add gold packs to a internal dictionary used for evaluation. This dictionary is used by the evaluator while calling consume_next(…)
- Parameters
pack – A key, value pair containing job.id -> gold_pack mapping
-
process
(*args, **kwargs)[source]¶ Alias for
process_one()
.- Parameters
args – The positional arguments used to get the initial data.
kwargs – The keyword arguments used to get the initial data.
- Return type
~PackType
-
run
(*args, **kwargs)[source]¶ Run the whole pipeline and ignore all returned DataPack. This is mostly used when you need to run the pipeline and do not require the output but rely on the side-effect. For example, if the pipeline writes some data to disk.
Calling this function will automatically call the
initialize()
at the beginning, and call thefinish()
at the end.- Parameters
args – The positional arguments used to get the initial data.
kwargs – The keyword arguments used to get the initial data.
-
process_one
(*args, **kwargs)[source]¶ Process one single data pack. This is done by only reading and processing the first pack in the reader.
- Parameters
kwargs – the information needed to load the data. For example, if
_reader
isStringReader
, this should contain a single piece of text in the form of a string variable. If_reader
is a file reader, this can point to the file path.- Return type
~PackType
-
process_dataset
(*args, **kwargs)[source]¶ Process the documents in the data source(s) and return an iterator or list of DataPacks. The arguments are directly passed to the reader to take data from the source.
- Return type
Iterator
[~PackType]
-
finish
()[source]¶ Call the finish method of all pipeline component. This need to be called explicitly to release all resources.
-
evaluate
()[source]¶ Call the evaluators in the pipeline to collect their results.
- Return type
- Returns
Iterator of the evaluator results. Each element is a tuple, where the first one is the name of the evaluator, and the second one is the output of the evaluator (see
get_result()
).
-
Train Pipeline¶
Pipeline Component¶
-
class
forte.pipeline_component.
PipelineComponent
[source]¶ The base class for all pipeline component. A pipeline component represents one node in the pipeline, and would perform certain action on the data pack. All pipeline components should extend this class.
-
resources
¶ The resources that can be used by this component, the resources object is a shared object across the whole pipeline.
-
configs
¶ The configuration of this component, will be built by the pipeline based on the default_configs() and the configs provided by the users.
-
enforce_consistency
(enforce=True)[source]¶ This function determines whether the pipeline will enforce the content expectations specified in each pipeline component. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a
EntryNotFoundError
if the check fails. When this function is called with enforce isTrue
, all the pipeline components would check if the input datapack record matches with the expected types and attributes if functionexpected_types_and_attributes
is implemented for the processor. For example, processor A requires entry type offt.onto.base_ontology.Sentence
, and processor B would produce this type in the output datapack, sorecord
function of processor B writes the record of this type in the datapack and processor A implementsexpected_types_and_attributes
to add this type. Then when the pipeline runs with enforce_consistency, processor A would check if this type exists in the record of the output of the previous pipeline component.- Parameters
enforce (
bool
) – A boolean of whether to enable consistency checking for the pipeline or not.
-
initialize
(resources, configs)[source]¶ The pipeline will call the initialize method at the start of a processing. The processor and reader will be initialized with
configs
, and register global resources intoresource
. The implementation should set up the states of the component.
-
reset_flags
()[source]¶ Reset the flags related to this component. This will be called first when doing initialization.
-
add_entry
(pack, entry)[source]¶ The component can manually call this function to add the entry into the data pack immediately. Otherwise, the system will add the entries automatically when this component finishes.
-
flush
()[source]¶ Indicate that there will be no more packs to be passed in, handle what’s remaining in the buffer.
-
finish
(resource)[source]¶ The pipeline will call this function at the end of the pipeline to notify all the components. The user can implement this function to release resources used by this component. The component can also add objects to the resources.
- Parameters
resource (
Resources
) – A global resource registry.
-
Process¶
ProcessManager¶
-
class
forte.process_manager.
ProcessManager
(pipeline_length)[source]¶ A pipeline level manager that manages global processing information, such as the current running components. This is an internal class and should only be initialized by the system.
-
_queues
¶ A list of queues which hold the jobs for each processors. The size of this list is equal to pipeline length
- Type
List[Deque[int]]
-
_current_queue_index
¶ An index indicating which queue to read the data from. A value of -1 indicates read from the reader.
- Type
-
_unprocessed_queue_indices
¶ Each element of this list is the index of the first UNPROCESSED element in the corresponding queue. Length of this list equals the “pipeline_length”.
If unprocessed_queue_indices = [0, 2]
This means for the 1st queue, the first UNPROCESSED job is at index-0. All elements from indices [0, len(queue[0]) ) are UNPROCESSED.
Similarly, for the 2nd queue, the first UNPROCESSED job is at index-2. All elements from indices [2, len(queue[1])) are UNPROCESSED
- Type
List[int]
-
_processed_queue_indices
¶ Each element of this list is the index of the last PROCESSED element in the corresponding queue. Length of this list equals the “pipeline_length”.
If processed_queue_indices = [0, 2]
This means for the 1st queue, the last PROCESSED job is at index-0. Only the first element in queue[0] is PROCESSED
Similarly, for the 2nd queue, the last PROCESSED job is at index-2. All elements from indices [0, 2] are PROCESSED
- Type
List [int]
-
Args
¶ pipeline_length: The length of the current pipeline being executed
-
add_to_queue
(queue_index, job)[source]¶ Add a job to a particular queue.
- Parameters
queue_index (
int
) – The queue that the job is to be added.job (
ProcessJob
) – The job to be added.
- Returns
None
-