Building a Machine Translation System with Forte

Overview

This tutorial will walk you through the steps to build a machine translation system with Forte. Forte allows users to breaks down complex problems into composable pipelines and enables inter-operations across tasks through a unified data format. With Forte, it’s easy to compose a customized machine translation management system that is able to handle practical problems like new feature requests.

In this tutorial, you will learn:

  • How to read data from source

    • How to create a simple NLP pipeline

    • How to maintain and store the input data

  • How to process data in pipeline

    • How to perform sentence segmentation

    • How to annotate and query the data

    • How to translate the input text with a pre-trained model

    • How to manage multiple data objects

  • How to handle new practical requests

    • How to handle structures like HTML data

      • How to select a single data object for processing

    • How to replace the translation model with remote translation services

    • How to save and load the pipeline

Run the following command to install all the required dependencies for this tutorial:

[ ]:
!pip install forte==0.2.0 forte.nltk transformers==4.16.2 torch==1.7.0 requests sentencepiece

Start with the Reader

Overview

  • How to read data from source

    • How to create a simple pipeline

    • How to maintain and store the input data

  • How to process data in pipeline

  • How to handle new practical requests

In this section, you will learn * What is a reader and why we need it * How to compose a simple pipeline with a pre-built reader

[ ]:
from forte import Pipeline
from forte.data.readers import TerminalReader
pipeline: Pipeline = Pipeline()

All pipelines need a reader to read and parse input data. To make our pipeline read queries from the user’s command-line terminal, use the TerminalReader class provided by Forte. TerminalReader transforms the user’s query into a DataPack object, which is a unified data format for NLP that makes it easy to connect different NLP tools together as Forte Processors.

[ ]:
pipeline.set_reader(TerminalReader())

To run the pipeline consisting of the single TerminalReader, call process_dataset which will return an iterator of DataPack objects. The second line in the following code snippet retrieves the first user query from the TerminalReader.

[ ]:
pipeline.initialize()
datapack = next(pipeline.process_dataset())
print(datapack.text)

DataPack

Overview

  • How to read data from source

    • How to create a simple pipeline

    • How to maintain and store the input data

  • How to process data in pipeline

  • How to handle new practical requests

In this section, you will learn * What is a DataPack object and why we need it

Forte helps demystify data lineage and increase the traceability of how data flows along the pipeline and how features are generated to interface data to model. Similar to a cargo ship that loads and transports goods from one port to another, a data pack carries information when passing each module and updates the ontology states along the way. |string\_reader|

DataPack and Multi-Modality

DataPack not only supports text data but also audio and image data. |multi\_modal|

Add a pre-built Forte processor to the pipeline

Overview

  • How to read data from source

  • How to process data in pipeline

    • How to perform sentence segmentation

    • How to annotate and query the data

    • How to translate the input text with a pre-trained model

    • How to manage multiple data objects

  • How to handle new practical requests

In this section, you will learn * What is a processor and why we need it * How to add a pre-built processor to the pipeline

A Forte Processor takes DataPacks as inputs, processes them, and stores its outputs in DataPacks. The processors we are going to use in this section are all PackProcessors, which expect exactly one DataPack as input and store its outputs back into the same DataPack. The following two lines of code shows how a pre-built processor NLTKSentenceSegmenter is added to our pipeline.

[ ]:
from fortex.nltk.nltk_processors import NLTKSentenceSegmenter
pipeline.add(NLTKSentenceSegmenter())

When we run the pipeline, the NLTKSentenceSegmenter processor will split the user query into sentences and store them back to the DataPack created by TerminalReader. The code snippet below shows how to get all the sentences from the first query. |sentence\_seg|

[ ]:
from ft.onto.base_ontology import Sentence
[ ]:
pipeline.initialize()
for sent in next(pipeline.process_dataset()).get(Sentence):
    print(sent.text)

Ontology

Overview

  • How to read data from source

  • How to process data in pipeline

    • How to perform sentence segmentation

    • How to annotate and query the data

    • How to translate the input text with a pre-trained model

    • How to manage multiple data objects

  • How to handle new practical requests

In this section, you will learn * What is the ontology system and why we need it * How to write a customized ontology and how to use it

Sentence is a pre-defined ontology provided by Forte and it is used by NLTKSentenceSegmenter to annotate each sentence in text. Forte is built on top of an Ontology system, which defines the relations between NLP annotations, for example, the relation between words and documents, or between two words. This is the core for Forte. The ontology can be specified via a JSON format. And tools are provided to convert the ontology into production code (Python). |onto\_simple|

We can also define customized ontologies:

