Module gatenlp.utils

Various utilities that could be useful in several modules.

Expand source code
"""
Various utilities that could be useful in several modules.
"""
import numbers
import sys
import os
import logging
import logging.config
import datetime
import time
from functools import wraps


def identity(x):
    return x


def isequal(x, y):
    return x == y


def match_substrings(text, items, getstr=None, cmp=None, unmatched=False):
    """
    Matches each item from the items sequence with some substring of the text
    in a greedy fashion. An item is either already a string or getstr is used
    to retrieve a string from it. The text and substrings are normally
    compared with normal string equality but cmp can be replaced with
    a two-argument function that does the comparison instead.
    This function expects that all items are present in the text, in their order
    and without overlapping! If this is not the case, an exception is raised.

    Args:
      text: the text to use for matching
      items: items that are or contains substrings to match
      getstr: a function that retrieves the text from an item (Default value = None)
      cmp: a function that compares to strings and returns a boolean \
    that indicates if they should be considered to be equal. (Default value = None)
      unmatched: if true returns two lists of tuples, where the second list\
    contains the offsets of text not matched by the items (Default value = False)

    Returns:
      a list of tuples (start, end, item) where start and end are the\
      start and end offsets of a substring in the text and item is the item for that substring.

    """
    if getstr is None:
        getstr = identity
    if cmp is None:
        cmp = isequal
    ltxt = len(text)
    ret = []
    ret2 = []
    item_idx = 0
    start = 0
    lastunmatched = 0
    while start < ltxt:
        itemorig = items[item_idx]
        item = getstr(itemorig)
        end = start + len(item)
        if end > ltxt:
            raise Exception("Text too short to match next item: {}".format(item))
        if cmp(text[start:end], item):
            if unmatched and start > lastunmatched:
                ret2.append((lastunmatched, start))
                lastunmatched = start + len(item)
            ret.append((start, end, itemorig))
            start += len(item)
            item_idx += 1
            if item_idx == len(items):
                break
        else:
            start += 1
    if item_idx != len(items):
        raise Exception(
            "Not all items matched but {} of {}".format(item_idx, len(items))
        )
    if unmatched and lastunmatched != ltxt:
        ret2.append((lastunmatched, ltxt))
    if unmatched:
        return ret, ret2
    else:
        return ret


start = 0
LOGGING_FORMAT = "%(asctime)s|%(levelname)s|%(name)s|%(message)s"


def init_logger(name=None, file=None, lvl=None, config=None, debug=False, args=None, fmt=None):
    """
    Configure the root logger (this only works the very first time, all subsequent
    invocations will not modify the root logger). The root logger is initialized
    with a standard format the given log level and, if specified the outputs to the
    given file.

    The get a new logger for the given name is retrieved using the given name or
    the invoking command if None. It is also set to the given logging leve and returned.

    TODO: If file is not given but args is given and has "outpref" parameter, log to
    file "outpref.DATETIME.log" as well.

    Args:
        name: name to use in the log, if None, __name__
        file: if given, log to this destination in addition to stderr
        lvl: set logging level
        config: if specified, set logger config from this file
        debug: if true, set the level to DEBUG
        args: not used yet
        fmt: logging format to use, if None uses a default format

    Returns:
        A logger instance for name (always the same instance for the same name)
    """

    if name is None:
        name = sys.argv[0]
    if fmt is None:
        fmt = LOGGING_FORMAT
    if lvl is None:
        if debug:
            lvl = logging.DEBUG
        else:
            lvl = logging.INFO
    if config:
        # NOTE we could also configure from a yaml file or a dictionary, see
        # http://zetcode.com/python/logging/
        # see doc on logging.config
        logging.config.fileConfig(fname=config)
    # get the root logger
    rl = logging.getLogger()
    # rl.setLevel(lvl)
    # NOTE: basicConfig does nothing if there is already a handler, so it only runs once, but we create the additional
    # handler for the file, if needed, only if the root logger has no handlers yet as well
    addhandlers = []
    fmt = logging.Formatter(fmt)
    hndlr = logging.StreamHandler(sys.stderr)
    hndlr.setFormatter(fmt)
    addhandlers.append(hndlr)
    if file and len(logging.getLogger().handlers) == 0:
        hndlr = logging.FileHandler(file)
        hndlr.setFormatter(fmt)
        addhandlers.append(hndlr)
    logging.basicConfig(
        level=lvl, handlers=addhandlers,
        # force=True # not supported in Python 3.7
    )
    # now get the handler for name
    logger = logging.getLogger(name)
    logger.setLevel(lvl)
    return logger


