Pipeline

Process Pipeline

class forte.pipeline.Pipeline(resource=None, ontology_file=None, enforce_consistency=False, do_init_type_check=False)[source]

This controls the main inference flow of the system. A pipeline is consisted of a set of Components (readers and processors). The data flows in the pipeline as data packs, and each component will use or add information to the data packs.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will check the content expectations specified in each pipeline component. This function works with initialize() called after itself. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a EntryNotFoundError if the check fails. The example of implementation is mentioned in the docstrings of __init__().

Parameters

enforce (bool) – A boolean of whether to enable consistency checking for the pipeline or not.

init_from_config_path(config_path)[source]

Read the configurations from the given path config_path and build the pipeline with the config.

Parameters

config_path – A string of the configuration path, which is is a YAML file that specify the structure and parameters of the pipeline.

init_from_config(configs)[source]

Initialized the pipeline (ontology and processors) from the given configurations.

Parameters

configs (Dict[str, Any]) – The configs used to initialize the pipeline. It should be a dictionary that contains forte_ir_version, components and states. forte_ir_version is a string used to validate input format. components is a list of dictionary that contains type (the class of pipeline components), configs (the corresponding component’s configs) and selector. states will be used to update the pipeline states based on the fields specified in states.attribute and states.resource.

save(path)[source]

Store the pipeline as an IR(intermediate representation) in yaml. The path can then be passed to init_from_config_path() to initialize a pipeline. Note that calling init_from_config() from a different python environment may not work for some self defined component classes because their module name is __main__.

Parameters

path (str) – The file path to save configurations.

export(name=None)[source]

Exports pipeline to FORTE_EXPORT_PATH.

FORTE_EXPORT_PATH is a directory where all serialized pipeline will be stored. Users can specify through environment variable FORTE_EXPORT_PATH.

This method will have the following behaviors:

  • FORTE_EXPORT_PATH will be created if assigned but not found.

  • If name is not provided, a default name pipeline will be used and suffixed by UUID, to prevent overwriting (e.g. pipeline-4ba29336-aa05-11ec-abec-309c23414763.yml).

  • If name is provided, then no suffix will be appended.

  • The pipeline is saved by save(), which exports the pipeline by _dump_to_config() and saves it to a YAML file.

Parameters

name (Optional[str]) – Export name of the pipeline. Default is None.

Returns

Export path of pipeline config YAML.

Return type

Optional[str]

Raises

ValueError – if export name is already taken.

serve(host='localhost', port=8008, service_name='', input_format='string')[source]

Start a service of the current pipeline at a specified host and port.

Parameters
  • host (str) – Port number of pipeline service.

  • port (int) – Host name of pipeline service.

  • service_name (str) – Assign a name to the pipeline service for validation. This will appear in the service_name field on default page and can be queried and validated against the expected service name set by user. Default to ‘’.

  • input_format (str) – Specify format of the input for validation. It can be “string” or “DataPack”. This will appear in the input_format field on default page and can be queried and validated against the expected input format set by user. Default to “string”.

Raises

ImportError – An error occurred importing uvicorn module.

set_profiling(enable_profiling=True)[source]

Set profiling option.

Parameters

enable_profiling (bool) – A boolean of whether to enable profiling for the pipeline or not (the default is True).

initialize()[source]

This function should be called before the pipeline can be used to process the actual data. This function will call the initialize of all the components inside this pipeline.

Return type

Pipeline

Returns

None

initialize_components()[source]

This function will initialize all the components in this pipeline, except the reader. The components are initialized in a FIFO manner based on the order of insertion,

During initialization, the component will be configured based on its corresponding configuration. However, if the component is already initialized (for example, being initialized manually or used twice in the same pipeline), the new configuration will be ignored.

The pipeline will check for type dependencies between the components inside this pipeline, see enforce_consistency() for more details.

initialize_selectors()[source]

This function will reset the states of selectors

set_reader(reader, config=None)[source]

Set the reader of the pipeline. A reader is the entry point of this pipeline, data flown into the reader will be converted to the data pack format, and being passed onto the other components for processing.