[ ]:
from dataclasses import dataclass
from forte.data.ontology.top import Annotation
from typing import Optional

@dataclass
class Article(Annotation):

    language: Optional[str]

    def __init__(self, pack, begin: int, end: int):
        super().__init__(pack, begin, end)
        self.language: Optional[str] = None

Below is a simple example showing how we can query sentences through the new ontology we just create:

[ ]:
from forte.data import DataPack

sentences = [
    "Do you want to get better at making delicious BBQ?",
    "You will have the opportunity, put this on your calendar now.",
    "Thursday, September 22nd join World Class BBQ Champion, Tony Balay from Lonestar Smoke Rangers."
]
datapack: DataPack = DataPack()

# Add sentences to the DataPack and annotate them
for sentence in sentences:
    datapack.set_text(datapack.text + sentence)
    datapack.add_entry(
        Sentence(datapack, len(datapack.text) - len(sentence), len(datapack.text))
    )

# Annotate the whole text with Article
article: Article = Article(datapack, 0, len(datapack.text))
article.language = "en"
datapack.add_entry(article)

for article in datapack.get(Article):
    print(f"Article (language - {article.language}):")
    for sentence in article.get(Sentence):
        print(sentence.text)

In our previous example, we have the following ontologies inheritance. Sentence and Article both inherit from Annotation which is used to represent text data. In Article, we have langauge field to represent the text language. |onto\_annotation|

Actually, we not only supports text ontology but also audio, image and link which represent relationships between two entries. |onto\_full| * Annotation is inherited by all text entries which usually has a span to retrieve partial text from the full text. * Article, as shown in our previous example, inherits annotation and contains language field to differentiate English and Germany. In the single DataPack example, English article has a span of English text in the DataPack. Likewise, Germany article has a span of Germany text in the DataPack. * Sentence in our example is used to break down article, and we pass sentences into MT pipeline. * AudioAnnotation is inherited by all audio entries which usually has an audio span to retrieve partial audio from the full audio. * Recording is an example subclass of AudioAnnotation, and it has extra recording_class field denoting the classes the audio belongs to. * ImageAnnotation is inherited by all image entries which usually has payload index pointing to a loaded image array. * BoundingBox is an example subclass of ImageAnnotation. As the picture shows, it has more inheritance relationships than other ontology classes due to the nature of CV objects. The advantage of forte ontology is that it supports complex inheritance, and users can inherit from existing ontology and add new ontology features for their needs. * Link is inherited by all link-like entries which has parent and child. * RelationLink is an example subclass of Link, and it has a class attribute specifying the relation type.

Create a Machine Translation Processor

Overview

  • How to read data from source

  • How to process data in pipeline

    • How to perform sentence segmentation

    • How to annotate and query the data

    • How to translate the input text with a pre-trained model

    • How to manage multiple data objects

  • How to handle new practical requests

In this section, you will learn * The basics of machine translation process * How to wrap a pre-trained machine translation model into a Forte processor

Translation converts a sequence of text from one language to another. In this tutorial we will use Huggingface Transformer model to translate input data, which consists of several steps including subword tokenization, input embedding, model inference, decoding, etc.

|transformer\_mt|

In Forte, we have a generic class PackProcessor that wraps model and inference-related components and behaviors to process DataPack. Therefore, we need to create a class that inherits the generic method from PackProcessor. Then we have a class definition class MachineTranslationProcessor(PackProcessor).

[ ]:
from forte.data import DataPack
from forte.data.readers import StringReader
from forte.processors.base import PackProcessor
from transformers import T5Tokenizer, T5ForConditionalGeneration

class MachineTranslationProcessor(PackProcessor):
    """
    Translate the input text and output to a file.
    """
    def initialize(self, resources, configs):
        super().initialize(resources, configs)

        # Initialize the tokenizer and model
        model_name: str = self.configs.pretrained_model
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)
        self.task_prefix = "translate English to German: "
        self.tokenizer.padding_side = "left"
        self.tokenizer.pad_token = self.tokenizer.eos_token

    def _process(self, input_pack: DataPack):
        # en2de machine translation
        inputs = self.tokenizer([
            self.task_prefix + sentence.text
            for sentence in input_pack.get(Sentence)
        ], return_tensors="pt", padding=True)

        output_sequences = self.model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            do_sample=False,
        )

        output = ''.join(self.tokenizer.batch_decode(
            output_sequences, skip_special_tokens=True
        ))
        src_article: Article = Article(input_pack, 0, len(input_pack.text))
        src_article.language = "en"

        input_pack.set_text(input_pack.text + '\n\n' + output)
        tgt_article: Article = Article(input_pack, len(input_pack.text) - len(output), len(input_pack.text))
        tgt_article.language = "de"

    @classmethod
    def default_configs(cls):
        return {
            "pretrained_model": "t5-small"
        }
  • Initialization of needed components:

    • Users need to consider initializing all needed NLP components for the inference task such as tokenizer and model.

    • Users also need to specify all configuration in configs, a dictionary-like object that specifies configurations of all components such as model name.

  • MT operations on datapack

    • After the initialization, we already have the needed NLP components. We need to consider several MT behaviors based on Forte DataPack.

    • Pre-process text data

      • retrieve text data from datapack (given that it already reads data from the data source).

      • since T5 has a better performance given a task prompt, we also want to include the prompt in our data.

    • Tokenization that transforms input text into sequences of tokens and token ids.

    • Generate output sequences from model.

    • Decode output token ids into sentences using the tokenizer.

