Module gatenlp.processing.pipeline
Module that provides the Pipeline class. A Pipeline is an annotator which is configured to contain several annotators which get executed in sequence. The result of each annotator is passed on to the next anotator. Each annotator can return a single document, None, or list of documents. If no document is returned, subsequent annotators are not called and None is returned from the pipeline. If several documents are areturned, subsequent annotators are invoked for each of those documents and the list of final return documents is returned by the pipeline.
Whenever a single document is returned it is returned as the document and NOT as a list with a single document as the only element.
Expand source code
"""
Module that provides the Pipeline class. A Pipeline is an annotator which is configured to contain several annotators
which get executed in sequence. The result of each annotator is passed on to the next anotator.
Each annotator can return a single document, None, or list of documents. If no document is returned, subsequent
annotators are not called and None is returned from the pipeline. If several documents are areturned, subsequent
annotators are invoked for each of those documents and the list of final return documents is returned by the pipeline.
Whenever a single document is returned it is returned as the document and NOT as a list with a single document as
the only element.
"""
from collections.abc import Iterable
import inspect
from gatenlp.processing.annotator import Annotator
from gatenlp.utils import init_logger
def _check_and_ret_callable(a, **kwargs):
"""
Make sure a is either a callable or a class that can be instatiated to a callable.
Args:
a: a class or instantiated callable
kwargs: arguments to pass on to the initializer
**kwargs:
Returns:
an instantiated callable or throws an exception if not a callable
"""
if inspect.isclass(a):
a = a(**kwargs)
if not callable(a):
raise Exception(f"Not a callable: {a}")
return a
def _has_method(obj, name):
"""
Check if the object has a method with that name
Args:
obj: the object
name: the name of the method
Returns:
True if the object has a callable method with that name, otherwise False
"""
mth = getattr(obj, name, None)
if mth is None:
return False
elif callable(mth):
return True
else:
return False
class Pipeline(Annotator):
"""
A pipeline is an annotator which runs several other annotators in sequence on a document
and returns the result. Since annotators can return no or more than one result document
in a list, the pipeline can return no or more than one document for each input document
as well.
When the start/finish/reduce method of the pipeline is invoked, all start/finish/reduce methods of
all annotators are invoked in sequence. The finish method returns the list of all return values of
all the finish methods of the annotators (if a finish method returns None, this is added to the list).
The reduce method expects a list with as many return value lists as there are annotators and returns
the overall result for each annotator (again, including None if there is none).
"""
def __init__(self, *annotators, **kwargs):
"""
Creates a pipeline annotator. Individual annotators can be added at a later time to the front or back
using the add method.
Note: each annotator can be assigned a name in a pipeline, either when using the add method or
by passing a tuple (annotator, name) instead of just the annotator.
Args:
annotators: each parameter can be an annotator, a callable, a tuple where the first item is
an annotator or callable and the second a string(name), or a list of these things.
An annotator can be given as an instance or class, if it is a class, the kwargs are used
to construct an instance. If no annotators are specified at construction, they can still
be added later and incrementally using the `add` method.
**kwargs: these arguments are passed to the constructor of any class in the annotators list
"""
self.annotators = []
self.names = []
self.names2annotators = dict()
self.logger = init_logger("Pipeline")
for ann in annotators:
if not isinstance(ann, list):
anns = [ann]
for a in anns:
if isinstance(a, tuple) and len(a) == 2:
a, name = a
else:
name = f"{len(self.annotators)}"
a = _check_and_ret_callable(a)
if name in self.names2annotators:
raise Exception(f"Duplicate name: {name}")
self.names2annotators[name] = a
self.annotators.append(a)
self.names.append(name)
# if len(self.annotators) == 0:
# self.logger.warning("Pipeline is a do-nothing pipeline: no annotators")
def copy(self):
"""
Return a shallow copy of the pipeline: the annotators and the annotator data is identical, but
the pipeline data itself is copied, so that an existing pipeline can be modified or extendet.
Returns:
a shallow copy of this pipeline
"""
new = Pipeline([(name, ann) for name, ann in self.names2annotators.items()])
return new
def add(self, annotator, name=None, tofront=False):
"""
Add an annotator to list of annotators for this pipeline. The annotator must be an initialized instance,
not a class.
Args:
annotator: the annotator to add
name: an optional name of the annotator, if None, uses a string representation of the
number of the annotator as added (not the index in the pipeline!)
tofront: if True adds to the front of the list instead of appending to the end
"""
a = _check_and_ret_callable(annotator)
if name is None:
name = f"{len(self.annotators)}"
if name in self.names2annotators:
raise Exception(f"Duplicate name: {name}")
if tofront:
self.annotators.insert(0, a)
self.names.insert(0, name)
else:
self.annotators.append(a)
self.names.append(name)
self.names2annotators[name] = a
def __getitem__(self, item):
if isinstance(item, str):
return self.names2annotators[item]
else:
return self.annotators[item]
def __call__(self, doc, **kwargs):
"""
Calls each annotator in sequence and passes the result or results to the next.
Args:
doc: the document to process
**kwargs: any kwargs will be passed to all annotators
Returns:
a document or a list of documents
"""
toprocess = [doc]
results = []
for annotator in self.annotators:
results = []
for d in toprocess:
ret = annotator(doc, **kwargs)
if isinstance(ret, list):
results.extend(ret)
else:
if ret is not None:
results.append(ret)
toprocess = results
if len(results) == 1:
return results[0]
else:
return results
def pipe(self, documents, **kwargs):
"""
Iterate over each of the documents process them by all the annotators in the pipeline
and yield all the final non-None result documents. Documents are processed by in turn
invoking their `pipe` method on the generator created by the previous step.
Args:
documents: an iterable of documents or None (None values are ignored)
**kwargs: arguments to be passed to each of the annotators
Yields:
documents for which processing did not return None
"""
gen = documents
for antr in self.annotators:
gen = antr.pipe(gen, **kwargs)
return gen
def start(self):
"""
Invokes start on all annotators.
"""
for annotator in self.annotators:
if _has_method(annotator, "start"):
annotator.start()
def finish(self):
"""
Invokes finish on all annotators and return their results as a list with as many
elements as there are annotators (annotators which did not return anything have None).
Returns:
list of annotator results
"""
results = []
for annotator in self.annotators:
if _has_method(annotator, "finish"):
results.append(annotator.finish())
else:
results.append(None)
return results
def reduce(self, results):
"""
Invokes reduce on all annotators using the list of result lists. `results` is a list with
as many elements as there are annotators. Each element is a list of results from different
processes or different batches.
Returns a list with as many elements as there are annotators, each element the combined result.
Args:
results: a list of result lists
Returns:
a list of combined results
"""
results = []
assert len(results) == len(self.annotators)
for reslist, annotator in zip(results, self.annotators):
if _has_method(annotator, "reduce"):
results.append(annotator.reduce(reslist))
else:
results.append(reslist)
return results
def __repr__(self):
reprs = []
for name, ann in zip(self.names, self.annotators):
if hasattr(ann, "__name__"):
arepr = ann.__name__
else:
arepr = ann.__class__.__name__
repr = name + ":" + arepr
reprs.append(repr)
reprs = ",".join(reprs)
return f"Pipeline({reprs})"
Classes
class Pipeline (*annotators, **kwargs)
-
A pipeline is an annotator which runs several other annotators in sequence on a document and returns the result. Since annotators can return no or more than one result document in a list, the pipeline can return no or more than one document for each input document as well.
When the start/finish/reduce method of the pipeline is invoked, all start/finish/reduce methods of all annotators are invoked in sequence. The finish method returns the list of all return values of all the finish methods of the annotators (if a finish method returns None, this is added to the list).
The reduce method expects a list with as many return value lists as there are annotators and returns the overall result for each annotator (again, including None if there is none).
Creates a pipeline annotator. Individual annotators can be added at a later time to the front or back using the add method.
Note: each annotator can be assigned a name in a pipeline, either when using the add method or by passing a tuple (annotator, name) instead of just the annotator.
Args
annotators
- each parameter can be an annotator, a callable, a tuple where the first item is
an annotator or callable and the second a string(name), or a list of these things.
An annotator can be given as an instance or class, if it is a class, the kwargs are used
to construct an instance. If no annotators are specified at construction, they can still
be added later and incrementally using the
add
method. **kwargs
- these arguments are passed to the constructor of any class in the annotators list
Expand source code
class Pipeline(Annotator): """ A pipeline is an annotator which runs several other annotators in sequence on a document and returns the result. Since annotators can return no or more than one result document in a list, the pipeline can return no or more than one document for each input document as well. When the start/finish/reduce method of the pipeline is invoked, all start/finish/reduce methods of all annotators are invoked in sequence. The finish method returns the list of all return values of all the finish methods of the annotators (if a finish method returns None, this is added to the list). The reduce method expects a list with as many return value lists as there are annotators and returns the overall result for each annotator (again, including None if there is none). """ def __init__(self, *annotators, **kwargs): """ Creates a pipeline annotator. Individual annotators can be added at a later time to the front or back using the add method. Note: each annotator can be assigned a name in a pipeline, either when using the add method or by passing a tuple (annotator, name) instead of just the annotator. Args: annotators: each parameter can be an annotator, a callable, a tuple where the first item is an annotator or callable and the second a string(name), or a list of these things. An annotator can be given as an instance or class, if it is a class, the kwargs are used to construct an instance. If no annotators are specified at construction, they can still be added later and incrementally using the `add` method. **kwargs: these arguments are passed to the constructor of any class in the annotators list """ self.annotators = [] self.names = [] self.names2annotators = dict() self.logger = init_logger("Pipeline") for ann in annotators: if not isinstance(ann, list): anns = [ann] for a in anns: if isinstance(a, tuple) and len(a) == 2: a, name = a else: name = f"{len(self.annotators)}" a = _check_and_ret_callable(a) if name in self.names2annotators: raise Exception(f"Duplicate name: {name}") self.names2annotators[name] = a self.annotators.append(a) self.names.append(name) # if len(self.annotators) == 0: # self.logger.warning("Pipeline is a do-nothing pipeline: no annotators") def copy(self): """ Return a shallow copy of the pipeline: the annotators and the annotator data is identical, but the pipeline data itself is copied, so that an existing pipeline can be modified or extendet. Returns: a shallow copy of this pipeline """ new = Pipeline([(name, ann) for name, ann in self.names2annotators.items()]) return new def add(self, annotator, name=None, tofront=False): """ Add an annotator to list of annotators for this pipeline. The annotator must be an initialized instance, not a class. Args: annotator: the annotator to add name: an optional name of the annotator, if None, uses a string representation of the number of the annotator as added (not the index in the pipeline!) tofront: if True adds to the front of the list instead of appending to the end """ a = _check_and_ret_callable(annotator) if name is None: name = f"{len(self.annotators)}" if name in self.names2annotators: raise Exception(f"Duplicate name: {name}") if tofront: self.annotators.insert(0, a) self.names.insert(0, name) else: self.annotators.append(a) self.names.append(name) self.names2annotators[name] = a def __getitem__(self, item): if isinstance(item, str): return self.names2annotators[item] else: return self.annotators[item] def __call__(self, doc, **kwargs): """ Calls each annotator in sequence and passes the result or results to the next. Args: doc: the document to process **kwargs: any kwargs will be passed to all annotators Returns: a document or a list of documents """ toprocess = [doc] results = [] for annotator in self.annotators: results = [] for d in toprocess: ret = annotator(doc, **kwargs) if isinstance(ret, list): results.extend(ret) else: if ret is not None: results.append(ret) toprocess = results if len(results) == 1: return results[0] else: return results def pipe(self, documents, **kwargs): """ Iterate over each of the documents process them by all the annotators in the pipeline and yield all the final non-None result documents. Documents are processed by in turn invoking their `pipe` method on the generator created by the previous step. Args: documents: an iterable of documents or None (None values are ignored) **kwargs: arguments to be passed to each of the annotators Yields: documents for which processing did not return None """ gen = documents for antr in self.annotators: gen = antr.pipe(gen, **kwargs) return gen def start(self): """ Invokes start on all annotators. """ for annotator in self.annotators: if _has_method(annotator, "start"): annotator.start() def finish(self): """ Invokes finish on all annotators and return their results as a list with as many elements as there are annotators (annotators which did not return anything have None). Returns: list of annotator results """ results = [] for annotator in self.annotators: if _has_method(annotator, "finish"): results.append(annotator.finish()) else: results.append(None) return results def reduce(self, results): """ Invokes reduce on all annotators using the list of result lists. `results` is a list with as many elements as there are annotators. Each element is a list of results from different processes or different batches. Returns a list with as many elements as there are annotators, each element the combined result. Args: results: a list of result lists Returns: a list of combined results """ results = [] assert len(results) == len(self.annotators) for reslist, annotator in zip(results, self.annotators): if _has_method(annotator, "reduce"): results.append(annotator.reduce(reslist)) else: results.append(reslist) return results def __repr__(self): reprs = [] for name, ann in zip(self.names, self.annotators): if hasattr(ann, "__name__"): arepr = ann.__name__ else: arepr = ann.__class__.__name__ repr = name + ":" + arepr reprs.append(repr) reprs = ",".join(reprs) return f"Pipeline({reprs})"
Ancestors
- Annotator
- abc.ABC
Methods
def add(self, annotator, name=None, tofront=False)
-
Add an annotator to list of annotators for this pipeline. The annotator must be an initialized instance, not a class.
Args
annotator
- the annotator to add
name
- an optional name of the annotator, if None, uses a string representation of the number of the annotator as added (not the index in the pipeline!)
tofront
- if True adds to the front of the list instead of appending to the end
Expand source code
def add(self, annotator, name=None, tofront=False): """ Add an annotator to list of annotators for this pipeline. The annotator must be an initialized instance, not a class. Args: annotator: the annotator to add name: an optional name of the annotator, if None, uses a string representation of the number of the annotator as added (not the index in the pipeline!) tofront: if True adds to the front of the list instead of appending to the end """ a = _check_and_ret_callable(annotator) if name is None: name = f"{len(self.annotators)}" if name in self.names2annotators: raise Exception(f"Duplicate name: {name}") if tofront: self.annotators.insert(0, a) self.names.insert(0, name) else: self.annotators.append(a) self.names.append(name) self.names2annotators[name] = a
def copy(self)
-
Return a shallow copy of the pipeline: the annotators and the annotator data is identical, but the pipeline data itself is copied, so that an existing pipeline can be modified or extendet.
Returns
a shallow copy of this pipeline
Expand source code
def copy(self): """ Return a shallow copy of the pipeline: the annotators and the annotator data is identical, but the pipeline data itself is copied, so that an existing pipeline can be modified or extendet. Returns: a shallow copy of this pipeline """ new = Pipeline([(name, ann) for name, ann in self.names2annotators.items()]) return new
def finish(self)
-
Invokes finish on all annotators and return their results as a list with as many elements as there are annotators (annotators which did not return anything have None).
Returns
list of annotator results
Expand source code
def finish(self): """ Invokes finish on all annotators and return their results as a list with as many elements as there are annotators (annotators which did not return anything have None). Returns: list of annotator results """ results = [] for annotator in self.annotators: if _has_method(annotator, "finish"): results.append(annotator.finish()) else: results.append(None) return results
def pipe(self, documents, **kwargs)
-
Iterate over each of the documents process them by all the annotators in the pipeline and yield all the final non-None result documents. Documents are processed by in turn invoking their
pipe
method on the generator created by the previous step.Args
documents
- an iterable of documents or None (None values are ignored)
**kwargs
- arguments to be passed to each of the annotators
Yields
documents for which processing did not return None
Expand source code
def pipe(self, documents, **kwargs): """ Iterate over each of the documents process them by all the annotators in the pipeline and yield all the final non-None result documents. Documents are processed by in turn invoking their `pipe` method on the generator created by the previous step. Args: documents: an iterable of documents or None (None values are ignored) **kwargs: arguments to be passed to each of the annotators Yields: documents for which processing did not return None """ gen = documents for antr in self.annotators: gen = antr.pipe(gen, **kwargs) return gen
def reduce(self, results)
-
Invokes reduce on all annotators using the list of result lists.
results
is a list with as many elements as there are annotators. Each element is a list of results from different processes or different batches.Returns a list with as many elements as there are annotators, each element the combined result.
Args
results
- a list of result lists
Returns
a list of combined results
Expand source code
def reduce(self, results): """ Invokes reduce on all annotators using the list of result lists. `results` is a list with as many elements as there are annotators. Each element is a list of results from different processes or different batches. Returns a list with as many elements as there are annotators, each element the combined result. Args: results: a list of result lists Returns: a list of combined results """ results = [] assert len(results) == len(self.annotators) for reslist, annotator in zip(results, self.annotators): if _has_method(annotator, "reduce"): results.append(annotator.reduce(reslist)) else: results.append(reslist) return results
def start(self)
-
Invokes start on all annotators.
Expand source code
def start(self): """ Invokes start on all annotators. """ for annotator in self.annotators: if _has_method(annotator, "start"): annotator.start()
Inherited members