Module gatenlp.processing.runners.runner_dir2dir

Module that implements runners which can be used from the command line to run pipelines.

Expand source code
"""
Module that implements runners which can be used from the command line to run pipelines.
"""
import argparse
import importlib.util
import os
import signal
from collections.abc import Iterable
import ray

from gatenlp.corpora import DirFilesCorpus, DirFilesSource, DirFilesDestination, NullDestination
from gatenlp.processing.pipeline import Pipeline
from gatenlp.utils import init_logger

# TODO: refactor get_pipeline_resultprocessor into separate functions to get the module, to
#    get the pipeline and to get the result processor from the module (passing the module)
#    Then add a function to also update the argparser if there is a "get_args()" method in the module.
# That way we can use arbitrary additional arguments to configure further processing
# This will come in handy for gatenlp-run where we also make the source/dest/corpus configurable
#
# TODO: refactor so that all processing that requires Ray is done in a different module which is only imported
#     when ray is actually used
#
# TODO: add native Python multiprocessing: use ray only if any of the ray-related options or "--ray"  is present

GLOBALS = dict(mod=None)


class LoggedException(Exception):
    """Exception that gets logged and causes aborting the process"""
    pass


def load_module(args):
    """
    Load a module according to the setting in the ArgumentParser namespace args.

    Args:
        args: an ArgumentParser  namespace which must contain the "modulefile" attribute.

    Returns:
        the module
    """
    if GLOBALS["mod"] is not None:
        return GLOBALS["mod"]
    if args.modulefile is None:
        import gatenlp.processing.runners.module_noop
        mod = gatenlp.processing.runners.module_noop
        GLOBALS["mod"] = mod
        return mod
    if not os.path.exists(args.modulefile):
        raise Exception(f"Module file {args.modulefile} does not exist")
    spec = importlib.util.spec_from_file_location("gatenlp.tmprunner", args.modulefile)
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    GLOBALS["mod"] = mod
    return mod


def get_add_args(args):
    """
    Return the "add_args" definition from the loaded module or None.

    Args:
        args: an ArgumentParser namespace that contains the "modulefile" attribute.

    Returns:
        the definition of "add_args" or None if the loaded module does not have it.

    """
    mod = load_module(args)
    if hasattr(mod, "add_args"):
        return getattr(mod, "add_args")
    else:
        return None


def get_pipeline_resultprocessor(args, nworkers=1, workernr=0):
    """
    Get the instatiated pipeline and the process_result function from the module specified in
    the argparse args as option --modulefile.

    Args:
        args: ArgumentParser namespace
        nworkers: total number of workers
        workernr: the worker number (0-based)

    Returns:
        A list with the pipeline as the first and the process_result function as the second element.

    """
    mod = load_module(args)
    if not hasattr(mod, args.make_pipeline):
        raise Exception(f"Module {args.modulefile} does not define function {args.make_pipeline}(args=None, nworkers=1, workernr=0)")
    pipeline_maker = getattr(mod, args.make_pipeline)
    # if not isinstance(pipeline_maker, Callable):  # NOTE: pylint chokes on this!
    if not callable(pipeline_maker):
            raise Exception(f"Module {args.modulefile} must contain a callable {args.make_pipeline}(args=None, nworkers=1, workernr=0)")

    pipeline = pipeline_maker(args=args, workernr=workernr, nworkers=nworkers)
    if not isinstance(pipeline, Pipeline):
        raise Exception("make_pipeline must return a gatenlp.processing.pipeline.Pipeline")
    result_processor = None
    if args.process_result is not None:
        if not hasattr(mod, args.result_processor):
            raise Exception(f"Module does not define {args.process_result}")
        else:
            process_result = getattr(mod, args.process_result)
            if not callable(process_result):
                raise Exception(f"Result processor {args.process_result} is not Callable")
    return pipeline, result_processor


