Module gatenlp.lib_stanza
Support for using stanford stanza (see https://stanfordnlp.github.io/stanza/): convert from stanford Stanza output to gatenlp documents and annotations.
Expand source code
"""
Support for using stanford stanza (see https://stanfordnlp.github.io/stanza/):
convert from stanford Stanza output to gatenlp documents and annotations.
"""
from gatenlp import Span
from gatenlp import Document
from gatenlp import logger
from gatenlp.processing.annotator import Annotator
import stanza
from stanza.models.common.doc import Document as StanzaDocument
# NOTES
# Install stanza models:
# stanza.download('de')
# stanza.download('de', package="gsd")
# stanza.download('de', processors={'ner': 'CoNLL03'})
# Create pipeline:
# nlp = stanza.Pipeline('en', package = 'partut')
# nlp = stanza.Pipeline('it', processors='tokenize,mwt', package='twittiro')
# nlp = stanza.Pipeline('de', processors={ 'tokenize': 'gsd',
# 'pos': 'hdt', 'ner': 'conll03', 'lemma': 'default' })
# Create own
# from stanza.pipeline.processor import Processor,
# register_processor, register_processor_variant
# @register_processor("myprocessor")
# class LowercaseProcessor(Processor):
# _requires = set(['tokenize'])
# _provides = set(['myprocessor'])
# def __init__(self, config, pipeline, use_gpu):
# pass
# def _set_up_model(self, *args):
# pass
# def process(self, doc):
# # process doc
# return doc
# etc
class AnnStanza(Annotator):
""" """
def __init__(
self,
pipeline=None,
outsetname="",
token_type="Token",
mwt_type="MWT",
space_token_type=None,
sentence_type="Sentence",
add_entities=True,
ent_prefix=None,
batchsize=1000,
**kwargs,
):
"""
Create a processing resources for running a stanza pipeline on documents.
Args:
pipeline: if this is specified, use a pre-configured pipeline, otherwise create a pipeline
passing on the kwargs
outsetname: the annotation set name where to put the annotations
token_type: the annotation type for the token annotations
mwt_type: annotation type for multi-word token annotations
space_token_type: annotation type for space tokens. If not None, adds space tokens of this type
for all characters in the document not covered by tokens.
sentence_type: the annotation type for the sentence annotations
add_entities: if true, add entity annotations
ent_prefix: the prefix to add to all entity annotation types
batchsize: for the pipe() method, batches from the input generator are created to speed up processing
with Stanza, this defines the number of documents per batch (default: 1000). Note that Stanza
internally re-batches those batches again, depending on the size of the documents in the sequence.
kwargs: if no preconfigured pipeline is specified, pass these arguments to
the stanza.Pipeline() constructor see https://stanfordnlp.github.io/stanza/pipeline.html#pipeline
use lang= to specify the default pipeline for a language.
"""
self.outsetname = outsetname
self.token_type = token_type
self.sentence_type = sentence_type
self.add_entities = add_entities
self.ent_prefix = ent_prefix
self.mwt_type = mwt_type
self.batchsize = batchsize
self.space_token_type = space_token_type
[
kwargs.pop(a, None)
for a in ["token_type", "sentence_type", "add_entities",
"ent_prefix", "mwt_type", "space_token_type"]
]
if pipeline:
self.pipeline = pipeline
else:
self.pipeline = stanza.Pipeline(**kwargs)
def __call__(self, doc, **kwargs):
stanza_doc = self.pipeline(doc.text)
stanza2gatenlp(
stanza_doc,
doc,
setname=self.outsetname,
token_type=self.token_type,
mwt_type=self.mwt_type,
space_token_type=self.space_token_type,
sentence_type=self.sentence_type,
add_entities=self.add_entities,
ent_prefix=self.ent_prefix,
)
return doc
def _pipe_batch(self, gatenlp_docs, stanza_docs):
stanza_out = self.pipeline(stanza_docs)
assert len(stanza_out) == len(gatenlp_docs)
for doc_stanza, doc in zip(stanza_out, gatenlp_docs):
try:
stanza2gatenlp(doc_stanza, doc,
setname=self.outsetname,
token_type=self.token_type,
mwt_type=self.mwt_type,
space_token_type=self.space_token_type,
sentence_type=self.sentence_type,
add_entities=self.add_entities,
ent_prefix=self.ent_prefix)
yield doc
except:
# TODO: this should be configurable: should we terminate, log, silently return None, silently
# return the unprocessed document?
yield None
def pipe(self, documents, **kwargs):
docs = []
stanza_in = []
idx = 0
for doc in documents:
docs.append(doc)
stanza_in.append(StanzaDocument([], text=doc.text))
idx += 1
if idx >= self.batchsize:
yield from self._pipe_batch(docs, stanza_in)
idx = 0
docs = []
stanza_in = []
if len(docs) > 0:
yield from self._pipe_batch(docs, stanza_in)
def apply_stanza(nlp, gatenlpdoc, setname=""):
"""Run the stanford stanza pipeline on the gatenlp document and transfer the annotations.
This modifies the gatenlp document in place.
Args:
nlp: StanfordNLP pipeline
gatenlpdoc: gatenlp document
setname: set to use (Default value = "")
Returns:
"""
doc = nlp(gatenlpdoc.text)
return stanza2gatenlp(doc, gatenlpdoc=gatenlpdoc, setname=setname)
def tok2tok(tok):
"""
Create a copy of a Stanza token, prepared for creating an annotation: this is a dict that has
start, end and id keys and everything else in a nested dict "fm".
Args:
tok: original stanza token
Returns:
what we use to create a Token annotation
"""
newtok = {}
fm = {}
newtok["fm"] = fm
for k, v in tok.items():
if k == "start_char":
newtok["start"] = v
elif k == "end_char":
newtok["end"] = v
elif k == "feats":
for feat in v.split("|"):
k, v = feat.split("=")
fm[k] = v
elif k == "id":
newtok[k] = v
elif k == "misc":
msettings = v.split("|")
ostart = None
oend = None
othersettings = []
for ms in msettings:
k, v = ms.split("=")
if k == "start_char":
ostart = int(v)
elif k == "end_char":
oend = int(v)
else:
othersettings.append(ms)
if ostart is not None:
newtok["start"] = ostart
if oend is not None:
newtok["end"] = oend
if othersettings:
for os in othersettings:
k, v = ms.split("=")
fm[k] = v
else:
fm[k] = v
return newtok
def stanza2gatenlp(
stanzadoc,
gatenlpdoc=None,
setname="",
token_type="Token",
mwt_type="MWT",
space_token_type=None,
sentence_type="Sentence",
add_entities=True,
ent_prefix=None,
):
"""
Convert a Stanford Stanza document to a gatenlp document. If a gatenlp document is already
provided, add the annotations from the Stanford Stanza document to it. In this case the
original gatenlpdoc is used and gets modified.
Args:
stanzadoc: a Stanford Stanza document
gatenlpdoc: if None, a new gatenlp document is created otherwise this
document is added to. (Default value = None)
setname: the annotation set name to which the annotations get added, empty string
for the default annotation set.
token_type: the annotation type to use for tokens, if needed (Default value = "Token")
mwt_type: annotation type for multi-word token annotations
space_token_type: annotation type for space tokens. If not None, adds space tokens of this type
for all characters in the document not covered by tokens.
sentence_type: the annotation type to use for sentence anntoations (Default value = "Sentence")
add_entities: if True, add any entities as well (Default value = True)
ent_prefix: if None, use the original entity type as annotation type, otherwise add the given string
to the annotation type as a prefix. (Default value = None)
Returns:
the new or modified gatenlp document
"""
if gatenlpdoc is None:
retdoc = Document(stanzadoc.text)
else:
retdoc = gatenlpdoc
annset = retdoc.annset(setname)
# stanford nlp processes text in sentence chunks, so we do everything per sentence
prev_end = 0
for sent in stanzadoc.sentences:
# go through the tokens: in stanza, each token is a list of dicts, normally there is one dict
# which also has the offset information in "misc", but for multiword tokens, there seems to be
# one "header" dict for the range of words which has the offset info and NER label and then
# one additional element per word which has all the rest.
# For our purposes we create a list of dicts where for normal tokens we just copy the element, but for
# multiword tokens we copy over something that has fake offsets and all the features
newtokens = []
mwtokens = []
for t in sent.tokens:
t = t.to_dict()
if len(t) == 1:
# normal token
newtokens.append(tok2tok(t[0]))
else:
# multiword token
tokinfo = tok2tok(t[0]) # a dict with field "id" that contains a list of indices of words
words = t[1:] # the rest of the list is words
fm = tokinfo.get("fm")
ner = fm.get("ner")
text = fm.get("text")
start = tokinfo["start"]
end = tokinfo["end"]
mwtokens.append(dict(start=start, end=end, ids=t[0]["id"]))
# create the spans for the annotations
spans = Span.squeeze(start, end, len(words))
for i, w in enumerate(words):
tok = tok2tok(w)
tok["fm"]["ner"] = ner
tok["fm"]["token_text"] = text
span = spans[i]
tok["start"] = span.start
tok["end"] = span.end
newtokens.append(tok)
# print(f"\n!!!!!!DEBUG: newtokens={newtokens}")
# now go through the new token list and create annotations
idx2annid = {} # map stanza word id to annotation id
starts = []
ends = []
# offset of any previous token ann, used to insert space tokens between token annotations
for t in newtokens:
start = t["start"]
end = t["end"]
stanzaid = t["id"]
starts.append(start)
ends.append(end)
if space_token_type is not None and prev_end < start:
annset.add(prev_end, start, space_token_type)
annid = annset.add(start, end, token_type, features=t["fm"]).id
prev_end = end
idx2annid[str(stanzaid)] = annid
for mwtinfo in mwtokens:
annids = [idx2annid[str(sid)] for sid in mwtinfo["ids"]]
annset.add(mwtinfo["start"], mwtinfo["end"], mwt_type, dict(word_ids=annids))
# print(f"\n!!!!!!DEBUG: idx2annid={idx2annid}")
# create a sentence annotation from beginning of first word to end of last
sentid = annset.add(starts[0], ends[-1], sentence_type).id
# now replace the head index with the corresponding annid, the head index "0" is
# mapped to the sentence annotation
idx2annid["0"] = sentid
for annid in list(idx2annid.values()):
ann = annset.get(annid)
hd = ann.features.get("head")
if hd is not None:
head_id = idx2annid.get(str(hd))
if head_id is None:
logger.error(f"Could not find head id: {hd} for {ann} in document {gatenlpdoc.name}")
else:
ann.features["head"] = head_id
# if necessary add a final space token
if space_token_type is not None and prev_end < len(retdoc.text):
annset.add(prev_end, len(retdoc.text), space_token_type)
# add the entities
if add_entities:
for e in stanzadoc.entities:
if ent_prefix:
anntype = ent_prefix + e.type
else:
anntype = e.type
annset.add(e.start_char, e.end_char, anntype)
return retdoc
Functions
def apply_stanza(nlp, gatenlpdoc, setname='')
-
Run the stanford stanza pipeline on the gatenlp document and transfer the annotations. This modifies the gatenlp document in place.
Args
nlp
- StanfordNLP pipeline
gatenlpdoc
- gatenlp document
setname
- set to use (Default value = "")
Returns:
Expand source code
def apply_stanza(nlp, gatenlpdoc, setname=""): """Run the stanford stanza pipeline on the gatenlp document and transfer the annotations. This modifies the gatenlp document in place. Args: nlp: StanfordNLP pipeline gatenlpdoc: gatenlp document setname: set to use (Default value = "") Returns: """ doc = nlp(gatenlpdoc.text) return stanza2gatenlp(doc, gatenlpdoc=gatenlpdoc, setname=setname)
def stanza2gatenlp(stanzadoc, gatenlpdoc=None, setname='', token_type='Token', mwt_type='MWT', space_token_type=None, sentence_type='Sentence', add_entities=True, ent_prefix=None)
-
Convert a Stanford Stanza document to a gatenlp document. If a gatenlp document is already provided, add the annotations from the Stanford Stanza document to it. In this case the original gatenlpdoc is used and gets modified.
Args
stanzadoc
- a Stanford Stanza document
gatenlpdoc
- if None, a new gatenlp document is created otherwise this document is added to. (Default value = None)
setname
- the annotation set name to which the annotations get added, empty string for the default annotation set.
token_type
- the annotation type to use for tokens, if needed (Default value = "Token")
mwt_type
- annotation type for multi-word token annotations
space_token_type
- annotation type for space tokens. If not None, adds space tokens of this type for all characters in the document not covered by tokens.
sentence_type
- the annotation type to use for sentence anntoations (Default value = "Sentence")
add_entities
- if True, add any entities as well (Default value = True)
ent_prefix
- if None, use the original entity type as annotation type, otherwise add the given string to the annotation type as a prefix. (Default value = None)
Returns
the new or modified gatenlp document
Expand source code
def stanza2gatenlp( stanzadoc, gatenlpdoc=None, setname="", token_type="Token", mwt_type="MWT", space_token_type=None, sentence_type="Sentence", add_entities=True, ent_prefix=None, ): """ Convert a Stanford Stanza document to a gatenlp document. If a gatenlp document is already provided, add the annotations from the Stanford Stanza document to it. In this case the original gatenlpdoc is used and gets modified. Args: stanzadoc: a Stanford Stanza document gatenlpdoc: if None, a new gatenlp document is created otherwise this document is added to. (Default value = None) setname: the annotation set name to which the annotations get added, empty string for the default annotation set. token_type: the annotation type to use for tokens, if needed (Default value = "Token") mwt_type: annotation type for multi-word token annotations space_token_type: annotation type for space tokens. If not None, adds space tokens of this type for all characters in the document not covered by tokens. sentence_type: the annotation type to use for sentence anntoations (Default value = "Sentence") add_entities: if True, add any entities as well (Default value = True) ent_prefix: if None, use the original entity type as annotation type, otherwise add the given string to the annotation type as a prefix. (Default value = None) Returns: the new or modified gatenlp document """ if gatenlpdoc is None: retdoc = Document(stanzadoc.text) else: retdoc = gatenlpdoc annset = retdoc.annset(setname) # stanford nlp processes text in sentence chunks, so we do everything per sentence prev_end = 0 for sent in stanzadoc.sentences: # go through the tokens: in stanza, each token is a list of dicts, normally there is one dict # which also has the offset information in "misc", but for multiword tokens, there seems to be # one "header" dict for the range of words which has the offset info and NER label and then # one additional element per word which has all the rest. # For our purposes we create a list of dicts where for normal tokens we just copy the element, but for # multiword tokens we copy over something that has fake offsets and all the features newtokens = [] mwtokens = [] for t in sent.tokens: t = t.to_dict() if len(t) == 1: # normal token newtokens.append(tok2tok(t[0])) else: # multiword token tokinfo = tok2tok(t[0]) # a dict with field "id" that contains a list of indices of words words = t[1:] # the rest of the list is words fm = tokinfo.get("fm") ner = fm.get("ner") text = fm.get("text") start = tokinfo["start"] end = tokinfo["end"] mwtokens.append(dict(start=start, end=end, ids=t[0]["id"])) # create the spans for the annotations spans = Span.squeeze(start, end, len(words)) for i, w in enumerate(words): tok = tok2tok(w) tok["fm"]["ner"] = ner tok["fm"]["token_text"] = text span = spans[i] tok["start"] = span.start tok["end"] = span.end newtokens.append(tok) # print(f"\n!!!!!!DEBUG: newtokens={newtokens}") # now go through the new token list and create annotations idx2annid = {} # map stanza word id to annotation id starts = [] ends = [] # offset of any previous token ann, used to insert space tokens between token annotations for t in newtokens: start = t["start"] end = t["end"] stanzaid = t["id"] starts.append(start) ends.append(end) if space_token_type is not None and prev_end < start: annset.add(prev_end, start, space_token_type) annid = annset.add(start, end, token_type, features=t["fm"]).id prev_end = end idx2annid[str(stanzaid)] = annid for mwtinfo in mwtokens: annids = [idx2annid[str(sid)] for sid in mwtinfo["ids"]] annset.add(mwtinfo["start"], mwtinfo["end"], mwt_type, dict(word_ids=annids)) # print(f"\n!!!!!!DEBUG: idx2annid={idx2annid}") # create a sentence annotation from beginning of first word to end of last sentid = annset.add(starts[0], ends[-1], sentence_type).id # now replace the head index with the corresponding annid, the head index "0" is # mapped to the sentence annotation idx2annid["0"] = sentid for annid in list(idx2annid.values()): ann = annset.get(annid) hd = ann.features.get("head") if hd is not None: head_id = idx2annid.get(str(hd)) if head_id is None: logger.error(f"Could not find head id: {hd} for {ann} in document {gatenlpdoc.name}") else: ann.features["head"] = head_id # if necessary add a final space token if space_token_type is not None and prev_end < len(retdoc.text): annset.add(prev_end, len(retdoc.text), space_token_type) # add the entities if add_entities: for e in stanzadoc.entities: if ent_prefix: anntype = ent_prefix + e.type else: anntype = e.type annset.add(e.start_char, e.end_char, anntype) return retdoc
def tok2tok(tok)
-
Create a copy of a Stanza token, prepared for creating an annotation: this is a dict that has start, end and id keys and everything else in a nested dict "fm".
Args
tok
- original stanza token
Returns
what we use to create a Token annotation
Expand source code
def tok2tok(tok): """ Create a copy of a Stanza token, prepared for creating an annotation: this is a dict that has start, end and id keys and everything else in a nested dict "fm". Args: tok: original stanza token Returns: what we use to create a Token annotation """ newtok = {} fm = {} newtok["fm"] = fm for k, v in tok.items(): if k == "start_char": newtok["start"] = v elif k == "end_char": newtok["end"] = v elif k == "feats": for feat in v.split("|"): k, v = feat.split("=") fm[k] = v elif k == "id": newtok[k] = v elif k == "misc": msettings = v.split("|") ostart = None oend = None othersettings = [] for ms in msettings: k, v = ms.split("=") if k == "start_char": ostart = int(v) elif k == "end_char": oend = int(v) else: othersettings.append(ms) if ostart is not None: newtok["start"] = ostart if oend is not None: newtok["end"] = oend if othersettings: for os in othersettings: k, v = ms.split("=") fm[k] = v else: fm[k] = v return newtok
Classes
class AnnStanza (pipeline=None, outsetname='', token_type='Token', mwt_type='MWT', space_token_type=None, sentence_type='Sentence', add_entities=True, ent_prefix=None, batchsize=1000, **kwargs)
-
Create a processing resources for running a stanza pipeline on documents.
Args
pipeline
- if this is specified, use a pre-configured pipeline, otherwise create a pipeline passing on the kwargs
outsetname
- the annotation set name where to put the annotations
token_type
- the annotation type for the token annotations
mwt_type
- annotation type for multi-word token annotations
space_token_type
- annotation type for space tokens. If not None, adds space tokens of this type for all characters in the document not covered by tokens.
sentence_type
- the annotation type for the sentence annotations
add_entities
- if true, add entity annotations
ent_prefix
- the prefix to add to all entity annotation types
batchsize
- for the pipe() method, batches from the input generator are created to speed up processing with Stanza, this defines the number of documents per batch (default: 1000). Note that Stanza internally re-batches those batches again, depending on the size of the documents in the sequence.
kwargs
- if no preconfigured pipeline is specified, pass these arguments to the stanza.Pipeline() constructor see https://stanfordnlp.github.io/stanza/pipeline.html#pipeline use lang= to specify the default pipeline for a language.
Expand source code
class AnnStanza(Annotator): """ """ def __init__( self, pipeline=None, outsetname="", token_type="Token", mwt_type="MWT", space_token_type=None, sentence_type="Sentence", add_entities=True, ent_prefix=None, batchsize=1000, **kwargs, ): """ Create a processing resources for running a stanza pipeline on documents. Args: pipeline: if this is specified, use a pre-configured pipeline, otherwise create a pipeline passing on the kwargs outsetname: the annotation set name where to put the annotations token_type: the annotation type for the token annotations mwt_type: annotation type for multi-word token annotations space_token_type: annotation type for space tokens. If not None, adds space tokens of this type for all characters in the document not covered by tokens. sentence_type: the annotation type for the sentence annotations add_entities: if true, add entity annotations ent_prefix: the prefix to add to all entity annotation types batchsize: for the pipe() method, batches from the input generator are created to speed up processing with Stanza, this defines the number of documents per batch (default: 1000). Note that Stanza internally re-batches those batches again, depending on the size of the documents in the sequence. kwargs: if no preconfigured pipeline is specified, pass these arguments to the stanza.Pipeline() constructor see https://stanfordnlp.github.io/stanza/pipeline.html#pipeline use lang= to specify the default pipeline for a language. """ self.outsetname = outsetname self.token_type = token_type self.sentence_type = sentence_type self.add_entities = add_entities self.ent_prefix = ent_prefix self.mwt_type = mwt_type self.batchsize = batchsize self.space_token_type = space_token_type [ kwargs.pop(a, None) for a in ["token_type", "sentence_type", "add_entities", "ent_prefix", "mwt_type", "space_token_type"] ] if pipeline: self.pipeline = pipeline else: self.pipeline = stanza.Pipeline(**kwargs) def __call__(self, doc, **kwargs): stanza_doc = self.pipeline(doc.text) stanza2gatenlp( stanza_doc, doc, setname=self.outsetname, token_type=self.token_type, mwt_type=self.mwt_type, space_token_type=self.space_token_type, sentence_type=self.sentence_type, add_entities=self.add_entities, ent_prefix=self.ent_prefix, ) return doc def _pipe_batch(self, gatenlp_docs, stanza_docs): stanza_out = self.pipeline(stanza_docs) assert len(stanza_out) == len(gatenlp_docs) for doc_stanza, doc in zip(stanza_out, gatenlp_docs): try: stanza2gatenlp(doc_stanza, doc, setname=self.outsetname, token_type=self.token_type, mwt_type=self.mwt_type, space_token_type=self.space_token_type, sentence_type=self.sentence_type, add_entities=self.add_entities, ent_prefix=self.ent_prefix) yield doc except: # TODO: this should be configurable: should we terminate, log, silently return None, silently # return the unprocessed document? yield None def pipe(self, documents, **kwargs): docs = [] stanza_in = [] idx = 0 for doc in documents: docs.append(doc) stanza_in.append(StanzaDocument([], text=doc.text)) idx += 1 if idx >= self.batchsize: yield from self._pipe_batch(docs, stanza_in) idx = 0 docs = [] stanza_in = [] if len(docs) > 0: yield from self._pipe_batch(docs, stanza_in)
Ancestors
- Annotator
- abc.ABC
Inherited members