Module gatenlp.gate_interaction

Support for interacting between a GATE (java) process and a gatenlp (Python) process. This is used by the Java GATE Python plugin.

Expand source code
#!/usr/bin/env python
"""
Support for interacting between a GATE (java) process and a gatenlp (Python) process.
This is used by the Java GATE Python plugin.
"""

import sys
import os
import io
import json
import traceback
from argparse import ArgumentParser
import inspect
import logging
from gatenlp.changelog import ChangeLog
from gatenlp.document import Document
from gatenlp.offsetmapper import OFFSET_TYPE_JAVA, OFFSET_TYPE_PYTHON
from gatenlp.utils import init_logger
from gatenlp.version import __version__ as gatenlp_version

# NOTE: this is the global variable that holds the current function or class defined for interaction
# In order to avoid use of global, we use a list and just always use element 0
gate_python_plugin_pr = [None]


# We cannot simply do this, because on some systems Python may guess the wrong encoding for stdin:
# instream = sys.stdin
# Instead use utf-8 explicitly:
# NOTE: we only do this if we get the environment variable "FROMGATEPLUGIN" set, since this
# can interfere with normal use of gatenlp, with pytest etc!
if os.environ.get("FROMGATEPLUGIN"):
    instream = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
    ostream = sys.stdout
    sys.stdout = sys.stderr

logger = init_logger("gate_interaction")


class _PrWrapper:
    def __init__(self):
        self.func_execute = None  # the function to process each doc
        self.func_execute_allowkws = False
        self.func_start = None  # called when processing starts
        self.func_start_allowkws = False
        self.func_finish = None  # called when processing finishes
        self.func_finish_allowkws = False
        self.func_reduce = None  # function for combining results
        self.func_reduce_allowkws = False
        self.script_parms = {}  # Script parms to pass to each execute
        self.logger = None

    def execute(self, doc):
        """
        Invoke the wrapped function or the __call__ method of the wrapped callable.
        """
        if self.func_execute_allowkws and self.script_parms:
            ret = self.func_execute(doc, **self.script_parms)
        else:
            ret = self.func_execute(doc)
        if ret is None:
            if doc.changelog is None:
                ret = doc
            else:
                ret = doc.changelog
        return ret

    def start(self, script_params):
        """
        Invoke any wrapped start method.
        """
        if script_params:
            self.script_parms = script_params
        # TODO: amend the script params with additional data from here?
        if self.func_start is not None:
            if self.func_start_allowkws and self.script_parms:
                self.func_start(**self.script_parms)
            else:
                self.func_start()

    def finish(self):
        """
        Invoke any wrapped finish method.
        """
        if self.func_finish is not None:
            if self.func_finish_allowkws and self.script_parms:
                return self.func_finish(**self.script_parms)
            else:
                return self.func_finish()
        return None

    def reduce(self, resultslist):
        """
        Invoke any wrapped reduce method.
        """
        if self.func_reduce is not None:
            if self.func_reduce_allowkws and self.script_parms:
                ret = self.func_reduce(resultslist, **self.script_parms)
            else:
                ret = self.func_reduce(resultslist)
            return ret
        return None


def _check_exec(func):
    """
    Check the signature of the func to see if it is a proper
    execute function: must accept one (or more optional) args
    and can accept kwargs. This returns true of kwargs are accepted

    Args:
      func: the function to check

    Returns:
      true if the function accepts kwargs

    """
    argspec = inspect.getfullargspec(func)
    if (
        len(argspec.args) == 1
        or len(argspec.args) == 2
        and argspec.args[0] == "self"
        or argspec.varargs is not None
    ):
        pass
    else:
        raise Exception(
            "Processing resource execution function does not accept exactly one or any number of arguments"
        )
    return argspec.varkw is not None


def _has_method(theobj, name):
    """
    Check if the object has a callable method with the given name,
    if yes return the method, otherwise return None

    Args:
      theobj: the object that contains the method
      name: the name of the method

    Returns:
      the method or None

    """
    tmp = getattr(theobj, name, None)
    if tmp is not None and callable(tmp):
        return tmp
    else:
        return None