class Dir2DirExecutor:
    """
    Executor class.
    """
    def __init__(self, args=None, workernr=0, nworkers=1):
        """
        Initialize the executor.

        Args:
            args: argparse namespace
            workernr: 0-based index of the worker
            nworkers: total number of workers
        """
        self.args = args
        self.workernr = workernr
        self.nworkers = nworkers
        self.n_in = 0
        self.n_out = 0
        self.n_none = 0
        self.logger = None
        self.logger = init_logger(name="Dir2DirExecutor")
        self.result_processor = None
        self.error = False

    def get_inout(self):
        """
        Return a list with either the corpus or the source and destination to use for processing
        """
        args = self.args
        if not os.path.exists(args.dir) or not os.path.isdir(args.dir):
            raise Exception(f"Does not exist or not a directory: {args.dir}")
        if args.outdir or args.outnone:
            if args.outnone:
                dest =NullDestination()
            else:
                if not os.path.exists(args.outdir) or not os.path.isdir(args.outdir):
                    raise Exception(f"Output directory must exist: {args.outdir}")
                outfmt = args.outfmt
                if outfmt is None:
                    outfmt = args.fmt
                outext = args.outext
                if outext is None:
                    outext = args.ext
                dest = DirFilesDestination(
                    args.outdir, "relpath", fmt=outfmt, ext=outext
                )
            src = DirFilesSource(
                args.dir, exts=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
                nparts=args.nworkers, partnr=self.workernr
            )
            return [src, dest]
        else:
            corpus = DirFilesCorpus(
                args.dir, ext=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
                nparts=args.nworkers, partnr=self.workernr
            )
            return [corpus]

    def run_pipe(self, pipeline, inout):
        """
        Run the given pipeline on the given input/output configuration.

        Args:
            pipeline: processing pipeline
            inout: list with input/output configuration

        Returns:

        """
        flags = dict(interrupted = False)
        logpref = f"Worker {self.workernr+1} of {self.nworkers}: "

        def siginthandler(sig, frame):
            self.error = True
            flags["interrupted"] = True
            self.logger.warning("%s received SIGINT signal", logpref)

        signal.signal(signal.SIGINT, siginthandler)

        if len(inout) == 2:   # src -> dest
            for ret in pipeline.pipe(inout[0]):
                if flags["interrupted"]:
                    self.logger.warning("%s interrupted by SIGINT", logpref)
                    break
                if ret is not None:
                    if isinstance(ret, Iterable):
                        for doc in ret:
                            inout[1].append(doc)
                    else:
                        inout[1].append(ret)
                else:
                    self.n_none += 1
                self.n_in = inout[0].n
                self.n_out = inout[1].n
                if self.n_out % self.args.log_every == 0:
                    self.logger.info("%s %i read, %i were None, %i returned",
                                     logpref, self.n_in, self.n_none, self.n_out)
            self.n_in = inout[0].n
            self.n_out = inout[1].n
        else:
            self.n_in = 0
            for ret in pipeline.pipe(inout[0]):
                if flags["interrupted"]:
                    self.logger.warning("%s interrupted by SIGINT", logpref)
                    break
                if ret is not None:
                    if isinstance(ret, list):
                        if len(ret) > 1:
                            raise Exception(f"%s Pipeline %s returned %i documents for corpus index %i",
                                            logpref, pipeline, len(ret), self.n_in)
                        for doc in ret:
                            inout[0].store(doc)
                            self.n_out += 1
                    else:
                        inout[0].store(ret)
                        self.n_out += 1
                else:
                    self.n_none += 1
                self.n_in += 1
                if self.n_out % self.args.log_every == 0:
                    self.logger.info("%s %i read, %i were None, %i returned",
                                     logpref, self.n_in, self.n_none, self.n_out)

    def run(self):
        """
        Run processing with the pipeline.

        Returns:
            The result returned by the pipeline finish() method
        """
        logpref = f"Worker {self.workernr+1} of {self.nworkers}: "
        pipeline, self.result_processor = get_pipeline_resultprocessor(self.args)
        self.logger.info("%s got pipeline %s", logpref, pipeline)
        inout = self.get_inout()
        self.logger.info(f"%s got In/Out %s", logpref, inout)
        have_error = False
        try:
            pipeline.start()
        except Exception as ex:
            self.logger.error(f"Pipeline start aborted", exc_info=ex)
            self.error = True
            raise LoggedException()
        self.logger.info("%s pipeline start() completed", logpref)
        self.logger.info("%s running pipeline", logpref)
        try:
            self.run_pipe(pipeline, inout)
            self.logger.info(
                f"%s pipeline running completed: %s read, %s were None, %s returned",
                logpref, self.n_in, self.n_none, self.n_out
            )
        except Exception as ex:
            self.logger.error("%s pipeline running aborted, %s read, %s were None, %s returned",
                              logpref, self.n_in, self.n_none, self.n_out,
                              exc_info=ex)
            # we continue to calculate any incomplete result, but remember that we had an error
            self.error = True
        try:
            ret = pipeline.finish()
            self.logger.info("%s pipeline finish() completed", logpref)
            # only return the result value if we have a result processor defined!
            if self.args.process_result:
                return ret
            else:
                return
        except Exception as ex:
            self.logger.error("%s pipeline finish aborted", logpref, exc_info=ex)
            self.error = True
            raise LoggedException()


