Module gatenlp.pam.pampac.pampac
Module for the Pampac class.
Expand source code
"""
Module for the Pampac class.
"""
import sys
from gatenlp.pam.pampac.data import Location, Context
from gatenlp.pam.pampac.rule import Rule
from gatenlp.annotation_set import AnnotationSet
from gatenlp.utils import init_logger
from gatenlp.processing.annotator import Annotator
class Pampac:
"""
A class for applying a sequence of rules to a document.
"""
def __init__(self, *rules, skip="longest", select="first"):
"""
Initialize Pampac.
Args:
*rules: one or more rules
skip: how proceed after something has been matched at a position. One of: "longest" to proceed
at the next text offset after the end of the longest match. "next" to use a location with the highest
text and annotation index over all matches. "one" to increment the text offset by one and adjust
the annotation index to point to the next annotation at or after the new text offset.
"once": do not advance after the first location where a rule matches. NOTE: if skipping depends on
on the match(es), only those matches for which a rule fires are considered.
select: which of those rules that match to actually apply, i.e. call the action part of the rule.
One of: "first": try all rules in sequence and call only the first one that matches. "highest": try
all rules and only call the rules which has the highest priority, if there is more than one, the first
of those.
"""
assert len(rules) > 0
assert skip in ["one", "longest", "next", "once"]
assert select in ["first", "highest", "all"]
for rule_ in rules:
assert isinstance(rule_, Rule)
self.rules = rules
self.priorities = [r.priority for r in self.rules]
self.max_priority = max(self.priorities)
for idx, rule_ in enumerate(rules):
if rule_.priority == self.max_priority:
self.hp_rule = rule_
self.hp_rule_idx = idx
break
self.skip = skip
self.select = select
def set_skip(self, val):
"""
Different way to set the skip parameter.
"""
self.skip = val
return self
def set_select(self, val):
"""
Different way to set the select parameter.
"""
self.select = val
return self
# pylint: disable=R0912, R0915
def run(self,
doc,
annotations,
outset=None, start=None, end=None, containing_anns=None, debug=False):
"""
Run the rules from location start to location end (default: full document), using the annotation set or list.
Args:
doc: the document to run on
annotations: the annotation set or iterable to use for matching.
outset: the output annotation set. If this is a string, retrieves the set from doc
start: the text offset where to start matching
end: the text offset where to end matching
containing_anns: if this is an AnnotationSet or iterable of annotations, the rules are applied to each
span of each of the annotations in order, and only input annotations that are fully contained
in that span are processed (default: None, use the whole document)
debug: enable debug logging
Returns:
a list of tuples (offset, actionreturnvals) for each location where one or more matches occurred
"""
logger = init_logger(debug=debug)
if isinstance(outset, str):
outset = doc.annset(outset)
returntuples = []
ctx = Context(doc=doc, anns=annotations, outset=outset, start=start, end=end)
location = Location(ctx.start, 0)
if containing_anns is not None:
# in order to be able to get the contained annotations, we need to make sure the `annotations`
# are in a set
if not isinstance(annotations, AnnotationSet):
containing_anns = AnnotationSet.create_from(containing_anns)
for ann in containing_anns:
if ann.length == 0:
continue
span_anns = annotations.within(ann)
ctx = Context(doc=doc, anns=span_anns, outset=outset, start=ann.start, end=ann.end)
returntuples.extend(self._run4span(logger, ctx, location))
return returntuples
else:
return self._run4span(logger, ctx, location)
def _run4span(self, logger, ctx, location):
# Runs on a single span using the given context and start location and returns a list of tuples with
# offset and actionreturnvals for each location where a match or matches occured
returntuples = []
while True: # pylint: disable=R1702
# try the rules at the current position
cur_offset = location.text_location
frets = []
rets = dict()
for idx, rule_ in enumerate(self.rules):
logger.debug("Trying rule %s at location %s", idx, location)
ret = rule_.parse(location, ctx)
if ret.issuccess():
rets[idx] = ret
logger.debug("Success for rule %s, %s results", idx, len(ret))
if self.select == "first":
break
# we now got all the matching results in rets
# if we have at least one matching ...
if len(rets) > 0:
fired_rets = []
# choose the rules to fire and call the actions
if self.select == "first":
idx, ret = list(rets.items())[0]
logger.debug("Firing rule %s at %s", idx, location)
fret = self.rules[idx].action(ret, context=ctx, location=location)
frets.append(fret)
fired_rets.append(ret)
elif self.select == "all":
for idx, ret in rets.items():
logger.debug("Firing rule %s at %s", idx, location)
fret = self.rules[idx].action(
ret, context=ctx, location=location
)
frets.append(fret)
fired_rets.append(ret)
elif self.select == "highest":
for idx, ret in rets.items():
if idx == self.hp_rule_idx:
logger.debug("Firing rule %s at %s", idx, location)
fret = self.rules[idx].action(
ret, context=ctx, location=location
)
frets.append(fret)
fired_rets.append(ret)
# now that we have fired rules, find out how to advance to the next position
if self.skip == "once":
return frets
elif self.skip == "one":
# we need to advance to the offset AFTER the BEGINNING of the earliest match
old_t = location.text_location
old_a = location.ann_location
next_o = sys.maxsize
for ret in fired_rets:
for res in ret:
if res.span is not None and res.span.start < next_o:
next_o = res.span.start
location = ctx.inc_location(location, to_offset=next_o+1)
# print(f"********** LOCATION: fired={len(fired_rets)}: from {old_t}/{old_a} for {next_o} to {location.text_location}/{location.ann_location}")
elif self.skip == "longest":
longest = 0
for ret in fired_rets:
for res in ret:
if res.location.text_location > longest:
longest = res.location.text_location
location.text_location = longest
location = ctx.update_location_byoffset(location)
elif self.skip == "next":
for ret in fired_rets:
for res in ret:
if res.location.text_location > location.text_location:
location.text_location = res.location.text_location
location.ann_location = res.location.ann_location
elif (
res.location.text_location == location.text_location
and res.location.ann_location > location.ann_location
):
location.ann_location = res.location.ann_location
returntuples.append((cur_offset, frets))
else:
# we had no match, just continue from the next offset
location = ctx.inc_location(location, by_offset=1)
if ctx.at_endofanns(location) or ctx.at_endoftext(location):
break
return returntuples
__call__ = run
class PampacAnnotator(Annotator):
"""
Class for running a Pampac ruleset.
"""
def __init__(self,
pampac,
annspec,
outset_name=None,
containing_anns_desc=None):
"""
Args:
pampac: a Pampac instance
annspec: annotation specification for annotations to use as input. This can be a annotation set name,
or a list of either annotation set names or tuples, where the first element is an annotation set
name and the second element is either a type name or a list of type names. E.g. `[("", "Token")]`
to get all annotations with type Token from the default set or or `[("", ["PER", "ORG"]), "Key"]`
to get all annotations with type PER or ORG from the default set and all annotations from the Key
set.
outset_name: the name of the annotation set where to add output annoations
containing_anns_desc: a specification of annotations to use for containing annotations. If specified,
the Pampac instance will run pattern matching on each span that corresponds to a containing annotation.
Containing annotations should not overlap. The outputs for each containing annotation are aggregated
and returned. Default: do not use containing annotations and run for the whole document.
"""
self.pampac = pampac
self.annspec = annspec
self.outset_name = outset_name
self.containing_anns_desc = containing_anns_desc
def __call__(self, doc, **kwargs):
outset = doc.annset(self.outset_name)
anns = doc.anns(self.annspec)
if self.containing_anns_desc is not None:
cont = doc.anns(self.containing_anns_desc)
else:
cont = None
self.pampac.run(doc, anns, outset=outset, containing_anns=cont)
return doc
Classes
class Pampac (*rules, skip='longest', select='first')
-
A class for applying a sequence of rules to a document.
Initialize Pampac.
Args
*rules
- one or more rules
skip
- how proceed after something has been matched at a position. One of: "longest" to proceed at the next text offset after the end of the longest match. "next" to use a location with the highest text and annotation index over all matches. "one" to increment the text offset by one and adjust the annotation index to point to the next annotation at or after the new text offset. "once": do not advance after the first location where a rule matches. NOTE: if skipping depends on on the match(es), only those matches for which a rule fires are considered.
select
- which of those rules that match to actually apply, i.e. call the action part of the rule. One of: "first": try all rules in sequence and call only the first one that matches. "highest": try all rules and only call the rules which has the highest priority, if there is more than one, the first of those.
Expand source code
class Pampac: """ A class for applying a sequence of rules to a document. """ def __init__(self, *rules, skip="longest", select="first"): """ Initialize Pampac. Args: *rules: one or more rules skip: how proceed after something has been matched at a position. One of: "longest" to proceed at the next text offset after the end of the longest match. "next" to use a location with the highest text and annotation index over all matches. "one" to increment the text offset by one and adjust the annotation index to point to the next annotation at or after the new text offset. "once": do not advance after the first location where a rule matches. NOTE: if skipping depends on on the match(es), only those matches for which a rule fires are considered. select: which of those rules that match to actually apply, i.e. call the action part of the rule. One of: "first": try all rules in sequence and call only the first one that matches. "highest": try all rules and only call the rules which has the highest priority, if there is more than one, the first of those. """ assert len(rules) > 0 assert skip in ["one", "longest", "next", "once"] assert select in ["first", "highest", "all"] for rule_ in rules: assert isinstance(rule_, Rule) self.rules = rules self.priorities = [r.priority for r in self.rules] self.max_priority = max(self.priorities) for idx, rule_ in enumerate(rules): if rule_.priority == self.max_priority: self.hp_rule = rule_ self.hp_rule_idx = idx break self.skip = skip self.select = select def set_skip(self, val): """ Different way to set the skip parameter. """ self.skip = val return self def set_select(self, val): """ Different way to set the select parameter. """ self.select = val return self # pylint: disable=R0912, R0915 def run(self, doc, annotations, outset=None, start=None, end=None, containing_anns=None, debug=False): """ Run the rules from location start to location end (default: full document), using the annotation set or list. Args: doc: the document to run on annotations: the annotation set or iterable to use for matching. outset: the output annotation set. If this is a string, retrieves the set from doc start: the text offset where to start matching end: the text offset where to end matching containing_anns: if this is an AnnotationSet or iterable of annotations, the rules are applied to each span of each of the annotations in order, and only input annotations that are fully contained in that span are processed (default: None, use the whole document) debug: enable debug logging Returns: a list of tuples (offset, actionreturnvals) for each location where one or more matches occurred """ logger = init_logger(debug=debug) if isinstance(outset, str): outset = doc.annset(outset) returntuples = [] ctx = Context(doc=doc, anns=annotations, outset=outset, start=start, end=end) location = Location(ctx.start, 0) if containing_anns is not None: # in order to be able to get the contained annotations, we need to make sure the `annotations` # are in a set if not isinstance(annotations, AnnotationSet): containing_anns = AnnotationSet.create_from(containing_anns) for ann in containing_anns: if ann.length == 0: continue span_anns = annotations.within(ann) ctx = Context(doc=doc, anns=span_anns, outset=outset, start=ann.start, end=ann.end) returntuples.extend(self._run4span(logger, ctx, location)) return returntuples else: return self._run4span(logger, ctx, location) def _run4span(self, logger, ctx, location): # Runs on a single span using the given context and start location and returns a list of tuples with # offset and actionreturnvals for each location where a match or matches occured returntuples = [] while True: # pylint: disable=R1702 # try the rules at the current position cur_offset = location.text_location frets = [] rets = dict() for idx, rule_ in enumerate(self.rules): logger.debug("Trying rule %s at location %s", idx, location) ret = rule_.parse(location, ctx) if ret.issuccess(): rets[idx] = ret logger.debug("Success for rule %s, %s results", idx, len(ret)) if self.select == "first": break # we now got all the matching results in rets # if we have at least one matching ... if len(rets) > 0: fired_rets = [] # choose the rules to fire and call the actions if self.select == "first": idx, ret = list(rets.items())[0] logger.debug("Firing rule %s at %s", idx, location) fret = self.rules[idx].action(ret, context=ctx, location=location) frets.append(fret) fired_rets.append(ret) elif self.select == "all": for idx, ret in rets.items(): logger.debug("Firing rule %s at %s", idx, location) fret = self.rules[idx].action( ret, context=ctx, location=location ) frets.append(fret) fired_rets.append(ret) elif self.select == "highest": for idx, ret in rets.items(): if idx == self.hp_rule_idx: logger.debug("Firing rule %s at %s", idx, location) fret = self.rules[idx].action( ret, context=ctx, location=location ) frets.append(fret) fired_rets.append(ret) # now that we have fired rules, find out how to advance to the next position if self.skip == "once": return frets elif self.skip == "one": # we need to advance to the offset AFTER the BEGINNING of the earliest match old_t = location.text_location old_a = location.ann_location next_o = sys.maxsize for ret in fired_rets: for res in ret: if res.span is not None and res.span.start < next_o: next_o = res.span.start location = ctx.inc_location(location, to_offset=next_o+1) # print(f"********** LOCATION: fired={len(fired_rets)}: from {old_t}/{old_a} for {next_o} to {location.text_location}/{location.ann_location}") elif self.skip == "longest": longest = 0 for ret in fired_rets: for res in ret: if res.location.text_location > longest: longest = res.location.text_location location.text_location = longest location = ctx.update_location_byoffset(location) elif self.skip == "next": for ret in fired_rets: for res in ret: if res.location.text_location > location.text_location: location.text_location = res.location.text_location location.ann_location = res.location.ann_location elif ( res.location.text_location == location.text_location and res.location.ann_location > location.ann_location ): location.ann_location = res.location.ann_location returntuples.append((cur_offset, frets)) else: # we had no match, just continue from the next offset location = ctx.inc_location(location, by_offset=1) if ctx.at_endofanns(location) or ctx.at_endoftext(location): break return returntuples __call__ = run
Methods
def run(self, doc, annotations, outset=None, start=None, end=None, containing_anns=None, debug=False)
-
Run the rules from location start to location end (default: full document), using the annotation set or list.
Args
doc
- the document to run on
annotations
- the annotation set or iterable to use for matching.
outset
- the output annotation set. If this is a string, retrieves the set from doc
start
- the text offset where to start matching
end
- the text offset where to end matching
containing_anns
- if this is an AnnotationSet or iterable of annotations, the rules are applied to each span of each of the annotations in order, and only input annotations that are fully contained in that span are processed (default: None, use the whole document)
debug
- enable debug logging
Returns
a list of tuples (offset, actionreturnvals) for each location where one or more matches occurred
Expand source code
def run(self, doc, annotations, outset=None, start=None, end=None, containing_anns=None, debug=False): """ Run the rules from location start to location end (default: full document), using the annotation set or list. Args: doc: the document to run on annotations: the annotation set or iterable to use for matching. outset: the output annotation set. If this is a string, retrieves the set from doc start: the text offset where to start matching end: the text offset where to end matching containing_anns: if this is an AnnotationSet or iterable of annotations, the rules are applied to each span of each of the annotations in order, and only input annotations that are fully contained in that span are processed (default: None, use the whole document) debug: enable debug logging Returns: a list of tuples (offset, actionreturnvals) for each location where one or more matches occurred """ logger = init_logger(debug=debug) if isinstance(outset, str): outset = doc.annset(outset) returntuples = [] ctx = Context(doc=doc, anns=annotations, outset=outset, start=start, end=end) location = Location(ctx.start, 0) if containing_anns is not None: # in order to be able to get the contained annotations, we need to make sure the `annotations` # are in a set if not isinstance(annotations, AnnotationSet): containing_anns = AnnotationSet.create_from(containing_anns) for ann in containing_anns: if ann.length == 0: continue span_anns = annotations.within(ann) ctx = Context(doc=doc, anns=span_anns, outset=outset, start=ann.start, end=ann.end) returntuples.extend(self._run4span(logger, ctx, location)) return returntuples else: return self._run4span(logger, ctx, location)
def set_select(self, val)
-
Different way to set the select parameter.
Expand source code
def set_select(self, val): """ Different way to set the select parameter. """ self.select = val return self
def set_skip(self, val)
-
Different way to set the skip parameter.
Expand source code
def set_skip(self, val): """ Different way to set the skip parameter. """ self.skip = val return self
class PampacAnnotator (pampac, annspec, outset_name=None, containing_anns_desc=None)
-
Class for running a Pampac ruleset.
Args
pampac
- a Pampac instance
annspec
- annotation specification for annotations to use as input. This can be a annotation set name,
or a list of either annotation set names or tuples, where the first element is an annotation set
name and the second element is either a type name or a list of type names. E.g.
[("", "Token")]
to get all annotations with type Token from the default set or or[("", ["PER", "ORG"]), "Key"]
to get all annotations with type PER or ORG from the default set and all annotations from the Key set. outset_name
- the name of the annotation set where to add output annoations
containing_anns_desc
- a specification of annotations to use for containing annotations. If specified, the Pampac instance will run pattern matching on each span that corresponds to a containing annotation. Containing annotations should not overlap. The outputs for each containing annotation are aggregated and returned. Default: do not use containing annotations and run for the whole document.
Expand source code
class PampacAnnotator(Annotator): """ Class for running a Pampac ruleset. """ def __init__(self, pampac, annspec, outset_name=None, containing_anns_desc=None): """ Args: pampac: a Pampac instance annspec: annotation specification for annotations to use as input. This can be a annotation set name, or a list of either annotation set names or tuples, where the first element is an annotation set name and the second element is either a type name or a list of type names. E.g. `[("", "Token")]` to get all annotations with type Token from the default set or or `[("", ["PER", "ORG"]), "Key"]` to get all annotations with type PER or ORG from the default set and all annotations from the Key set. outset_name: the name of the annotation set where to add output annoations containing_anns_desc: a specification of annotations to use for containing annotations. If specified, the Pampac instance will run pattern matching on each span that corresponds to a containing annotation. Containing annotations should not overlap. The outputs for each containing annotation are aggregated and returned. Default: do not use containing annotations and run for the whole document. """ self.pampac = pampac self.annspec = annspec self.outset_name = outset_name self.containing_anns_desc = containing_anns_desc def __call__(self, doc, **kwargs): outset = doc.annset(self.outset_name) anns = doc.anns(self.annspec) if self.containing_anns_desc is not None: cont = doc.anns(self.containing_anns_desc) else: cont = None self.pampac.run(doc, anns, outset=outset, containing_anns=cont) return doc
Ancestors
- Annotator
- abc.ABC
Inherited members