Multiprocessing - Running PythonPr in parallel
PythonPr can be run in parallel using duplication, e.g. by running a pipeline that contains the PythonPr processing resource via gcp or gcp-direct (see https://github.com/GateNLP/gcp)
When running under gcp, the pipeline gets duplicated as many times as needed so each parallel process has its own copy. Each duplication has its own duplicate of the PythonPr resource and each PythonPr resource starts its own Python process. So the code run for each duplicate is completely separate and variables cannot be shared between them.
Each duplicate gets passed the following keyword arguments to
the start
, _call__
and finish
methods:
_duplicateId
: the duplicate number of the process_nrDuplicates
: the total number of duplicates
When each of the process finishes, the finish
method is invoked.
If the finish
method returns a map/dictionary, processing is a bit different if there are duplicates: since there are several duplicates, the maps returned by each of them are collected in a list and then passed on method reduce
or the first duplicate, if that method is defined in the class.
The reduce
method is expected to use all the results from all the processes to calculate and return a map of the overall result for the
whole processing which is then treated as a result returned from finish
in the single process case.
The following code uses the reduce
method to calculate, print
and return the overall results of counting words (as in PythonPrResult) when running multiple duplicates:
import sys
import re
from collections import Counter
from gatenlp import interact, GateNlpPr, Document
@GateNlpPr
class MyProcessor:
def __init__(self):
self.words_total = 0
self.nr_docs = 0
def start(self, **kwargs):
self.words_total = 0
self.nr_docs = 0
def finish(self, **kwargs):
return {"nrdocs": self.nr_docs, "nrwords": self.words_total}
def reduce(self, resultlist):
totals = Counter()
for r in resultlist:
totals.update(r)
print("Total number of documents: ", totals["nrdocs"])
print("Total number of words: ", totals["nrwords"])
return totals
def __call__(self, doc, **kwargs):
text = doc.text
whitespaces = [m for m in re.finditer(r"[\s,.!?]+|^[\s,.!?]*|[\s,.!?]*$",text)]
nrwords = len(whitespaces)-1
self.words_total += nrwords
self.nr_docs += 1
if __name__ == '__main__':
interact()