Parameters
  • reader (BaseReader) – The reader to be used of the pipeline

  • config (Union[HParams, Dict[str, Any], None]) – The custom configuration to be passed to the reader. If the config is not provided, the default config defined by the reader class will be used.

Return type

Pipeline

Returns

The pipeline itself, which allows you to directly chain other pipeline construction code afterwards, i.e., you can do:

Pipeline().set_reader(your_reader()).add(your_processor())

property components

Return all the components in this pipeline, except the reader.

Returns: A list containing the components.

Return type

List[PipelineComponent]

property ref_names

Return all the reference names in this pipeline, except the reader.

Returns: A dictionary containing the reference names.

Return type

Dict[str, int]

property component_configs

Return the configs related to the components, except the reader.

Returns: A list containing the components configs.

Return type

List[Optional[HParams]]

add(component, config=None, selector=None, selector_config=None, ref_name=None)[source]

Adds a pipeline component to the pipeline. The pipeline components will form a chain based on the insertion order. The customized config and selector (Selector) will be associated with this particular component. If the config or the selector is not provided, the default ones will be used.

Here, note that the same component instance can be added multiple times to the pipeline. In such cases, the instance will only be setup at the first insertion (i.e. its initialize function will only be called once). The subsequent insertion of the same component instance will not change the behavior nor the states of the instance. Thus, a different config cannot be provided (should be None) when added the second time, otherwise a ProcessorConfigError will be thrown. In the case where one want to them to behave differently, a different instance should be used.

Parameters
  • component (PipelineComponent) – The component to be inserted next to the pipeline.

  • config (Union[HParams, Dict[str, Any], None]) – The custom configuration to be used for the added component. Default None, which means the default_configs() of the component will be used.

  • selector (Optional[Selector]) – The selector used to pick the corresponding data pack to be consumed by the component. Default None, which means the whole pack will be used.

Return type

Pipeline

Returns

The pipeline itself, which enables one to chain the creation of the pipeline, i.e., you can do:

Pipeline().set_reader(your_reader()).add(
    your_processor()).add(anther_processor())

add_gold_packs(pack)[source]

Add gold packs to a internal dictionary used for evaluation. This dictionary is used by the evaluator while calling consume_next(…)

Parameters

pack – A key, value pair containing job.id -> gold_pack mapping

process(*args, **kwargs)[source]

Alias for process_one().

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

Return type

~PackType

run(*args, **kwargs)[source]

Run the whole pipeline and ignore all returned DataPack. This is mostly used when you need to run the pipeline and do not require the output but rely on the side-effect. For example, if the pipeline writes some data to disk.

Calling this function will automatically call the initialize() at the beginning, and call the finish() at the end.

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

process_one(*args, **kwargs)[source]

Process one single data pack. This is done by only reading and processing the first pack in the reader.

Parameters

kwargs – the information needed to load the data. For example, if _reader is StringReader, this should contain a single piece of text in the form of a string variable. If _reader is a file reader, this can point to the file path.

Return type

~PackType

process_dataset(*args, **kwargs)[source]

Process the documents in the data source(s) and return an iterator or list of DataPacks. The arguments are directly passed to the reader to take data from the source.

Return type

Iterator[~PackType]

finish()[source]

Call the finish method of all pipeline component. This need to be called explicitly to release all resources.

evaluate()[source]

Call the evaluators in the pipeline to collect their results.

Return type

Iterator[Tuple[str, Any]]

Returns

Iterator of the evaluator results. Each element is a tuple, where the first one is the name of the evaluator, and the second one is the output of the evaluator (see get_result()).

get_component(ref_name)[source]

Call the evaluator in the pipeline by the reference name to get a component.

Parameters

ref_name (str) – the reference name of a component

Return type

PipelineComponent[Any]

Train Pipeline

class forte.train_pipeline.TrainPipeline(train_reader, trainer, dev_reader, configs, preprocessors=None, evaluator=None, predictor=None)[source]

Pipeline Component

class forte.pipeline_component.PipelineComponent[source]

The base class for all pipeline component. A pipeline component represents one node in the pipeline, and would perform certain action on the data pack. All pipeline components should extend this class.

resources

The resources that can be used by this component, the resources object is a shared object across the whole pipeline.

configs

