Pipeline

Process Pipeline

class forte.pipeline.Pipeline(resource=None, ontology_file=None, enforce_consistency=False)[source]

This controls the main inference flow of the system. A pipeline is consisted of a set of Components (readers and processors). The data flows in the pipeline as data packs, and each component will use or add information to the data packs.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will check the content expectations specified in each pipeline component. This function works with initialize() called after itself. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a ExpectedEntryNotFound if the check fails. The example of implementation is mentioned in the docstrings of __init__().

Parameters

enforce – A boolean of whether to enable consistency checking for the pipeline or not.

init_from_config_path(config_path)[source]

Read the configurations from the given path config_path and build the pipeline with the config.

Parameters

config_path – A string of the configuration path, which is is a YAML file that specify the structure and parameters of the pipeline.

init_from_config(configs)[source]

Initialized the pipeline (ontology and processors) from the given configurations.

Parameters

configs – The configs used to initialize the pipeline.

set_profiling(enable_profiling=True)[source]

Set profiling option.

Parameters

enable_profiling – A boolean of whether to enable profiling for the pipeline or not (the default is True).

initialize()[source]

This function should be called before the pipeline can be used to process the actual data. This function will call the initialize of all the components inside this pipeline.

Returns:

initialize_components()[source]

This function will initialize all the components in this pipeline, except the reader. The components are initialized in a FIFO manner based on the order of insertion,

During initialization, the component will be configured based on its corresponding configuration. However, if the component is already initialized (for example, being initialized manually or used twice in the same pipeline), the new configuration will be ignored.

The pipeline will check for type dependencies between the components inside this pipeline, see enforce_consistency() for more details.

set_reader(reader, config=None)[source]

Set the reader of the pipeline. A reader is the entry point of this pipeline, data flown into the reader will be converted to the data pack format, and being passed onto the other components for processing.

Parameters
  • reader – The reader to be used of the pipeline

  • config – The custom configuration to be passed to the reader. If the config is not provided, the default config defined by the reader class will be used.

Returns

The pipeline itself, which allows you to directly chain other pipeline construction code afterwards, i.e., you can do:

Pipeline().set_reader(your_reader()).add(your_processor())

property components

Return all the components in this pipeline, except the reader.

Returns: A list containing the components.

property component_configs

Return the configs related to the components, except the reader.

Returns: A list containing the components configs.

add(component, config=None, selector=None)[source]

Adds a pipeline component to the pipeline. The pipeline components will form a chain based on the insertion order. The customized config and selector (Selector) will be associated with this particular component. If the config or the selector is not provided, the default ones will be used.

Here, note that the same component instance can be added multiple times to the pipeline. In such cases, the instance will only be setup at the first insertion (i.e. its initialize function will only be called once). The subsequent insertion of the same component instance will not change the behavior nor the states of the instance. Thus, a different config cannot be provided (should be None) when added the second time, otherwise a ProcessorConfigError will be thrown. In the case where one want to them to behave differently, a different instance should be used.