@ray.remote
def ray_executor(args=None, workernr=0, nworkers=1):
    executor = Dir2DirExecutor(args, workernr=workernr, nworkers=nworkers)
    ret = executor.run()
    return dict(result=ret, error=executor.error, n_in=executor.n_in, n_out=executor.n_out, n_none=executor.n_none)


def build_argparser():
    argparser = argparse.ArgumentParser(
        description="Run gatenlp pipeline on directory of documents",
        epilog="The module should define make_pipeline(args=None, nworkers=1, workernr=0) and result_processor(result=None)" +
               " and can optionally define add_args(argparser) to inject additional arguments into the argparser." +
               " If only one directory is specify, only format bdocjs is currently supported."
    )
    argparser.add_argument("dir", type=str,
                           help="Directory to process or input directory if --outdir is also specified"
                           )
    argparser.add_argument("--outdir", type=str,
                           help="If specified, read from dir, store result in outdir")
    argparser.add_argument("--outnone", action="store_true",
                           help="If specified, --outdir is ignored, if present and the output of the pipeline is ignored")
    argparser.add_argument("--fmt", choices=["bdocjs", "gatexml", "bdocym", "bdocmp"], default=None,
                           help="Format of documents in dir (none: determine from file extension)")
    argparser.add_argument("--outfmt", choices=["bdocjs", "text", "bdocym", "bdocmp"],
                           help="Format of documents in outdir (only used if --outdir is specified)")
    argparser.add_argument("--ext", choices=["bdocjs", "xml", "txt", "bdocym", "bdocmp", "html"], default="bdocjs",
                           help="File extension of documents in dir (bdocjs)")
    argparser.add_argument("--outext", choices=["bdocjs"],
                           help="File extension of documents in outdir (only used if --outdir is specified)")
    argparser.add_argument("--recursive", action="store_true",
                           help="If specified, process all documents in all subdirectories as well")
    argparser.add_argument("--modulefile",
                           help="Module file that contains the make_pipeline(args=None, workernr=0) definition.")
    argparser.add_argument("--nworkers", default=1, type=int,
                           help="Number of workers to run (1)")
    argparser.add_argument("--ray_address", type=str, default=None,
                           help="If specified, connect to ray cluster with that redis address, otherwise start own local cluster")
    argparser.add_argument("--log_every", default=1000, type=int,
                           help="Log progress message every n read documents (1000)")
    argparser.add_argument("--make_pipeline", type=str, default="make_pipeline",
                           help="Name of the pipeline factory function (make_pipeline)")
    argparser.add_argument("--process_result", type=str, default=None,
                           help="Name of result processing function, if None, results are ignored (None)")
    argparser.add_argument("--debug", action="store_true",
                           help="Show DEBUG logging messages")
    return argparser


