Pipeline

Basics

We want a uniform module that manages the workflow step by step from input to output. For instance, given a data source in txt file for machine translation tasks, we want to read it from the file and use the model to generate the translated text.

Related Readings:

An working example

[ ]:
# set up
from forte import Pipeline
from forte.data.readers import PlainTextReader
from forte.processors.third_party.machine_translation_processor import MachineTranslationProcessor
import os


# notebook should be running from project root folder
dir_path = os.path.abspath(
            os.path.join("data_samples", "machine_translation")
        )

# pipeline code
pipeline: Pipeline = Pipeline() # intialize a pipeline
pipeline.set_reader(PlainTextReader())
pipeline.add(MachineTranslationProcessor(), config={
    "pretrained_model": "t5-small"
})
pipeline.run(dir_path) # it will call `initialize()` internally

The code is derived from machine translation task.

After initializing pipeline, we need to read data from the data source, so we set reader first. Then we add MachineTranslationProcessor into pipeline. Finally, we run the pipeline, and the output should be available under mt_test_output folder in the notebook directory. We can also pass Dictionary configuration while adding PipelineComponent.

Usually, we only need to read from the data source after setting the reader once. However, we can add multiple processors into the pipeline when needed. For example, when data in one DataPack is a paragraph, we might want to add NLTKSentenceSegmentater to segment paragraphs into sentences.

Plus, readers and processors are all PipelineComponent. Let’s consider the modules doing these tasks as PipelineComponent. Then, we focus on Pipeline, which contains PipelineComponent and how it runs through the task.

Life Cycle

Generally, there are four life cycles for the pipeline.

  1. Before initializing: we add PipelineComponent into the pipelines.

  2. After initializing: we have all PipelineComponent in the pipeline initialized.

  3. Running pipeline.

  4. Finish pipeline: we collect resources used by the pipeline.

Pseudocode with PipelineComponent

Let’s check out pseudocode for setting up and running a pipeline.

pipeline: Pipeline = Pipeline() # intialize a pipeline
pipeline.set_reader(SomePipelineComponent())
pipeline.add(SomePipelineComponent())
pipeline.add(SomePipelineComponent())
pipeline.run(data_source) # it will call `initialize()` internally to initialize all :class:`PipelineComponent` in the pipeline.

As we can see, after initializing a pipeline, we set PipelineComponent as the reader, which is the beginning of the workflow, and add PipelineComponent into the workflow and then call run() on the data source. PipelineComponent keeps the order of adding internally, and it is the same as the workflow order. As we can see, the whole pipeline setup is easy and clean as it’s a modular way of managing/running workflow.

PipelineComponent can be reader, processor or selector. We will take a deeper look in the next sections.