Parameters
  • component (PipelineComponent) – The component to be inserted next to the pipeline.

  • config (Union[Config, Dict[str, Any]) – The custom configuration to be used for the added component. Default None, which means the default_configs() of the component will be used.

  • selector (Selector) – The selector used to pick the corresponding data pack to be consumed by the component. Default None, which means the whole pack will be used.

Returns

The pipeline itself, which enables one to chain the creation of the pipeline, i.e., you can do:

Pipeline().set_reader(your_reader()).add(
    your_processor()).add(anther_processor())

add_gold_packs(pack)[source]

Add gold packs to a internal dictionary used for evaluation. This dictionary is used by the evaluator while calling consume_next(…)

Parameters

pack (Dict) – A key, value pair containing job.id -> gold_pack mapping

process(*args, **kwargs)[source]

Alias for process_one().

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

run(*args, **kwargs)[source]

Run the whole pipeline and ignore all returned DataPack. This is mostly used when you need to run the pipeline and do not require the output but rely on the side-effect. For example, if the pipeline writes some data to disk.

Calling this function will automatically call the initialize() at the beginning, and call the finish() at the end.

Parameters
  • args – The positional arguments used to get the initial data.

  • kwargs – The keyword arguments used to get the initial data.

process_one(*args, **kwargs)[source]

Process one single data pack. This is done by only reading and processing the first pack in the reader.

Parameters

kwargs – the information needed to load the data. For example, if _reader is StringReader, this should contain a single piece of text in the form of a string variable. If _reader is a file reader, this can point to the file path.

process_dataset(*args, **kwargs)[source]

Process the documents in the data source(s) and return an iterator or list of DataPacks. The arguments are directly passed to the reader to take data from the source.

finish()[source]

Call the finish method of all pipeline component. This need to be called explicitly to release all resources.

evaluate()[source]

Call the evaluators in the pipeline to collect their results.

Returns

Iterator of the evaluator results. Each element is a tuple, where the first one is the name of the evaluator, and the second one is the output of the evaluator (see get_result()).

Train Pipeline

class forte.train_pipeline.TrainPipeline(train_reader, trainer, dev_reader, configs, preprocessors=None, evaluator=None, predictor=None)[source]

Pipeline Component

class forte.pipeline_component.PipelineComponent[source]

The base class for all pipeline component. A pipeline component represents one node in the pipeline, and would perform certain action on the data pack. All pipeline components should extend this class.

resources

The resources that can be used by this component, the resources object is a shared object across the whole pipeline.

configs

The configuration of this component, will be built by the pipeline based on the default_configs() and the configs provided by the users.

enforce_consistency(enforce=True)[source]

This function determines whether the pipeline will enforce the content expectations specified in each pipeline component. Each component will check whether the input pack contains the expected data via checking the meta-data, and throws a ExpectedEntryNotFound if the check fails. When this function is called with enforce is True, all the pipeline components would check if the input datapack record matches with the expected types and attributes if function expected_types_and_attributes is implemented for the processor. For example, processor A requires entry type of ft.onto.base_ontology.Sentence, and processor B would produce this type in the output datapack, so record function of processor B writes the record of this type in the datapack and processor A implements expected_types_and_attributes to add this type. Then when the pipeline runs with enforce_consistency, processor A would check if this type exists in the record of the output of the previous pipeline component.

Parameters

enforce – A boolean of whether to enable consistency checking for the pipeline or not.

initialize(resources, configs)[source]

The pipeline will call the initialize method at the start of a processing. The processor and reader will be initialized with configs, and register global resources into resource. The implementation should set up the states of the component.

Parameters
  • resources (Resources) – A global resource register. User can register shareable resources here, for example, the vocabulary.

  • configs (Config) – The configuration passed in to set up this component.

reset_flags()[source]

Reset the flags related to this component. This will be called first when doing initialization.

add_entry(pack, entry)[source]

The component can manually call this function to add the entry into the data pack immediately. Otherwise, the system will add the entries automatically when this component finishes.

Parameters
  • pack (BasePack) – The pack to add the entry into.

  • entry (Entry) – The entry to be added.

Returns:

flush()[source]

Indicate that there will be no more packs to be passed in, handle what’s remaining in the buffer.

finish(resource)[source]

The pipeline will call this function at the end of the pipeline to notify all the components. The user can implement this function to release resources used by this component. The component can also add objects to the resources.

Parameters

resource (Resources) – A global resource registry.

classmethod make_configs(configs)[source]

Create the component configuration for this class, by merging the provided config with the default_configs().

The following config conventions are expected:
  • The top level key can be a special config_path.

  • config_path should be point to a file system path, which will

    be a YAML file containing configurations.

  • Other key values in the configs will be considered as parameters.

Parameters

configs – The input config to be merged with the default config.

Returns

The merged configuration.

classmethod default_configs()[source]

Returns a dict of configurations of the component with default values. Used to replace the missing values of input configs during pipeline construction.