def _pr_decorator(what):
    """
    This is the decorator to identify a class or function as a processing
    resource. This is made available with the name PR in the gatenlp
    package.

    This creates an instance of PRWrapper and registers all the relevant
    functions of the decorated class or the decorated function in the
    wrapper.

    Args:
      what: the class or function to decorate.

    Returns:
      modified class or function

    """
    wrapper = _PrWrapper()
    if inspect.isfunction(what):
        allowkws = _check_exec(what)
        wrapper.func_execute = what
        wrapper.func_execute_allowkws = allowkws
    elif inspect.isclass(what) or _has_method(what, "__call__"):
        # NOTE: functions also have a "__call__" attribute! This is why we need to check for function first.
        if inspect.isclass(what):
            what = (
                what()
            )  # if it is a class, create an instance, otherwise assume it is already an instance
        # TODO: instead of this we could just as well store the instance and
        # directly call the instance methods from the wrapper!
        execmethod = _has_method(what, "__call__")
        if not execmethod:
            raise Exception("PR does not have a __call__(doc) method.")
        allowkws = _check_exec(execmethod)
        wrapper.func_execute_allowkws = allowkws
        wrapper.func_execute = execmethod
        startmethod = _has_method(what, "start")
        if startmethod:
            wrapper.func_start = startmethod
            if inspect.getfullargspec(startmethod).varkw:
                wrapper.func_start_allowkws = True
        finishmethod = _has_method(what, "finish")
        if finishmethod:
            wrapper.func_finish = finishmethod
            if inspect.getfullargspec(finishmethod).varkw:
                wrapper.func_finish_allowkws = True
        reducemethod = _has_method(what, "reduce")
        if reducemethod:
            wrapper.func_reduce = reducemethod
            if inspect.getfullargspec(reducemethod).varkw:
                wrapper.func_reduce_allowkws = True
    else:
        raise Exception(
            f"Decorator applied to something that is not a function or class: {what}"
        )
    gate_python_plugin_pr[0] = wrapper
    return wrapper


class DefaultPr:
    """
    A default annotator which does nothing but shows a warning. All the implemented methods also
    log debug messages.
    """
    def __call__(self, doc, **kwargs):
        """
        Do-nothing except showing a warning and logging a debug message showing the document and kwargs
        """
        logger.debug(
            "DefaultPr: called __call__() with doc=%s, kwargs=%s", doc, kwargs
        )
        logger.warning(
            "Finished DefaultPr: did you define a @GateNlpPr class or function?"
        )
        return doc

    def start(self, **kwargs):
        """
        Do-nothing except showing logging a debug message showing the kwargs
        """
        logger.debug("DefaultPr: called start() with kwargs=%s", kwargs)
        return None

    def finish(self, **kwargs):
        """
        Do-nothing except showing logging a debug message showing the kwargs
        """
        logger.debug("DefaultPr: called finish() with kwargs=%s", kwargs)
        return None

    def reduce(self, resultlist, **kwargs):
        """
        Do-nothing except showing logging a debug message showing resultlist the kwargs
        """
        logger.debug(
            "DefaultPr: called reduce() with results %s and kwargs=%s", resultlist, kwargs
        )
        return None


def get_arguments(from_main=False):
    """
    Parse the command line arguments and return them.
    """
    argparser = ArgumentParser()
    argparser.add_argument(
        "--mode",
        default="check",
        help="Interaction mode: pipe|http|websockets|file|dir|check (default: check)",
    )
    argparser.add_argument(
        "--format", default="json", help="Exchange format: json|json.gz|cjson"
    )
    argparser.add_argument("--path", help="File/directory path for modes file/dir")
    argparser.add_argument(
        "--out", help="Output file/directory path for modes file/dir"
    )
    argparser.add_argument(
        "-d", action="store_true", help="Enable debugging: log to stderr"
    )
    argparser.add_argument(
        "--log_lvl",
        type=str,
        help="Log level to use: DEBUG|INFO|WARNING|ERROR|CRITICAL",
    )
    argparser.add_argument(
        "--config_file",
        type=str,
        help="Config file path to pass on to the annotator (default: not set)",
    )
    argparser.add_argument(
        "--parms_file",
        type=str,
        help="The parms file to use for setting parameters",
    )
    if from_main:
        argparser.add_argument("pythonfile")
    theargs = argparser.parse_args()
    return theargs