def run_dir2dir():
    argparser = build_argparser()
    args, extra = argparser.parse_known_args()

    if args.outdir is None:
        if args.dir is not None and args.fmt is not None and args.fmt != "bdocjs":
            raise Exception("If no outdir specified, only supported format is bdocjs")
        if args.dir is not None and args.fmt is None:
            args.fmt = "bdocjs"
    # if we detect extra args, try to find the add_args function in the module:
    add_args_fn = get_add_args(args)
    if add_args_fn is not None:
        argparser = build_argparser()
        add_args_fn(argparser)
        args = argparser.parse_args()
    elif len(extra) > 0:
        raise Exception(f"Unknown args, but no add_args function in module: {extra}")

    logger = init_logger(name="run_dir2dir", debug=args.debug)
    if args.nworkers == 1:
        logger.info("Running SerialExecutor")
        executor = Dir2DirExecutor(args=args)
        try:
            result = executor.run()
            if args.process_result:
                logger.info("Processing result")
                executor.result_processor(result=result)
            if executor.error:
                logger.error(f"Processing ended with ERROR!!!")
            else:
                logger.info(f"Processing ended normally")
        except LoggedException:
            logger.error(f"Processing ended with ERROR!!!")
        except Exception as ex:
            logger.error(f"Processing ended with ERROR!!!", exc_info=ex)
    else:
        logger.info("Running RayExecutor")
        assert args.nworkers > 1
        if args.ray_address is None:
            logger.info("Starting Ray, using %s workers", args.nworkers)
            rayinfo = ray.init()
        else:
            rayinfo = ray.init(address=args.ray_address)
            logger.info("Connected to Ray cluster at %s using %s", args.ray_address, args.nworkers)
        logger.info("Ray available: %s", rayinfo)
        workers = []
        for k in range(args.nworkers):
            worker = ray_executor.remote(args, workernr=k, nworkers=args.nworkers)
            workers.append(worker)
            logger.info("Started worker %s: %s", k, worker)
        remaining = workers

        def siginthandler(sig, frame):
            for worker in workers:
                logger.warning("KILLING worker %s", worker)
                ray.cancel(worker)

        signal.signal(signal.SIGINT, siginthandler)
        while True:
            finished, remaining = ray.wait(remaining, num_returns=1, timeout=10.0)
            if len(finished) > 0:
                logger.info("Finished: %s (%s so far, %s remaining)",
                            finished, len(finished), len(remaining))
            if len(remaining) == 0:
                logger.info("All workers finished, processing results")
                break
        results_list = ray.get(workers)
        pipeline_results = [r["result"] for r in results_list]
        have_error = False
        total_in = 0
        total_none = 0
        total_out = 0
        for worker, ret in zip(workers, results_list):
            if ret["error"]:
                logger.error("Worker %s ABORTED, %s read, %s were None, %s returned",
                             worker, ret["n_in"], ret["n_none"], ret["n_out"])
                have_error = True
            else:
                logger.info("Worker %s finished, %s read, %s were None, %s returned",
                            worker, ret["n_in"], ret["n_none"], ret["n_out"])
            total_in += ret["n_in"]
            total_none += ret["n_none"]
            total_out += ret["n_out"]
        logger.info("Total processed:  %s read, %s were None, %s returned",
                    total_in, total_none, total_out)
        if args.process_result:
            logger.info("Processing any results")
            logger.info("Creating pipeline for workernr -1")
            pipeline, resultprocessor = get_pipeline_resultprocessor(args, workernr=-1, nworkers=1)
            logger.info("Combining results")
            result = pipeline.reduce(results_list)
            logger.info("Processing results")
            try:
                resultprocessor(result=result)
            except Exception as ex:
                logger.error("Result processor error", exc_info=ex)
                have_error = True
        logger.info("Shutting down Ray ...")
        ray.shutdown()
        if have_error:
            logger.error("Processing ended with ERROR!!!")
        else:
            logger.info("Processing ended normally")

Functions

def build_argparser()
Expand source code
def build_argparser():
    argparser = argparse.ArgumentParser(
        description="Run gatenlp pipeline on directory of documents",
        epilog="The module should define make_pipeline(args=None, nworkers=1, workernr=0) and result_processor(result=None)" +
               " and can optionally define add_args(argparser) to inject additional arguments into the argparser." +
               " If only one directory is specify, only format bdocjs is currently supported."
    )
    argparser.add_argument("dir", type=str,
                           help="Directory to process or input directory if --outdir is also specified"
                           )
    argparser.add_argument("--outdir", type=str,
                           help="If specified, read from dir, store result in outdir")
    argparser.add_argument("--outnone", action="store_true",
                           help="If specified, --outdir is ignored, if present and the output of the pipeline is ignored")
    argparser.add_argument("--fmt", choices=["bdocjs", "gatexml", "bdocym", "bdocmp"], default=None,
                           help="Format of documents in dir (none: determine from file extension)")
    argparser.add_argument("--outfmt", choices=["bdocjs", "text", "bdocym", "bdocmp"],
                           help="Format of documents in outdir (only used if --outdir is specified)")
    argparser.add_argument("--ext", choices=["bdocjs", "xml", "txt", "bdocym", "bdocmp", "html"], default="bdocjs",
                           help="File extension of documents in dir (bdocjs)")
    argparser.add_argument("--outext", choices=["bdocjs"],
                           help="File extension of documents in outdir (only used if --outdir is specified)")
    argparser.add_argument("--recursive", action="store_true",
                           help="If specified, process all documents in all subdirectories as well")
    argparser.add_argument("--modulefile",
                           help="Module file that contains the make_pipeline(args=None, workernr=0) definition.")
    argparser.add_argument("--nworkers", default=1, type=int,
                           help="Number of workers to run (1)")
    argparser.add_argument("--ray_address", type=str, default=None,
                           help="If specified, connect to ray cluster with that redis address, otherwise start own local cluster")
    argparser.add_argument("--log_every", default=1000, type=int,
                           help="Log progress message every n read documents (1000)")
    argparser.add_argument("--make_pipeline", type=str, default="make_pipeline",
                           help="Name of the pipeline factory function (make_pipeline)")
    argparser.add_argument("--process_result", type=str, default=None,
                           help="Name of result processing function, if None, results are ignored (None)")
    argparser.add_argument("--debug", action="store_true",
                           help="Show DEBUG logging messages")
    return argparser
def get_add_args(args)

