# Copyright 2019 The Forte Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The reader that reads CoNLL-U Standard Universal Dependency Format -
https://universaldependencies.org/docs/format.html
into data_pack format
"""
from typing import Iterator, Dict, Tuple, Any
from ft.onto.base_ontology import (
Document,
Sentence,
Token,
Dependency,
EnhancedDependency,
)
from forte.data.data_utils_io import dataset_path_iterator
from forte.data.data_pack import DataPack
from forte.data.base_reader import PackReader
__all__ = ["ConllUDReader"]
[docs]class ConllUDReader(PackReader):
r""":class:`~forte.data.readers.conllu_ud_reader.ConllUDReader` is designed
to read in the Universal Dependencies
2.4 dataset.
"""
def _cache_key_function(self, data_pack: Any) -> str:
if data_pack.pack_name is None:
raise ValueError("data_pack does not have a document id")
return data_pack.pack_name
def _collect(self, *args, **kwargs) -> Iterator[Any]:
# pylint: disable = unused-argument
r"""Iterator over conll files in the data_source.
Args:
args: args[0] is the directory to the conllu files.
kwargs:
Returns: data packs obtained from each document from each conllu file.
"""
conll_dir_path = args[0]
file_paths = dataset_path_iterator(conll_dir_path, "conllu")
for file_path in file_paths:
with open(file_path, "r", encoding="utf8") as file:
lines = file.readlines()
doc_lines = []
for i, line in enumerate(lines):
# previous document ends
doc_lines.append(line)
if i == len(lines) - 1 or lines[i + 1].strip().startswith(
"# newdoc"
):
yield doc_lines
doc_lines = []
def _parse_pack(self, doc_lines) -> Iterator[DataPack]:
token_comp_fields = [
"id",
"form",
"lemma",
"pos",
"ud_xpos",
"ud_features",
"head",
"label",
"enhanced_dependency_relations",
"ud_misc",
]
token_multi_fields = [
"ud_features",
"ud_misc",
"enhanced_dependency_relations",
]
token_feature_fields = ["ud_features", "ud_misc"]
data_pack: DataPack = DataPack()
doc_sent_begin: int = 0
doc_num_sent: int = 0
doc_text: str = ""
doc_offset: int = 0
doc_id: str
sent_text: str
sent_tokens: Dict[str, Tuple[Dict[str, Any], Token]] = {}
for line in doc_lines:
line = line.strip()
line_comps = line.split()
if line.startswith("# newdoc"):
doc_id = line.split("=")[1].strip()
elif line.startswith("# sent"):
sent_text = ""
elif len(line_comps) > 0 and line_comps[0].strip().isdigit():
# token
token_comps: Dict[str, Any] = {}
for index, key in enumerate(token_comp_fields):
token_comps[key] = str(line_comps[index])
if key in token_multi_fields:
values = (
str(token_comps[key]).split("|")
if token_comps[key] != "_"
else []
)
if key not in token_feature_fields:
token_comps[key] = values
else:
feature_lst = [
elem.split("=", 1) for elem in values
]
feature_dict = {
elem[0]: elem[1] for elem in feature_lst
}
token_comps[key] = feature_dict
word: str = token_comps["form"]
word_begin = doc_offset
word_end = doc_offset + len(word)
# add token
token: Token = Token(data_pack, word_begin, word_end)
token.lemma = token_comps["lemma"]
token.pos = token_comps["pos"]
token.ud_xpos = token_comps["ud_xpos"]
token.ud_features = token_comps["ud_features"]
token.ud_misc = token_comps["ud_misc"]
sent_tokens[str(token_comps["id"])] = (token_comps, token)
sent_text += word + " "
doc_offset = word_end + 1
elif line == "":
# sentence ends
sent_text = sent_text.strip()
doc_text += " " + sent_text
# add dependencies for a sentence when all the tokens have been
# added
for _, (token_comps, token) in sent_tokens.items():
# add primary dependency
label = token_comps["label"]
if label == "root":
token.is_root = True
else:
token.is_root = False
head = sent_tokens[token_comps["head"]][1]
dependency = Dependency(data_pack, head, token)
dependency.dep_label = label
# add enhanced dependencies
for dep in token_comps["enhanced_dependency_relations"]:
head_id, label = dep.split(":", 1)
if label != "root":
head = sent_tokens[head_id][1]
enhanced_dependency = EnhancedDependency(
data_pack, head, token
)
enhanced_dependency.dep_label = label
# add sentence
Sentence(data_pack, doc_sent_begin, doc_offset - 1)
doc_sent_begin = doc_offset
doc_num_sent += 1
doc_text = doc_text.strip()
data_pack.set_text(doc_text)
# add doc to data_pack
Document(data_pack, 0, len(doc_text))
data_pack.pack_name = doc_id
yield data_pack