def run_start(logger=None, name=None, lvl=None):
    """
    Define time when running starts.

    Returns:
        system time in seconds
    """
    global start
    if logger is None:
        logger = init_logger(name=name, lvl=lvl)
    logger.info(
        "Started: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M%S"))
    )
    start = time.time()
    return start


def run_stop(logger=None, name=None):
    """
    Log and return formatted elapsed run time.

    Returns:
        tuple of formatted run time, run time in seconds
    """
    if logger is None:
        logger = init_logger(name=name)
    logger.info(
        "Stopped: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M%S"))
    )
    if start == 0:
        logger.warning("Run timing not set up properly, no time!")
        return "", 0
    stop = time.time()
    delta = stop - start
    deltastr = str(datetime.timedelta(seconds=delta))
    logger.info(f"Runtime: {deltastr}")
    return deltastr, delta


def file4logger(thelogger, noext=False):
    """
    Return the first logging file found for this logger or None if there is no file handler.

    Args:
        thelogger: logger

    Returns:
        file path (string)
    """
    lpath = None
    for h in thelogger.handlers:
        if isinstance(h, logging.FileHandler):
            lpath = h.baseFilename
            if noext:
                lpath = os.path.splitext(lpath)[0]
            break
    return lpath


def support_annotation_or_set(method):
    """
    Decorator to allow a method that normally takes a start and end
    offset to take an annotation or annotation set, or any other object that has
    "start" and "end" attributes, or a pair of offsets instead.

    It also allows to take a single offset instead in which case the end offset will
    get passed on as None: this is to support those methods which can take a span or a single
    offset.

    If a set is passed, the minimum start offset and the maximum end offset of all annotations in the
    set are used.

    If an annotation is passed, the annotation start and end offsets are used, and if the called method
    has a keyword parameter "ann" the annotation itself is also passed on.

    Args:
      method: the method that gets converted by this decorator.

    Returns:
        the adapted method which now takes an annotation or annotation set as well as start/end offsets.
    """

    @wraps(method)
    def _support_annotation_or_set(self, *args, **kwargs):
        from gatenlp.annotation import Annotation

        annid = None
        ann = None
        if len(args) == 1:
            obj = args[0]
            if isinstance(obj, Annotation):
                left, right = obj.start, obj.end
                ann = obj
            elif hasattr(obj, "start") and hasattr(obj, "end"):
                left, right = obj.start, obj.end
            elif isinstance(obj, (tuple, list)) and len(obj) == 2:
                left, right = obj
            elif isinstance(obj, numbers.Integral):
                left, right = obj, None
            else:
                raise Exception(
                    "Not an annotation or an annotation set or pair: {}".format(args[0])
                )
        else:
            assert len(args) == 2
            left, right = args
        # if the called method/function does have an annid keyword, pass it, otherwise omit
        if "ann" in method.__code__.co_varnames:
            return method(self, left, right, ann=ann, **kwargs)
        else:
            return method(self, left, right, **kwargs)

    return _support_annotation_or_set


class _CheckHtml:
    def _repr_html_(self):
        return "yes"

    def __repr__(self):
        return "no"


_checkhtml = _CheckHtml()

_in_notebook = [None]


