Module gatenlp.processing.gazetteer.tokengazetteer
This module provides Gazetteer classes which allow matching the text or the tokens of documents against gazetteer lists, lists of interesting texts or token sequences and annotate the matches with features from the gazetteer lists.
Expand source code
"""
This module provides Gazetteer classes which allow matching the text or the tokens of documents against
gazetteer lists, lists of interesting texts or token sequences and annotate the matches with features from the
gazetteer lists.
"""
import os
from typing import Union, Dict, Optional, Callable, List, Any
from collections import defaultdict
from dataclasses import dataclass
from gatenlp.document import Document, Annotation
from gatenlp.utils import init_logger
from gatenlp.processing.annotator import Annotator
from gatenlp.processing.gazetteer.base import GazetteerBase
# TODO: better handling/support for separator annotations: this would add complexity but allow that a sequence
# of annotations is only matched if there is a/several? separator annotation between each of those annotations.
# Could also require this only if there is a separator in the gazetteer sequence (e.g. indicated by a None element)
# NOTE: slots=True is supported from 3.10 only
@dataclass()
class TokenGazetteerMatch:
"""
Represent a token gazetteer match
"""
start: int
end: int
match: list
data: Any
listidx: int
class TokenGazetteerNode:
"""
Represent an entry in the hash map of entry first tokens.
If is_match is True, that token is already a match and data contains the entry data.
The continuations attribute contains None or a list of multi token matches that
start with the first token and the entry data if we have a match (all tokens match).
"""
__slots__ = ("is_match", "data", "nodes", "listidx")
def __init__(self, is_match=None, data=None, nodes=None, listidx=None):
"""
Args:
is_match: this node is a match
data: data associated with the match, can be a list of data items
nodes:
"""
self.is_match = is_match
self.data = data
self.listidx = listidx
self.nodes = nodes
@staticmethod
def dict_repr(nodes):
if nodes is not None:
return str([(t, n) for t, n in nodes.items()])
def __repr__(self):
nodes = TokenGazetteerNode.dict_repr(self.nodes)
return f"Node(is_match={self.is_match},data={self.data},listidx={self.listidx},nodes={nodes})"
def tokentext_getter(token, doc=None, feature=None):
if feature is not None:
txt = token.features.get(feature)
else:
if doc is None:
raise Exception("No feature given, need doc for gazetteer")
txt = doc[token]
return txt
# TODO: allow output annotation type to be set from the match or from the list!
class TokenGazetteer(GazetteerBase):
def __init__(
self,
source: Union[List, str, None] = None,
source_fmt: str = "gate-def",
source_sep="\t",
source_encoding="UTF-8",
# cache_source=None, # TODO
source_tokenizer: Union[None, Annotator, Callable] = None,
longest_only: bool = False,
skip_longest: bool = False,
outset_name: str = "",
ann_type: str = "Lookup",
annset_name: str = "",
token_type: str = "Token",
feature=None,
split_type: Optional[str] = None,
within_type: Optional[str] = None,
mapfunc: Optional[Callable] = None,
ignorefunc: Optional[Callable] = None,
getterfunc: Optional[Callable] = None,
list_features: Optional[Dict] = None,
list_type: Optional[str] = None,
):
"""
Args:
source: where to load the gazetteer from. What is actually expected here depends on the fmt
parameter. If none, nothing is loaded
source_fmt: defines what is expected as the format and/or content of the source parameter. One of:
* "gate-def" (default): the path to a GATE-style "def" file.
See https://gate.ac.uk/userguide/chap:gazetteers
* "gazlist": a list of tuples or lists where the first element of the tuple/list
is a list of strings and the second element is a dictionary containing the features to assign.
All entries in the list belong to the first gazetteer list which has list features as
specified with the listfeatures parameter and a list type as specified with the listtype parameter.
source_sep: the field separator to use for some source formats (default: tab character)
source_encoding: the encoding to use for some source formats (default: UTF-8)
source_tokenizer: if not None, an annotator, that creates annotations of type "Token" in the default
annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by
splitting on whitespace (as defined by Python str.split())
feature: the feature name to use to get the string for each token. If the corresponding feature
in the token does not exist, is None or is the empty string, the Token is completely ignored.
If the feature parameter is None, use the document string covered by the token.
longest_only: if True, only returns the longest match at each matching position, otherwise returns all
matches.
skip_longest: skip forward over longest match (do not return contained/overlapping matches)
annset_name: the set where the tokens to match should come from
outset_name: the set where the new annotations are added
ann_type: the annotation type of the annotations to create, unless a type is given for the gazetteer
entry or for the gazetteer list.
token_type: the annotation type of the token annotations
split_type: the annotation type of any split annotations which will end any ongoing match
within_type: only matches fully within annotations of this type will be made
mapfunc: a callable that maps the original string extracted for each token to the actual string to use.
ignorefunc: a callable which given the mapped token string decides if the token should be ignored
(not added to the gazetteer list, not considered in the document when matching)
getterfunc: a callable which, given a token annotation, retrieves the string. If there is mapfunc, the
retrieved string is then still run through the mapfunc. The getterfunc must accept the token and
an optional document as parameters.
list_features: a dictionary of features common to the whole list loaded or None.
If what gets loaded specifies
its own list features, this is getting ignored.
list_type: the output annotation type to use for the list, ignored if the input format specifies this
on its own. If the input does not specify this on its own and this is not None, then it takes
precedence over outtype for the data loaded from source.
"""
self.nodes = defaultdict(TokenGazetteerNode)
self.mapfunc = mapfunc
self.ignorefunc = ignorefunc
self.feature = feature
self.annset = annset_name
self.tokentype = token_type
self.splittype = split_type
self.withintype = within_type
self.outset = outset_name
self.outtype = ann_type
self.longest_only = longest_only
self.skip = skip_longest
if getterfunc:
self.getterfunc = getterfunc
else:
self.getterfunc = tokentext_getter
self.listfeatures = []
self.listtypes = []
self.logger = init_logger(__name__)
# self.logger.setLevel(logging.DEBUG)
self.size = 0
if source is not None:
self.append(source, source_fmt=source_fmt,
list_features=list_features, list_type=list_type, source_sep=source_sep,
source_encoding=source_encoding, source_tokenizer=source_tokenizer
)
def append(
self,
source: Union[None, str, List],
source_fmt: str = "gate-def",
source_sep: str = "\t",
source_encoding: str = "UTF-8",
source_tokenizer: Union[None, Annotator, Callable] = None,
source_splitter: Optional[Callable] = None,
list_features: Optional[Dict] = None,
list_type: Optional[str] = None,
):
"""
This method appends more entries to gazetteer.
Args:
source: where to load the gazetteer from. What is actually expected here depends on the fmt
parameter.
source_fmt: defines what is expected as the format and/or content of the source parameter. One of:
* "gate-def" (default): source must be a string, a pathlib Path or a parsed urllib url and
point to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers
* "gazlist": a list of tuples or lists where the first element of the tuple/list
is a list of strings, the second element is a dictionary containing the features to assign and
the third element, if it exists, is the index of an element in the listfeatures array.
source_sep: the field separator to use for some source formats (default: tab character)
source_encoding: the encoding to use for some source formats (default: UTF-8)
source_tokenizer: if not None, an annotator, that creates annotations of type "Token" in the default
annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by
splitting on whitespace (as defined by Python str.split())
source_splitter: if not None and source_tokenizer is None, a callable that takes a string and returns
the tokenstrings to use
list_features: a list of dictionaries containing the features to set for all matches witch have the
list index set, this list gets appended to the existing listfeatures. If what gets appended specifies
its own list features, this is ignored.
list_type: the output annotation type to use for the list that gets appended. If what gets appended
specifies its own list type or list types, this is ignored.
"""
if source_fmt == "gazlist":
if list_features is not None:
self.listfeatures.append(list_features)
else:
self.listfeatures.append({})
if list_type is not None:
self.listtypes.append(list_type)
else:
self.listtypes.append(self.outtype)
listidx = len(self.listfeatures) - 1
for el in source:
entry = el[0]
data = el[1]
self.add(entry, data, listidx=listidx)
elif source_fmt == "gate-def":
if list_features is None:
list_features = {}
if list_type is None:
list_type = self.outtype
with open(source, "rt", encoding=source_encoding) as infp:
for line in infp:
line = line.rstrip("\n\r")
fields = line.split(":")
fields.extend(["", "", "", ""])
listFile = fields[0]
majorType = fields[1]
minorType = fields[2]
languages = fields[3]
anntype = fields[4]
this_listfeatures = list_features.copy()
this_outtype = list_type
if majorType:
this_listfeatures["majorType"] = majorType
if minorType:
this_listfeatures["minorType"] = minorType
if languages:
this_listfeatures["lang"] = languages
if anntype:
this_outtype = anntype
# read in the actual list
listfile = os.path.join(os.path.dirname(source), listFile)
self.logger.debug(f"Reading list file {listfile}")
with open(listfile, "rt", encoding=source_encoding) as inlistfile:
self.listtypes.append(this_outtype)
self.listfeatures.append(this_listfeatures)
linenr = 0
for listline in inlistfile:
linenr += 1
listline = listline.rstrip("\n\r")
fields = listline.split(source_sep)
entry = fields[0]
if source_tokenizer or source_splitter:
if source_tokenizer:
tmpdoc = Document(entry)
tmpdoc = source_tokenizer(tmpdoc) # we MUST reassign here to allow return of a new doc!
tokenanns = list(tmpdoc.annset().with_type("Token"))
if self.getterfunc:
tokenstrings = [
self.getterfunc(a, doc=tmpdoc)
for a in tokenanns
]
else:
tokenstrings = [tmpdoc[a] for a in tokenanns]
else:
tokenstrings = source_splitter(entry)
if self.mapfunc:
tokenstrings = [
self.mapfunc(s) for s in tokenstrings
]
if self.ignorefunc:
tokenstrings = [
s
for s in tokenstrings
if not self.ignorefunc(s)
]
else:
tokenstrings = entry.split() # just split on whitespace
if len(tokenstrings) == 0:
self.logger.warning(
f"File {listfile}, skipping line {linenr}, no tokens left: {listline}"
)
continue
if len(entry) > 1:
feats = {}
for fspec in fields[1:]:
fname, fval = fspec.split("=")
feats[fname] = fval
else:
feats = None
listidx = len(self.listfeatures) - 1
self.add(tokenstrings, feats, listidx=listidx)
else:
raise Exception(f"TokenGazetteer format {source_fmt} not known")
def add(self, entry, data=None, listidx=None):
"""
Add a single gazetteer entry. A gazetteer entry can have no data associated with it at all if both
data and listidx are None. If only list indices are given then an array of those indices is stored
with the entry and data remaines None, if only data is given then an array of data is stored and
listidx remains None. If at some point, both data and a listidx are stored in the same entry, then
both fields are changed to have both a list with the same number of elements corresponding to each
other, with missing data or listidx elements being None.
Args:
entry: a iterable of string or a string for a single element, each element is the string that
represents a token to be matched
data: dictionary of features to add
listidx: the index to list features and a list type to add
"""
if isinstance(entry, str):
entry = [entry]
node = None
i = 0
for token in entry:
if self.mapfunc is not None:
token = self.mapfunc(token)
if self.ignorefunc is not None and self.ignorefunc(token):
continue
if i == 0:
node = self.nodes[token]
else:
if node.nodes is None:
node.nodes = defaultdict(TokenGazetteerNode)
tmpnode = TokenGazetteerNode()
node.nodes[token] = tmpnode
node = tmpnode
else:
node = node.nodes[token]
i += 1
node.is_match = True
self.size += 1
# For now: always store parallel lists of data and listidxs, with None elements if necessary.
if data is not None or listidx is not None:
if node.data is None:
node.data = [data]
node.listidx = [listidx]
else:
node.data.append(data)
node.listidx.append(listidx)
# TODO: code to test and correct: try to save space by only storing parallel lists if
# both data and listindices are actually both non-null and added:
#
# if data is None and listidx is None:
# # nothing to do, return what we have
# return node.data, node.listidx
# # if we have only data and no listidx and there is no listidx
# if data is not None and listidx is None and node.listidx is None:
# if node.data is None:
# node.data = [data]
# else:
# node.data.append(data)
# elif listidx is not None and data is None and node.data is None:
# if node.listidx is None:
# node.listidx = [listidx]
# else:
# node.listidx.append(listidx)
# else:
# # make sure we have parallel lists
# if node.data is None:
# node.data = []
# if node.listidx is None:
# node.listidx = []
# if len(node.data) > len(node.listidx):
# node.listidx.extend([None] * (len(node.data) - len(node.listidx)))
# elif len(node.listidx) > len(node.data):
# node.data.extend([None] * (len(node.listidx) - len(node.data)))
# if listidx:
# node.listidx.append(listidx)
# if data:
# node.data.append(data)
# else:
# node.data.append(None)
# else:
# node.listidx.append(None)
# node.listidx.append(listidx)
def match(self, tokens, doc=None, longest_only=None, idx=0, endidx=None, matchfunc=None):
"""
Try to match at index location idx of the tokens sequence. Returns a list which contains
no elements if no match is found, or
as many elements as matches are found. The element for each match is either a
TokenGazeteerMatch instance if matchfunc is None or whatever matchfunc returns for a match.
Also returns the legngth of the longest match (0 if no match).
Args:
tokens: a list of tokens (must allow to fetch the ith token as tokens[i])
doc: the document to which the tokens belong. Necessary of the underlying text is used
for the tokens.
longest_only: whether to return all matches or just the longest ones. If None, overrides the setting
from init.
idx: the index in tokens where the match must start
endidx: the index in tokens after which no match must end
matchfunc: a function to process each match.
The function is passed the TokenGazetteerMatch and the doc and should return something
that is then added to the result list of matches.
Returns:
A tuple, where the first element is a list of match elements, empty if no matches are found
and the second element is the length of the longest match, 0 if no match.
"""
if endidx is None:
endidx = len(tokens)
assert idx < endidx
if longest_only is None:
longest_only = self.longest_only
token = tokens[idx]
if token.type == self.splittype:
return [], 0
token_string = self.getterfunc(token, doc=doc, feature=self.feature)
if token_string is None:
return [], 0
if self.mapfunc:
token_string = self.mapfunc(token_string)
if self.ignorefunc:
if self.ignorefunc(token_string):
# no match possible here
return [], 0
# check if we can match the current token
if token_string in self.nodes:
# ok, we have the beginning of a possible match
longest = 0
node = self.nodes[token_string]
thismatches = []
thistokens = [token]
if node.is_match:
# the first token is already a complete match, so we need to add this to thismatches
longest = 1
# TODO: make this work with list data!
if matchfunc:
match = matchfunc(
idx, idx + 1, thistokens.copy(), node.data, node.listidx
)
else:
match = TokenGazetteerMatch(
idx, idx + 1, thistokens.copy(), node.data, node.listidx
)
thismatches.append(match)
j = idx + 1 # index into text tokens
nignored = 0
while j < endidx:
# print(f"!!! processing idx={j}/{endidx}")
if node.nodes:
token = tokens[j]
if token.type == self.splittype:
break
token_string = self.getterfunc(token, doc=doc, feature=self.feature)
if token_string is None:
j += 1
nignored += 1
continue
if self.mapfunc:
token_string = self.mapfunc(token_string)
if self.ignorefunc and self.ignorefunc(token_string):
j += 1
nignored += 1
continue
if token_string in node.nodes:
node = node.nodes[token_string]
thistokens.append(token)
if node.is_match:
if matchfunc:
match = matchfunc(
idx,
idx + len(thistokens) + nignored,
thistokens.copy(),
node.data,
node.listidx,
)
else:
match = TokenGazetteerMatch(
idx,
idx + len(thistokens) + nignored,
thistokens.copy(),
node.data,
node.listidx,
)
# debugtxt = " ".join(
# [doc[tokens[i]] for i in range(match.start, match.end)]
# )
# TODO: should LONGEST get calculated including ignored tokens or not?
if not longest_only:
thismatches.append(match)
if len(thistokens) > longest:
longest = len(thistokens)
else:
if len(thistokens) > longest:
thismatches = [match]
longest = len(thistokens)
j += 1
continue
else:
break
else:
break
return thismatches, longest
else:
# first token did not match, nothing to be found
return [], 0
def find(
self,
tokens: List[Annotation],
doc: Optional[Document] = None,
longest_only: Optional[bool] = None,
fromidx: Optional[int] = None,
toidx: Optional[int] = None,
endidx: Optional[int] = None,
matchfunc: Optional[Callable] = None,
):
"""
Find the next match in the given index range and return a tuple with two elements: the first element
if the list of matches, empty if no match was found, the second element is the index where the matches
were found or None if no match was found.
Args:
tokens: list of tokens (must allow to fetch the ith token as tokens[i])
doc: the document to which the tokens belong. Necessary of the underlying text is used
for the tokens.
longest_only: whether to return all matches or just the longest ones. If not none, overrides the
setting from init
fromidx: first index where a match may start
toidx: last index where a match may start
endidx: the index in tokens after which no match must end
matchfunc: the function to use to process each match
Returns:
A triple with the list of matches as the first element, the max length of matches or 0 if no matches
as the second element and the index where the match occurs or None as the third element
"""
if longest_only is None:
longest_only = self.longest_only
idx = fromidx
if idx is None:
idx = 0
if toidx is None:
toidx = len(tokens) - 1
if endidx is None:
endidx = len(tokens)
while idx <= toidx:
matches, long = self.match(
tokens, idx=idx, doc=doc, longest_only=longest_only, endidx=endidx, matchfunc=matchfunc
)
if long == 0:
idx += 1
continue
return matches, long, idx
return [], 0, None
def find_all(
self,
tokens: List[Annotation],
doc: Optional[Document] = None,
longest_only: Optional[bool] = None,
skip_longest: Optional[bool] = None,
fromidx: Optional[int] = None,
toidx: Optional[int] = None,
endidx: Optional[int] = None,
matchfunc: Optional[Callable] = None,
# reverse=True,
):
"""
Find gazetteer entries in a sequence of tokens.
Note: if fromidx or toidx are bigger than the length of the tokens allows, this is silently
ignored.
Args:
tokens: iterable of tokens. The getter will be applied to each one and the doc to retrieve the initial
string.
doc: the document this should run on. Only necessary if the text to match is not retrieved from
the token annotation, but from the underlying document text.
longest_only: whether to return only the longest or all matches. If not None, overrides the init
setting
skip_longest: skip forward over longest match (do not return contained/overlapping matches). If not
None overrides the init setting.
fromidx: index where to start finding in tokens
toidx: index where to stop finding in tokens (this is the last index actually used)
endidx: index beyond which no matches should end
matchfunc: a function which takes the data from the gazetteer, the token and doc and performs
some action.
Yields:
list of matches
"""
if longest_only is None:
longest_only = self.longest_only
if skip_longest is None:
skip_longest = self.skip
matches = []
lentok = len(tokens)
if endidx is None:
endidx = lentok
if fromidx is None:
fromidx = 0
if toidx is None:
toidx = lentok - 1
if fromidx >= lentok:
yield matches
return
if toidx >= lentok:
toidx = lentok - 1
if fromidx > toidx:
yield matches
return
idx = fromidx
while idx <= toidx:
matches, maxlen, idx = self.find(
tokens,
doc=doc,
longest_only=longest_only,
fromidx=idx,
endidx=endidx,
toidx=toidx,
matchfunc=matchfunc,
)
if idx is None:
return
yield matches
if skip_longest:
idx += maxlen
else:
idx += 1
def __call__(self, doc: Document, **kwargs) -> Document:
"""
Apply the gazetteer to the document and annotate all matches.
Args:
doc: the document to annotate with matches.
Returns:
the annotated document
"""
# create the token lists from the document: if withintype is None we only have one token list,
# otherwise we have one list for each withingtype
# We create a list of segments which are identified by start and end offsets
if self.withintype is None:
segment_offs = [(0, len(doc.text))]
else:
withinanns = doc.annset(self.withintype)
segment_offs = []
for wann in withinanns:
segment_offs.append((wann.start, wann.end))
anntypes = [self.tokentype]
if self.splittype is not None:
anntypes.append(self.splittype)
anns = doc.annset(self.annset).with_type(anntypes)
# now do the annotation process for each segment
outset = doc.annset(self.outset)
for segment_start, segment_end in segment_offs:
tokens = list(anns.within(segment_start, segment_end))
for matches in self.find_all(tokens, doc=doc):
for match in matches:
starttoken = tokens[match.start]
endtoken = tokens[
match.end - 1
] # end is the index after the last match!!
startoffset = starttoken.start
endoffset = endtoken.end
if match.data: # TODO: for now data and listidx are either both None or lists with same len
for data, listidx in zip(match.data, match.listidx):
outtype = self.outtype
feats = {}
if listidx is not None:
feats.update(self.listfeatures[listidx])
outtype = self.listtypes[listidx]
if "_gatenlp.gazetteer.outtype" in feats:
outtype = feats["_gatenlp.gazetteer.outtype"]
del feats["_gatenlp.gazetteer.outtype"]
if data is not None:
feats.update(data)
outset.add(startoffset, endoffset, outtype, features=feats)
else:
outset.add(startoffset, endoffset, self.outtype)
return doc
def get(self, tokenstrings, default=None):
if isinstance(tokenstrings, str):
tokenstrings = [tokenstrings]
node = self.nodes
for idx, tokenstring in enumerate(tokenstrings):
if idx == 0:
node = node.get(tokenstring) # get from defaultdict
else:
# !!! TODO: defaultdict has no "nodes" member???
node = node.nodes.get(tokenstring) # get from TokenGazetteerNode nodes
if node is None:
return None
if node.is_match:
ret = []
assert len(node.data) == len(node.listidx)
for d, i in zip(node.data, node.listidx):
new = d.copy()
new.update(self.listfeatures[i])
ret.append(new)
return ret
else:
return default
def __getitem__(self, tokenstrings):
ret = self.get(tokenstrings)
if ret is None:
raise KeyError(tokenstrings)
return ret
def __contains__(self, tokenstrings):
ret = self.get(tokenstrings)
return ret is not None
def __len__(self):
return self.size
Functions
def tokentext_getter(token, doc=None, feature=None)
-
Expand source code
def tokentext_getter(token, doc=None, feature=None): if feature is not None: txt = token.features.get(feature) else: if doc is None: raise Exception("No feature given, need doc for gazetteer") txt = doc[token] return txt
Classes
class TokenGazetteer (source: Union[List[~T], str, None] = None, source_fmt: str = 'gate-def', source_sep='\t', source_encoding='UTF-8', source_tokenizer: Union[None, Annotator, Callable] = None, longest_only: bool = False, skip_longest: bool = False, outset_name: str = '', ann_type: str = 'Lookup', annset_name: str = '', token_type: str = 'Token', feature=None, split_type: Optional[str] = None, within_type: Optional[str] = None, mapfunc: Optional[Callable] = None, ignorefunc: Optional[Callable] = None, getterfunc: Optional[Callable] = None, list_features: Optional[Dict[~KT, ~VT]] = None, list_type: Optional[str] = None)
-
Gazetteer base class.
Args
source
- where to load the gazetteer from. What is actually expected here depends on the fmt parameter. If none, nothing is loaded
source_fmt
- defines what is expected as the format and/or content of the source parameter. One of: * "gate-def" (default): the path to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers * "gazlist": a list of tuples or lists where the first element of the tuple/list is a list of strings and the second element is a dictionary containing the features to assign. All entries in the list belong to the first gazetteer list which has list features as specified with the listfeatures parameter and a list type as specified with the listtype parameter.
source_sep
- the field separator to use for some source formats (default: tab character)
source_encoding
- the encoding to use for some source formats (default: UTF-8)
source_tokenizer
- if not None, an annotator, that creates annotations of type "Token" in the default annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by splitting on whitespace (as defined by Python str.split())
feature
- the feature name to use to get the string for each token. If the corresponding feature in the token does not exist, is None or is the empty string, the Token is completely ignored. If the feature parameter is None, use the document string covered by the token.
longest_only
- if True, only returns the longest match at each matching position, otherwise returns all matches.
skip_longest
- skip forward over longest match (do not return contained/overlapping matches)
annset_name
- the set where the tokens to match should come from
outset_name
- the set where the new annotations are added
ann_type
- the annotation type of the annotations to create, unless a type is given for the gazetteer entry or for the gazetteer list.
token_type
- the annotation type of the token annotations
split_type
- the annotation type of any split annotations which will end any ongoing match
within_type
- only matches fully within annotations of this type will be made
mapfunc
- a callable that maps the original string extracted for each token to the actual string to use.
ignorefunc
- a callable which given the mapped token string decides if the token should be ignored (not added to the gazetteer list, not considered in the document when matching)
getterfunc
- a callable which, given a token annotation, retrieves the string. If there is mapfunc, the retrieved string is then still run through the mapfunc. The getterfunc must accept the token and an optional document as parameters.
list_features
- a dictionary of features common to the whole list loaded or None. If what gets loaded specifies its own list features, this is getting ignored.
list_type
- the output annotation type to use for the list, ignored if the input format specifies this on its own. If the input does not specify this on its own and this is not None, then it takes precedence over outtype for the data loaded from source.
Expand source code
class TokenGazetteer(GazetteerBase): def __init__( self, source: Union[List, str, None] = None, source_fmt: str = "gate-def", source_sep="\t", source_encoding="UTF-8", # cache_source=None, # TODO source_tokenizer: Union[None, Annotator, Callable] = None, longest_only: bool = False, skip_longest: bool = False, outset_name: str = "", ann_type: str = "Lookup", annset_name: str = "", token_type: str = "Token", feature=None, split_type: Optional[str] = None, within_type: Optional[str] = None, mapfunc: Optional[Callable] = None, ignorefunc: Optional[Callable] = None, getterfunc: Optional[Callable] = None, list_features: Optional[Dict] = None, list_type: Optional[str] = None, ): """ Args: source: where to load the gazetteer from. What is actually expected here depends on the fmt parameter. If none, nothing is loaded source_fmt: defines what is expected as the format and/or content of the source parameter. One of: * "gate-def" (default): the path to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers * "gazlist": a list of tuples or lists where the first element of the tuple/list is a list of strings and the second element is a dictionary containing the features to assign. All entries in the list belong to the first gazetteer list which has list features as specified with the listfeatures parameter and a list type as specified with the listtype parameter. source_sep: the field separator to use for some source formats (default: tab character) source_encoding: the encoding to use for some source formats (default: UTF-8) source_tokenizer: if not None, an annotator, that creates annotations of type "Token" in the default annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by splitting on whitespace (as defined by Python str.split()) feature: the feature name to use to get the string for each token. If the corresponding feature in the token does not exist, is None or is the empty string, the Token is completely ignored. If the feature parameter is None, use the document string covered by the token. longest_only: if True, only returns the longest match at each matching position, otherwise returns all matches. skip_longest: skip forward over longest match (do not return contained/overlapping matches) annset_name: the set where the tokens to match should come from outset_name: the set where the new annotations are added ann_type: the annotation type of the annotations to create, unless a type is given for the gazetteer entry or for the gazetteer list. token_type: the annotation type of the token annotations split_type: the annotation type of any split annotations which will end any ongoing match within_type: only matches fully within annotations of this type will be made mapfunc: a callable that maps the original string extracted for each token to the actual string to use. ignorefunc: a callable which given the mapped token string decides if the token should be ignored (not added to the gazetteer list, not considered in the document when matching) getterfunc: a callable which, given a token annotation, retrieves the string. If there is mapfunc, the retrieved string is then still run through the mapfunc. The getterfunc must accept the token and an optional document as parameters. list_features: a dictionary of features common to the whole list loaded or None. If what gets loaded specifies its own list features, this is getting ignored. list_type: the output annotation type to use for the list, ignored if the input format specifies this on its own. If the input does not specify this on its own and this is not None, then it takes precedence over outtype for the data loaded from source. """ self.nodes = defaultdict(TokenGazetteerNode) self.mapfunc = mapfunc self.ignorefunc = ignorefunc self.feature = feature self.annset = annset_name self.tokentype = token_type self.splittype = split_type self.withintype = within_type self.outset = outset_name self.outtype = ann_type self.longest_only = longest_only self.skip = skip_longest if getterfunc: self.getterfunc = getterfunc else: self.getterfunc = tokentext_getter self.listfeatures = [] self.listtypes = [] self.logger = init_logger(__name__) # self.logger.setLevel(logging.DEBUG) self.size = 0 if source is not None: self.append(source, source_fmt=source_fmt, list_features=list_features, list_type=list_type, source_sep=source_sep, source_encoding=source_encoding, source_tokenizer=source_tokenizer ) def append( self, source: Union[None, str, List], source_fmt: str = "gate-def", source_sep: str = "\t", source_encoding: str = "UTF-8", source_tokenizer: Union[None, Annotator, Callable] = None, source_splitter: Optional[Callable] = None, list_features: Optional[Dict] = None, list_type: Optional[str] = None, ): """ This method appends more entries to gazetteer. Args: source: where to load the gazetteer from. What is actually expected here depends on the fmt parameter. source_fmt: defines what is expected as the format and/or content of the source parameter. One of: * "gate-def" (default): source must be a string, a pathlib Path or a parsed urllib url and point to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers * "gazlist": a list of tuples or lists where the first element of the tuple/list is a list of strings, the second element is a dictionary containing the features to assign and the third element, if it exists, is the index of an element in the listfeatures array. source_sep: the field separator to use for some source formats (default: tab character) source_encoding: the encoding to use for some source formats (default: UTF-8) source_tokenizer: if not None, an annotator, that creates annotations of type "Token" in the default annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by splitting on whitespace (as defined by Python str.split()) source_splitter: if not None and source_tokenizer is None, a callable that takes a string and returns the tokenstrings to use list_features: a list of dictionaries containing the features to set for all matches witch have the list index set, this list gets appended to the existing listfeatures. If what gets appended specifies its own list features, this is ignored. list_type: the output annotation type to use for the list that gets appended. If what gets appended specifies its own list type or list types, this is ignored. """ if source_fmt == "gazlist": if list_features is not None: self.listfeatures.append(list_features) else: self.listfeatures.append({}) if list_type is not None: self.listtypes.append(list_type) else: self.listtypes.append(self.outtype) listidx = len(self.listfeatures) - 1 for el in source: entry = el[0] data = el[1] self.add(entry, data, listidx=listidx) elif source_fmt == "gate-def": if list_features is None: list_features = {} if list_type is None: list_type = self.outtype with open(source, "rt", encoding=source_encoding) as infp: for line in infp: line = line.rstrip("\n\r") fields = line.split(":") fields.extend(["", "", "", ""]) listFile = fields[0] majorType = fields[1] minorType = fields[2] languages = fields[3] anntype = fields[4] this_listfeatures = list_features.copy() this_outtype = list_type if majorType: this_listfeatures["majorType"] = majorType if minorType: this_listfeatures["minorType"] = minorType if languages: this_listfeatures["lang"] = languages if anntype: this_outtype = anntype # read in the actual list listfile = os.path.join(os.path.dirname(source), listFile) self.logger.debug(f"Reading list file {listfile}") with open(listfile, "rt", encoding=source_encoding) as inlistfile: self.listtypes.append(this_outtype) self.listfeatures.append(this_listfeatures) linenr = 0 for listline in inlistfile: linenr += 1 listline = listline.rstrip("\n\r") fields = listline.split(source_sep) entry = fields[0] if source_tokenizer or source_splitter: if source_tokenizer: tmpdoc = Document(entry) tmpdoc = source_tokenizer(tmpdoc) # we MUST reassign here to allow return of a new doc! tokenanns = list(tmpdoc.annset().with_type("Token")) if self.getterfunc: tokenstrings = [ self.getterfunc(a, doc=tmpdoc) for a in tokenanns ] else: tokenstrings = [tmpdoc[a] for a in tokenanns] else: tokenstrings = source_splitter(entry) if self.mapfunc: tokenstrings = [ self.mapfunc(s) for s in tokenstrings ] if self.ignorefunc: tokenstrings = [ s for s in tokenstrings if not self.ignorefunc(s) ] else: tokenstrings = entry.split() # just split on whitespace if len(tokenstrings) == 0: self.logger.warning( f"File {listfile}, skipping line {linenr}, no tokens left: {listline}" ) continue if len(entry) > 1: feats = {} for fspec in fields[1:]: fname, fval = fspec.split("=") feats[fname] = fval else: feats = None listidx = len(self.listfeatures) - 1 self.add(tokenstrings, feats, listidx=listidx) else: raise Exception(f"TokenGazetteer format {source_fmt} not known") def add(self, entry, data=None, listidx=None): """ Add a single gazetteer entry. A gazetteer entry can have no data associated with it at all if both data and listidx are None. If only list indices are given then an array of those indices is stored with the entry and data remaines None, if only data is given then an array of data is stored and listidx remains None. If at some point, both data and a listidx are stored in the same entry, then both fields are changed to have both a list with the same number of elements corresponding to each other, with missing data or listidx elements being None. Args: entry: a iterable of string or a string for a single element, each element is the string that represents a token to be matched data: dictionary of features to add listidx: the index to list features and a list type to add """ if isinstance(entry, str): entry = [entry] node = None i = 0 for token in entry: if self.mapfunc is not None: token = self.mapfunc(token) if self.ignorefunc is not None and self.ignorefunc(token): continue if i == 0: node = self.nodes[token] else: if node.nodes is None: node.nodes = defaultdict(TokenGazetteerNode) tmpnode = TokenGazetteerNode() node.nodes[token] = tmpnode node = tmpnode else: node = node.nodes[token] i += 1 node.is_match = True self.size += 1 # For now: always store parallel lists of data and listidxs, with None elements if necessary. if data is not None or listidx is not None: if node.data is None: node.data = [data] node.listidx = [listidx] else: node.data.append(data) node.listidx.append(listidx) # TODO: code to test and correct: try to save space by only storing parallel lists if # both data and listindices are actually both non-null and added: # # if data is None and listidx is None: # # nothing to do, return what we have # return node.data, node.listidx # # if we have only data and no listidx and there is no listidx # if data is not None and listidx is None and node.listidx is None: # if node.data is None: # node.data = [data] # else: # node.data.append(data) # elif listidx is not None and data is None and node.data is None: # if node.listidx is None: # node.listidx = [listidx] # else: # node.listidx.append(listidx) # else: # # make sure we have parallel lists # if node.data is None: # node.data = [] # if node.listidx is None: # node.listidx = [] # if len(node.data) > len(node.listidx): # node.listidx.extend([None] * (len(node.data) - len(node.listidx))) # elif len(node.listidx) > len(node.data): # node.data.extend([None] * (len(node.listidx) - len(node.data))) # if listidx: # node.listidx.append(listidx) # if data: # node.data.append(data) # else: # node.data.append(None) # else: # node.listidx.append(None) # node.listidx.append(listidx) def match(self, tokens, doc=None, longest_only=None, idx=0, endidx=None, matchfunc=None): """ Try to match at index location idx of the tokens sequence. Returns a list which contains no elements if no match is found, or as many elements as matches are found. The element for each match is either a TokenGazeteerMatch instance if matchfunc is None or whatever matchfunc returns for a match. Also returns the legngth of the longest match (0 if no match). Args: tokens: a list of tokens (must allow to fetch the ith token as tokens[i]) doc: the document to which the tokens belong. Necessary of the underlying text is used for the tokens. longest_only: whether to return all matches or just the longest ones. If None, overrides the setting from init. idx: the index in tokens where the match must start endidx: the index in tokens after which no match must end matchfunc: a function to process each match. The function is passed the TokenGazetteerMatch and the doc and should return something that is then added to the result list of matches. Returns: A tuple, where the first element is a list of match elements, empty if no matches are found and the second element is the length of the longest match, 0 if no match. """ if endidx is None: endidx = len(tokens) assert idx < endidx if longest_only is None: longest_only = self.longest_only token = tokens[idx] if token.type == self.splittype: return [], 0 token_string = self.getterfunc(token, doc=doc, feature=self.feature) if token_string is None: return [], 0 if self.mapfunc: token_string = self.mapfunc(token_string) if self.ignorefunc: if self.ignorefunc(token_string): # no match possible here return [], 0 # check if we can match the current token if token_string in self.nodes: # ok, we have the beginning of a possible match longest = 0 node = self.nodes[token_string] thismatches = [] thistokens = [token] if node.is_match: # the first token is already a complete match, so we need to add this to thismatches longest = 1 # TODO: make this work with list data! if matchfunc: match = matchfunc( idx, idx + 1, thistokens.copy(), node.data, node.listidx ) else: match = TokenGazetteerMatch( idx, idx + 1, thistokens.copy(), node.data, node.listidx ) thismatches.append(match) j = idx + 1 # index into text tokens nignored = 0 while j < endidx: # print(f"!!! processing idx={j}/{endidx}") if node.nodes: token = tokens[j] if token.type == self.splittype: break token_string = self.getterfunc(token, doc=doc, feature=self.feature) if token_string is None: j += 1 nignored += 1 continue if self.mapfunc: token_string = self.mapfunc(token_string) if self.ignorefunc and self.ignorefunc(token_string): j += 1 nignored += 1 continue if token_string in node.nodes: node = node.nodes[token_string] thistokens.append(token) if node.is_match: if matchfunc: match = matchfunc( idx, idx + len(thistokens) + nignored, thistokens.copy(), node.data, node.listidx, ) else: match = TokenGazetteerMatch( idx, idx + len(thistokens) + nignored, thistokens.copy(), node.data, node.listidx, ) # debugtxt = " ".join( # [doc[tokens[i]] for i in range(match.start, match.end)] # ) # TODO: should LONGEST get calculated including ignored tokens or not? if not longest_only: thismatches.append(match) if len(thistokens) > longest: longest = len(thistokens) else: if len(thistokens) > longest: thismatches = [match] longest = len(thistokens) j += 1 continue else: break else: break return thismatches, longest else: # first token did not match, nothing to be found return [], 0 def find( self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None, ): """ Find the next match in the given index range and return a tuple with two elements: the first element if the list of matches, empty if no match was found, the second element is the index where the matches were found or None if no match was found. Args: tokens: list of tokens (must allow to fetch the ith token as tokens[i]) doc: the document to which the tokens belong. Necessary of the underlying text is used for the tokens. longest_only: whether to return all matches or just the longest ones. If not none, overrides the setting from init fromidx: first index where a match may start toidx: last index where a match may start endidx: the index in tokens after which no match must end matchfunc: the function to use to process each match Returns: A triple with the list of matches as the first element, the max length of matches or 0 if no matches as the second element and the index where the match occurs or None as the third element """ if longest_only is None: longest_only = self.longest_only idx = fromidx if idx is None: idx = 0 if toidx is None: toidx = len(tokens) - 1 if endidx is None: endidx = len(tokens) while idx <= toidx: matches, long = self.match( tokens, idx=idx, doc=doc, longest_only=longest_only, endidx=endidx, matchfunc=matchfunc ) if long == 0: idx += 1 continue return matches, long, idx return [], 0, None def find_all( self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, skip_longest: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None, # reverse=True, ): """ Find gazetteer entries in a sequence of tokens. Note: if fromidx or toidx are bigger than the length of the tokens allows, this is silently ignored. Args: tokens: iterable of tokens. The getter will be applied to each one and the doc to retrieve the initial string. doc: the document this should run on. Only necessary if the text to match is not retrieved from the token annotation, but from the underlying document text. longest_only: whether to return only the longest or all matches. If not None, overrides the init setting skip_longest: skip forward over longest match (do not return contained/overlapping matches). If not None overrides the init setting. fromidx: index where to start finding in tokens toidx: index where to stop finding in tokens (this is the last index actually used) endidx: index beyond which no matches should end matchfunc: a function which takes the data from the gazetteer, the token and doc and performs some action. Yields: list of matches """ if longest_only is None: longest_only = self.longest_only if skip_longest is None: skip_longest = self.skip matches = [] lentok = len(tokens) if endidx is None: endidx = lentok if fromidx is None: fromidx = 0 if toidx is None: toidx = lentok - 1 if fromidx >= lentok: yield matches return if toidx >= lentok: toidx = lentok - 1 if fromidx > toidx: yield matches return idx = fromidx while idx <= toidx: matches, maxlen, idx = self.find( tokens, doc=doc, longest_only=longest_only, fromidx=idx, endidx=endidx, toidx=toidx, matchfunc=matchfunc, ) if idx is None: return yield matches if skip_longest: idx += maxlen else: idx += 1 def __call__(self, doc: Document, **kwargs) -> Document: """ Apply the gazetteer to the document and annotate all matches. Args: doc: the document to annotate with matches. Returns: the annotated document """ # create the token lists from the document: if withintype is None we only have one token list, # otherwise we have one list for each withingtype # We create a list of segments which are identified by start and end offsets if self.withintype is None: segment_offs = [(0, len(doc.text))] else: withinanns = doc.annset(self.withintype) segment_offs = [] for wann in withinanns: segment_offs.append((wann.start, wann.end)) anntypes = [self.tokentype] if self.splittype is not None: anntypes.append(self.splittype) anns = doc.annset(self.annset).with_type(anntypes) # now do the annotation process for each segment outset = doc.annset(self.outset) for segment_start, segment_end in segment_offs: tokens = list(anns.within(segment_start, segment_end)) for matches in self.find_all(tokens, doc=doc): for match in matches: starttoken = tokens[match.start] endtoken = tokens[ match.end - 1 ] # end is the index after the last match!! startoffset = starttoken.start endoffset = endtoken.end if match.data: # TODO: for now data and listidx are either both None or lists with same len for data, listidx in zip(match.data, match.listidx): outtype = self.outtype feats = {} if listidx is not None: feats.update(self.listfeatures[listidx]) outtype = self.listtypes[listidx] if "_gatenlp.gazetteer.outtype" in feats: outtype = feats["_gatenlp.gazetteer.outtype"] del feats["_gatenlp.gazetteer.outtype"] if data is not None: feats.update(data) outset.add(startoffset, endoffset, outtype, features=feats) else: outset.add(startoffset, endoffset, self.outtype) return doc def get(self, tokenstrings, default=None): if isinstance(tokenstrings, str): tokenstrings = [tokenstrings] node = self.nodes for idx, tokenstring in enumerate(tokenstrings): if idx == 0: node = node.get(tokenstring) # get from defaultdict else: # !!! TODO: defaultdict has no "nodes" member??? node = node.nodes.get(tokenstring) # get from TokenGazetteerNode nodes if node is None: return None if node.is_match: ret = [] assert len(node.data) == len(node.listidx) for d, i in zip(node.data, node.listidx): new = d.copy() new.update(self.listfeatures[i]) ret.append(new) return ret else: return default def __getitem__(self, tokenstrings): ret = self.get(tokenstrings) if ret is None: raise KeyError(tokenstrings) return ret def __contains__(self, tokenstrings): ret = self.get(tokenstrings) return ret is not None def __len__(self): return self.size
Ancestors
- GazetteerBase
- Annotator
- abc.ABC
Methods
def add(self, entry, data=None, listidx=None)
-
Add a single gazetteer entry. A gazetteer entry can have no data associated with it at all if both data and listidx are None. If only list indices are given then an array of those indices is stored with the entry and data remaines None, if only data is given then an array of data is stored and listidx remains None. If at some point, both data and a listidx are stored in the same entry, then both fields are changed to have both a list with the same number of elements corresponding to each other, with missing data or listidx elements being None.
Args
entry
- a iterable of string or a string for a single element, each element is the string that represents a token to be matched
data
- dictionary of features to add
listidx
- the index to list features and a list type to add
Expand source code
def add(self, entry, data=None, listidx=None): """ Add a single gazetteer entry. A gazetteer entry can have no data associated with it at all if both data and listidx are None. If only list indices are given then an array of those indices is stored with the entry and data remaines None, if only data is given then an array of data is stored and listidx remains None. If at some point, both data and a listidx are stored in the same entry, then both fields are changed to have both a list with the same number of elements corresponding to each other, with missing data or listidx elements being None. Args: entry: a iterable of string or a string for a single element, each element is the string that represents a token to be matched data: dictionary of features to add listidx: the index to list features and a list type to add """ if isinstance(entry, str): entry = [entry] node = None i = 0 for token in entry: if self.mapfunc is not None: token = self.mapfunc(token) if self.ignorefunc is not None and self.ignorefunc(token): continue if i == 0: node = self.nodes[token] else: if node.nodes is None: node.nodes = defaultdict(TokenGazetteerNode) tmpnode = TokenGazetteerNode() node.nodes[token] = tmpnode node = tmpnode else: node = node.nodes[token] i += 1 node.is_match = True self.size += 1 # For now: always store parallel lists of data and listidxs, with None elements if necessary. if data is not None or listidx is not None: if node.data is None: node.data = [data] node.listidx = [listidx] else: node.data.append(data) node.listidx.append(listidx)
def append(self, source: Union[List[~T], str, None], source_fmt: str = 'gate-def', source_sep: str = '\t', source_encoding: str = 'UTF-8', source_tokenizer: Union[None, Annotator, Callable] = None, source_splitter: Optional[Callable] = None, list_features: Optional[Dict[~KT, ~VT]] = None, list_type: Optional[str] = None)
-
This method appends more entries to gazetteer.
Args
source
- where to load the gazetteer from. What is actually expected here depends on the fmt parameter.
source_fmt
- defines what is expected as the format and/or content of the source parameter. One of: * "gate-def" (default): source must be a string, a pathlib Path or a parsed urllib url and point to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers * "gazlist": a list of tuples or lists where the first element of the tuple/list is a list of strings, the second element is a dictionary containing the features to assign and the third element, if it exists, is the index of an element in the listfeatures array.
source_sep
- the field separator to use for some source formats (default: tab character)
source_encoding
- the encoding to use for some source formats (default: UTF-8)
source_tokenizer
- if not None, an annotator, that creates annotations of type "Token" in the default annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by splitting on whitespace (as defined by Python str.split())
source_splitter
- if not None and source_tokenizer is None, a callable that takes a string and returns the tokenstrings to use
list_features
- a list of dictionaries containing the features to set for all matches witch have the list index set, this list gets appended to the existing listfeatures. If what gets appended specifies its own list features, this is ignored.
list_type
- the output annotation type to use for the list that gets appended. If what gets appended specifies its own list type or list types, this is ignored.
Expand source code
def append( self, source: Union[None, str, List], source_fmt: str = "gate-def", source_sep: str = "\t", source_encoding: str = "UTF-8", source_tokenizer: Union[None, Annotator, Callable] = None, source_splitter: Optional[Callable] = None, list_features: Optional[Dict] = None, list_type: Optional[str] = None, ): """ This method appends more entries to gazetteer. Args: source: where to load the gazetteer from. What is actually expected here depends on the fmt parameter. source_fmt: defines what is expected as the format and/or content of the source parameter. One of: * "gate-def" (default): source must be a string, a pathlib Path or a parsed urllib url and point to a GATE-style "def" file. See https://gate.ac.uk/userguide/chap:gazetteers * "gazlist": a list of tuples or lists where the first element of the tuple/list is a list of strings, the second element is a dictionary containing the features to assign and the third element, if it exists, is the index of an element in the listfeatures array. source_sep: the field separator to use for some source formats (default: tab character) source_encoding: the encoding to use for some source formats (default: UTF-8) source_tokenizer: if not None, an annotator, that creates annotations of type "Token" in the default annotation set. If this is None, then when loading string gazetteer entries, they are tokenized by splitting on whitespace (as defined by Python str.split()) source_splitter: if not None and source_tokenizer is None, a callable that takes a string and returns the tokenstrings to use list_features: a list of dictionaries containing the features to set for all matches witch have the list index set, this list gets appended to the existing listfeatures. If what gets appended specifies its own list features, this is ignored. list_type: the output annotation type to use for the list that gets appended. If what gets appended specifies its own list type or list types, this is ignored. """ if source_fmt == "gazlist": if list_features is not None: self.listfeatures.append(list_features) else: self.listfeatures.append({}) if list_type is not None: self.listtypes.append(list_type) else: self.listtypes.append(self.outtype) listidx = len(self.listfeatures) - 1 for el in source: entry = el[0] data = el[1] self.add(entry, data, listidx=listidx) elif source_fmt == "gate-def": if list_features is None: list_features = {} if list_type is None: list_type = self.outtype with open(source, "rt", encoding=source_encoding) as infp: for line in infp: line = line.rstrip("\n\r") fields = line.split(":") fields.extend(["", "", "", ""]) listFile = fields[0] majorType = fields[1] minorType = fields[2] languages = fields[3] anntype = fields[4] this_listfeatures = list_features.copy() this_outtype = list_type if majorType: this_listfeatures["majorType"] = majorType if minorType: this_listfeatures["minorType"] = minorType if languages: this_listfeatures["lang"] = languages if anntype: this_outtype = anntype # read in the actual list listfile = os.path.join(os.path.dirname(source), listFile) self.logger.debug(f"Reading list file {listfile}") with open(listfile, "rt", encoding=source_encoding) as inlistfile: self.listtypes.append(this_outtype) self.listfeatures.append(this_listfeatures) linenr = 0 for listline in inlistfile: linenr += 1 listline = listline.rstrip("\n\r") fields = listline.split(source_sep) entry = fields[0] if source_tokenizer or source_splitter: if source_tokenizer: tmpdoc = Document(entry) tmpdoc = source_tokenizer(tmpdoc) # we MUST reassign here to allow return of a new doc! tokenanns = list(tmpdoc.annset().with_type("Token")) if self.getterfunc: tokenstrings = [ self.getterfunc(a, doc=tmpdoc) for a in tokenanns ] else: tokenstrings = [tmpdoc[a] for a in tokenanns] else: tokenstrings = source_splitter(entry) if self.mapfunc: tokenstrings = [ self.mapfunc(s) for s in tokenstrings ] if self.ignorefunc: tokenstrings = [ s for s in tokenstrings if not self.ignorefunc(s) ] else: tokenstrings = entry.split() # just split on whitespace if len(tokenstrings) == 0: self.logger.warning( f"File {listfile}, skipping line {linenr}, no tokens left: {listline}" ) continue if len(entry) > 1: feats = {} for fspec in fields[1:]: fname, fval = fspec.split("=") feats[fname] = fval else: feats = None listidx = len(self.listfeatures) - 1 self.add(tokenstrings, feats, listidx=listidx) else: raise Exception(f"TokenGazetteer format {source_fmt} not known")
def find(self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None)
-
Find the next match in the given index range and return a tuple with two elements: the first element if the list of matches, empty if no match was found, the second element is the index where the matches were found or None if no match was found.
Args
tokens
- list of tokens (must allow to fetch the ith token as tokens[i])
doc
- the document to which the tokens belong. Necessary of the underlying text is used for the tokens.
longest_only
- whether to return all matches or just the longest ones. If not none, overrides the setting from init
fromidx
- first index where a match may start
toidx
- last index where a match may start
endidx
- the index in tokens after which no match must end
matchfunc
- the function to use to process each match
Returns
A triple with the list of matches as the first element, the max length of matches or 0 if no matches as the second element and the index where the match occurs or None as the third element
Expand source code
def find( self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None, ): """ Find the next match in the given index range and return a tuple with two elements: the first element if the list of matches, empty if no match was found, the second element is the index where the matches were found or None if no match was found. Args: tokens: list of tokens (must allow to fetch the ith token as tokens[i]) doc: the document to which the tokens belong. Necessary of the underlying text is used for the tokens. longest_only: whether to return all matches or just the longest ones. If not none, overrides the setting from init fromidx: first index where a match may start toidx: last index where a match may start endidx: the index in tokens after which no match must end matchfunc: the function to use to process each match Returns: A triple with the list of matches as the first element, the max length of matches or 0 if no matches as the second element and the index where the match occurs or None as the third element """ if longest_only is None: longest_only = self.longest_only idx = fromidx if idx is None: idx = 0 if toidx is None: toidx = len(tokens) - 1 if endidx is None: endidx = len(tokens) while idx <= toidx: matches, long = self.match( tokens, idx=idx, doc=doc, longest_only=longest_only, endidx=endidx, matchfunc=matchfunc ) if long == 0: idx += 1 continue return matches, long, idx return [], 0, None
def find_all(self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, skip_longest: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None)
-
Find gazetteer entries in a sequence of tokens. Note: if fromidx or toidx are bigger than the length of the tokens allows, this is silently ignored.
Args
tokens
- iterable of tokens. The getter will be applied to each one and the doc to retrieve the initial string.
doc
- the document this should run on. Only necessary if the text to match is not retrieved from the token annotation, but from the underlying document text.
longest_only
- whether to return only the longest or all matches. If not None, overrides the init setting
skip_longest
- skip forward over longest match (do not return contained/overlapping matches). If not None overrides the init setting.
fromidx
- index where to start finding in tokens
toidx
- index where to stop finding in tokens (this is the last index actually used)
endidx
- index beyond which no matches should end
matchfunc
- a function which takes the data from the gazetteer, the token and doc and performs some action.
Yields
list of matches
Expand source code
def find_all( self, tokens: List[Annotation], doc: Optional[Document] = None, longest_only: Optional[bool] = None, skip_longest: Optional[bool] = None, fromidx: Optional[int] = None, toidx: Optional[int] = None, endidx: Optional[int] = None, matchfunc: Optional[Callable] = None, # reverse=True, ): """ Find gazetteer entries in a sequence of tokens. Note: if fromidx or toidx are bigger than the length of the tokens allows, this is silently ignored. Args: tokens: iterable of tokens. The getter will be applied to each one and the doc to retrieve the initial string. doc: the document this should run on. Only necessary if the text to match is not retrieved from the token annotation, but from the underlying document text. longest_only: whether to return only the longest or all matches. If not None, overrides the init setting skip_longest: skip forward over longest match (do not return contained/overlapping matches). If not None overrides the init setting. fromidx: index where to start finding in tokens toidx: index where to stop finding in tokens (this is the last index actually used) endidx: index beyond which no matches should end matchfunc: a function which takes the data from the gazetteer, the token and doc and performs some action. Yields: list of matches """ if longest_only is None: longest_only = self.longest_only if skip_longest is None: skip_longest = self.skip matches = [] lentok = len(tokens) if endidx is None: endidx = lentok if fromidx is None: fromidx = 0 if toidx is None: toidx = lentok - 1 if fromidx >= lentok: yield matches return if toidx >= lentok: toidx = lentok - 1 if fromidx > toidx: yield matches return idx = fromidx while idx <= toidx: matches, maxlen, idx = self.find( tokens, doc=doc, longest_only=longest_only, fromidx=idx, endidx=endidx, toidx=toidx, matchfunc=matchfunc, ) if idx is None: return yield matches if skip_longest: idx += maxlen else: idx += 1
def get(self, tokenstrings, default=None)
-
Expand source code
def get(self, tokenstrings, default=None): if isinstance(tokenstrings, str): tokenstrings = [tokenstrings] node = self.nodes for idx, tokenstring in enumerate(tokenstrings): if idx == 0: node = node.get(tokenstring) # get from defaultdict else: # !!! TODO: defaultdict has no "nodes" member??? node = node.nodes.get(tokenstring) # get from TokenGazetteerNode nodes if node is None: return None if node.is_match: ret = [] assert len(node.data) == len(node.listidx) for d, i in zip(node.data, node.listidx): new = d.copy() new.update(self.listfeatures[i]) ret.append(new) return ret else: return default
def match(self, tokens, doc=None, longest_only=None, idx=0, endidx=None, matchfunc=None)
-
Try to match at index location idx of the tokens sequence. Returns a list which contains no elements if no match is found, or as many elements as matches are found. The element for each match is either a TokenGazeteerMatch instance if matchfunc is None or whatever matchfunc returns for a match. Also returns the legngth of the longest match (0 if no match).
Args
tokens
- a list of tokens (must allow to fetch the ith token as tokens[i])
doc
- the document to which the tokens belong. Necessary of the underlying text is used for the tokens.
longest_only
- whether to return all matches or just the longest ones. If None, overrides the setting from init.
idx
- the index in tokens where the match must start
endidx
- the index in tokens after which no match must end
matchfunc
- a function to process each match. The function is passed the TokenGazetteerMatch and the doc and should return something that is then added to the result list of matches.
Returns
A tuple, where the first element is a list of match elements, empty if no matches are found and the second element is the length of the longest match, 0 if no match.
Expand source code
def match(self, tokens, doc=None, longest_only=None, idx=0, endidx=None, matchfunc=None): """ Try to match at index location idx of the tokens sequence. Returns a list which contains no elements if no match is found, or as many elements as matches are found. The element for each match is either a TokenGazeteerMatch instance if matchfunc is None or whatever matchfunc returns for a match. Also returns the legngth of the longest match (0 if no match). Args: tokens: a list of tokens (must allow to fetch the ith token as tokens[i]) doc: the document to which the tokens belong. Necessary of the underlying text is used for the tokens. longest_only: whether to return all matches or just the longest ones. If None, overrides the setting from init. idx: the index in tokens where the match must start endidx: the index in tokens after which no match must end matchfunc: a function to process each match. The function is passed the TokenGazetteerMatch and the doc and should return something that is then added to the result list of matches. Returns: A tuple, where the first element is a list of match elements, empty if no matches are found and the second element is the length of the longest match, 0 if no match. """ if endidx is None: endidx = len(tokens) assert idx < endidx if longest_only is None: longest_only = self.longest_only token = tokens[idx] if token.type == self.splittype: return [], 0 token_string = self.getterfunc(token, doc=doc, feature=self.feature) if token_string is None: return [], 0 if self.mapfunc: token_string = self.mapfunc(token_string) if self.ignorefunc: if self.ignorefunc(token_string): # no match possible here return [], 0 # check if we can match the current token if token_string in self.nodes: # ok, we have the beginning of a possible match longest = 0 node = self.nodes[token_string] thismatches = [] thistokens = [token] if node.is_match: # the first token is already a complete match, so we need to add this to thismatches longest = 1 # TODO: make this work with list data! if matchfunc: match = matchfunc( idx, idx + 1, thistokens.copy(), node.data, node.listidx ) else: match = TokenGazetteerMatch( idx, idx + 1, thistokens.copy(), node.data, node.listidx ) thismatches.append(match) j = idx + 1 # index into text tokens nignored = 0 while j < endidx: # print(f"!!! processing idx={j}/{endidx}") if node.nodes: token = tokens[j] if token.type == self.splittype: break token_string = self.getterfunc(token, doc=doc, feature=self.feature) if token_string is None: j += 1 nignored += 1 continue if self.mapfunc: token_string = self.mapfunc(token_string) if self.ignorefunc and self.ignorefunc(token_string): j += 1 nignored += 1 continue if token_string in node.nodes: node = node.nodes[token_string] thistokens.append(token) if node.is_match: if matchfunc: match = matchfunc( idx, idx + len(thistokens) + nignored, thistokens.copy(), node.data, node.listidx, ) else: match = TokenGazetteerMatch( idx, idx + len(thistokens) + nignored, thistokens.copy(), node.data, node.listidx, ) # debugtxt = " ".join( # [doc[tokens[i]] for i in range(match.start, match.end)] # ) # TODO: should LONGEST get calculated including ignored tokens or not? if not longest_only: thismatches.append(match) if len(thistokens) > longest: longest = len(thistokens) else: if len(thistokens) > longest: thismatches = [match] longest = len(thistokens) j += 1 continue else: break else: break return thismatches, longest else: # first token did not match, nothing to be found return [], 0
Inherited members
class TokenGazetteerMatch (start: int, end: int, match: list, data: Any, listidx: int)
-
Represent a token gazetteer match
Expand source code
class TokenGazetteerMatch: """ Represent a token gazetteer match """ start: int end: int match: list data: Any listidx: int
Class variables
var data : Any
var end : int
var listidx : int
var match : list
var start : int
class TokenGazetteerNode (is_match=None, data=None, nodes=None, listidx=None)
-
Represent an entry in the hash map of entry first tokens. If is_match is True, that token is already a match and data contains the entry data. The continuations attribute contains None or a list of multi token matches that start with the first token and the entry data if we have a match (all tokens match).
Args
is_match
- this node is a match
data
- data associated with the match, can be a list of data items
nodes:
Expand source code
class TokenGazetteerNode: """ Represent an entry in the hash map of entry first tokens. If is_match is True, that token is already a match and data contains the entry data. The continuations attribute contains None or a list of multi token matches that start with the first token and the entry data if we have a match (all tokens match). """ __slots__ = ("is_match", "data", "nodes", "listidx") def __init__(self, is_match=None, data=None, nodes=None, listidx=None): """ Args: is_match: this node is a match data: data associated with the match, can be a list of data items nodes: """ self.is_match = is_match self.data = data self.listidx = listidx self.nodes = nodes @staticmethod def dict_repr(nodes): if nodes is not None: return str([(t, n) for t, n in nodes.items()]) def __repr__(self): nodes = TokenGazetteerNode.dict_repr(self.nodes) return f"Node(is_match={self.is_match},data={self.data},listidx={self.listidx},nodes={nodes})"
Static methods
def dict_repr(nodes)
-
Expand source code
@staticmethod def dict_repr(nodes): if nodes is not None: return str([(t, n) for t, n in nodes.items()])
Instance variables
var data
-
Return an attribute of instance, which is of type owner.
var is_match
-
Return an attribute of instance, which is of type owner.
var listidx
-
Return an attribute of instance, which is of type owner.
var nodes
-
Return an attribute of instance, which is of type owner.