# Copyright 2019 The Forte Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=logging-fstring-interpolation
import logging
import os
from typing import Dict, List, Optional, Tuple
import numpy as np
from forte.common import ProcessorConfigError, ResourceError
from forte.utils import create_import_error_msg
from forte.common.configuration import Config
from forte.common.resources import Resources
from forte.data.data_pack import DataPack
from forte.data.ontology import Annotation
from forte.models.ner import utils
from forte.models.ner.model_factory import BiRecurrentConvCRF
from forte.processors.base.batch_processor import RequestPackingProcessor
from ft.onto.base_ontology import Token, EntityMention
try:
import torch
except ImportError as e:
raise ImportError(
create_import_error_msg("torch", "models", "ner predictor")
) from e
logger = logging.getLogger(__name__)
__all__ = [
"CoNLLNERPredictor",
]
[docs]class CoNLLNERPredictor(RequestPackingProcessor):
"""
An Named Entity Recognizer trained according to `Ma, Xuezhe, and Eduard
Hovy. "End-to-end sequence labeling via bi-directional lstm-cnns-crf."
<https://arxiv.org/abs/1603.01354>`_.
Note that to use :class:`CoNLLNERPredictor`, the :attr:`ontology` of
:class:`~forte.pipeline.Pipeline` must be an ontology that include
:class:`ft.onto.base_ontology.Token` and :class:`ft.onto.base_ontology.Sentence`.
"""
def __init__(self):
super().__init__()
self.model = None
self.word_alphabet, self.char_alphabet, self.ner_alphabet = (
None,
None,
None,
)
self.resource = None
self.config_model = None
self.config_data = None
self.normalize_func = None
self.device = None
[docs] def initialize(self, resources: Resources, configs: Config):
super().initialize(resources, configs)
self.resource = resources
self.config_model = configs.config_model
self.config_data = configs.config_data
resource_path = configs.config_model.resource_dir
keys = {
"word_alphabet",
"char_alphabet",
"ner_alphabet",
"word_embedding_table",
}
missing_keys = list(keys.difference(self.resource.keys()))
self.resource.load(keys=missing_keys, path=resource_path)
self.word_alphabet = resources.get("word_alphabet")
self.char_alphabet = resources.get("char_alphabet")
self.ner_alphabet = resources.get("ner_alphabet")
word_embedding_table = resources.get("word_embedding_table")
if resources.get("device"):
self.device = resources.get("device")
else:
self.device = (
torch.device("cuda")
if torch.cuda.is_available()
else torch.device("cpu")
)
self.normalize_func = utils.normalize_digit_word
if "model" not in self.resource.keys():
def load_model(path):
if (
self.word_alphabet is None
or self.char_alphabet is None
or self.ner_alphabet is None
):
raise ResourceError(
"Error when configuring the predictor, alphabets "
"loaded from the resources are not initialized."
)
model = BiRecurrentConvCRF(
word_embedding_table,
self.char_alphabet.size(),
self.ner_alphabet.size(),
self.config_model,
)
if os.path.exists(path):
with open(path, "rb") as f:
weights = torch.load(f, map_location=self.device)
model.load_state_dict(weights)
return model
self.resource.load(keys={"model": load_model}, path=resource_path)
self.model = resources.get("model")
self.model.to(self.device)
self.model.eval()
utils.set_random_seed(self.config_model.random_seed)
[docs] @torch.no_grad()
def predict(
self, data_batch: Dict[str, Dict[str, List[str]]]
) -> Dict[str, Dict[str, List[np.ndarray]]]:
if self.config_data is None:
raise ProcessorConfigError(
"Data configuration for the predictor is not found."
)
if self.model is None:
raise ProcessorConfigError("Model for the predictor is not set.")
if self.normalize_func is None:
raise ProcessorConfigError(
"The normalizing function for the predictor is not set."
)
if (
self.word_alphabet is None
or self.ner_alphabet is None
or self.word_alphabet is None
or self.char_alphabet is None
):
raise ProcessorConfigError(
"Error when configuring the predictor, alphabets are not initialized."
)
tokens = data_batch["Token"]
instances = []
for words in tokens["text"]:
char_id_seqs = []
word_ids = []
for word in words:
char_ids = []
for char in word:
char_ids.append(self.char_alphabet.get_index(char))
if len(char_ids) > self.config_data.max_char_length:
char_ids = char_ids[: self.config_data.max_char_length]
char_id_seqs.append(char_ids)
word = self.normalize_func(word)
word_ids.append(self.word_alphabet.get_index(word))
instances.append((word_ids, char_id_seqs))
self.model.eval()
batch_data = self.get_batch_tensor(instances, device=self.device)
word, char, masks, unused_lengths = batch_data
preds = self.model.decode(word, char, mask=masks)
pred: Dict = {"Token": {"ner": [], "tid": []}}
for i in range(len(tokens["tid"])):
tids = tokens["tid"][i]
ner_tags = []
for j in range(len(tids)):
ner_tags.append(self.ner_alphabet.get_instance(preds[i][j]))
pred["Token"]["ner"].append(np.array(ner_tags))
pred["Token"]["tid"].append(np.array(tids))
return pred
def load_model_checkpoint(self, model_path=None):
if self.config_model is None:
raise ProcessorConfigError(
"Model configuration for the predictor is not found."
)
if self.model is None:
raise ProcessorConfigError("Model is not set for the predictor.")
p = (
model_path
if model_path is not None
else self.config_model.model_path
)
ckpt = torch.load(p, map_location=self.device)
logger.info(f"Restoring NER model from {self.config_model.model_path}")
self.model.load_state_dict(ckpt["model"])
[docs] def pack(
self,
pack: DataPack,
predict_results: Dict[str, Dict[str, List[str]]],
_: Optional[Annotation] = None,
):
"""
Write the prediction results back to datapack. by writing the predicted
ner to the original tokens.
"""
if predict_results is None:
return
current_entity_mention: Tuple[int, str] = (-1, "None")
for i in range(len(predict_results["Token"]["tid"])):
# an instance
for j in range(len(predict_results["Token"]["tid"][i])):
tid: int = predict_results["Token"]["tid"][i][j] # type: ignore
orig_token: Token = pack.get_entry(tid) # type: ignore
ner_tag: str = predict_results["Token"]["ner"][i][j]
orig_token.ner = ner_tag
token = orig_token
token_ner = token.ner
assert isinstance(token_ner, str)
if token_ner[0] == "B":
current_entity_mention = (token.begin, token_ner[2:])
elif token_ner[0] == "I":
continue
elif token_ner[0] == "O":
continue
elif token_ner[0] == "E":
if token_ner[2:] != current_entity_mention[1]:
continue
entity = EntityMention(
pack, current_entity_mention[0], token.end
)
entity.ner_type = current_entity_mention[1]
elif token_ner[0] == "S":
current_entity_mention = (token.begin, token_ner[2:])
entity = EntityMention(
pack, current_entity_mention[0], token.end
)
entity.ner_type = current_entity_mention[1]
[docs] def get_batch_tensor(
self,
data: List[Tuple[List[int], List[List[int]]]],
device: Optional[torch.device] = None,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""Get the tensors to be fed into the model.
Args:
data: A list of tuple (word_ids, char_id_sequences)
device: The device for the tensors.
Returns:
A tuple where
- ``words``: A tensor of shape `[batch_size, batch_length]`
representing the word ids in the batch
- ``chars``: A tensor of shape
`[batch_size, batch_length, char_length]` representing the char
ids for each word in the batch
- ``masks``: A tensor of shape `[batch_size, batch_length]`
representing the indices to be masked in the batch. 1 indicates
no masking.
- ``lengths``: A tensor of shape `[batch_size]` representing the
length of each sentences in the batch
"""
if self.config_data is None:
raise ProcessorConfigError(
"Data configuration for the predictor is not found."
)
if self.config_model is None:
raise ProcessorConfigError(
"Model configuration for the predictor is not found."
)
if (
self.word_alphabet is None
or self.ner_alphabet is None
or self.word_alphabet is None
or self.char_alphabet is None
):
raise ProcessorConfigError(
"Error when configuring the predictor, alphabets are not initialized."
)
batch_size = len(data)
batch_length = max(len(d[0]) for d in data)
char_length = max(max(len(charseq) for charseq in d[1]) for d in data)
char_length = min(
self.config_data.max_char_length,
char_length + self.config_data.num_char_pad,
)
wid_inputs = np.empty([batch_size, batch_length], dtype=np.int64)
cid_inputs = np.empty(
[batch_size, batch_length, char_length], dtype=np.int64
)
masks = np.zeros([batch_size, batch_length], dtype=np.float32)
lengths = np.empty(batch_size, dtype=np.int64)
for i, inst in enumerate(data):
wids, cid_seqs = inst
inst_size = len(wids)
lengths[i] = inst_size
# word ids
wid_inputs[i, :inst_size] = wids
wid_inputs[i, inst_size:] = self.word_alphabet.pad_id
for c, cids in enumerate(cid_seqs):
cid_inputs[i, c, : len(cids)] = cids
cid_inputs[i, c, len(cids) :] = self.char_alphabet.pad_id
cid_inputs[i, inst_size:, :] = self.char_alphabet.pad_id
masks[i, :inst_size] = 1.0
words = torch.from_numpy(wid_inputs).to(device)
chars = torch.from_numpy(cid_inputs).to(device)
masks = torch.from_numpy(masks).to(device)
lengths = torch.from_numpy(lengths).to(device)
return words, chars, masks, lengths
# TODO: change this to manageable size
[docs] @classmethod
def default_configs(cls):
r"""Default config for NER Predictor"""
return {
"config_data": {
"train_path": "",
"val_path": "",
"test_path": "",
"num_epochs": 200,
"batch_size_tokens": 512,
"test_batch_size": 16,
"max_char_length": 45,
"num_char_pad": 2,
},
"config_model": {
"output_hidden_size": 128,
"dropout_rate": 0.3,
"word_emb": {"dim": 100},
"char_emb": {"dim": 30, "initializer": {"type": "normal_"}},
"char_cnn_conv": {
"in_channels": 30,
"out_channels": 30,
"kernel_size": 3,
"padding": 2,
},
"bilstm_sentence_encoder": {
"rnn_cell_fw": {
"input_size": 130,
"type": "LSTMCell",
"kwargs": {"num_units": 128},
},
"rnn_cell_share_config": "yes",
"output_layer_fw": {"num_layers": 0},
"output_layer_share_config": "yes",
},
"learning_rate": 0.01,
"momentum": 0.9,
"decay_interval": 1,
"decay_rate": 0.05,
"random_seed": 1234,
"initializer": {"type": "xavier_uniform_"},
"model_path": "",
"resource_dir": "",
},
"batcher": {
"batch_size": 16,
"context_type": "ft.onto.base_ontology.Sentence",
"requests": {
"ft.onto.base_ontology.Token": [],
"ft.onto.base_ontology.Sentence": [],
},
},
}