def in_notebook():
    """
    Check if we are running from within a jupyter-like notebook.

    Returns: True if likely within a notebook.
    """
    if _in_notebook[0] is not None:
        return _in_notebook[0]
    try:
        from IPython import get_ipython
        from IPython.core import display  # we do not need this here, but if this fails we also return False

        ip = get_ipython()
        if ip is None:
            # we have IPython installed but not running from IPython
            _in_notebook[0] = False
        else:
            from IPython.core.interactiveshell import InteractiveShell

            format = InteractiveShell.instance().display_formatter.format
            if len(format(_checkhtml, include="text/html")[0]):
                _in_notebook[0] = True
            else:
                _in_notebook[0] = False
    except Exception:
        # We do not even have IPython installed
        _in_notebook[0] = False
    return _in_notebook[0]


def in_colab():
    try:
        from IPython.core import getipython
        from IPython.core import display  # we do not need this here, but if this fails we also return False
    except:
        return False
    return 'google.colab' in str(getipython.get_ipython())


def allowspan(method):
    @wraps(method)
    def _allowspan(self, *args, **kwargs):
        if len(args) == 0:
            # maybe the start and end parameters are given as kwargs?
            if "start" in kwargs and "end" in kwargs:
                return method(self, **kwargs)
            else:
                raise Exception("Need a span, or start and end parameters!")
        maybespan = args[0]
        if hasattr(maybespan, "start") and hasattr(maybespan, "end"):
            return method(self, maybespan.start, maybespan.end, *args[1:], **kwargs)
        else:
            return method(self, *args, **kwargs)

    return _allowspan


def get_nested(adict, name, default=None, silent=False):
    """
    Get a field from a nested map or return the default if the submap/field does not exist.

    Args:
        adict: a dictionary with possibly nested dictionaries
        name:  the key to access where dots are used to separate keys for nested maps, e.g.
            "key1.key2.key3" would access the value of key3 in the map stored under key2 in
            the map stored under key1 in adict. If key1 returns something that is not a map,
            an excpetion is raised unless silent is True
        default: the default value to return if a field with the given name cannot be accessed

    Returns:
        The value for the field or None if not found

    Raises:
        Exception if an expected nested dictionary is not a dictionary and silent is False
    """
    origname = name
    names = name.split(".")
    for name in names:
        if not isinstance(adict, dict):
            if silent:
                return None
            else:
                raise Exception(
                    f"Not a dictionary for {name}, original name was {origname}, got {type(adict)}"
                )
        ret = adict.get(name)
        adict = ret
    if ret is None:
        return default
    else:
        return ret

Functions

def allowspan(method)
Expand source code
def allowspan(method):
    @wraps(method)
    def _allowspan(self, *args, **kwargs):
        if len(args) == 0:
            # maybe the start and end parameters are given as kwargs?
            if "start" in kwargs and "end" in kwargs:
                return method(self, **kwargs)
            else:
                raise Exception("Need a span, or start and end parameters!")
        maybespan = args[0]
        if hasattr(maybespan, "start") and hasattr(maybespan, "end"):
            return method(self, maybespan.start, maybespan.end, *args[1:], **kwargs)
        else:
            return method(self, *args, **kwargs)

    return _allowspan
def file4logger(thelogger, noext=False)

Return the first logging file found for this logger or None if there is no file handler.

Args

thelogger
logger

Returns

file path (string)

Expand source code
def file4logger(thelogger, noext=False):
    """
    Return the first logging file found for this logger or None if there is no file handler.

    Args:
        thelogger: logger

    Returns:
        file path (string)
    """
    lpath = None
    for h in thelogger.handlers:
        if isinstance(h, logging.FileHandler):
            lpath = h.baseFilename
            if noext:
                lpath = os.path.splitext(lpath)[0]
            break
    return lpath
def get_nested(adict, name, default=None, silent=False)

Get a field from a nested map or return the default if the submap/field does not exist.

Args

adict
a dictionary with possibly nested dictionaries
name
the key to access where dots are used to separate keys for nested maps, e.g. "key1.key2.key3" would access the value of key3 in the map stored under key2 in the map stored under key1 in adict. If key1 returns something that is not a map, an excpetion is raised unless silent is True
default
the default value to return if a field with the given name cannot be accessed