def interact(args=None, annotator=None):
    """Starts and handles the interaction with a GATE python plugin process.
    This will get started by the GATE plugin if the interaction uses
    pipes, but can also be started separately for http/websockets.

    This MUST be called in the user's python file!
    The python file should also have one class or function decorated
    with the @gatenlp.PR  decorator to identify it as the
    processing resource to the system.

    :return:

    Args:
      args:  (Default value = None)

    Returns:

    """
    logger = init_logger(__name__)
    loglvls = {
        "DEBUG": logging.DEBUG,
        "INFO": logging.INFO,
        "WARNING": logging.WARNING,
        "ERROR": logging.ERROR,
        "CRITICAL": logging.CRITICAL,
    }
    # before we do anything we need to check if a PR has actually
    # been defined. If not, use our own default debugging PR
    if gate_python_plugin_pr[0] is None and annotator is None:
        logger.warning(
            "No processing resource defined with @GateNlpPr decorator or passed to interact, using default do-nothing"
        )
        _pr_decorator(DefaultPr)
    if annotator is not None:
        pr = _pr_decorator(annotator)
    else:
        pr = gate_python_plugin_pr[0]

    if args is None:
        args = get_arguments()
    if args.d:
        logger.setLevel(logging.DEBUG)
    if args.log_lvl:
        if args.log_lvl not in loglvls:
            raise Exception("Not a valid log level: {}".format(args.log_lvl))
        logger.setLevel(loglvls[args.log_lvl])

    if args.mode == "check":
        return

    logger.info("Using gatenlp version {}\n".format(gatenlp_version))

    logger.debug("Starting interaction args={}".format(args))
    if args.mode == "pipe":
        if args.format != "json":
            raise Exception("For interaction mode pipe, only format=json is supported")
        for line in instream:
            try:
                request = json.loads(line)
            except Exception as ex:
                logger.error("Unable to load from JSON:\n{}".format(line))
                raise ex
            logger.debug("Got request object: {}".format(request))
            cmd = request.get("command", None)
            stop_requested = False
            ret = None
            try:
                if cmd == "execute":
                    doc = Document.from_dict(request.get("data"))
                    om = doc.to_offset_type(OFFSET_TYPE_PYTHON)
                    doc.changelog = ChangeLog()
                    pr.execute(doc)
                    # NOTE: for now we just discard what the method returns and always return
                    # the changelog instead!
                    chlog = doc.changelog
                    # if we got an offset mapper earlier, we had to convert, so we convert back to JAVA
                    if om:
                        # replace True is faster, and we do not need the ChangeLog any more!
                        chlog.fixup_changes(
                            offset_mapper=om, offset_type=OFFSET_TYPE_JAVA, replace=True
                        )
                    ret = doc.changelog.to_dict()
                    logger.debug("Returning CHANGELOG: {}".format(ret))
                elif cmd == "start":
                    parms = request.get("data")
                    pr.start(parms)
                elif cmd == "finish":
                    ret = pr.finish()
                elif cmd == "reduce":
                    results = request.get("data")
                    ret = pr.reduce(results)
                elif cmd == "stop":
                    stop_requested = True
                else:
                    raise Exception("Odd command received: {}".format(cmd))
                response = {
                    "data": ret,
                    "status": "ok",
                }
            except Exception as ex:
                error = repr(ex)
                tb_str = traceback.format_exception(
                    type(ex), ex, ex.__traceback__
                )
                print("ERROR when running python code:", file=sys.stderr)
                for line in tb_str:
                    print(
                        line, file=sys.stderr, end=""
                    )  # what we get from traceback already has new lines
                info = "".join(tb_str)
                # in case we want the actual stacktrace data as well:
                st = [
                    (f.filename, f.lineno, f.name, f.line)
                    for f in traceback.extract_tb(ex.__traceback__)
                ]
                response = {
                    "data": None,
                    "status": "error",
                    "error": error,
                    "info": info,
                    "stacktrace": st,
                }
            logger.debug("Sending back response: {}".format(response))
            print(json.dumps(response), file=ostream)

            ostream.flush()
            if stop_requested:
                break
        # TODO: do any cleanup/restoring needed
        logger.debug("Finishing interaction")
    elif args.mode == "http":
        raise Exception("Mode http not implemented yet")
    elif args.mode == "websockets":
        raise Exception("Mode websockets not implemented yet")
    elif args.mode in ["file", "dir"]:
        if not args.path:
            raise Exception("Mode file or dir but no --path specified")
        fileext = ".bdoc" + args.format
        if args.mode == "file" and not os.path.isfile(args.path):
            raise Exception("Mode file but path is not a file: {}".format(args.path))
        elif args.mode == "dir" and not os.path.isdir(args.path):
            raise Exception(
                "Mode dir but path is not a directory: {}".format(args.path)
            )
        # we need to do this for mode file and dir: get the parms and run pr.start(parms):
        parms = {}
        # check if there is a parms file:
        if args.parms_file:
            with open(args.parms_file, "rt", encoding="utf-8") as infp:
                parms.update(json.load(infp))
        if args.config_file:
            parms["_config_file"] = args.config_file
        pr.start(parms)
        if args.mode == "file":
            logger.info(f"Loading file {args.path}")
            doc = Document.load(args.path)
            pr.execute(doc)
            pr.finish()
            if args.out:
                logger.info(f"Saving file to {args.out}")
                doc.save(args.out)
            else:
                logger.info(f"Saving file to {args.path}")
                doc.save(args.path)
        else:
            import glob
            files = glob.glob(args.path + os.path.sep + "*" + fileext)
            for file in files:
                logger.info("Loading file {}".format(file))
                doc = Document.load(file)
                pr.execute(doc)
                if args.out:
                    tofile = os.path.join(args.out, os.path.basename(file))
                    logger.info("Saving to {}".format(tofile))
                    doc.save(tofile)
                else:
                    logger.info("Saving to {}".format(file))
                    doc.save(file)
            pr.finish()
    else:
        raise Exception("Not a valid mode: {}".format(args.mode))