Return the "add_args" definition from the loaded module or None.

Args

args
an ArgumentParser namespace that contains the "modulefile" attribute.

Returns

the definition of "add_args" or None if the loaded module does not have it.

Expand source code
def get_add_args(args):
    """
    Return the "add_args" definition from the loaded module or None.

    Args:
        args: an ArgumentParser namespace that contains the "modulefile" attribute.

    Returns:
        the definition of "add_args" or None if the loaded module does not have it.

    """
    mod = load_module(args)
    if hasattr(mod, "add_args"):
        return getattr(mod, "add_args")
    else:
        return None
def get_pipeline_resultprocessor(args, nworkers=1, workernr=0)

Get the instatiated pipeline and the process_result function from the module specified in the argparse args as option –modulefile.

Args

args
ArgumentParser namespace
nworkers
total number of workers
workernr
the worker number (0-based)

Returns

A list with the pipeline as the first and the process_result function as the second element.

Expand source code
def get_pipeline_resultprocessor(args, nworkers=1, workernr=0):
    """
    Get the instatiated pipeline and the process_result function from the module specified in
    the argparse args as option --modulefile.

    Args:
        args: ArgumentParser namespace
        nworkers: total number of workers
        workernr: the worker number (0-based)

    Returns:
        A list with the pipeline as the first and the process_result function as the second element.

    """
    mod = load_module(args)
    if not hasattr(mod, args.make_pipeline):
        raise Exception(f"Module {args.modulefile} does not define function {args.make_pipeline}(args=None, nworkers=1, workernr=0)")
    pipeline_maker = getattr(mod, args.make_pipeline)
    # if not isinstance(pipeline_maker, Callable):  # NOTE: pylint chokes on this!
    if not callable(pipeline_maker):
            raise Exception(f"Module {args.modulefile} must contain a callable {args.make_pipeline}(args=None, nworkers=1, workernr=0)")

    pipeline = pipeline_maker(args=args, workernr=workernr, nworkers=nworkers)
    if not isinstance(pipeline, Pipeline):
        raise Exception("make_pipeline must return a gatenlp.processing.pipeline.Pipeline")
    result_processor = None
    if args.process_result is not None:
        if not hasattr(mod, args.result_processor):
            raise Exception(f"Module does not define {args.process_result}")
        else:
            process_result = getattr(mod, args.process_result)
            if not callable(process_result):
                raise Exception(f"Result processor {args.process_result} is not Callable")
    return pipeline, result_processor
def load_module(args)

Load a module according to the setting in the ArgumentParser namespace args.

Args

args
an ArgumentParser namespace which must contain the "modulefile" attribute.

Returns

the module

Expand source code
def load_module(args):
    """
    Load a module according to the setting in the ArgumentParser namespace args.

    Args:
        args: an ArgumentParser  namespace which must contain the "modulefile" attribute.

    Returns:
        the module
    """
    if GLOBALS["mod"] is not None:
        return GLOBALS["mod"]
    if args.modulefile is None:
        import gatenlp.processing.runners.module_noop
        mod = gatenlp.processing.runners.module_noop
        GLOBALS["mod"] = mod
        return mod
    if not os.path.exists(args.modulefile):
        raise Exception(f"Module file {args.modulefile} does not exist")
    spec = importlib.util.spec_from_file_location("gatenlp.tmprunner", args.modulefile)
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    GLOBALS["mod"] = mod
    return mod
