Machine Translation Inference Pipeline

Packages

[ ]:
import os
import shutil
from typing import Dict
from transformers import T5Tokenizer, T5ForConditionalGeneration
from forte import Pipeline
from forte.data import DataPack
from forte.common import Resources, Config
from forte.processors.base import PackProcessor
from forte.data.readers import PlainTextReader

Background

After a Data Scientist is satisfied with the results of a training model, they will have their notebook over to an MLE who has to convert their model into an inference model.

Inference Workflow

Pipeline

We consider t5-small as a trained MT model to simplify the example. We should always consider pipeline first when it comes to an inference workflow. As the glossary suggests, it’s an inference system that contains a set of processing components.

Therefore, we initialize a pipeline below.

[ ]:
pipeline: Pipeline = Pipeline[DataPack]()

Reader

After observing the dataset, it’s a plain txt file. Therefore, we can use PlainTextReader directly.

[ ]:
pipeline.set_reader(PlainTextReader())

However, it’s still beneficial to take a deeper look at how to design this class so that users can customize a reader when needed.

Processor

We already have an inference model, t5-small, and we need a component to make an inference. Therefore, besides the model itself, there are several behaviors needed. 1. tokenization that transforms input text into sequences of tokens. 2. since T5 has a better performance given a task prompt, we also want to include the prompt in our data.

In forte, we have a generic class PackProcessor that wraps model and inference-related components and behaviors to process DataPack. We need to create a class that inherits the generic method and customizes the behaviors.

The generic method to process DataPack is _process(self, input_pack: DataPack). It should tokenize the input text, use the model class to make an inference, decode the output token ids, and finally writes the output to a target file.

Given what we discussed, we have a processor class below, and we need to add it to the pipeline after defining it.

[ ]:

class MachineTranslationProcessor(PackProcessor): """ Translate the input text and output to a file. """ def initialize(self, resources: Resources, configs: Config): super().initialize(resources, configs) # Initialize the tokenizer and model model_name: str = self.configs.pretrained_model self.tokenizer = T5Tokenizer.from_pretrained(model_name) self.model = T5ForConditionalGeneration.from_pretrained(model_name) self.task_prefix = "translate English to German: " self.tokenizer.padding_side = "left" self.tokenizer.pad_token = self.tokenizer.eos_token if not os.path.isdir(self.configs.output_folder): os.mkdir(self.configs.output_folder) def _process(self, input_pack: DataPack): file_name: str = os.path.join( self.configs.output_folder, os.path.basename(input_pack.pack_name) ) # en2de machine translation inputs = self.tokenizer([ self.task_prefix + sentence for sentence in input_pack.text.split('\n') ], return_tensors="pt", padding=True) output_sequences = self.model.generate( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], do_sample=False, ) outputs = self.tokenizer.batch_decode( output_sequences, skip_special_tokens=True ) # Write output to the specified file with open(file=file_name, mode='w') as f: f.write('\n'.join(outputs)) @classmethod def default_configs(cls) -> Dict: return { "pretrained_model": "t5-small", "output_folder": "mt_test_output" } pipeline.add(MachineTranslationProcessor(), config={ "pretrained_model": "t5-small" })

Examples

We have a working MT translation pipeline example.

There are several basic functions of the processor and internal functions defined in this example.

  • initialize(): Pipeline will call it at the start of processing. The processor will be initialized with configs, and register global resources into resource. The implementation should set up the states of the component.

  • initialize a pre-trained model

  • initialize tokenizer

  • initialize model-specific attributes such as task prefix

  • process(): using the loaded model to make predictions and write the prediction results out.

  • we first tokenize the input text

  • then, we use model to generate output sequence ids

  • then, we decode output sequence ids into tokens and write the output into a file

After setting up the pipeline’s components, we can run the pipeline on the input directory as below.

[ ]:
dir_path = os.path.abspath(
            os.path.join("data_samples", "machine_translation")
        ) # notebook should be running from project root folder

pipeline.run(dir_path)
print("Done successfully")

One can investigate the machine translation output in folder mt_test_output located under the script’s directory. Then we remove the output folder below.

[ ]:
shutil.rmtree(MachineTranslationProcessor.default_configs()["output_folder"])