The generic method to process DataPack is _process(self, input_pack: DataPack). It should tokenize the input text, use the model class to make an inference, decode the output token ids, and finally writes the output to a target file.

Now we can add it into the pipeline and run the machine translation task.

[ ]:
input_string: str = ' '.join(sentences)
pipeline: Pipeline = Pipeline[DataPack]()
pipeline.set_reader(StringReader())
pipeline.add(NLTKSentenceSegmenter())
pipeline.add(MachineTranslationProcessor())
pipeline.initialize()
for datapack in pipeline.process_dataset([input_string]):
    for article in datapack.get(Article):
        print([f"\nArticle (language - {article.language}): {article.text}"])

Ontology in DataPack

Here we provide an illustration so that users can better understand the internal storage of DataPack. As we can see, text data, sentence and articles, are stored as span in Annotations. Their text data can be easily and efficiently retrieved by their spans. |onto\_and\_datapack|

A better way to store source and target text: MultiPack

Overview

  • How to read data from source

  • How to process data in pipeline

    • How to perform sentence segmentation

    • How to annotate and query the data

    • How to translate the input text with a pre-trained model

    • How to manage multiple data objects

  • How to handle new practical requests

In this section, you will learn * What is a MultiPack and why we need it * How to use a Multipack

The above step outputs a DataPack which is good for holding data about one specific piece of text. A complicated pipeline like the one we are building now may need multiple DataPacks to be passed along the pipeline and this is where MultiPack can help. MultiPack manages a set of DataPacks that can be indexed by their names.

MultiPackBoxer is a simple Forte processor that converts a DataPack into a MultiPack by making it the only DataPack in there. A name can be specified via the config. We use it to wrap DataPack that contains source sentence.

|mp\_boxer|

[ ]:
from forte.data import MultiPack
from forte.processors.base import MultiPackProcessor
from forte.data.caster import MultiPackBoxer

class MachineTranslationMPProcessor(MultiPackProcessor):
    """
    Translate the input text and output to a file.
    """
    def initialize(self, resources, configs):
        super().initialize(resources, configs)

        # Initialize the tokenizer and model
        model_name: str = self.configs.pretrained_model
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)
        self.task_prefix = "translate English to German: "
        self.tokenizer.padding_side = "left"
        self.tokenizer.pad_token = self.tokenizer.eos_token

    def _process(self, input_pack: MultiPack):
        source_pack: DataPack = input_pack.get_pack("source")
        target_pack: DataPack = input_pack.add_pack("target")

        # en2de machine translation
        inputs = self.tokenizer([
            self.task_prefix + sentence.text
            for sentence in source_pack.get(Sentence)
        ], return_tensors="pt", padding=True)

        output_sequences = self.model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            do_sample=False,
        )

        # Annotate the source article
        src_article: Article = Article(source_pack, 0, len(source_pack.text))
        src_article.language = "en"

        # Annotate each sentence
        for output in self.tokenizer.batch_decode(
            output_sequences, skip_special_tokens=True
        ):
            target_pack.set_text(target_pack.text + output)
            text_length: int = len(target_pack.text)
            Sentence(target_pack, text_length - len(output), text_length)

        # Annotate the target article
        tgt_article: Article = Article(target_pack, 0, len(target_pack.text))
        tgt_article.language = "de"

    @classmethod
    def default_configs(cls):
        return {
            "pretrained_model": "t5-small",
        }

Then MachineTranslationMPProcessor writes the output sentence into a target DataPack.

|mp\_mt|

Now let’s try to create a new pipeline that utilizes MultiPack to manage text in different languages.