Returns

The value for the field or None if not found

Raises

Exception if an expected nested dictionary is not a dictionary and silent is False

Expand source code
def get_nested(adict, name, default=None, silent=False):
    """
    Get a field from a nested map or return the default if the submap/field does not exist.

    Args:
        adict: a dictionary with possibly nested dictionaries
        name:  the key to access where dots are used to separate keys for nested maps, e.g.
            "key1.key2.key3" would access the value of key3 in the map stored under key2 in
            the map stored under key1 in adict. If key1 returns something that is not a map,
            an excpetion is raised unless silent is True
        default: the default value to return if a field with the given name cannot be accessed

    Returns:
        The value for the field or None if not found

    Raises:
        Exception if an expected nested dictionary is not a dictionary and silent is False
    """
    origname = name
    names = name.split(".")
    for name in names:
        if not isinstance(adict, dict):
            if silent:
                return None
            else:
                raise Exception(
                    f"Not a dictionary for {name}, original name was {origname}, got {type(adict)}"
                )
        ret = adict.get(name)
        adict = ret
    if ret is None:
        return default
    else:
        return ret
def identity(x)
Expand source code
def identity(x):
    return x
def in_colab()
Expand source code
def in_colab():
    try:
        from IPython.core import getipython
        from IPython.core import display  # we do not need this here, but if this fails we also return False
    except:
        return False
    return 'google.colab' in str(getipython.get_ipython())
def in_notebook()

Check if we are running from within a jupyter-like notebook.

Returns: True if likely within a notebook.

Expand source code
def in_notebook():
    """
    Check if we are running from within a jupyter-like notebook.

    Returns: True if likely within a notebook.
    """
    if _in_notebook[0] is not None:
        return _in_notebook[0]
    try:
        from IPython import get_ipython
        from IPython.core import display  # we do not need this here, but if this fails we also return False

        ip = get_ipython()
        if ip is None:
            # we have IPython installed but not running from IPython
            _in_notebook[0] = False
        else:
            from IPython.core.interactiveshell import InteractiveShell

            format = InteractiveShell.instance().display_formatter.format
            if len(format(_checkhtml, include="text/html")[0]):
                _in_notebook[0] = True
            else:
                _in_notebook[0] = False
    except Exception:
        # We do not even have IPython installed
        _in_notebook[0] = False
    return _in_notebook[0]
def init_logger(name=None, file=None, lvl=None, config=None, debug=False, args=None, fmt=None)

Configure the root logger (this only works the very first time, all subsequent invocations will not modify the root logger). The root logger is initialized with a standard format the given log level and, if specified the outputs to the given file.

The get a new logger for the given name is retrieved using the given name or the invoking command if None. It is also set to the given logging leve and returned.

TODO: If file is not given but args is given and has "outpref" parameter, log to file "outpref.DATETIME.log" as well.

Args

name
name to use in the log, if None, name
file
if given, log to this destination in addition to stderr
lvl
set logging level
config
if specified, set logger config from this file
debug
if true, set the level to DEBUG
args
not used yet
fmt
logging format to use, if None uses a default format

Returns

A logger instance for name (always the same instance for the same name)