def main():
    # we run this from the command line so we need to also first load the PR code from the python file
    arguments = get_arguments(from_main=True)
    loggr = init_logger(__name__)
    import importlib.util

    spec = importlib.util.spec_from_file_location("gateapp", arguments.pythonfile)
    foo = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(foo)
    interact(args=arguments)


if __name__ == "__main__":
    main()

Functions

def get_arguments(from_main=False)

Parse the command line arguments and return them.

Expand source code
def get_arguments(from_main=False):
    """
    Parse the command line arguments and return them.
    """
    argparser = ArgumentParser()
    argparser.add_argument(
        "--mode",
        default="check",
        help="Interaction mode: pipe|http|websockets|file|dir|check (default: check)",
    )
    argparser.add_argument(
        "--format", default="json", help="Exchange format: json|json.gz|cjson"
    )
    argparser.add_argument("--path", help="File/directory path for modes file/dir")
    argparser.add_argument(
        "--out", help="Output file/directory path for modes file/dir"
    )
    argparser.add_argument(
        "-d", action="store_true", help="Enable debugging: log to stderr"
    )
    argparser.add_argument(
        "--log_lvl",
        type=str,
        help="Log level to use: DEBUG|INFO|WARNING|ERROR|CRITICAL",
    )
    argparser.add_argument(
        "--config_file",
        type=str,
        help="Config file path to pass on to the annotator (default: not set)",
    )
    argparser.add_argument(
        "--parms_file",
        type=str,
        help="The parms file to use for setting parameters",
    )
    if from_main:
        argparser.add_argument("pythonfile")
    theargs = argparser.parse_args()
    return theargs
def interact(args=None, annotator=None)

Starts and handles the interaction with a GATE python plugin process. This will get started by the GATE plugin if the interaction uses pipes, but can also be started separately for http/websockets.

This MUST be called in the user's python file! The python file should also have one class or function decorated with the @gatenlp.PR decorator to identify it as the processing resource to the system.

:return:

Args

args
(Default value = None)

Returns:

Expand source code
def interact(args=None, annotator=None):
    """Starts and handles the interaction with a GATE python plugin process.
    This will get started by the GATE plugin if the interaction uses
    pipes, but can also be started separately for http/websockets.

    This MUST be called in the user's python file!
    The python file should also have one class or function decorated
    with the @gatenlp.PR  decorator to identify it as the
    processing resource to the system.

    :return:

    Args:
      args:  (Default value = None)

    Returns:

    """
    logger = init_logger(__name__)
    loglvls = {
        "DEBUG": logging.DEBUG,
        "INFO": logging.INFO,
        "WARNING": logging.WARNING,
        "ERROR": logging.ERROR,
        "CRITICAL": logging.CRITICAL,
    }
    # before we do anything we need to check if a PR has actually
    # been defined. If not, use our own default debugging PR
    if gate_python_plugin_pr[0] is None and annotator is None:
        logger.warning(
            "No processing resource defined with @GateNlpPr decorator or passed to interact, using default do-nothing"
        )
        _pr_decorator(DefaultPr)
    if annotator is not None:
        pr = _pr_decorator(annotator)
    else:
        pr = gate_python_plugin_pr[0]

    if args is None:
        args = get_arguments()
    if args.d:
        logger.setLevel(logging.DEBUG)
    if args.log_lvl:
        if args.log_lvl not in loglvls:
            raise Exception("Not a valid log level: {}".format(args.log_lvl))
        logger.setLevel(loglvls[args.log_lvl])

    if args.mode == "check":
        return

    logger.info("Using gatenlp version {}\n".format(gatenlp_version))

    logger.debug("Starting interaction args={}".format(args))
    if args.mode == "pipe":
        if args.format != "json":
            raise Exception("For interaction mode pipe, only format=json is supported")
        for line in instream:
            try:
                request = json.loads(line)
            except Exception as ex:
                logger.error("Unable to load from JSON:\n{}".format(line))
                raise ex
            logger.debug("Got request object: {}".format(request))
            cmd = request.get("command", None)
            stop_requested = False
            ret = None
            try:
                if cmd == "execute":
                    doc = Document.from_dict(request.get("data"))
                    om = doc.to_offset_type(OFFSET_TYPE_PYTHON)
                    doc.changelog = ChangeLog()
                    pr.execute(doc)
                    # NOTE: for now we just discard what the method returns and always return
                    # the changelog instead!
                    chlog = doc.changelog
                    # if we got an offset mapper earlier, we had to convert, so we convert back to JAVA
                    if om:
                        # replace True is faster, and we do not need the ChangeLog any more!
                        chlog.fixup_changes(
                            offset_mapper=om, offset_type=OFFSET_TYPE_JAVA, replace=True
                        )
                    ret = doc.changelog.to_dict()
                    logger.debug("Returning CHANGELOG: {}".format(ret))
                elif cmd == "start":
                    parms = request.get("data")
                    pr.start(parms)
                elif cmd == "finish":
                    ret = pr.finish()
                elif cmd == "reduce":
                    results = request.get("data")
                    ret = pr.reduce(results)
                elif cmd == "stop":
                    stop_requested = True
                else:
                    raise Exception("Odd command received: {}".format(cmd))
                response = {
                    "data": ret,
                    "status": "ok",
                }
            except Exception as ex:
                error = repr(ex)
                tb_str = traceback.format_exception(
                    type(ex), ex, ex.__traceback__
                )
                print("ERROR when running python code:", file=sys.stderr)
                for line in tb_str:
                    print(
                        line, file=sys.stderr, end=""
                    )  # what we get from traceback already has new lines
                info = "".join(tb_str)
                # in case we want the actual stacktrace data as well:
                st = [
                    (f.filename, f.lineno, f.name, f.line)
                    for f in traceback.extract_tb(ex.__traceback__)
                ]
                response = {
                    "data": None,
                    "status": "error",
                    "error": error,
                    "info": info,
                    "stacktrace": st,
                }
            logger.debug("Sending back response: {}".format(response))
            print(json.dumps(response), file=ostream)

            ostream.flush()
            if stop_requested:
                break
        # TODO: do any cleanup/restoring needed
        logger.debug("Finishing interaction")
    elif args.mode == "http":
        raise Exception("Mode http not implemented yet")
    elif args.mode == "websockets":
        raise Exception("Mode websockets not implemented yet")
    elif args.mode in ["file", "dir"]:
        if not args.path:
            raise Exception("Mode file or dir but no --path specified")
        fileext = ".bdoc" + args.format
        if args.mode == "file" and not os.path.isfile(args.path):
            raise Exception("Mode file but path is not a file: {}".format(args.path))
        elif args.mode == "dir" and not os.path.isdir(args.path):
            raise Exception(
                "Mode dir but path is not a directory: {}".format(args.path)
            )
        # we need to do this for mode file and dir: get the parms and run pr.start(parms):
        parms = {}
        # check if there is a parms file:
        if args.parms_file:
            with open(args.parms_file, "rt", encoding="utf-8") as infp:
                parms.update(json.load(infp))
        if args.config_file:
            parms["_config_file"] = args.config_file
        pr.start(parms)
        if args.mode == "file":
            logger.info(f"Loading file {args.path}")
            doc = Document.load(args.path)
            pr.execute(doc)
            pr.finish()
            if args.out:
                logger.info(f"Saving file to {args.out}")
                doc.save(args.out)
            else:
                logger.info(f"Saving file to {args.path}")
                doc.save(args.path)
        else:
            import glob
            files = glob.glob(args.path + os.path.sep + "*" + fileext)
            for file in files:
                logger.info("Loading file {}".format(file))
                doc = Document.load(file)
                pr.execute(doc)
                if args.out:
                    tofile = os.path.join(args.out, os.path.basename(file))
                    logger.info("Saving to {}".format(tofile))
                    doc.save(tofile)
                else:
                    logger.info("Saving to {}".format(file))
                    doc.save(file)
            pr.finish()
    else:
        raise Exception("Not a valid mode: {}".format(args.mode))