The configuration of this component, will be built by the pipeline based on the default_configs() and the configs provided by the users.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will enforce the content expectations specified in each pipeline component. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a EntryNotFoundError if the check fails. When this function is called with enforce is True, all the pipeline components would check if the input datapack record matches with the expected types and attributes if function expected_types_and_attributes is implemented for the processor. For example, processor A requires entry type of ft.onto.base_ontology.Sentence, and processor B would produce this type in the output datapack, so record function of processor B writes the record of this type in the datapack and processor A implements expected_types_and_attributes to add this type. Then when the pipeline runs with enforce_consistency, processor A would check if this type exists in the record of the output of the previous pipeline component.

Parameters

enforce (bool) – A boolean of whether to enable consistency checking for the pipeline or not.

initialize(resources, configs)[source]

The pipeline will call the initialize method at the start of a processing. The processor and reader will be initialized with configs, and register global resources into resource. The implementation should set up the states of the component.

Parameters
  • resources (Resources) – A global resource register. User can register shareable resources here, for example, the vocabulary.

  • configs (HParams) – The configuration passed in to set up this component.

reset_flags()[source]

Reset the flags related to this component. This will be called first when doing initialization.

add_entry(pack, entry)[source]

The component can manually call this function to add the entry into the data pack immediately. Otherwise, the system will add the entries automatically when this component finishes.

Parameters
  • pack (BasePack) – The pack to add the entry into.

  • entry (Entry) – The entry to be added.

Returns

None

flush()[source]

Indicate that there will be no more packs to be passed in, handle what’s remaining in the buffer.

finish(resource)[source]

The pipeline will call this function at the end of the pipeline to notify all the components. The user can implement this function to release resources used by this component. The component can also add objects to the resources.

Parameters

resource (Resources) – A global resource registry.

classmethod default_configs()[source]

Returns a dict of configurations of the component with default values. Used to replace the missing values of input configs during pipeline construction.

Process

class forte.process_job.ProcessJob(pack, is_poison)[source]
alter_pack(pack)[source]

This class alter the pack in this job. This should only be controlled by the system itself. One should not call this function without proper understanding.

Parameters

pack (~PackType) – The pack to be used to replace into this job.

Returns

None

ProcessJobStatus

class forte.process_job.ProcessJobStatus(value)

An enumeration.

ProcessManager

class forte.process_manager.ProcessManager(pipeline_length)[source]

A pipeline level manager that manages global processing information, such as the current running components. This is an internal class and should only be initialized by the system.

pipeline_length

The length of the current pipeline being executed

Type

int

_queues

A list of queues which hold the jobs for each processors. The size of this list is equal to pipeline length

Type

List[Deque[int]]

_current_queue_index

An index indicating which queue to read the data from. A value of -1 indicates read from the reader.

Type

int

_current_processor_index

An index indicating the processor that executes the job

Type

int

_unprocessed_queue_indices

Each element of this list is the index of the first UNPROCESSED element in the corresponding queue. Length of this list equals the “pipeline_length”.

If unprocessed_queue_indices = [0, 2]

  • This means for the 1st queue, the first UNPROCESSED job is at index-0. All elements from indices [0, len(queue[0]) ) are UNPROCESSED.

  • Similarly, for the 2nd queue, the first UNPROCESSED job is at index-2. All elements from indices [2, len(queue[1])) are UNPROCESSED

Type

List[int]

_processed_queue_indices

Each element of this list is the index of the last PROCESSED element in the corresponding queue. Length of this list equals the “pipeline_length”.

If processed_queue_indices = [0, 2]

  • This means for the 1st queue, the last PROCESSED job is at index-0. Only the first element in queue[0] is PROCESSED

  • Similarly, for the 2nd queue, the last PROCESSED job is at index-2. All elements from indices [0, 2] are PROCESSED

Type

List [int]

Args

pipeline_length: The length of the current pipeline being executed

add_to_queue(queue_index, job)[source]

Add a job to a particular queue.

Parameters
  • queue_index (int) – The queue that the job is to be added.

  • job (ProcessJob) – The job to be added.

Returns

None

exhausted()[source]

Returns True only if the last element remaining in the last queue is a poison pack.

Return type

bool