Expand source code
def init_logger(name=None, file=None, lvl=None, config=None, debug=False, args=None, fmt=None):
    """
    Configure the root logger (this only works the very first time, all subsequent
    invocations will not modify the root logger). The root logger is initialized
    with a standard format the given log level and, if specified the outputs to the
    given file.

    The get a new logger for the given name is retrieved using the given name or
    the invoking command if None. It is also set to the given logging leve and returned.

    TODO: If file is not given but args is given and has "outpref" parameter, log to
    file "outpref.DATETIME.log" as well.

    Args:
        name: name to use in the log, if None, __name__
        file: if given, log to this destination in addition to stderr
        lvl: set logging level
        config: if specified, set logger config from this file
        debug: if true, set the level to DEBUG
        args: not used yet
        fmt: logging format to use, if None uses a default format

    Returns:
        A logger instance for name (always the same instance for the same name)
    """

    if name is None:
        name = sys.argv[0]
    if fmt is None:
        fmt = LOGGING_FORMAT
    if lvl is None:
        if debug:
            lvl = logging.DEBUG
        else:
            lvl = logging.INFO
    if config:
        # NOTE we could also configure from a yaml file or a dictionary, see
        # http://zetcode.com/python/logging/
        # see doc on logging.config
        logging.config.fileConfig(fname=config)
    # get the root logger
    rl = logging.getLogger()
    # rl.setLevel(lvl)
    # NOTE: basicConfig does nothing if there is already a handler, so it only runs once, but we create the additional
    # handler for the file, if needed, only if the root logger has no handlers yet as well
    addhandlers = []
    fmt = logging.Formatter(fmt)
    hndlr = logging.StreamHandler(sys.stderr)
    hndlr.setFormatter(fmt)
    addhandlers.append(hndlr)
    if file and len(logging.getLogger().handlers) == 0:
        hndlr = logging.FileHandler(file)
        hndlr.setFormatter(fmt)
        addhandlers.append(hndlr)
    logging.basicConfig(
        level=lvl, handlers=addhandlers,
        # force=True # not supported in Python 3.7
    )
    # now get the handler for name
    logger = logging.getLogger(name)
    logger.setLevel(lvl)
    return logger
def isequal(x, y)
Expand source code
def isequal(x, y):
    return x == y
def match_substrings(text, items, getstr=None, cmp=None, unmatched=False)

Matches each item from the items sequence with some substring of the text in a greedy fashion. An item is either already a string or getstr is used to retrieve a string from it. The text and substrings are normally compared with normal string equality but cmp can be replaced with a two-argument function that does the comparison instead. This function expects that all items are present in the text, in their order and without overlapping! If this is not the case, an exception is raised.

Args

text
the text to use for matching
items
items that are or contains substrings to match
getstr
a function that retrieves the text from an item (Default value = None)
cmp
a function that compares to strings and returns a boolean that indicates if they should be considered to be equal. (Default value = None)
unmatched
if true returns two lists of tuples, where the second list contains the offsets of text not matched by the items (Default value = False)

Returns

a list of tuples (start, end, item) where start and end are the start and end offsets of a substring in the text and item is the item for that substring.

Expand source code
def match_substrings(text, items, getstr=None, cmp=None, unmatched=False):
    """
    Matches each item from the items sequence with some substring of the text
    in a greedy fashion. An item is either already a string or getstr is used
    to retrieve a string from it. The text and substrings are normally
    compared with normal string equality but cmp can be replaced with
    a two-argument function that does the comparison instead.
    This function expects that all items are present in the text, in their order
    and without overlapping! If this is not the case, an exception is raised.

    Args:
      text: the text to use for matching
      items: items that are or contains substrings to match
      getstr: a function that retrieves the text from an item (Default value = None)
      cmp: a function that compares to strings and returns a boolean \
    that indicates if they should be considered to be equal. (Default value = None)
      unmatched: if true returns two lists of tuples, where the second list\
    contains the offsets of text not matched by the items (Default value = False)

    Returns:
      a list of tuples (start, end, item) where start and end are the\
      start and end offsets of a substring in the text and item is the item for that substring.

    """
    if getstr is None:
        getstr = identity
    if cmp is None:
        cmp = isequal
    ltxt = len(text)
    ret = []
    ret2 = []
    item_idx = 0
    start = 0
    lastunmatched = 0
    while start < ltxt:
        itemorig = items[item_idx]
        item = getstr(itemorig)
        end = start + len(item)
        if end > ltxt:
            raise Exception("Text too short to match next item: {}".format(item))
        if cmp(text[start:end], item):
            if unmatched and start > lastunmatched:
                ret2.append((lastunmatched, start))
                lastunmatched = start + len(item)
            ret.append((start, end, itemorig))
            start += len(item)
            item_idx += 1
            if item_idx == len(items):
                break
        else:
            start += 1
    if item_idx != len(items):
        raise Exception(
            "Not all items matched but {} of {}".format(item_idx, len(items))
        )
    if unmatched and lastunmatched != ltxt:
        ret2.append((lastunmatched, ltxt))
    if unmatched:
        return ret, ret2
    else:
        return ret