def main()
Expand source code
def main():
    # we run this from the command line so we need to also first load the PR code from the python file
    arguments = get_arguments(from_main=True)
    loggr = init_logger(__name__)
    import importlib.util

    spec = importlib.util.spec_from_file_location("gateapp", arguments.pythonfile)
    foo = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(foo)
    interact(args=arguments)

Classes

class DefaultPr

A default annotator which does nothing but shows a warning. All the implemented methods also log debug messages.

Expand source code
class DefaultPr:
    """
    A default annotator which does nothing but shows a warning. All the implemented methods also
    log debug messages.
    """
    def __call__(self, doc, **kwargs):
        """
        Do-nothing except showing a warning and logging a debug message showing the document and kwargs
        """
        logger.debug(
            "DefaultPr: called __call__() with doc=%s, kwargs=%s", doc, kwargs
        )
        logger.warning(
            "Finished DefaultPr: did you define a @GateNlpPr class or function?"
        )
        return doc

    def start(self, **kwargs):
        """
        Do-nothing except showing logging a debug message showing the kwargs
        """
        logger.debug("DefaultPr: called start() with kwargs=%s", kwargs)
        return None

    def finish(self, **kwargs):
        """
        Do-nothing except showing logging a debug message showing the kwargs
        """
        logger.debug("DefaultPr: called finish() with kwargs=%s", kwargs)
        return None

    def reduce(self, resultlist, **kwargs):
        """
        Do-nothing except showing logging a debug message showing resultlist the kwargs
        """
        logger.debug(
            "DefaultPr: called reduce() with results %s and kwargs=%s", resultlist, kwargs
        )
        return None

Methods

def finish(self, **kwargs)

Do-nothing except showing logging a debug message showing the kwargs

Expand source code
def finish(self, **kwargs):
    """
    Do-nothing except showing logging a debug message showing the kwargs
    """
    logger.debug("DefaultPr: called finish() with kwargs=%s", kwargs)
    return None
def reduce(self, resultlist, **kwargs)

Do-nothing except showing logging a debug message showing resultlist the kwargs

Expand source code
def reduce(self, resultlist, **kwargs):
    """
    Do-nothing except showing logging a debug message showing resultlist the kwargs
    """
    logger.debug(
        "DefaultPr: called reduce() with results %s and kwargs=%s", resultlist, kwargs
    )
    return None
def start(self, **kwargs)

Do-nothing except showing logging a debug message showing the kwargs

Expand source code
def start(self, **kwargs):
    """
    Do-nothing except showing logging a debug message showing the kwargs
    """
    logger.debug("DefaultPr: called start() with kwargs=%s", kwargs)
    return None