[ ]:
nlp: Pipeline = Pipeline[DataPack]()
nlp.set_reader(StringReader())
nlp.add(NLTKSentenceSegmenter())
nlp.add(MultiPackBoxer(), config={"pack_name": "source"})
nlp.add(MachineTranslationMPProcessor(), config={
    "pretrained_model": "t5-small"
})
nlp.initialize()
for multipack in nlp.process_dataset([input_string]):
    for pack_name in ("source", "target"):
        for article in multipack.get_pack(pack_name).get(Article):
            print(f"\nArticle (language - {article.language}): ")
            for sentence in article.get(Sentence):
                print(sentence.text)

Ontology in MultiPack

For comparison, here is an illustration of the internal storage of MultiPack. We can see that MultiPack wraps one source DataPack and one target DataPack. Article spans are based on two separate DataPack text. |onto\_and\_multipack|

New Requirement: Handle HTML data

Overview

  • How to read data from source

  • How to process data in pipeline

  • How to handle new practical requests

    • How to handle structures like HTML data

      • How to select a single data object for processing

    • How to replace the translation model with remote translation services

    • How to save and load the pipeline

In this section, you will learn * How to build a translation management system * How to preserve the structure like HTML in machine translation * How to select a specific DataPack from MultiPack for processing

In the previous step, the input string is just a simple paragraph made up of several sentences. However, in many cases, we might need to handle data with structural information, such HTML or XML. When the input is a string of raw HTML data, the machine translation pipeline above may not work as expected:

[ ]:
html_input: str = """
<!DOCTYPE html>
<html>
    <head><title>Beginners BBQ Class.</title></head>
    <body>
    <p>Do you want to get better at making delicious BBQ? You will have the opportunity, put this on your calendar now. Thursday, September 22nd join World Class BBQ Champion, Tony Balay from Lonestar Smoke Rangers.</p>
    </body>
</html>
"""
nlp.initialize()
for multipack in nlp.process_dataset([html_input]):
    print("Source Text: " + multipack.get_pack("source").text)
    print("\nTarget Text: " + multipack.get_pack("target").text)

We can see that the original HTML structure is broken in the translated output.

How to preserve HTML tags/structures

In order to handle structured data like HTML, we will need to update our current design of pipeline. Luckily, Forte pipelines are highly modular, we can simply insert two new processors without updating the previous pipeline.

We first need a HTML cleaner to parse all the HTML tags from input string. Picture below shows the effect of tag remover. |tag\_remover|

After the translation is finished, we will also need to recover the HTML structure from the unstructured translation output. Picture below shows replace one source sentence with one target sentence given the target sentence is ready. |tag\_recover|

[ ]:
from forte.data import NameMatchSelector
from forte.data.readers.html_reader import ForteHTMLParser

class HTMLTagCleaner(MultiPackProcessor):

    def initialize(self, resources, configs):
        super().initialize(resources, configs)
        self._parser = ForteHTMLParser()

    def _process(self, input_pack: MultiPack):
        raw_pack: DataPack = input_pack.get_pack("raw")
        source_pack: DataPack = input_pack.add_pack("source")

        self._parser.feed(raw_pack.text)
        cleaned_text: str = raw_pack.text
        for span, _ in self._parser.spans:
            cleaned_text = cleaned_text.replace(
                raw_pack.text[span.begin:span.end], ''
            )
        source_pack.set_text(cleaned_text)

class HTMLTagRecovery(MultiPackProcessor):

    def _process(self, input_pack: MultiPack):
        raw_pack: DataPack = input_pack.get_pack("raw")
        source_pack: DataPack = input_pack.get_pack("source")
        target_pack: DataPack = input_pack.get_pack("target")
        result_pack: DataPack = input_pack.add_pack("result")
        result_text: str = raw_pack.text
        for sent_src, sent_tgt in zip(source_pack.get(Sentence), target_pack.get(Sentence)):
            result_text = result_text.replace(sent_src.text, sent_tgt.text)
        result_pack.set_text(result_text)

Now we are able to create a translation management system by inserting the two processors introduced above into our previous machine translation pipeline.

[ ]:
# Pipeline with HTML handling
pipeline: Pipeline = Pipeline[DataPack]()
pipeline.set_reader(StringReader())
pipeline.add(MultiPackBoxer(), config={"pack_name": "raw"})
pipeline.add(HTMLTagCleaner())
pipeline.add(
    NLTKSentenceSegmenter(),
    selector=NameMatchSelector(),
    selector_config={"select_name": "source"}
)
pipeline.add(MachineTranslationMPProcessor(), config={
    "pretrained_model": "t5-small"
})
pipeline.add(HTMLTagRecovery())

