Module gatenlp.corpora.base

Module that defines base classes for representing document collections.

Corpus classes represent collections with a fixed number of documents, where each document can be accessed and stored by its index number, much like lists/arrays of documents.

DocumentSource classes represent collections that can be iterated over, producing a sequence of Documents, one document a time.

DocumentDestination classes represent collections that can receive Documents one document a time.

Expand source code
"""
Module that defines base classes for representing document collections.

Corpus classes represent collections with a fixed number of documents, where each document can be
accessed and stored by its index number, much like lists/arrays of documents.

DocumentSource classes represent collections that can be iterated over, producing a sequence of Documents,
one document a time.

DocumentDestination classes represent collections that can receive Documents one document a time.
"""
import bisect
import random
from abc import ABC, abstractmethod
from typing import Iterable as TypingIterable
from typing import Iterator as TypingIterator
from typing import Sized, Tuple
from typing import Union
from itertools import accumulate, chain
from contextlib import AbstractContextManager
import numbers
from gatenlp.document import Document

__pdoc__ = {
    "Corpus.__getitem__": True,
    "Corpus.__setitem__": True,
    "Corpus.__len__": True,
    "DocumentSource.__iter__": True,
}


class CorpusSourceBase:
    """
    Common base trait for Corpus and Source classes. So far just provides methods to
    get nparts and partnr even for objects which do not support to be shared between multiple workers.
    """
    @property
    def nparts(self):
        """
        Return the total number of parts for an EveryNth corpus or document source.
        This is 1 for all other corpus/source instances.
        """
        return 1

    @property
    def partnr(self):
        """
        Return the part number for an EveryNth corpus or document source.
        This is 0 for all other corpus/source instances.
        """
        return 0

    def relpathfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the relative path a document was loaded from.
        """
        return "_relpath"

    def setrelpathfeature(self, doc: Document, relpath: str):
        """
        Sets the special transient feature of the document to the given relative path.

        Args:
            doc: the document
            relpath: the relative path the document was created from
        """
        if doc is not None:
            doc.features[self.relpathfeatname()] = relpath


class MultiProcessingAble:
    """
    A document source/destination/corpus class where duplicate instances can be used by several
    processes on the same node in parallel.
    """
    pass


class DistributedProcessingAble(MultiProcessingAble):
    """
    A document source/destination/corpus class where duplicate instances can be used from several nodes in parallel.
    """
    pass


class Corpus(ABC, CorpusSourceBase, Sized):
    """
    A corpus represents a collection of documents with a fixed number of elements which can be read and written
    using an index number, e.g. `doc = corpus[2]` and `corpus[2] = doc`. For each index in the allowed range,
    the element is either a document or (for a few corpus implementations) None (indicating that no document
    is available for this index).

    The index is an int with range 0 to N-1 where N is the number of documents in the corpus.

    NOTE: for most corpus implementations, setting an index to None should not be allowed as this would
    not work with batching and the use of the `store` method to save documents back into the corpus.
    """

    @abstractmethod
    def __getitem__(self, idx: int) -> Document:
        """
        Retrieve a document from the corpus. Note that fetching a document from the corpus will usually set
        a special transient document feature that contains the index of the document so it can be
        stored back at the same index using the method `store()` later. If a corpus implementaton does not
        set that feature, batching and the use of the `store()` method to save back documents are not
        supported.

        Args:
            idx: the index of the document

        Returns:
            a document or None

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __setitem__(self, idx: int, doc: Document) -> None:
        """
        A corpus object must allow setting an item by its idx, e.g. `mycorpus[2] = doc`
        The item assigned must be a document or (in rare cases) None.

        Args:
            idx: the index of the document
            doc: a document

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __len__(self) -> int:
        """
        Returns the size of the corpus.
        """
        pass

    def idxfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the index used to access a document
        from a corpus.
        """
        return "__idx_" + str(id(self))

    def setidxfeature(self, doc: Document, idx: int):
        """
        Sets the special transient feature of the document to the given index.

        Args:
            doc: the document
            idx: the index used to access the document in a corpus
        """
        if doc is not None:
            doc.features[self.idxfeatname()] = idx

    def store(self, doc: Document) -> None:
        """
        This method allows to store a document that comes from the same corpus back without the need to specify
        the index. This is useful for processing documents in batches or in streams. For this to work, all
        corpus implementations MUST make sure to store the index as part of returning a document with
        `__getitem__`. The index is stored in document feature `self.idxfeatname()`.

        Args:
            doc: the document to store back into the corpus, should be a document that was retrieved from the same
                 corpus or (in very rare cases and with specific corpus implementations only) None.
                 The default behaviour for None is to throw an exception, this must be overriden by
                 subclasses where store(None) should be supported.

        Raises:
            Exception: if the index is not stored in a document feature `self.idxfeatname()`
        """
        if doc is None:
            raise Exception("Cannot store back None into a corpus")
        assert isinstance(doc, Document)
        idx = doc.features.get(self.idxfeatname())
        if idx is None:
            raise Exception("Cannot store back document, no __idx_ID feature")
        self.__setitem__(idx, doc)

    def append(self, document: Document) -> int:
        """
        Some corpus implementations may provide the append method to allow for adding documents (i.e.
        use the corpus like a DocumentDestination).

        Important: this will probably not work properly in situations where another
        corpus wraps a corpus that allows appending. Use with care!

        Args:
            document: the document to add to the corpus or (in rare cases and for specific Corpus
                implementations) None.

        Returns:
            the index where the document was stored
        """
        raise RuntimeError("Corpus does not allow appending")


class DocumentSource(ABC, TypingIterable, CorpusSourceBase):
    """
    A document source is an iterable of documents which will generate an unknown number of documents.
    """
    def __init__(self):
        self._n = 0

    def __iter__(self) -> TypingIterator[Document]:
        pass

    @property
    def n(self):
        return self._n

    def __enter__(self):
        return self

    def __exit__(self, extype, value, traceback):
        pass

    def close(self):
        pass



# NOTE: AbstractContextManager already inherits from ABC, so no need to list as base class here!
class DocumentDestination(AbstractContextManager):
    """
    A document destination is something that accepts an a priori unknown number of documents via
    the append method.

    Document destinations all provide a `close()` method and must be closed after use.

    Document destinations can be used as context managers i.e. one can do
    `with SomeDocumentDest(..) as dest: dest.append(doc)` which will take care of closing the
    destination automatically.
    """

    def __init__(self):
        self._n = 0

    @abstractmethod
    def append(self, doc: Document) -> None:
        """
        Append the given document to the destination.

        Args:
            doc: the document to add, if this is None, by default nothing is actually added to the destination,
                but specific implementations may change this behaviour.
        """
        pass

    def close(self) -> None:
        """
        Close the document destination. The default context manager implementation always calls
        close(), even when an exception is raised.
        """
        pass

    def __exit__(self, exctype, value, traceback) -> bool:
        """
        The default implementation always invokes close() and
        does not suppress any exception (always returns False)
        """
        self.close()
        return False   # do not suppress any exception

    def relpathfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the relative path a document was loaded from.
        """
        return "_relpath"

    @property
    def n(self):
        return self._n


class StringIdCorpus:
    """
    A corpus which allows to use string ids in addition to integer indices for setting and getting documents.
    """
    # NOTE: the only thing that is really different is the type signature for some of the methods
    @abstractmethod
    def __getitem__(self, key: Union[int, str]) -> Document:
        """
        Retrieve a document from the corpus by either its numeric index or its string id.

        Args:
            key: the index of the document or the unique string id

        Returns:
            a document or None

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __setitem__(self, idx: Union[int, str], doc: Document) -> None:
        """
        Store a document into the corpus by either its numeric index or its strign id.

        Args:
            idx: the index or the string id of the document
            doc: a document

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass


class EveryNthBase:
    """
    A Source or Corpus that wraps another Source or Corpus so that only every nth document, starting
    with some document 0 <= k < n is included.

    Such classes must provide the initialization keyword parameters partnr and nparts which may
    have default values of 0 and 1 for single part resources.
    """
    def __init__(self, nparts=1, partnr=0):
        self._nparts = nparts
        self._partnr = partnr

    @property
    def nparts(self):
        return self._nparts

    @property
    def partnr(self):
        return self._partnr


class EveryNthSource(EveryNthBase, DocumentSource):
    """
    A wrapper to make any DocumentSource that is multiprocessing or distributed processing-able
    viewable in parts.

    Wraps a document source to only return every nparts-th document, starting with the partnr-th document.
    For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the
    documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get
    documents 2,5,8,11,14 etc.
    """
    def __init__(self,
                 source: DocumentSource,
                 nparts: int = 1,
                 partnr: int = 0):
        assert isinstance(source, MultiProcessingAble)
        assert isinstance(source, DocumentSource)
        # this uses Integral so we can also support integral types from Numpy etc!
        if (not isinstance(nparts, numbers.Integral)) or (
            not isinstance(partnr, numbers.Integral)
        ):
            raise Exception("nparts and partnr must be integers.")
        super().__init__(nparts=nparts, partnr=partnr)
        self.source = source

        if nparts < 2 or partnr < 0 or partnr >= nparts:
            raise Exception("nparts must be >= 2 and partnr must be >= 0 and < nparts")
        self.source = source

    def __iter__(self) -> TypingIterator[Document]:
        for idx, doc in enumerate(self.source):
            if idx % self.nparts == self.partnr:
                yield doc


class EveryNthCorpus(EveryNthBase, Corpus):
    """
    A wrapper to make any corpus that is multiprocessing/distributedprocessing-able shardable.

    Wraps a corpus to only every nparts-th document, starting with the partnr-th document.
    For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the
    documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get
    documents 2,5,8,11,14 etc.

    This is useful to access a subset of documents from a corpus from different concurrent
    processes (the wrapped corpus must be MultiProcessingAble for that!).
    """
    def __init__(self, corpus: Corpus, nparts: int = 1, partnr: int = 0):
        assert isinstance(corpus, MultiProcessingAble)
        assert isinstance(corpus, Corpus)
        # this uses Integral so we can also support integral types from Numpy etc!
        if (not isinstance(nparts, numbers.Integral)) or (
            not isinstance(partnr, numbers.Integral)
        ):
            raise Exception("nparts and partnr must be integers.")
        if nparts < 2 or partnr < 0 or partnr >= nparts:
            raise Exception("nparts must be >= 2 and partnr must be >= 0 and < nparts")
        super().__init__(nparts=nparts, partnr=partnr)
        self.corpus = corpus

    def __len__(self):
        olen = len(self.corpus)
        # alternate way to calculate?
        # int((olen + (self.nparts - self.partnr) - 1) / self.partnr)
        return int(olen/self.nparts) + (1 if (olen % self.nparts) > self.partnr else 0)

    def _orig_idx(self, idx: int) -> int:
        return idx * self.nparts + self.partnr

    def __getitem__(self, idx: int) -> Document:
        # NOTE: we do not store the index in a feature for this wrapper as the wrapped
        # corpus index is eventually the only one that matters
        return self.corpus[self._orig_idx(idx)]

    def __setitem__(self, idx: int, doc: Document) -> None:
        self.corpus[self._orig_idx(idx)] = doc

    def store(self, doc: Document) -> None:
        # stored using the feature from the original index!
        self.corpus.store(doc)

    def append(self, document: Document) -> int:
        raise Exception("Method append not supported for EveryNthCorpus")


class ShuffledCorpus(Corpus):
    """
    Wraps a corpus to reorder the documents in the corpus randomly.
    """

    def __init__(self, corpus, seed=None):
        """
        Create a ShuffledCorpus wrapper.

        Args:
            seed: if an integer and > 0, shuffle the list of instances randomly, using the given seed.
                If the seed is 0, the RNGs random random seed is used, if seed is -1, the seed is not set at all
                and whatever the current state of the random generator is is used. If None, no shuffling is
                carried out. If this is None or not an integer, same as 0.
        """
        super().__init__()
        self.corpus = corpus
        self.seed = seed
        self.idxs = list(range(len(corpus)))
        self.shuffle(seed)

    def shuffle(self, seed=0):
        """
        Shuffle instance list order,
        :param seed: random seed to set, if seed is 0, a random random seed is used, if -1, seed is not set.
        If seed is None, no shuffling is carried out.
        :return:
        """
        if isinstance(seed, numbers.Integral):  # also allow for np.int8(n) and the like
            if seed != -1:
                if seed == 0:
                    random.seed()
                else:
                    random.seed(seed)
            random.shuffle(self.idxs)
        else:  # not an integer seed: None or some other type
            # same as seed 0
            random.seed()
            random.shuffle(self.idxs)

    def __getitem__(self, idx):
        doc = self.corpus[self.idxs[idx]]
        self.setidxfeature(doc, idx)
        return self.corpus[self.idxs[idx]]

    def __setitem__(self, idx, doc):
        if not isinstance(idx, numbers.Integral):
            raise Exception("Item must be an integer")
        if idx >= len(self.idxs) or idx < 0:
            raise Exception("Index idx must be >= 0 and < {}".format(len(self)))
        # the index to access in the original dataset is int(n*item)+k
        self.corpus[self.idxs[idx]] = doc

    def __len__(self):
        return len(self.idxs)

    def append(self, document: Document) -> int:
        raise Exception("Method append not supported for ShuffledCorpus")


class ConcatCorpus(Corpus):
    """
    Wraps a list of corpora to make them appear as a single corpus
    """
    def __init__(self, corpora: TypingIterable[Corpus]):
        """
        Create a ConcatCorpus from the iterable of corpus instances.

        Parameters:
            corpora: an iterable of corpus instances
        """
        self.corpora = list(corpora)
        self.sizes = [len(c) for c in corpora]
        self.idxs = list(accumulate(self.sizes))

    def idx2ci(self, idx: int) -> Tuple[int, int]:
        """
        For a given index idx, return a tuple with the index of the corpus in the corpus list and the index
        of the entry within that corpus
        """
        # find the index of the corpus: in the list of accumulated sizes, it is the entry that is still less
        # than the given index
        cidx = bisect.bisect_left(self.idxs, idx)
        if cidx == len(self.idxs):
            raise Exception(f"Index {idx} out of range, total number of documents over all corpora is {self.idxs[-1]}")
        if cidx == 0:
            eidx = idx
        else:
            eidx = self.idxs[cidx-1] - idx
        return cidx, eidx

    def __getitem__(self, idx):
        cidx, eidx = self.idx2ci(idx)
        return self.corpora[cidx][eidx]

    def __setitem__(self, idx, doc):
        cidx, eidx = self.idx2ci(idx)
        self.corpora[cidx][eidx] = doc


class ConcatSource(DocumentSource):
    """
    Wraps a list of document source objects to make them appear as a single source.
    """
    def __init__(self, sources: TypingIterable[DocumentSource]):
        """
        Create a ConcatSource from an iterable of DocumentSource instances
        """
        self.sources = sources

    def __iter__(self):
        for doc in chain(*self.sources):
            yield doc


class CachedCorpus(Corpus):
    """
    Wraps two other corpora: the base corpus which may be slow to access, may not be writable etc. and the
    cache corpus which is meant to be fast. The cache corpus may initially contain only None elements or no
    files. This wrapper caches documents when they are written to, but this can be changed to caching on read.
    """

    def __init__(self, basecorpus, cachecorpus, cacheonread=False):
        """
        TODO: this is still work in progress!

        Creates a cached corpus.
        This accesses data from the cachecorpus, if it does not exist in there (entry is,
        None) will instead fall back to the base corpus.

        This cached corpus can be set up to cache on read or cache on write.

        Args:
            basecorpus: any corpus
            cachecorpus: any corpus that can return None for non-existing elements, e.g. a NumberedDirFilesCorpus
              or just an in-memory list or array.
            cacheonread: if True, writes to the cache as soon as an item has been read from the base dataset.
                Otherwise will only write to the cache dataset when an item is set. This allows to cache the result
                of processing efficiently.
        """
        assert len(cachecorpus) == len(basecorpus)
        self.basecorpus = basecorpus
        self.cachecorpus = cachecorpus
        self.cacheonread = cacheonread

    def __len__(self):
        return len(self.basecorpus)

    def __getitem__(self, index):
        tmp = self.cachecorpus[index]
        if tmp is None:
            tmp = self.basecorpus[index]
            if self.cacheonread:
                self.basecorpus[index] = tmp
        self.setidxfeature(tmp, index)
        return tmp

    def __setitem__(self, index, value):
        self.cachecorpus[index] = value


class NullDestination(DocumentDestination):
    def __init__(self):
        super().__init__()

    def append(self, doc: Document):
        self._n += 1

Classes

class CachedCorpus (basecorpus, cachecorpus, cacheonread=False)

Wraps two other corpora: the base corpus which may be slow to access, may not be writable etc. and the cache corpus which is meant to be fast. The cache corpus may initially contain only None elements or no files. This wrapper caches documents when they are written to, but this can be changed to caching on read.

TODO: this is still work in progress!

Creates a cached corpus. This accesses data from the cachecorpus, if it does not exist in there (entry is, None) will instead fall back to the base corpus.

This cached corpus can be set up to cache on read or cache on write.

Args

basecorpus
any corpus
cachecorpus
any corpus that can return None for non-existing elements, e.g. a NumberedDirFilesCorpus or just an in-memory list or array.
cacheonread
if True, writes to the cache as soon as an item has been read from the base dataset. Otherwise will only write to the cache dataset when an item is set. This allows to cache the result of processing efficiently.
Expand source code
class CachedCorpus(Corpus):
    """
    Wraps two other corpora: the base corpus which may be slow to access, may not be writable etc. and the
    cache corpus which is meant to be fast. The cache corpus may initially contain only None elements or no
    files. This wrapper caches documents when they are written to, but this can be changed to caching on read.
    """

    def __init__(self, basecorpus, cachecorpus, cacheonread=False):
        """
        TODO: this is still work in progress!

        Creates a cached corpus.
        This accesses data from the cachecorpus, if it does not exist in there (entry is,
        None) will instead fall back to the base corpus.

        This cached corpus can be set up to cache on read or cache on write.

        Args:
            basecorpus: any corpus
            cachecorpus: any corpus that can return None for non-existing elements, e.g. a NumberedDirFilesCorpus
              or just an in-memory list or array.
            cacheonread: if True, writes to the cache as soon as an item has been read from the base dataset.
                Otherwise will only write to the cache dataset when an item is set. This allows to cache the result
                of processing efficiently.
        """
        assert len(cachecorpus) == len(basecorpus)
        self.basecorpus = basecorpus
        self.cachecorpus = cachecorpus
        self.cacheonread = cacheonread

    def __len__(self):
        return len(self.basecorpus)

    def __getitem__(self, index):
        tmp = self.cachecorpus[index]
        if tmp is None:
            tmp = self.basecorpus[index]
            if self.cacheonread:
                self.basecorpus[index] = tmp
        self.setidxfeature(tmp, index)
        return tmp

    def __setitem__(self, index, value):
        self.cachecorpus[index] = value

Ancestors

Inherited members

class ConcatCorpus (corpora: Iterable[Corpus])

Wraps a list of corpora to make them appear as a single corpus

Create a ConcatCorpus from the iterable of corpus instances.

Parameters

corpora: an iterable of corpus instances

Expand source code
class ConcatCorpus(Corpus):
    """
    Wraps a list of corpora to make them appear as a single corpus
    """
    def __init__(self, corpora: TypingIterable[Corpus]):
        """
        Create a ConcatCorpus from the iterable of corpus instances.

        Parameters:
            corpora: an iterable of corpus instances
        """
        self.corpora = list(corpora)
        self.sizes = [len(c) for c in corpora]
        self.idxs = list(accumulate(self.sizes))

    def idx2ci(self, idx: int) -> Tuple[int, int]:
        """
        For a given index idx, return a tuple with the index of the corpus in the corpus list and the index
        of the entry within that corpus
        """
        # find the index of the corpus: in the list of accumulated sizes, it is the entry that is still less
        # than the given index
        cidx = bisect.bisect_left(self.idxs, idx)
        if cidx == len(self.idxs):
            raise Exception(f"Index {idx} out of range, total number of documents over all corpora is {self.idxs[-1]}")
        if cidx == 0:
            eidx = idx
        else:
            eidx = self.idxs[cidx-1] - idx
        return cidx, eidx

    def __getitem__(self, idx):
        cidx, eidx = self.idx2ci(idx)
        return self.corpora[cidx][eidx]

    def __setitem__(self, idx, doc):
        cidx, eidx = self.idx2ci(idx)
        self.corpora[cidx][eidx] = doc

Ancestors

Methods

def idx2ci(self, idx: int) ‑> Tuple[int, int]

For a given index idx, return a tuple with the index of the corpus in the corpus list and the index of the entry within that corpus

Expand source code
def idx2ci(self, idx: int) -> Tuple[int, int]:
    """
    For a given index idx, return a tuple with the index of the corpus in the corpus list and the index
    of the entry within that corpus
    """
    # find the index of the corpus: in the list of accumulated sizes, it is the entry that is still less
    # than the given index
    cidx = bisect.bisect_left(self.idxs, idx)
    if cidx == len(self.idxs):
        raise Exception(f"Index {idx} out of range, total number of documents over all corpora is {self.idxs[-1]}")
    if cidx == 0:
        eidx = idx
    else:
        eidx = self.idxs[cidx-1] - idx
    return cidx, eidx

Inherited members

class ConcatSource (sources: Iterable[DocumentSource])

Wraps a list of document source objects to make them appear as a single source.

Create a ConcatSource from an iterable of DocumentSource instances

Expand source code
class ConcatSource(DocumentSource):
    """
    Wraps a list of document source objects to make them appear as a single source.
    """
    def __init__(self, sources: TypingIterable[DocumentSource]):
        """
        Create a ConcatSource from an iterable of DocumentSource instances
        """
        self.sources = sources

    def __iter__(self):
        for doc in chain(*self.sources):
            yield doc

Ancestors

Inherited members

class Corpus (*args, **kwds)

A corpus represents a collection of documents with a fixed number of elements which can be read and written using an index number, e.g. doc = corpus[2] and corpus[2] = doc. For each index in the allowed range, the element is either a document or (for a few corpus implementations) None (indicating that no document is available for this index).

The index is an int with range 0 to N-1 where N is the number of documents in the corpus.

NOTE: for most corpus implementations, setting an index to None should not be allowed as this would not work with batching and the use of the store method to save documents back into the corpus.

Expand source code
class Corpus(ABC, CorpusSourceBase, Sized):
    """
    A corpus represents a collection of documents with a fixed number of elements which can be read and written
    using an index number, e.g. `doc = corpus[2]` and `corpus[2] = doc`. For each index in the allowed range,
    the element is either a document or (for a few corpus implementations) None (indicating that no document
    is available for this index).

    The index is an int with range 0 to N-1 where N is the number of documents in the corpus.

    NOTE: for most corpus implementations, setting an index to None should not be allowed as this would
    not work with batching and the use of the `store` method to save documents back into the corpus.
    """

    @abstractmethod
    def __getitem__(self, idx: int) -> Document:
        """
        Retrieve a document from the corpus. Note that fetching a document from the corpus will usually set
        a special transient document feature that contains the index of the document so it can be
        stored back at the same index using the method `store()` later. If a corpus implementaton does not
        set that feature, batching and the use of the `store()` method to save back documents are not
        supported.

        Args:
            idx: the index of the document

        Returns:
            a document or None

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __setitem__(self, idx: int, doc: Document) -> None:
        """
        A corpus object must allow setting an item by its idx, e.g. `mycorpus[2] = doc`
        The item assigned must be a document or (in rare cases) None.

        Args:
            idx: the index of the document
            doc: a document

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __len__(self) -> int:
        """
        Returns the size of the corpus.
        """
        pass

    def idxfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the index used to access a document
        from a corpus.
        """
        return "__idx_" + str(id(self))

    def setidxfeature(self, doc: Document, idx: int):
        """
        Sets the special transient feature of the document to the given index.

        Args:
            doc: the document
            idx: the index used to access the document in a corpus
        """
        if doc is not None:
            doc.features[self.idxfeatname()] = idx

    def store(self, doc: Document) -> None:
        """
        This method allows to store a document that comes from the same corpus back without the need to specify
        the index. This is useful for processing documents in batches or in streams. For this to work, all
        corpus implementations MUST make sure to store the index as part of returning a document with
        `__getitem__`. The index is stored in document feature `self.idxfeatname()`.

        Args:
            doc: the document to store back into the corpus, should be a document that was retrieved from the same
                 corpus or (in very rare cases and with specific corpus implementations only) None.
                 The default behaviour for None is to throw an exception, this must be overriden by
                 subclasses where store(None) should be supported.

        Raises:
            Exception: if the index is not stored in a document feature `self.idxfeatname()`
        """
        if doc is None:
            raise Exception("Cannot store back None into a corpus")
        assert isinstance(doc, Document)
        idx = doc.features.get(self.idxfeatname())
        if idx is None:
            raise Exception("Cannot store back document, no __idx_ID feature")
        self.__setitem__(idx, doc)

    def append(self, document: Document) -> int:
        """
        Some corpus implementations may provide the append method to allow for adding documents (i.e.
        use the corpus like a DocumentDestination).

        Important: this will probably not work properly in situations where another
        corpus wraps a corpus that allows appending. Use with care!

        Args:
            document: the document to add to the corpus or (in rare cases and for specific Corpus
                implementations) None.

        Returns:
            the index where the document was stored
        """
        raise RuntimeError("Corpus does not allow appending")

Ancestors

Subclasses

Methods

def __getitem__(self, idx: int) ‑> Document

Retrieve a document from the corpus. Note that fetching a document from the corpus will usually set a special transient document feature that contains the index of the document so it can be stored back at the same index using the method store() later. If a corpus implementaton does not set that feature, batching and the use of the store() method to save back documents are not supported.

Args

idx
the index of the document

Returns

a document or None

Throws

exception if the index idx does not exist in the corpus

Expand source code
@abstractmethod
def __getitem__(self, idx: int) -> Document:
    """
    Retrieve a document from the corpus. Note that fetching a document from the corpus will usually set
    a special transient document feature that contains the index of the document so it can be
    stored back at the same index using the method `store()` later. If a corpus implementaton does not
    set that feature, batching and the use of the `store()` method to save back documents are not
    supported.

    Args:
        idx: the index of the document

    Returns:
        a document or None

    Throws:
        exception if the index idx does not exist in the corpus
    """
    pass
def __len__(self) ‑> int

Returns the size of the corpus.

Expand source code
@abstractmethod
def __len__(self) -> int:
    """
    Returns the size of the corpus.
    """
    pass
def __setitem__(self, idx: int, doc: Document) ‑> None

A corpus object must allow setting an item by its idx, e.g. mycorpus[2] = doc The item assigned must be a document or (in rare cases) None.

Args

idx
the index of the document
doc
a document

Throws

exception if the index idx does not exist in the corpus

Expand source code
@abstractmethod
def __setitem__(self, idx: int, doc: Document) -> None:
    """
    A corpus object must allow setting an item by its idx, e.g. `mycorpus[2] = doc`
    The item assigned must be a document or (in rare cases) None.

    Args:
        idx: the index of the document
        doc: a document

    Throws:
        exception if the index idx does not exist in the corpus
    """
    pass
def append(self, document: Document) ‑> int

Some corpus implementations may provide the append method to allow for adding documents (i.e. use the corpus like a DocumentDestination).

Important: this will probably not work properly in situations where another corpus wraps a corpus that allows appending. Use with care!

Args

document
the document to add to the corpus or (in rare cases and for specific Corpus implementations) None.

Returns

the index where the document was stored

Expand source code
def append(self, document: Document) -> int:
    """
    Some corpus implementations may provide the append method to allow for adding documents (i.e.
    use the corpus like a DocumentDestination).

    Important: this will probably not work properly in situations where another
    corpus wraps a corpus that allows appending. Use with care!

    Args:
        document: the document to add to the corpus or (in rare cases and for specific Corpus
            implementations) None.

    Returns:
        the index where the document was stored
    """
    raise RuntimeError("Corpus does not allow appending")
def idxfeatname(self) ‑> str

Return the name of the transient feature to receive the index used to access a document from a corpus.

Expand source code
def idxfeatname(self) -> str:
    """
    Return the name of the transient feature to receive the index used to access a document
    from a corpus.
    """
    return "__idx_" + str(id(self))
def setidxfeature(self, doc: Document, idx: int)

Sets the special transient feature of the document to the given index.

Args

doc
the document
idx
the index used to access the document in a corpus
Expand source code
def setidxfeature(self, doc: Document, idx: int):
    """
    Sets the special transient feature of the document to the given index.

    Args:
        doc: the document
        idx: the index used to access the document in a corpus
    """
    if doc is not None:
        doc.features[self.idxfeatname()] = idx
def store(self, doc: Document) ‑> None

This method allows to store a document that comes from the same corpus back without the need to specify the index. This is useful for processing documents in batches or in streams. For this to work, all corpus implementations MUST make sure to store the index as part of returning a document with __getitem__. The index is stored in document feature self.idxfeatname().

Args

doc
the document to store back into the corpus, should be a document that was retrieved from the same corpus or (in very rare cases and with specific corpus implementations only) None. The default behaviour for None is to throw an exception, this must be overriden by subclasses where store(None) should be supported.

Raises

Exception
if the index is not stored in a document feature self.idxfeatname()
Expand source code
def store(self, doc: Document) -> None:
    """
    This method allows to store a document that comes from the same corpus back without the need to specify
    the index. This is useful for processing documents in batches or in streams. For this to work, all
    corpus implementations MUST make sure to store the index as part of returning a document with
    `__getitem__`. The index is stored in document feature `self.idxfeatname()`.

    Args:
        doc: the document to store back into the corpus, should be a document that was retrieved from the same
             corpus or (in very rare cases and with specific corpus implementations only) None.
             The default behaviour for None is to throw an exception, this must be overriden by
             subclasses where store(None) should be supported.

    Raises:
        Exception: if the index is not stored in a document feature `self.idxfeatname()`
    """
    if doc is None:
        raise Exception("Cannot store back None into a corpus")
    assert isinstance(doc, Document)
    idx = doc.features.get(self.idxfeatname())
    if idx is None:
        raise Exception("Cannot store back document, no __idx_ID feature")
    self.__setitem__(idx, doc)

Inherited members

class CorpusSourceBase

Common base trait for Corpus and Source classes. So far just provides methods to get nparts and partnr even for objects which do not support to be shared between multiple workers.

Expand source code
class CorpusSourceBase:
    """
    Common base trait for Corpus and Source classes. So far just provides methods to
    get nparts and partnr even for objects which do not support to be shared between multiple workers.
    """
    @property
    def nparts(self):
        """
        Return the total number of parts for an EveryNth corpus or document source.
        This is 1 for all other corpus/source instances.
        """
        return 1

    @property
    def partnr(self):
        """
        Return the part number for an EveryNth corpus or document source.
        This is 0 for all other corpus/source instances.
        """
        return 0

    def relpathfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the relative path a document was loaded from.
        """
        return "_relpath"

    def setrelpathfeature(self, doc: Document, relpath: str):
        """
        Sets the special transient feature of the document to the given relative path.

        Args:
            doc: the document
            relpath: the relative path the document was created from
        """
        if doc is not None:
            doc.features[self.relpathfeatname()] = relpath

Subclasses

Instance variables

var nparts

Return the total number of parts for an EveryNth corpus or document source. This is 1 for all other corpus/source instances.

Expand source code
@property
def nparts(self):
    """
    Return the total number of parts for an EveryNth corpus or document source.
    This is 1 for all other corpus/source instances.
    """
    return 1
var partnr

Return the part number for an EveryNth corpus or document source. This is 0 for all other corpus/source instances.

Expand source code
@property
def partnr(self):
    """
    Return the part number for an EveryNth corpus or document source.
    This is 0 for all other corpus/source instances.
    """
    return 0

Methods

def relpathfeatname(self) ‑> str

Return the name of the transient feature to receive the relative path a document was loaded from.

Expand source code
def relpathfeatname(self) -> str:
    """
    Return the name of the transient feature to receive the relative path a document was loaded from.
    """
    return "_relpath"
def setrelpathfeature(self, doc: Document, relpath: str)

Sets the special transient feature of the document to the given relative path.

Args

doc
the document
relpath
the relative path the document was created from
Expand source code
def setrelpathfeature(self, doc: Document, relpath: str):
    """
    Sets the special transient feature of the document to the given relative path.

    Args:
        doc: the document
        relpath: the relative path the document was created from
    """
    if doc is not None:
        doc.features[self.relpathfeatname()] = relpath
class DistributedProcessingAble

A document source/destination/corpus class where duplicate instances can be used from several nodes in parallel.

Expand source code
class DistributedProcessingAble(MultiProcessingAble):
    """
    A document source/destination/corpus class where duplicate instances can be used from several nodes in parallel.
    """
    pass

Ancestors

class DocumentDestination

A document destination is something that accepts an a priori unknown number of documents via the append method.

Document destinations all provide a close() method and must be closed after use.

Document destinations can be used as context managers i.e. one can do with SomeDocumentDest(..) as dest: dest.append(doc) which will take care of closing the destination automatically.

Expand source code
class DocumentDestination(AbstractContextManager):
    """
    A document destination is something that accepts an a priori unknown number of documents via
    the append method.

    Document destinations all provide a `close()` method and must be closed after use.

    Document destinations can be used as context managers i.e. one can do
    `with SomeDocumentDest(..) as dest: dest.append(doc)` which will take care of closing the
    destination automatically.
    """

    def __init__(self):
        self._n = 0

    @abstractmethod
    def append(self, doc: Document) -> None:
        """
        Append the given document to the destination.

        Args:
            doc: the document to add, if this is None, by default nothing is actually added to the destination,
                but specific implementations may change this behaviour.
        """
        pass

    def close(self) -> None:
        """
        Close the document destination. The default context manager implementation always calls
        close(), even when an exception is raised.
        """
        pass

    def __exit__(self, exctype, value, traceback) -> bool:
        """
        The default implementation always invokes close() and
        does not suppress any exception (always returns False)
        """
        self.close()
        return False   # do not suppress any exception

    def relpathfeatname(self) -> str:
        """
        Return the name of the transient feature to receive the relative path a document was loaded from.
        """
        return "_relpath"

    @property
    def n(self):
        return self._n

Ancestors

  • contextlib.AbstractContextManager
  • abc.ABC

Subclasses

Instance variables

var n
Expand source code
@property
def n(self):
    return self._n

Methods

def append(self, doc: Document) ‑> None

Append the given document to the destination.

Args

doc
the document to add, if this is None, by default nothing is actually added to the destination, but specific implementations may change this behaviour.
Expand source code
@abstractmethod
def append(self, doc: Document) -> None:
    """
    Append the given document to the destination.

    Args:
        doc: the document to add, if this is None, by default nothing is actually added to the destination,
            but specific implementations may change this behaviour.
    """
    pass
def close(self) ‑> None

Close the document destination. The default context manager implementation always calls close(), even when an exception is raised.

Expand source code
def close(self) -> None:
    """
    Close the document destination. The default context manager implementation always calls
    close(), even when an exception is raised.
    """
    pass
def relpathfeatname(self) ‑> str

Return the name of the transient feature to receive the relative path a document was loaded from.

Expand source code
def relpathfeatname(self) -> str:
    """
    Return the name of the transient feature to receive the relative path a document was loaded from.
    """
    return "_relpath"
class DocumentSource

A document source is an iterable of documents which will generate an unknown number of documents.

Expand source code
class DocumentSource(ABC, TypingIterable, CorpusSourceBase):
    """
    A document source is an iterable of documents which will generate an unknown number of documents.
    """
    def __init__(self):
        self._n = 0

    def __iter__(self) -> TypingIterator[Document]:
        pass

    @property
    def n(self):
        return self._n

    def __enter__(self):
        return self

    def __exit__(self, extype, value, traceback):
        pass

    def close(self):
        pass

Ancestors

Subclasses

Instance variables

var n
Expand source code
@property
def n(self):
    return self._n

Methods

def __iter__(self) ‑> Iterator[Document]
Expand source code
def __iter__(self) -> TypingIterator[Document]:
    pass
def close(self)
Expand source code
def close(self):
    pass

Inherited members

class EveryNthBase (nparts=1, partnr=0)

A Source or Corpus that wraps another Source or Corpus so that only every nth document, starting with some document 0 <= k < n is included.

Such classes must provide the initialization keyword parameters partnr and nparts which may have default values of 0 and 1 for single part resources.

Expand source code
class EveryNthBase:
    """
    A Source or Corpus that wraps another Source or Corpus so that only every nth document, starting
    with some document 0 <= k < n is included.

    Such classes must provide the initialization keyword parameters partnr and nparts which may
    have default values of 0 and 1 for single part resources.
    """
    def __init__(self, nparts=1, partnr=0):
        self._nparts = nparts
        self._partnr = partnr

    @property
    def nparts(self):
        return self._nparts

    @property
    def partnr(self):
        return self._partnr

Subclasses

Instance variables

var nparts
Expand source code
@property
def nparts(self):
    return self._nparts
var partnr
Expand source code
@property
def partnr(self):
    return self._partnr
class EveryNthCorpus (corpus: Corpus, nparts: int = 1, partnr: int = 0)

A wrapper to make any corpus that is multiprocessing/distributedprocessing-able shardable.

Wraps a corpus to only every nparts-th document, starting with the partnr-th document. For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get documents 2,5,8,11,14 etc.

This is useful to access a subset of documents from a corpus from different concurrent processes (the wrapped corpus must be MultiProcessingAble for that!).

Expand source code
class EveryNthCorpus(EveryNthBase, Corpus):
    """
    A wrapper to make any corpus that is multiprocessing/distributedprocessing-able shardable.

    Wraps a corpus to only every nparts-th document, starting with the partnr-th document.
    For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the
    documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get
    documents 2,5,8,11,14 etc.

    This is useful to access a subset of documents from a corpus from different concurrent
    processes (the wrapped corpus must be MultiProcessingAble for that!).
    """
    def __init__(self, corpus: Corpus, nparts: int = 1, partnr: int = 0):
        assert isinstance(corpus, MultiProcessingAble)
        assert isinstance(corpus, Corpus)
        # this uses Integral so we can also support integral types from Numpy etc!
        if (not isinstance(nparts, numbers.Integral)) or (
            not isinstance(partnr, numbers.Integral)
        ):
            raise Exception("nparts and partnr must be integers.")
        if nparts < 2 or partnr < 0 or partnr >= nparts:
            raise Exception("nparts must be >= 2 and partnr must be >= 0 and < nparts")
        super().__init__(nparts=nparts, partnr=partnr)
        self.corpus = corpus

    def __len__(self):
        olen = len(self.corpus)
        # alternate way to calculate?
        # int((olen + (self.nparts - self.partnr) - 1) / self.partnr)
        return int(olen/self.nparts) + (1 if (olen % self.nparts) > self.partnr else 0)

    def _orig_idx(self, idx: int) -> int:
        return idx * self.nparts + self.partnr

    def __getitem__(self, idx: int) -> Document:
        # NOTE: we do not store the index in a feature for this wrapper as the wrapped
        # corpus index is eventually the only one that matters
        return self.corpus[self._orig_idx(idx)]

    def __setitem__(self, idx: int, doc: Document) -> None:
        self.corpus[self._orig_idx(idx)] = doc

    def store(self, doc: Document) -> None:
        # stored using the feature from the original index!
        self.corpus.store(doc)

    def append(self, document: Document) -> int:
        raise Exception("Method append not supported for EveryNthCorpus")

Ancestors

Inherited members

class EveryNthSource (source: DocumentSource, nparts: int = 1, partnr: int = 0)

A wrapper to make any DocumentSource that is multiprocessing or distributed processing-able viewable in parts.

Wraps a document source to only return every nparts-th document, starting with the partnr-th document. For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get documents 2,5,8,11,14 etc.

Expand source code
class EveryNthSource(EveryNthBase, DocumentSource):
    """
    A wrapper to make any DocumentSource that is multiprocessing or distributed processing-able
    viewable in parts.

    Wraps a document source to only return every nparts-th document, starting with the partnr-th document.
    For example with nparts=3 and partnr=0, the documents 0,1,2,3,4 correspond to the
    documents 0,3,6,9,12 of the wrapped dataset, with nparts=3 and partnr=2, we get
    documents 2,5,8,11,14 etc.
    """
    def __init__(self,
                 source: DocumentSource,
                 nparts: int = 1,
                 partnr: int = 0):
        assert isinstance(source, MultiProcessingAble)
        assert isinstance(source, DocumentSource)
        # this uses Integral so we can also support integral types from Numpy etc!
        if (not isinstance(nparts, numbers.Integral)) or (
            not isinstance(partnr, numbers.Integral)
        ):
            raise Exception("nparts and partnr must be integers.")
        super().__init__(nparts=nparts, partnr=partnr)
        self.source = source

        if nparts < 2 or partnr < 0 or partnr >= nparts:
            raise Exception("nparts must be >= 2 and partnr must be >= 0 and < nparts")
        self.source = source

    def __iter__(self) -> TypingIterator[Document]:
        for idx, doc in enumerate(self.source):
            if idx % self.nparts == self.partnr:
                yield doc

Ancestors

Inherited members

class MultiProcessingAble

A document source/destination/corpus class where duplicate instances can be used by several processes on the same node in parallel.

Expand source code
class MultiProcessingAble:
    """
    A document source/destination/corpus class where duplicate instances can be used by several
    processes on the same node in parallel.
    """
    pass

Subclasses

class NullDestination

A document destination is something that accepts an a priori unknown number of documents via the append method.

Document destinations all provide a close() method and must be closed after use.

Document destinations can be used as context managers i.e. one can do with SomeDocumentDest(..) as dest: dest.append(doc) which will take care of closing the destination automatically.

Expand source code
class NullDestination(DocumentDestination):
    def __init__(self):
        super().__init__()

    def append(self, doc: Document):
        self._n += 1

Ancestors

Inherited members

class ShuffledCorpus (corpus, seed=None)

Wraps a corpus to reorder the documents in the corpus randomly.

Create a ShuffledCorpus wrapper.

Args

seed
if an integer and > 0, shuffle the list of instances randomly, using the given seed. If the seed is 0, the RNGs random random seed is used, if seed is -1, the seed is not set at all and whatever the current state of the random generator is is used. If None, no shuffling is carried out. If this is None or not an integer, same as 0.
Expand source code
class ShuffledCorpus(Corpus):
    """
    Wraps a corpus to reorder the documents in the corpus randomly.
    """

    def __init__(self, corpus, seed=None):
        """
        Create a ShuffledCorpus wrapper.

        Args:
            seed: if an integer and > 0, shuffle the list of instances randomly, using the given seed.
                If the seed is 0, the RNGs random random seed is used, if seed is -1, the seed is not set at all
                and whatever the current state of the random generator is is used. If None, no shuffling is
                carried out. If this is None or not an integer, same as 0.
        """
        super().__init__()
        self.corpus = corpus
        self.seed = seed
        self.idxs = list(range(len(corpus)))
        self.shuffle(seed)

    def shuffle(self, seed=0):
        """
        Shuffle instance list order,
        :param seed: random seed to set, if seed is 0, a random random seed is used, if -1, seed is not set.
        If seed is None, no shuffling is carried out.
        :return:
        """
        if isinstance(seed, numbers.Integral):  # also allow for np.int8(n) and the like
            if seed != -1:
                if seed == 0:
                    random.seed()
                else:
                    random.seed(seed)
            random.shuffle(self.idxs)
        else:  # not an integer seed: None or some other type
            # same as seed 0
            random.seed()
            random.shuffle(self.idxs)

    def __getitem__(self, idx):
        doc = self.corpus[self.idxs[idx]]
        self.setidxfeature(doc, idx)
        return self.corpus[self.idxs[idx]]

    def __setitem__(self, idx, doc):
        if not isinstance(idx, numbers.Integral):
            raise Exception("Item must be an integer")
        if idx >= len(self.idxs) or idx < 0:
            raise Exception("Index idx must be >= 0 and < {}".format(len(self)))
        # the index to access in the original dataset is int(n*item)+k
        self.corpus[self.idxs[idx]] = doc

    def __len__(self):
        return len(self.idxs)

    def append(self, document: Document) -> int:
        raise Exception("Method append not supported for ShuffledCorpus")

Ancestors

Methods

def shuffle(self, seed=0)

Shuffle instance list order, :param seed: random seed to set, if seed is 0, a random random seed is used, if -1, seed is not set. If seed is None, no shuffling is carried out. :return:

Expand source code
def shuffle(self, seed=0):
    """
    Shuffle instance list order,
    :param seed: random seed to set, if seed is 0, a random random seed is used, if -1, seed is not set.
    If seed is None, no shuffling is carried out.
    :return:
    """
    if isinstance(seed, numbers.Integral):  # also allow for np.int8(n) and the like
        if seed != -1:
            if seed == 0:
                random.seed()
            else:
                random.seed(seed)
        random.shuffle(self.idxs)
    else:  # not an integer seed: None or some other type
        # same as seed 0
        random.seed()
        random.shuffle(self.idxs)

Inherited members

class StringIdCorpus

A corpus which allows to use string ids in addition to integer indices for setting and getting documents.

Expand source code
class StringIdCorpus:
    """
    A corpus which allows to use string ids in addition to integer indices for setting and getting documents.
    """
    # NOTE: the only thing that is really different is the type signature for some of the methods
    @abstractmethod
    def __getitem__(self, key: Union[int, str]) -> Document:
        """
        Retrieve a document from the corpus by either its numeric index or its string id.

        Args:
            key: the index of the document or the unique string id

        Returns:
            a document or None

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass

    @abstractmethod
    def __setitem__(self, idx: Union[int, str], doc: Document) -> None:
        """
        Store a document into the corpus by either its numeric index or its strign id.

        Args:
            idx: the index or the string id of the document
            doc: a document

        Throws:
            exception if the index idx does not exist in the corpus
        """
        pass