def run_dir2dir()
Expand source code
def run_dir2dir():
    argparser = build_argparser()
    args, extra = argparser.parse_known_args()

    if args.outdir is None:
        if args.dir is not None and args.fmt is not None and args.fmt != "bdocjs":
            raise Exception("If no outdir specified, only supported format is bdocjs")
        if args.dir is not None and args.fmt is None:
            args.fmt = "bdocjs"
    # if we detect extra args, try to find the add_args function in the module:
    add_args_fn = get_add_args(args)
    if add_args_fn is not None:
        argparser = build_argparser()
        add_args_fn(argparser)
        args = argparser.parse_args()
    elif len(extra) > 0:
        raise Exception(f"Unknown args, but no add_args function in module: {extra}")

    logger = init_logger(name="run_dir2dir", debug=args.debug)
    if args.nworkers == 1:
        logger.info("Running SerialExecutor")
        executor = Dir2DirExecutor(args=args)
        try:
            result = executor.run()
            if args.process_result:
                logger.info("Processing result")
                executor.result_processor(result=result)
            if executor.error:
                logger.error(f"Processing ended with ERROR!!!")
            else:
                logger.info(f"Processing ended normally")
        except LoggedException:
            logger.error(f"Processing ended with ERROR!!!")
        except Exception as ex:
            logger.error(f"Processing ended with ERROR!!!", exc_info=ex)
    else:
        logger.info("Running RayExecutor")
        assert args.nworkers > 1
        if args.ray_address is None:
            logger.info("Starting Ray, using %s workers", args.nworkers)
            rayinfo = ray.init()
        else:
            rayinfo = ray.init(address=args.ray_address)
            logger.info("Connected to Ray cluster at %s using %s", args.ray_address, args.nworkers)
        logger.info("Ray available: %s", rayinfo)
        workers = []
        for k in range(args.nworkers):
            worker = ray_executor.remote(args, workernr=k, nworkers=args.nworkers)
            workers.append(worker)
            logger.info("Started worker %s: %s", k, worker)
        remaining = workers

        def siginthandler(sig, frame):
            for worker in workers:
                logger.warning("KILLING worker %s", worker)
                ray.cancel(worker)

        signal.signal(signal.SIGINT, siginthandler)
        while True:
            finished, remaining = ray.wait(remaining, num_returns=1, timeout=10.0)
            if len(finished) > 0:
                logger.info("Finished: %s (%s so far, %s remaining)",
                            finished, len(finished), len(remaining))
            if len(remaining) == 0:
                logger.info("All workers finished, processing results")
                break
        results_list = ray.get(workers)
        pipeline_results = [r["result"] for r in results_list]
        have_error = False
        total_in = 0
        total_none = 0
        total_out = 0
        for worker, ret in zip(workers, results_list):
            if ret["error"]:
                logger.error("Worker %s ABORTED, %s read, %s were None, %s returned",
                             worker, ret["n_in"], ret["n_none"], ret["n_out"])
                have_error = True
            else:
                logger.info("Worker %s finished, %s read, %s were None, %s returned",
                            worker, ret["n_in"], ret["n_none"], ret["n_out"])
            total_in += ret["n_in"]
            total_none += ret["n_none"]
            total_out += ret["n_out"]
        logger.info("Total processed:  %s read, %s were None, %s returned",
                    total_in, total_none, total_out)
        if args.process_result:
            logger.info("Processing any results")
            logger.info("Creating pipeline for workernr -1")
            pipeline, resultprocessor = get_pipeline_resultprocessor(args, workernr=-1, nworkers=1)
            logger.info("Combining results")
            result = pipeline.reduce(results_list)
            logger.info("Processing results")
            try:
                resultprocessor(result=result)
            except Exception as ex:
                logger.error("Result processor error", exc_info=ex)
                have_error = True
        logger.info("Shutting down Ray ...")
        ray.shutdown()
        if have_error:
            logger.error("Processing ended with ERROR!!!")
        else:
            logger.info("Processing ended normally")

Classes

class Dir2DirExecutor (args=None, workernr=0, nworkers=1)

Executor class.

Initialize the executor.

Args