def run_start(logger=None, name=None, lvl=None)

Define time when running starts.

Returns

system time in seconds

Expand source code
def run_start(logger=None, name=None, lvl=None):
    """
    Define time when running starts.

    Returns:
        system time in seconds
    """
    global start
    if logger is None:
        logger = init_logger(name=name, lvl=lvl)
    logger.info(
        "Started: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M%S"))
    )
    start = time.time()
    return start
def run_stop(logger=None, name=None)

Log and return formatted elapsed run time.

Returns

tuple of formatted run time, run time in seconds

Expand source code
def run_stop(logger=None, name=None):
    """
    Log and return formatted elapsed run time.

    Returns:
        tuple of formatted run time, run time in seconds
    """
    if logger is None:
        logger = init_logger(name=name)
    logger.info(
        "Stopped: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M%S"))
    )
    if start == 0:
        logger.warning("Run timing not set up properly, no time!")
        return "", 0
    stop = time.time()
    delta = stop - start
    deltastr = str(datetime.timedelta(seconds=delta))
    logger.info(f"Runtime: {deltastr}")
    return deltastr, delta
def support_annotation_or_set(method)

Decorator to allow a method that normally takes a start and end offset to take an annotation or annotation set, or any other object that has "start" and "end" attributes, or a pair of offsets instead.

It also allows to take a single offset instead in which case the end offset will get passed on as None: this is to support those methods which can take a span or a single offset.

If a set is passed, the minimum start offset and the maximum end offset of all annotations in the set are used.

If an annotation is passed, the annotation start and end offsets are used, and if the called method has a keyword parameter "ann" the annotation itself is also passed on.

Args

method
the method that gets converted by this decorator.

Returns

the adapted method which now takes an annotation or annotation set as well as start/end offsets.

Expand source code
def support_annotation_or_set(method):
    """
    Decorator to allow a method that normally takes a start and end
    offset to take an annotation or annotation set, or any other object that has
    "start" and "end" attributes, or a pair of offsets instead.

    It also allows to take a single offset instead in which case the end offset will
    get passed on as None: this is to support those methods which can take a span or a single
    offset.

    If a set is passed, the minimum start offset and the maximum end offset of all annotations in the
    set are used.

    If an annotation is passed, the annotation start and end offsets are used, and if the called method
    has a keyword parameter "ann" the annotation itself is also passed on.

    Args:
      method: the method that gets converted by this decorator.

    Returns:
        the adapted method which now takes an annotation or annotation set as well as start/end offsets.
    """

    @wraps(method)
    def _support_annotation_or_set(self, *args, **kwargs):
        from gatenlp.annotation import Annotation

        annid = None
        ann = None
        if len(args) == 1:
            obj = args[0]
            if isinstance(obj, Annotation):
                left, right = obj.start, obj.end
                ann = obj
            elif hasattr(obj, "start") and hasattr(obj, "end"):
                left, right = obj.start, obj.end
            elif isinstance(obj, (tuple, list)) and len(obj) == 2:
                left, right = obj
            elif isinstance(obj, numbers.Integral):
                left, right = obj, None
            else:
                raise Exception(
                    "Not an annotation or an annotation set or pair: {}".format(args[0])
                )
        else:
            assert len(args) == 2
            left, right = args
        # if the called method/function does have an annid keyword, pass it, otherwise omit
        if "ann" in method.__code__.co_varnames:
            return method(self, left, right, ann=ann, **kwargs)
        else:
            return method(self, left, right, **kwargs)

    return _support_annotation_or_set