Pipeline

Process Pipeline

class forte.pipeline.Pipeline(resource=None, ontology_file=None, enforce_consistency=False, do_init_type_check=False)[source]

This controls the main inference flow of the system. A pipeline is consisted of a set of Components (readers and processors). The data flows in the pipeline as data packs, and each component will use or add information to the data packs.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will check the content expectations specified in each pipeline component. This function works with initialize() called after itself. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a ExpectedEntryNotFound if the check fails. The example of implementation is mentioned in the docstrings of __init__().

Parameters

enforce – A boolean of whether to enable consistency checking for the pipeline or not.

init_from_config_path(config_path)[source]

Read the configurations from the given path config_path and build the pipeline with the config.

Parameters

config_path – A string of the configuration path, which is is a YAML file that specify the structure and parameters of the pipeline.

init_from_config(configs)[source]

Initialized the pipeline (ontology and processors) from the given configurations.

Parameters

configs – The configs used to initialize the pipeline. It should be a dictionary that contains forte_ir_version, components and states. forte_ir_version is a string used to validate input format. components is a list of dictionary that contains type (the class of pipeline components), configs (the corresponding component’s configs) and selector. states will be used to update the pipeline states based on the fields specified in states.attribute and states.resource.

save(path)[source]

Store the pipeline as an IR(intermediate representation) in yaml. The path can then be passed to init_from_config_path to initialize a pipeline. Note that calling init_from_config from a different python environment may not work for some self defined component classes because their module name is __main__.

Parameters

path – The file path to save configurations.

serve(host='localhost', port=8008, service_name='', input_format='string')[source]

Start a service of the current pipeline at a specified host and port.

Parameters
  • host – Port number of pipeline service.

  • port – Host name of pipeline service.

  • service_name – Assign a name to the pipeline service for validation. This will appear in the service_name field on default page and can be queried and validated against the expected service name set by user. Default to ‘’.

  • input_format – Specify format of the input for validation. It can be “string” or “DataPack”. This will appear in the input_format field on default page and can be queried and validated against the expected input format set by user. Default to “string”.

set_profiling(enable_profiling=True)[source]

Set profiling option.

Parameters

enable_profiling – A boolean of whether to enable profiling for the pipeline or not (the default is True).

initialize()[source]

This function should be called before the pipeline can be used to process the actual data. This function will call the initialize of all the components inside this pipeline.

Returns:

initialize_components()[source]

This function will initialize all the components in this pipeline, except the reader. The components are initialized in a FIFO manner based on the order of insertion,

During initialization, the component will be configured based on its corresponding configuration. However, if the component is already initialized (for example, being initialized manually or used twice in the same pipeline), the new configuration will be ignored.

The pipeline will check for type dependencies between the components inside this pipeline, see enforce_consistency() for more details.

initialize_selectors()[source]

This function will reset the states of selectors

set_reader(reader, config=None)[source]

Set the reader of the pipeline. A reader is the entry point of this pipeline, data flown into the reader will be converted to the data pack format, and being passed onto the other components for processing.

Parameters
  • reader – The reader to be used of the pipeline

  • config – The custom configuration to be passed to the reader. If the config is not provided, the default config defined by the reader class will be used.

Returns

The pipeline itself, which allows you to directly chain other pipeline construction code afterwards, i.e., you can do:

Pipeline().set_reader(your_reader()).add(your_processor())

property components

Return all the components in this pipeline, except the reader.

Returns: A list containing the components.

property component_configs

Return the configs related to the components, except the reader.

Returns: A list containing the components configs.

add(component, config=None, selector=None, selector_config=None)[source]

Adds a pipeline component to the pipeline. The pipeline components will form a chain based on the insertion order. The customized config and selector (Selector) will be associated with this particular component. If the config or the selector is not provided, the default ones will be used.

Here, note that the same component instance can be added multiple times to the pipeline. In such cases, the instance will only be setup at the first insertion (i.e. its initialize function will only be called once). The subsequent insertion of the same component instance will not change the behavior nor the states of the instance. Thus, a different config cannot be provided (should be None) when added the second time, otherwise a ProcessorConfigError will be thrown. In the case where one want to them to behave differently, a different instance should be used.

Parameters
  • component (PipelineComponent) – The component to be inserted next to the pipeline.

  • config (Union[Config, Dict[str, Any]) – The custom configuration to be used for the added component. Default None, which means the default_configs() of the component will be used.

  • selector (Selector) – The selector used to pick the corresponding data pack to be consumed by the component. Default None, which means the whole pack will be used.

Returns

The pipeline itself, which enables one to chain the creation of the pipeline, i.e., you can do:

Pipeline().set_reader(your_reader()).add(
    your_processor()).add(anther_processor())

add_gold_packs(pack)[source]

Add gold packs to a internal dictionary used for evaluation. This dictionary is used by the evaluator while calling consume_next(…)

Parameters

pack (Dict) – A key, value pair containing job.id -> gold_pack mapping

process(*args, **kwargs)[source]

Alias for process_one().

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

run(*args, **kwargs)[source]

Run the whole pipeline and ignore all returned DataPack. This is mostly used when you need to run the pipeline and do not require the output but rely on the side-effect. For example, if the pipeline writes some data to disk.

Calling this function will automatically call the initialize() at the beginning, and call the finish() at the end.

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

process_one(*args, **kwargs)[source]

Process one single data pack. This is done by only reading and processing the first pack in the reader.

Parameters

kwargs – the information needed to load the data. For example, if _reader is StringReader, this should contain a single piece of text in the form of a string variable. If _reader is a file reader, this can point to the file path.

process_dataset(*args, **kwargs)[source]

Process the documents in the data source(s) and return an iterator or list of DataPacks. The arguments are directly passed to the reader to take data from the source.

finish()[source]

Call the finish method of all pipeline component. This need to be called explicitly to release all resources.

evaluate()[source]

Call the evaluators in the pipeline to collect their results.

Returns

Iterator of the evaluator results. Each element is a tuple, where the first one is the name of the evaluator, and the second one is the output of the evaluator (see get_result()).

Train Pipeline

class forte.train_pipeline.TrainPipeline(train_reader, trainer, dev_reader, configs, preprocessors=None, evaluator=None, predictor=None)[source]

Pipeline Component

class forte.pipeline_component.PipelineComponent[source]

The base class for all pipeline component. A pipeline component represents one node in the pipeline, and would perform certain action on the data pack. All pipeline components should extend this class.

resources

The resources that can be used by this component, the resources object is a shared object across the whole pipeline.

configs

The configuration of this component, will be built by the pipeline based on the default_configs() and the configs provided by the users.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will enforce the content expectations specified in each pipeline component. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a ExpectedEntryNotFound if the check fails. When this function is called with enforce is True, all the pipeline components would check if the input datapack record matches with the expected types and attributes if function expected_types_and_attributes is implemented for the processor. For example, processor A requires entry type of ft.onto.base_ontology.Sentence, and processor B would produce this type in the output datapack, so record function of processor B writes the record of this type in the datapack and processor A implements expected_types_and_attributes to add this type. Then when the pipeline runs with enforce_consistency, processor A would check if this type exists in the record of the output of the previous pipeline component.

Parameters

enforce – A boolean of whether to enable consistency checking for the pipeline or not.

initialize(resources, configs)[source]

The pipeline will call the initialize method at the start of a processing. The processor and reader will be initialized with configs, and register global resources into resource. The implementation should set up the states of the component.

Parameters
  • resources (Resources) – A global resource register. User can register shareable resources here, for example, the vocabulary.

  • configs (Config) – The configuration passed in to set up this component.

reset_flags()[source]

Reset the flags related to this component. This will be called first when doing initialization.

add_entry(pack, entry)[source]

The component can manually call this function to add the entry into the data pack immediately. Otherwise, the system will add the entries automatically when this component finishes.

Parameters
  • pack (BasePack) – The pack to add the entry into.

  • entry (Entry) – The entry to be added.

Returns:

flush()[source]

Indicate that there will be no more packs to be passed in, handle what’s remaining in the buffer.

finish(resource)[source]

The pipeline will call this function at the end of the pipeline to notify all the components. The user can implement this function to release resources used by this component. The component can also add objects to the resources.

Parameters

resource (Resources) – A global resource registry.

classmethod default_configs()[source]

Returns a dict of configurations of the component with default values. Used to replace the missing values of input configs during pipeline construction.