args
argparse namespace
workernr
0-based index of the worker
nworkers
total number of workers
Expand source code
class Dir2DirExecutor:
    """
    Executor class.
    """
    def __init__(self, args=None, workernr=0, nworkers=1):
        """
        Initialize the executor.

        Args:
            args: argparse namespace
            workernr: 0-based index of the worker
            nworkers: total number of workers
        """
        self.args = args
        self.workernr = workernr
        self.nworkers = nworkers
        self.n_in = 0
        self.n_out = 0
        self.n_none = 0
        self.logger = None
        self.logger = init_logger(name="Dir2DirExecutor")
        self.result_processor = None
        self.error = False

    def get_inout(self):
        """
        Return a list with either the corpus or the source and destination to use for processing
        """
        args = self.args
        if not os.path.exists(args.dir) or not os.path.isdir(args.dir):
            raise Exception(f"Does not exist or not a directory: {args.dir}")
        if args.outdir or args.outnone:
            if args.outnone:
                dest =NullDestination()
            else:
                if not os.path.exists(args.outdir) or not os.path.isdir(args.outdir):
                    raise Exception(f"Output directory must exist: {args.outdir}")
                outfmt = args.outfmt
                if outfmt is None:
                    outfmt = args.fmt
                outext = args.outext
                if outext is None:
                    outext = args.ext
                dest = DirFilesDestination(
                    args.outdir, "relpath", fmt=outfmt, ext=outext
                )
            src = DirFilesSource(
                args.dir, exts=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
                nparts=args.nworkers, partnr=self.workernr
            )
            return [src, dest]
        else:
            corpus = DirFilesCorpus(
                args.dir, ext=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
                nparts=args.nworkers, partnr=self.workernr
            )
            return [corpus]

    def run_pipe(self, pipeline, inout):
        """
        Run the given pipeline on the given input/output configuration.

        Args:
            pipeline: processing pipeline
            inout: list with input/output configuration

        Returns:

        """
        flags = dict(interrupted = False)
        logpref = f"Worker {self.workernr+1} of {self.nworkers}: "

        def siginthandler(sig, frame):
            self.error = True
            flags["interrupted"] = True
            self.logger.warning("%s received SIGINT signal", logpref)

        signal.signal(signal.SIGINT, siginthandler)

        if len(inout) == 2:   # src -> dest
            for ret in pipeline.pipe(inout[0]):
                if flags["interrupted"]:
                    self.logger.warning("%s interrupted by SIGINT", logpref)
                    break
                if ret is not None:
                    if isinstance(ret, Iterable):
                        for doc in ret:
                            inout[1].append(doc)
                    else:
                        inout[1].append(ret)
                else:
                    self.n_none += 1
                self.n_in = inout[0].n
                self.n_out = inout[1].n
                if self.n_out % self.args.log_every == 0:
                    self.logger.info("%s %i read, %i were None, %i returned",
                                     logpref, self.n_in, self.n_none, self.n_out)
            self.n_in = inout[0].n
            self.n_out = inout[1].n
        else:
            self.n_in = 0
            for ret in pipeline.pipe(inout[0]):
                if flags["interrupted"]:
                    self.logger.warning("%s interrupted by SIGINT", logpref)
                    break
                if ret is not None:
                    if isinstance(ret, list):
                        if len(ret) > 1:
                            raise Exception(f"%s Pipeline %s returned %i documents for corpus index %i",
                                            logpref, pipeline, len(ret), self.n_in)
                        for doc in ret:
                            inout[0].store(doc)
                            self.n_out += 1
                    else:
                        inout[0].store(ret)
                        self.n_out += 1
                else:
                    self.n_none += 1
                self.n_in += 1
                if self.n_out % self.args.log_every == 0:
                    self.logger.info("%s %i read, %i were None, %i returned",
                                     logpref, self.n_in, self.n_none, self.n_out)

    def run(self):
        """
        Run processing with the pipeline.

        Returns:
            The result returned by the pipeline finish() method
        """
        logpref = f"Worker {self.workernr+1} of {self.nworkers}: "
        pipeline, self.result_processor = get_pipeline_resultprocessor(self.args)
        self.logger.info("%s got pipeline %s", logpref, pipeline)
        inout = self.get_inout()
        self.logger.info(f"%s got In/Out %s", logpref, inout)
        have_error = False
        try:
            pipeline.start()
        except Exception as ex:
            self.logger.error(f"Pipeline start aborted", exc_info=ex)
            self.error = True
            raise LoggedException()
        self.logger.info("%s pipeline start() completed", logpref)
        self.logger.info("%s running pipeline", logpref)
        try:
            self.run_pipe(pipeline, inout)
            self.logger.info(
                f"%s pipeline running completed: %s read, %s were None, %s returned",
                logpref, self.n_in, self.n_none, self.n_out
            )
        except Exception as ex:
            self.logger.error("%s pipeline running aborted, %s read, %s were None, %s returned",
                              logpref, self.n_in, self.n_none, self.n_out,
                              exc_info=ex)
            # we continue to calculate any incomplete result, but remember that we had an error
            self.error = True
        try:
            ret = pipeline.finish()
            self.logger.info("%s pipeline finish() completed", logpref)
            # only return the result value if we have a result processor defined!
            if self.args.process_result:
                return ret
            else:
                return
        except Exception as ex:
            self.logger.error("%s pipeline finish aborted", logpref, exc_info=ex)
            self.error = True
            raise LoggedException()

Methods

def get_inout(self)

Return a list with either the corpus or the source and destination to use for processing

Expand source code
def get_inout(self):
    """
    Return a list with either the corpus or the source and destination to use for processing
    """
    args = self.args
    if not os.path.exists(args.dir) or not os.path.isdir(args.dir):
        raise Exception(f"Does not exist or not a directory: {args.dir}")
    if args.outdir or args.outnone:
        if args.outnone:
            dest =NullDestination()
        else:
            if not os.path.exists(args.outdir) or not os.path.isdir(args.outdir):
                raise Exception(f"Output directory must exist: {args.outdir}")
            outfmt = args.outfmt
            if outfmt is None:
                outfmt = args.fmt
            outext = args.outext
            if outext is None:
                outext = args.ext
            dest = DirFilesDestination(
                args.outdir, "relpath", fmt=outfmt, ext=outext
            )
        src = DirFilesSource(
            args.dir, exts=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
            nparts=args.nworkers, partnr=self.workernr
        )
        return [src, dest]
    else:
        corpus = DirFilesCorpus(
            args.dir, ext=args.ext, fmt=args.fmt, recursive=args.recursive, sort=True,
            nparts=args.nworkers, partnr=self.workernr
        )
        return [corpus]