pipeline.initialize()
for multipack in pipeline.process_dataset([html_input]):
    print(multipack.get_pack("raw").text)
    print(multipack.get_pack("result").text)

Selector

In the code snippet above, we utilize a NameMatchSelector to select one specific DataPack from the MultiPack based on its reference name select_name. This allows NLTKSentenceSegmenter to process only the specified DataPack.

Replace our MT model with online translation API

Overview

  • How to read data from source

  • How to process data in pipeline

  • How to handle new practical requests

    • How to handle structures like HTML data

    • How to replace the translation model with remote translation services

    • How to save and load the pipeline

In this section, you will learn * How to use a different translation service

Forte also allows us to update the translation model and integrate it seamlessly to the original pipeline. For example, if we want to offload the translation task to an online service, all we need to do is to update the translation processor. There is no need to change other components in the pipeline.

[ ]:
# You can get your own API key by following the instructions in https://docs.microsoft.com/en-us/azure/cognitive-services/translator/
api_key = input("Enter your API key here:")
[ ]:
import requests
import uuid

class OnlineMachineTranslationMPProcessor(MultiPackProcessor):
    """
    Translate the input text and output to a file use online translator api.
    """
    def initialize(self, resources, configs):
        super().initialize(resources, configs)
        self.url = configs.endpoint + configs.path
        self.from_lang = configs.from_lang
        self.to_lang = configs.to_lang
        self.subscription_key = configs.subscription_key
        self.subscription_region = configs.subscription_region

    def _process(self, input_pack: MultiPack):
        source_pack: DataPack = input_pack.get_pack("source")
        target_pack: DataPack = input_pack.add_pack("target")

        params = {
            'api-version': '3.0',
            'from': 'en',
            'to': ['de']
        }
        # Build request
        headers = {
            'Ocp-Apim-Subscription-Key': self.subscription_key,
            'Ocp-Apim-Subscription-Region': self.subscription_region,
            'Content-type': 'application/json',
            'X-ClientTraceId': str(uuid.uuid4())
        }
        # You can pass more than one object in body.
        body = [{
            'text': source_pack.text
        }]

        request = requests.post(self.url, params=params, headers=headers, json=body)

        result = request.json()
        target_pack.set_text("".join(
            [trans['text'] for trans in result[0]["translations"]]
             )
        )

    @classmethod
    def default_configs(cls):
        return {
            "from_lang" : 'en',
            "to_lang":  'de',
            "endpoint" : 'https://api.cognitive.microsofttranslator.com/',
            "path" : '/translate',
            "subscription_key": None,
            "subscription_region" : "westus2",
            'X-ClientTraceId': str(uuid.uuid4())
        }
[ ]:
nlp: Pipeline = Pipeline[DataPack]()
nlp.set_reader(StringReader())
nlp.add(NLTKSentenceSegmenter())
nlp.add(MultiPackBoxer(), config={"pack_name": "source"})
nlp.add(OnlineMachineTranslationMPProcessor(), config={
    "from_lang" : 'en',
    "to_lang":  'de',
    "endpoint" : 'https://api.cognitive.microsofttranslator.com/',
    "path" : '/translate',
    "subscription_key": api_key,
    "subscription_region" : "westus2",
    'X-ClientTraceId': str(uuid.uuid4())
})
nlp.initialize()
for multipack in nlp.process_dataset([input_string]):
    print("Source Text: " + multipack.get_pack("source").text)
    print("\nTarget Text: " + multipack.get_pack("target").text)

Save the whole pipeline with save()

Overview

  • How to read data from source

  • How to process data in pipeline

  • How to handle new practical requests

    • How to handle structures like HTML data

    • How to replace the translation model with remote translation services

    • How to save and load the pipeline

In this section, you will learn * How to export and import a Forte pipeline

Forte also allow us to save the pipeline into disk. It serializes the whole pipeline and generates an intermediate representation, which can be loaded later maybe on a different machine.

[ ]:
import os
save_path: str = os.path.join(os.path.dirname(os.path.abspath('')), "pipeline.yml")
nlp.save(save_path)

with open(save_path, 'r') as f:
    print(f.read())

Now that the pipeline is saved, we can try to re-load the pipeline to see if it still functions as expect.

[ ]:
new_nlp: Pipeline = Pipeline()
new_nlp.init_from_config_path(save_path)
new_nlp.initialize()
for multipack in new_nlp.process_dataset([input_string]):
    print("Source Text: " + multipack.get_pack("source").text)
    print("\nTarget Text: " + multipack.get_pack("target").text)