Module gatenlp.serialization.default_msgpack

Module that implements the various ways of how to save and load documents and change logs.

Expand source code
"""
Module that implements the various ways of how to save and load documents and change logs.
"""
import io
from gatenlp.document import Document
from gatenlp.annotation_set import AnnotationSet
from gatenlp.annotation import Annotation
from gatenlp.changelog import ChangeLog
from gatenlp.features import Features
from gatenlp.urlfileutils import is_url, get_bytes_from_url


MSGPACK_VERSION_HDR = "sm2"


class MsgPackSerializer:
    """ """

    @staticmethod
    def document2stream(doc: Document, stream):
        """

        Args:
          doc: Document:
          stream:
          doc: Document:

        Returns:

        """
        from msgpack import pack

        pack(MSGPACK_VERSION_HDR, stream)
        pack(doc.offset_type, stream)
        pack(doc.text, stream)
        pack(doc.name, stream)
        pack(doc._features.to_dict(), stream)
        pack(len(doc._annotation_sets), stream)
        for name, annset in doc._annotation_sets.items():
            pack(name, stream)
            pack(annset._next_annid, stream)
            pack(len(annset), stream)
            for ann in annset.fast_iter():
                pack(ann.type, stream)
                pack(ann.start, stream)
                pack(ann.end, stream)
                pack(ann.id, stream)
                pack(ann.features.to_dict(), stream)

    @staticmethod
    def stream2document(stream):
        """

        Args:
          stream:

        Returns:

        """
        from msgpack import Unpacker

        u = Unpacker(stream)
        version = u.unpack()
        if version != MSGPACK_VERSION_HDR:
            raise Exception("MsgPack data starts with wrong version")
        doc = Document()
        doc.offset_type = u.unpack()
        doc._text = u.unpack()
        doc.name = u.unpack()
        doc._features = Features(u.unpack())
        nsets = u.unpack()
        setsdict = dict()
        doc.annotation_sets = setsdict
        for iset in range(nsets):
            sname = u.unpack()
            if sname is None:
                sname = ""
            annset = AnnotationSet(name=sname, owner_doc=doc)
            annset._next_annid = u.unpack()
            nanns = u.unpack()
            for iann in range(nanns):
                atype = u.unpack()
                astart = u.unpack()
                aend = u.unpack()
                aid = u.unpack()
                afeatures = u.unpack()
                ann = Annotation(astart, aend, atype, annid=aid, features=afeatures)
                annset._annotations[aid] = ann
            annset._annset.update(annset._annotations.values())
            setsdict[sname] = annset
        doc._annotation_sets = setsdict
        return doc

    @staticmethod
    def save(
        clazz,
        inst,
        to_ext=None,
        to_mem=None,
        offset_type=None,
        offset_mapper=None,
        **kwargs,
    ):
        """

        Args:
          clazz:
          inst:
          to_ext: (Default value = None)
          to_mem: (Default value = None)
          offset_type: (Default value = None)
          offset_mapper: (Default value = None)
          **kwargs:

        Returns:

        """
        if isinstance(inst, Document):
            writer = MsgPackSerializer.document2stream
        elif isinstance(inst, ChangeLog):
            raise Exception("Not implemented yet")
        else:
            raise Exception("Object not supported")
        if to_mem:
            f = io.BytesIO()
        else:
            f = open(to_ext, "wb")
        writer(inst, f)
        if to_mem:
            return f.getvalue()
        else:
            f.close()

    @staticmethod
    def load(clazz, from_ext=None, from_mem=None, offset_mapper=None, **kwargs):
        """

        Args:
          clazz:
          from_ext: (Default value = None)
          from_mem: (Default value = None)
          offset_mapper: (Default value = None)
          **kwargs:

        Returns:

        """
        if clazz == Document:
            reader = MsgPackSerializer.stream2document
        elif clazz == ChangeLog:
            raise Exception("Not implemented yet")
        else:
            raise Exception("Object not supported")

        isurl, extstr = is_url(from_ext)
        if from_ext is not None:
            if isurl:
                from_mem = get_bytes_from_url(extstr)
        if from_mem:
            f = io.BytesIO(from_mem)
        else:
            f = open(extstr, "rb")
        doc = reader(f)
        return doc

Classes

class MsgPackSerializer
Expand source code
class MsgPackSerializer:
    """ """

    @staticmethod
    def document2stream(doc: Document, stream):
        """

        Args:
          doc: Document:
          stream:
          doc: Document:

        Returns:

        """
        from msgpack import pack

        pack(MSGPACK_VERSION_HDR, stream)
        pack(doc.offset_type, stream)
        pack(doc.text, stream)
        pack(doc.name, stream)
        pack(doc._features.to_dict(), stream)
        pack(len(doc._annotation_sets), stream)
        for name, annset in doc._annotation_sets.items():
            pack(name, stream)
            pack(annset._next_annid, stream)
            pack(len(annset), stream)
            for ann in annset.fast_iter():
                pack(ann.type, stream)
                pack(ann.start, stream)
                pack(ann.end, stream)
                pack(ann.id, stream)
                pack(ann.features.to_dict(), stream)

    @staticmethod
    def stream2document(stream):
        """

        Args:
          stream:

        Returns:

        """
        from msgpack import Unpacker

        u = Unpacker(stream)
        version = u.unpack()
        if version != MSGPACK_VERSION_HDR:
            raise Exception("MsgPack data starts with wrong version")
        doc = Document()
        doc.offset_type = u.unpack()
        doc._text = u.unpack()
        doc.name = u.unpack()
        doc._features = Features(u.unpack())
        nsets = u.unpack()
        setsdict = dict()
        doc.annotation_sets = setsdict
        for iset in range(nsets):
            sname = u.unpack()
            if sname is None:
                sname = ""
            annset = AnnotationSet(name=sname, owner_doc=doc)
            annset._next_annid = u.unpack()
            nanns = u.unpack()
            for iann in range(nanns):
                atype = u.unpack()
                astart = u.unpack()
                aend = u.unpack()
                aid = u.unpack()
                afeatures = u.unpack()
                ann = Annotation(astart, aend, atype, annid=aid, features=afeatures)
                annset._annotations[aid] = ann
            annset._annset.update(annset._annotations.values())
            setsdict[sname] = annset
        doc._annotation_sets = setsdict
        return doc

    @staticmethod
    def save(
        clazz,
        inst,
        to_ext=None,
        to_mem=None,
        offset_type=None,
        offset_mapper=None,
        **kwargs,
    ):
        """

        Args:
          clazz:
          inst:
          to_ext: (Default value = None)
          to_mem: (Default value = None)
          offset_type: (Default value = None)
          offset_mapper: (Default value = None)
          **kwargs:

        Returns:

        """
        if isinstance(inst, Document):
            writer = MsgPackSerializer.document2stream
        elif isinstance(inst, ChangeLog):
            raise Exception("Not implemented yet")
        else:
            raise Exception("Object not supported")
        if to_mem:
            f = io.BytesIO()
        else:
            f = open(to_ext, "wb")
        writer(inst, f)
        if to_mem:
            return f.getvalue()
        else:
            f.close()

    @staticmethod
    def load(clazz, from_ext=None, from_mem=None, offset_mapper=None, **kwargs):
        """

        Args:
          clazz:
          from_ext: (Default value = None)
          from_mem: (Default value = None)
          offset_mapper: (Default value = None)
          **kwargs:

        Returns:

        """
        if clazz == Document:
            reader = MsgPackSerializer.stream2document
        elif clazz == ChangeLog:
            raise Exception("Not implemented yet")
        else:
            raise Exception("Object not supported")

        isurl, extstr = is_url(from_ext)
        if from_ext is not None:
            if isurl:
                from_mem = get_bytes_from_url(extstr)
        if from_mem:
            f = io.BytesIO(from_mem)
        else:
            f = open(extstr, "rb")
        doc = reader(f)
        return doc

Static methods

def document2stream(doc: Document, stream)

Args

doc
Document:
stream:
doc
Document:

Returns:

Expand source code
@staticmethod
def document2stream(doc: Document, stream):
    """

    Args:
      doc: Document:
      stream:
      doc: Document:

    Returns:

    """
    from msgpack import pack

    pack(MSGPACK_VERSION_HDR, stream)
    pack(doc.offset_type, stream)
    pack(doc.text, stream)
    pack(doc.name, stream)
    pack(doc._features.to_dict(), stream)
    pack(len(doc._annotation_sets), stream)
    for name, annset in doc._annotation_sets.items():
        pack(name, stream)
        pack(annset._next_annid, stream)
        pack(len(annset), stream)
        for ann in annset.fast_iter():
            pack(ann.type, stream)
            pack(ann.start, stream)
            pack(ann.end, stream)
            pack(ann.id, stream)
            pack(ann.features.to_dict(), stream)
def load(clazz, from_ext=None, from_mem=None, offset_mapper=None, **kwargs)

Args

clazz:
from_ext
(Default value = None)
from_mem
(Default value = None)
offset_mapper
(Default value = None)

**kwargs: Returns:

Expand source code
@staticmethod
def load(clazz, from_ext=None, from_mem=None, offset_mapper=None, **kwargs):
    """

    Args:
      clazz:
      from_ext: (Default value = None)
      from_mem: (Default value = None)
      offset_mapper: (Default value = None)
      **kwargs:

    Returns:

    """
    if clazz == Document:
        reader = MsgPackSerializer.stream2document
    elif clazz == ChangeLog:
        raise Exception("Not implemented yet")
    else:
        raise Exception("Object not supported")

    isurl, extstr = is_url(from_ext)
    if from_ext is not None:
        if isurl:
            from_mem = get_bytes_from_url(extstr)
    if from_mem:
        f = io.BytesIO(from_mem)
    else:
        f = open(extstr, "rb")
    doc = reader(f)
    return doc
def save(clazz, inst, to_ext=None, to_mem=None, offset_type=None, offset_mapper=None, **kwargs)

Args

clazz:
inst:
to_ext
(Default value = None)
to_mem
(Default value = None)
offset_type
(Default value = None)
offset_mapper
(Default value = None)

**kwargs: Returns:

Expand source code
@staticmethod
def save(
    clazz,
    inst,
    to_ext=None,
    to_mem=None,
    offset_type=None,
    offset_mapper=None,
    **kwargs,
):
    """

    Args:
      clazz:
      inst:
      to_ext: (Default value = None)
      to_mem: (Default value = None)
      offset_type: (Default value = None)
      offset_mapper: (Default value = None)
      **kwargs:

    Returns:

    """
    if isinstance(inst, Document):
        writer = MsgPackSerializer.document2stream
    elif isinstance(inst, ChangeLog):
        raise Exception("Not implemented yet")
    else:
        raise Exception("Object not supported")
    if to_mem:
        f = io.BytesIO()
    else:
        f = open(to_ext, "wb")
    writer(inst, f)
    if to_mem:
        return f.getvalue()
    else:
        f.close()
def stream2document(stream)

Args

stream: Returns:

Expand source code
@staticmethod
def stream2document(stream):
    """

    Args:
      stream:

    Returns:

    """
    from msgpack import Unpacker

    u = Unpacker(stream)
    version = u.unpack()
    if version != MSGPACK_VERSION_HDR:
        raise Exception("MsgPack data starts with wrong version")
    doc = Document()
    doc.offset_type = u.unpack()
    doc._text = u.unpack()
    doc.name = u.unpack()
    doc._features = Features(u.unpack())
    nsets = u.unpack()
    setsdict = dict()
    doc.annotation_sets = setsdict
    for iset in range(nsets):
        sname = u.unpack()
        if sname is None:
            sname = ""
        annset = AnnotationSet(name=sname, owner_doc=doc)
        annset._next_annid = u.unpack()
        nanns = u.unpack()
        for iann in range(nanns):
            atype = u.unpack()
            astart = u.unpack()
            aend = u.unpack()
            aid = u.unpack()
            afeatures = u.unpack()
            ann = Annotation(astart, aend, atype, annid=aid, features=afeatures)
            annset._annotations[aid] = ann
        annset._annset.update(annset._annotations.values())
        setsdict[sname] = annset
    doc._annotation_sets = setsdict
    return doc