def run(self)

Run processing with the pipeline.

Returns

The result returned by the pipeline finish() method

Expand source code
def run(self):
    """
    Run processing with the pipeline.

    Returns:
        The result returned by the pipeline finish() method
    """
    logpref = f"Worker {self.workernr+1} of {self.nworkers}: "
    pipeline, self.result_processor = get_pipeline_resultprocessor(self.args)
    self.logger.info("%s got pipeline %s", logpref, pipeline)
    inout = self.get_inout()
    self.logger.info(f"%s got In/Out %s", logpref, inout)
    have_error = False
    try:
        pipeline.start()
    except Exception as ex:
        self.logger.error(f"Pipeline start aborted", exc_info=ex)
        self.error = True
        raise LoggedException()
    self.logger.info("%s pipeline start() completed", logpref)
    self.logger.info("%s running pipeline", logpref)
    try:
        self.run_pipe(pipeline, inout)
        self.logger.info(
            f"%s pipeline running completed: %s read, %s were None, %s returned",
            logpref, self.n_in, self.n_none, self.n_out
        )
    except Exception as ex:
        self.logger.error("%s pipeline running aborted, %s read, %s were None, %s returned",
                          logpref, self.n_in, self.n_none, self.n_out,
                          exc_info=ex)
        # we continue to calculate any incomplete result, but remember that we had an error
        self.error = True
    try:
        ret = pipeline.finish()
        self.logger.info("%s pipeline finish() completed", logpref)
        # only return the result value if we have a result processor defined!
        if self.args.process_result:
            return ret
        else:
            return
    except Exception as ex:
        self.logger.error("%s pipeline finish aborted", logpref, exc_info=ex)
        self.error = True
        raise LoggedException()
def run_pipe(self, pipeline, inout)

Run the given pipeline on the given input/output configuration.

Args

pipeline
processing pipeline
inout
list with input/output configuration

Returns:

Expand source code
def run_pipe(self, pipeline, inout):
    """
    Run the given pipeline on the given input/output configuration.

    Args:
        pipeline: processing pipeline
        inout: list with input/output configuration

    Returns:

    """
    flags = dict(interrupted = False)
    logpref = f"Worker {self.workernr+1} of {self.nworkers}: "

    def siginthandler(sig, frame):
        self.error = True
        flags["interrupted"] = True
        self.logger.warning("%s received SIGINT signal", logpref)

    signal.signal(signal.SIGINT, siginthandler)

    if len(inout) == 2:   # src -> dest
        for ret in pipeline.pipe(inout[0]):
            if flags["interrupted"]:
                self.logger.warning("%s interrupted by SIGINT", logpref)
                break
            if ret is not None:
                if isinstance(ret, Iterable):
                    for doc in ret:
                        inout[1].append(doc)
                else:
                    inout[1].append(ret)
            else:
                self.n_none += 1
            self.n_in = inout[0].n
            self.n_out = inout[1].n
            if self.n_out % self.args.log_every == 0:
                self.logger.info("%s %i read, %i were None, %i returned",
                                 logpref, self.n_in, self.n_none, self.n_out)
        self.n_in = inout[0].n
        self.n_out = inout[1].n
    else:
        self.n_in = 0
        for ret in pipeline.pipe(inout[0]):
            if flags["interrupted"]:
                self.logger.warning("%s interrupted by SIGINT", logpref)
                break
            if ret is not None:
                if isinstance(ret, list):
                    if len(ret) > 1:
                        raise Exception(f"%s Pipeline %s returned %i documents for corpus index %i",
                                        logpref, pipeline, len(ret), self.n_in)
                    for doc in ret:
                        inout[0].store(doc)
                        self.n_out += 1
                else:
                    inout[0].store(ret)
                    self.n_out += 1
            else:
                self.n_none += 1
            self.n_in += 1
            if self.n_out % self.args.log_every == 0:
                self.logger.info("%s %i read, %i were None, %i returned",
                                 logpref, self.n_in, self.n_none, self.n_out)
class LoggedException (*args, **kwargs)

Exception that gets logged and causes aborting the process

Expand source code
class LoggedException(Exception):
    """Exception that gets logged and causes aborting the process"""
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException