AtomicIndex.java

/*
 *  AtomicIndex.java
 *
 *  Copyright (c) 2007-2013, The University of Sheffield.
 *
 *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html), 
 *  and is free software, licenced under the GNU Lesser General Public License,
 *  Version 3, June 2007 (also included with this distribution as file
 *  LICENCE-LGPL3.html).
 *
 *  Valentin Tablan, 1 Nov 2013
 *
 *  $Id: AtomicIndex.java 20154 2017-02-20 19:19:38Z ian_roberts $
 */
package gate.mimir.index;

import gate.Annotation;
import gate.mimir.MimirIndex;
import gate.mimir.search.IndexReaderPool.IndexDictionary;
import gate.util.GateRuntimeException;
import it.unimi.di.big.mg4j.index.BitStreamIndex;
import it.unimi.di.big.mg4j.index.CompressionFlags;
import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.Index;
import it.unimi.di.big.mg4j.index.Index.UriKeys;
import it.unimi.di.big.mg4j.index.IndexIterator;
import it.unimi.di.big.mg4j.index.IndexReader;
import it.unimi.di.big.mg4j.index.IndexWriter;
import it.unimi.di.big.mg4j.index.NullTermProcessor;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
import it.unimi.di.big.mg4j.index.SkipBitStreamIndexWriter;
import it.unimi.di.big.mg4j.index.TermProcessor;
import it.unimi.di.big.mg4j.index.cluster.ContiguousDocumentalStrategy;
import it.unimi.di.big.mg4j.index.cluster.ContiguousLexicalStrategy;
import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
import it.unimi.di.big.mg4j.index.cluster.DocumentalConcatenatedCluster;
import it.unimi.di.big.mg4j.index.cluster.LexicalCluster;
import it.unimi.di.big.mg4j.io.IOFactory;
import it.unimi.di.big.mg4j.tool.Combine;
import it.unimi.di.big.mg4j.tool.Combine.IndexType;
import it.unimi.di.big.mg4j.tool.Concatenate;
import it.unimi.di.big.mg4j.tool.Scan;
import it.unimi.dsi.big.io.FileLinesCollection;
import it.unimi.dsi.big.util.ShiftAddXorSignedStringMap;
import it.unimi.dsi.big.util.StringMap;
import it.unimi.dsi.bits.Fast;
import it.unimi.dsi.bits.TransformationStrategies;
import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.Hash;
import it.unimi.dsi.fastutil.Swapper;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntBigArrayBigList;
import it.unimi.dsi.fastutil.ints.IntBigList;
import it.unimi.dsi.fastutil.ints.IntComparator;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.objects.Object2LongAVLTreeMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectBigArrayBigList;
import it.unimi.dsi.fastutil.objects.ObjectBigList;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.lang.MutableString;
import it.unimi.dsi.lang.ObjectParser;
import it.unimi.dsi.logging.ProgressLogger;
import it.unimi.dsi.sux4j.mph.LcpMonotoneMinimalPerfectHashFunction;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;

import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;

import com.google.common.io.PatternFilenameFilter;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * An inverted index associating terms with documents. Terms can be either token
 * feature values, or annotations. Optionally, a direct index may also be 
 * present.
 * </p>
 * <p>
 * An atomic index manages a head index (the principal data) and a set of tail 
 * indexes (batches containing updates). Additionally, the data representing 
 * all the new documents that have been queued for indexing since the last tail
 * was written are stored in RAM.
 * </p>
 * <p>
 * When direct indexing is enabled, the term IDs in the direct index are 
 * different from the term IDs in the inverted index. In the inverted index 
 * the term IDs are their position in the lexicographically sorted list of all
 * terms. In the directed index, the term IDs are their position in the list
 * sorted by the time they were first seen during indexing.
 * </p>
 * <p>
 * The head and tail batches can be combined into a new head by a 
 * <em>compact</em> operation.
 */
public abstract class AtomicIndex implements Runnable {
  
  /**
   * A callable that does nothing. This is used to produce instances of 
   * {@link CustomFuture} which are only used to wait for the completion of an 
   * operation that returns nothing.
   */
  private static final Callable<Void> noOpVoid = new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      return null;
    }
  };
  
  /**
   * An in-RAM representation of a postings list
   */
  protected static class PostingsList {
    
    /**
     * The first document pointer added to this postings list.
     */
    private long firstDocumentPointer = -1;
    
    /**
     * The last seen document pointer.
     */
    private long lastDocumentPointer = -1;
    
    /**
     * The list of document pointer differentials (differences from the previous
     * document pointer. The first document pointer value is stored in
     * {@link #firstDocumentPointer}), and we store a <tt>0</tt> in the first 
     * position of this list. The actual value of a document pointer for a given
     * position is:
     * <dl>
     *   <dt>pos 0</dt>
     *   <dd>{@link #firstDocumentPointer}</dd>
     *   <dt>pos i</dt>
     *   <dd>pointer at position (i-1) + documentPointersDifferential[i]</dd>
     * </dl>
     */
    private IntList documentPointersDifferential;
    

    /**
     * The count (number of terms) for each document. This list is aligned with 
     * {@link #documentPointersDifferential}.
     */
    private IntList counts;
    
    /**
     * The list of positions in this postings list. For each document at 
     * position <tt>i</i>, there will be counts[i] positions stored in this 
     * list. This value is <code>non-null</code> only if positions are stored,
     * which is configured through a construction-time parameter.
     */
    private IntArrayList positions;
    
    /**
     * The last seen position in the current document.
     */
    private int lastPosition = -1;
    
    /**
     * The number of positions in the current document
     */
    private int count = 0; 
    
    /**
     * The maximum term count of all the stored documents
     */
    private int maxCount = 0;
    
    /**
     * The number of document pointers contained
     */
    private long frequency = 0;
    
    /**
     * The total number of term occurrences in all stored documents.
     */
    private long occurrences = 0;
    
    /**
     * The sum of the maximum positions for each document.
     */
    private long sumMaxPos = 0;
    
    public PostingsList(boolean storePositions) {
      firstDocumentPointer = -1;
      documentPointersDifferential = new IntArrayList();
      counts = new IntArrayList();
      if(storePositions) {
        positions = new IntArrayList();
      }
    }

    /**
     * Start storing the data for a new document
     * @param pointer
     */
    public void newDocumentPointer(long pointer) {
      // is this really a new document?
      if(pointer != lastDocumentPointer) {
        if(firstDocumentPointer < 0) firstDocumentPointer = pointer;
        if(lastDocumentPointer == -1) {
          // this is the first document
          documentPointersDifferential.add(0);  
        } else {
          // close previous document
          flush();
          // add the new document
          documentPointersDifferential.add((int)(pointer - lastDocumentPointer));
        }
        lastDocumentPointer = pointer;
        // reset the lastPosition when moving to a new document
        lastPosition = -1;
        
        frequency++;
      }
    }

    public void addPosition(int pos) {
      // ignore if the position hasn't changed: we don't store two identical 
      // records
      if(pos != lastPosition) {
        positions.add(pos);
        count++;
        //and update lastPosition
        lastPosition = pos;        
      }
    }
    
    /**
     * When storing positions, the count is automatically calculated. When not 
     * storing positions, it needs to be explicitly set by calling this method.
     * @param count
     */
    public void setCount(int count) {
      this.count = count;
    }
    
    /**
     * Checks whether the given position is valid (i.e. greater than the last 
     * seen position. If the position is invalid, this means that a call to
     * {@link #addPosition(int)} with the same value would actually be a 
     * no-operation.  
     * @param pos
     * @return
     */
    public boolean checkPosition(int pos){
      return pos > lastPosition;
    }
    
    /**
     * Notifies this postings list that it has received all the data
     */
    public void flush() {
      if(count > 0) {
        // we have some new positions for the last document: they were already
        // added to positions, but we now need to store their count
        counts.add(count);
        if(count > maxCount) maxCount = count;
        sumMaxPos += lastPosition;
        occurrences += count;
      }
      count = 0;
    }
    
    /**
     * Empties all the data from this postings list making it ready to be reused.
     */
    public void clear() {
      documentPointersDifferential.clear();
      count = 0;
      counts.clear();
      maxCount = 0;
      occurrences = 0;
      if(positions != null){
        positions.clear();
        lastPosition = -1;
        sumMaxPos = 0;
      }
      firstDocumentPointer = -1;
      lastDocumentPointer = -1;
      frequency = 0;
    }
    
    /**
     * Writes the data contained in this postings list to an index writer
     * @param indexWriter
     * @throws IOException 
     */
    public void write(IndexWriter indexWriter) throws IOException {
      flush();
      if(indexWriter instanceof QuasiSuccinctIndexWriter) {
        ((QuasiSuccinctIndexWriter)indexWriter).newInvertedList(
            frequency,
            occurrences, 
            positions!= null ? sumMaxPos : 0);
      } else {
        indexWriter.newInvertedList();
      }
      
      indexWriter.writeFrequency(frequency);
      long currDocumentPointer = firstDocumentPointer;
      int positionsStart = 0;
      for(int docId = 0; docId < documentPointersDifferential.size(); docId++) {
        currDocumentPointer += documentPointersDifferential.get(docId);
        int currCount = counts.get(docId);
        OutputBitStream obs = indexWriter.newDocumentRecord();
        indexWriter.writeDocumentPointer(obs, currDocumentPointer);
        indexWriter.writePositionCount(obs, currCount);
        if(positions != null){
          indexWriter.writeDocumentPositions(obs, positions.elements(),
              positionsStart, currCount, -1);
          positionsStart += currCount;       
        }
      }
    }

    @Override
    public String toString() {
      StringBuilder str = new StringBuilder();
      long docPointer = firstDocumentPointer;
      int positionsPointer = 0;
      boolean firstDoc = true;
      for(int i = 0; i < documentPointersDifferential.size(); i++) {
        docPointer += documentPointersDifferential.get(i);
        int count = counts.getInt(i);
        if(firstDoc) {
          firstDoc = false;
        } else {
          str.append("; ");
        }
        str.append(docPointer).append("(");
        boolean firstPos = true;
        for(int j = positionsPointer; j < positionsPointer + count; j++) {
          if(firstPos) {
            firstPos = false;
          } else {
            str.append(", ");
          }
          str.append(positions.getInt(j));
        }
        str.append(") ");
      }
      
      return str.toString();
    }
    
    
  }
  
  /**
   * Class representing an MG4J index batch, such as the head or any of the 
   * tails.
   */
  protected static class MG4JIndex {
    protected File indexDir;
    protected Index invertedIndex;
    protected Index directIndex;
    protected BloomFilter<Void> invertedTermFilter;
    protected BloomFilter<Void> directTermFilter;
    protected String indexName;
    
    public MG4JIndex(
        File indexDir,
        String indexName,
        Index invertedIndex,  
        BloomFilter<Void> invertedTermFilter,
        Index directIndex,
        BloomFilter<Void> directTermFilter) {
      
      this.indexDir = indexDir;
      this.indexName = indexName;
      this.invertedIndex = invertedIndex;
      this.invertedTermFilter = invertedTermFilter;
      
      this.directIndex = directIndex;
      this.directTermFilter = directTermFilter;
    }
  }
  
  /**
   * Given a terms file (text file with one term per line) this method generates
   * the corresponding termmap file (binary representation of a StringMap).
   * Optionally, a {@link BloomFilter} can also be generated, if the suitable
   * target file is provided.
   * 
   * @param termsFile the input file
   * @param termmapFile the output termmap file, or <code>null</code> if a 
   * termmap is not required.
   * @param bloomFilterFile the file to be used for writing the 
   * {@link BloomFilter} for the index, or <code>null</code> if a Bloom filter
   * is not required.
   * @throws IOException
   */
  public static void generateTermMap(File termsFile, File termmapFile,
      File bloomFilterFile) throws IOException {
    FileLinesCollection fileLinesCollection =
        new FileLinesCollection(termsFile.getAbsolutePath(), "UTF-8");
    if(termmapFile != null) {
      StringMap<CharSequence> terms =
          new ShiftAddXorSignedStringMap(
              fileLinesCollection.iterator(),
              new LcpMonotoneMinimalPerfectHashFunction.Builder<CharSequence>()
                .keys(fileLinesCollection)
                .transform(TransformationStrategies.prefixFreeUtf16())
              .build());
              //new LcpMonotoneMinimalPerfectHashFunction<CharSequence>(
              //    fileLinesCollection, TransformationStrategies.prefixFreeUtf16()));      
      BinIO.storeObject(terms, termmapFile);      
    }

    if(bloomFilterFile != null) {
      BloomFilter<Void> bloomFilter = BloomFilter.create(fileLinesCollection.size64());
      for(MutableString term : fileLinesCollection) {
        bloomFilter.add(term);
      }
      BinIO.storeObject(bloomFilter, bloomFilterFile);
    }
  }  

  /**
   * Creates a documental cluster from a list of {@link MG4JIndex} values.
   * 
   * @param batches the indexes to be combined into a cluster 
   * @param termProcessor the term processor to be used (can be null)
   * @return a documental cluster view of the list of indexes provided.
   */
  protected final static Index openInvertedIndexCluster(
      List<MG4JIndex> batches, TermProcessor termProcessor){
    
    if(batches == null || batches.size() == 0) return null;
    if(batches.size() == 1) return batches.get(0).invertedIndex;
    
    // prepare the documental cluster
    Index[] indexes = new Index[batches.size()];
    // cut points between the batches - there are numBatches+1 cutpoints,
    // cutPoints[0] is always zero, and cutPoints[i] is the sum of the
    // sizes of batches 0 to i-1 inclusive
    long[] cutPoints = new long[indexes.length + 1];
    cutPoints[0] = 0;
    int numberOfTerms = -1;
    int numberOfDocuments = -1;
    long numberOfPostings = -1;
    long numberOfOccurences =-1;
    int maxCount =-1;
    int indexIdx = 0;
    IntBigList sizes = new IntBigArrayBigList();
    @SuppressWarnings("unchecked")
    BloomFilter<Void> bloomFilters[] = new BloomFilter[indexes.length];
    
    for(MG4JIndex aSubIndex : batches) {
      indexes[indexIdx] = aSubIndex.invertedIndex;
      cutPoints[indexIdx + 1] = cutPoints[indexIdx] + 
          aSubIndex.invertedIndex.numberOfDocuments;
      numberOfTerms += aSubIndex.invertedIndex.numberOfTerms;
      numberOfDocuments += aSubIndex.invertedIndex.numberOfDocuments;
      numberOfPostings += aSubIndex.invertedIndex.numberOfPostings;
      numberOfOccurences += aSubIndex.invertedIndex.numberOfOccurrences;
      if(maxCount < aSubIndex.invertedIndex.maxCount){
        maxCount = aSubIndex.invertedIndex.maxCount;
      }
      bloomFilters[indexIdx] = aSubIndex.invertedTermFilter;
      sizes.addAll(aSubIndex.invertedIndex.sizes);
      indexIdx++;
    }
    
    return new DocumentalConcatenatedCluster(indexes,
          new ContiguousDocumentalStrategy(cutPoints),
          false, // flat = all component indexes have the same term list
          bloomFilters, // Bloom Filters
          numberOfDocuments == -1 ? -1 : numberOfDocuments + 1, 
          numberOfTerms == -1 ? -1 : numberOfTerms + 1, 
          numberOfPostings == -1 ? -1 : numberOfPostings + 1, 
          numberOfOccurences == -1 ? -1 : numberOfOccurences + 1, 
          maxCount, 
          null, // payload
          true, // hasCounts 
          true, // hasPositions, 
          termProcessor, 
          null, // field 
          sizes, // sizes
          null // properties
          );
  }  
  
  /**
   * Opens the direct index files from all the batches and combines them into
   * a {@link LexicalCluster}.
   * @param batches the batches to be opened.
   * @return
   */
  protected final static Index openDirectIndexCluster(List<MG4JIndex> batches){
    
    if(batches == null || batches.size() == 0) return null;
    if(batches.size() == 1) return batches.get(0).directIndex;
    
    // prepare the lexical cluster
    Index[] indexes = new Index[batches.size()];
    int[] cutPoints = new int[indexes.length + 1];
    cutPoints[0] = 0;
    String[] cutPointTerms = new String[indexes.length + 1];
    cutPointTerms[0] = longToTerm(0);
    int numberOfTerms = -1;
    int numberOfDocuments = -1;
    long numberOfPostings = -1;
    long numberOfOccurences =-1;
    int maxCount =-1;
    int indexIdx = 0;
    @SuppressWarnings("unchecked")
    BloomFilter<Void> bloomFilters[] = new BloomFilter[indexes.length];
    
    for(MG4JIndex aSubIndex : batches) {
      indexes[indexIdx] = aSubIndex.directIndex;
      // we build this based on the inverted index, as the cut-points for the
      // lexical partitioning are based on document IDs
      if(indexIdx < cutPoints.length - 1) {
        cutPoints[indexIdx + 1] = cutPoints[indexIdx] + 
            (int)aSubIndex.invertedIndex.numberOfDocuments;
        cutPointTerms[indexIdx + 1] = longToTerm(cutPoints[indexIdx + 1]);
      }
      numberOfTerms += aSubIndex.directIndex.numberOfTerms;
      numberOfDocuments += aSubIndex.directIndex.numberOfDocuments;
      numberOfPostings += aSubIndex.directIndex.numberOfPostings;
      numberOfOccurences += aSubIndex.directIndex.numberOfOccurrences;
      if(maxCount < aSubIndex.directIndex.maxCount){
        maxCount = aSubIndex.directIndex.maxCount;
      }
      bloomFilters[indexIdx] = aSubIndex.directTermFilter;
      indexIdx++;
    }
    cutPointTerms[cutPointTerms.length - 1] = null;
    
    return new LexicalCluster(indexes,
          new ContiguousLexicalStrategy(cutPoints, cutPointTerms),
          bloomFilters, // Bloom Filters
          numberOfDocuments == -1 ? -1 : numberOfDocuments + 1, 
          numberOfTerms == -1 ? -1 : numberOfTerms + 1, 
          numberOfPostings == -1 ? -1 : numberOfPostings + 1, 
          numberOfOccurences == -1 ? -1 : numberOfOccurences + 1, 
          maxCount, 
          null, // payload
          true, // hasCounts 
          false, // hasPositions, 
          NullTermProcessor.getInstance(), 
          null, // field 
          null, // sizes
          null // properties
          );
  }  
  
  /**
   * Converts a long value into a String containing a zero-padded Hex 
   * representation of the input value. The lexicographic ordering of the 
   * generated strings is the same as the natural order of the corresponding
   * long values.
   *  
   * @param value the value to convert.
   * @return the string representation.
   */
  public static final String longToTerm(long value) {
    String valueStr = Long.toHexString(value);
    return "0000000000000000".substring(valueStr.length()) + valueStr;
  }  
  
  /**
   * The file name (under the current directory for this atomic index) which 
   * stores the principal index. 
   */
  public static final String HEAD_FILE_NAME = "head";
  
  /**
   * The file extension used for the temporary directory where the updated head
   * is being built.
   */
  public static final String HEAD_NEW_EXT = ".new";
  
  /**
   * The file extension used for the temporary directory where the old head 
   * index is being stored while the newly updated one is being installed.
   */
  public static final String HEAD_OLD_EXT = ".old";
  
  /**
   * The prefix used for file names (under the current directory for this 
   * atomic index) for updates to the head index.
   */
  public static final String TAIL_FILE_NAME_PREFIX = "tail-";
  
  
  public static final String DIRECT_TERMS_FILENAME = "direct.terms";
  
  /**
   * FIles belonging to teh direct index get this suffix added to their 
   * basename.
   */
  public static final String DIRECT_INDEX_NAME_SUFFIX = "-dir";
  
  /**
   * The file name (under the current directory for this atomic index) for the
   * directory containing the documents that have been queued for indexing, but 
   * not yet indexed. 
   */
  public static final String DOCUMENTS_QUEUE_FILE_NAME = "queued-documents";
  
  /** The initial size of the term map. */
  private static final int INITIAL_TERM_MAP_SIZE = 1024;
  
  /**
   * A marker value that gets queued to indicate a request to
   * write the in-RAM data to a new index batch.
   */
  private static final GATEDocument DUMP_BATCH = new GATEDocument(){};

  /**
   * A marker value that gets queued to indicate a request to combine all the
   * on-disk batches into a new head.
   */
  private static final GATEDocument COMPACT_INDEX = new GATEDocument(){};
  
  private static Logger logger = LoggerFactory.getLogger(AtomicIndex.class);
  
  protected static final PatternFilenameFilter TAILS_FILENAME_FILTER = 
      new PatternFilenameFilter("\\Q" + TAIL_FILE_NAME_PREFIX + "\\E\\d+");
  
  /**
   * The name of this atomic index.
   */
  protected String name;
  
  /**
   * The directory where this atomic index stores its files.
   */
  protected File indexDirectory;
  
  /**
   * The term processor used to process the feature values being indexed.
   */
  protected TermProcessor termProcessor = null;
  
  /**
   * The size (number of terms) for the longest document indexed but not yet 
   * saved. 
   */
  protected int maxDocSizeInRAM = -1;
  
  /**
   * The number of occurrences represented in RAM and not yet written to disk.  
   */
  protected long occurrencesInRAM = 0;
  

  
  /**
   * The {@link MimirIndex} that this atomic index is a member of.
   */
  protected MimirIndex parent;
  
  /**
   * A list containing the head and tails of this index.
   */
  protected List<MG4JIndex> batches;
  
  /**
   * The cluster-view of all the MG4J indexes that are part of this index (i.e.
   * the head and all the tails). 
   */
  protected Index invertedIndex;
  
  /**
   * The direct index for this atomic index. If 
   * <code>{@link #hasDirectIndex()}</code> is false, then this index will be 
   * <code>null</code>.
   */
  protected Index directIndex;
  
  /**
   * A set of properties added to the ones obtained from the index writer when
   * writing out batches.
   */
  protected Properties additionalProperties;
  
  /**
   * A set of properties added to the ones obtained from the direct index writer
   * when writing out batches.
   */
  protected Properties additionalDirectProperties;
  
  /**
   * Is the direct indexing enabled? Direct indexes are used to find terms 
   * occurring in given documents. This is the reverse operation to the typical
   * search, which finds documents containing a given a set of terms.
   */
  protected boolean hasDirectIndex;
  
  /**
   * This map associates direct index terms with their IDs. See the note at the
   * top-level javadocs for this class for a discussion on direct and inverted 
   * term IDs. 
   */
  protected Object2LongMap<String> directTermIds;
  
  /**
   * The terms in the direct index, in the order they were first seen during 
   * indexing.
   */
  protected ObjectBigList<String> directTerms;
  
  /**
   * The single thread used to index documents. All writes to the index files
   * are done from this thread.
   */
  protected Thread indexingThread;
  
  /**
   * Documents to be indexed are queued in this queue.
   */
  protected BlockingQueue<GATEDocument> inputQueue;
  
  /**
   * Documents that have been indexed are passed on to this queue.
   */
  protected BlockingQueue<GATEDocument> outputQueue;

    
  /**
   * The position of the current (or most-recently used) token in the current
   * document.
   */
  protected int tokenPosition;
  
  /**
   * A mutable string used to create instances of MutableString on the cheap.
   */
  protected MutableString currentTerm;
  
  /**
   * The number of documents currently stored in RAM.
   */
  protected int documentsInRAM;
  
  /**
   * An in-memory inverted index that gets dumped to files for each batch. 
   */
  protected Object2ReferenceOpenHashMap<MutableString, PostingsList> termMap;
  
  /**
   * The sizes (numbers of terms) for all the documents indexed in RAM.
   */
  protected IntArrayList documentSizesInRAM;
  
  /**
   * If a request was made to compress the index (combine all sub-indexes 
   * into a new head) this value will be non-null. The operation will be 
   * performed on the indexing thread at the first opportunity. At that point 
   * this future will complete, and the value will be set back to null.
   */
  protected RunnableFuture<Void> compactIndexTask;
  
  /**
   * If a request was made to write the in-RAM index data to disk this value 
   * will be not null. The operation will be performed on the indexing
   * thread at the first opportunity.  At that point the Future will complete, 
   * and the value will be set back to null.
   */
  protected RunnableFuture<Long> batchWriteTask;
  
  /**
   * Creates a new AtomicIndex
   * 
   * @param parent the {@link MimirIndex} containing this atomic index.
   * @param name the name of the sub-index, e.g. <em>token-i</em> or 
   *  <em>mentions-j</em>
   * @param indexDirectory the directory where this index should store all its 
   *  files.
   * @param hasDirectIndex should a direct index be used?
   * @param inputQueue the input queue for documents to be indexed.
   * @param outputQueue the output queue for documents that have been indexed.
   * @throws IndexException 
   * @throws IOException 
   */
	protected AtomicIndex(MimirIndex parent, String name,
      boolean hasDirectIndex, TermProcessor termProcessor,
      BlockingQueue<GATEDocument> inputQueue,
      BlockingQueue<GATEDocument> outputQueue) throws IOException, IndexException {
    this.parent = parent;
    this.name = name;
    this.indexDirectory = new File(parent.getIndexDirectory(), name);
    this.hasDirectIndex = hasDirectIndex;
    this.termProcessor = termProcessor;
    this.inputQueue = inputQueue;
    this.outputQueue = outputQueue;
    
    this.currentTerm = new MutableString();
    
    this.additionalProperties = new Properties();
    // save the term processor
    additionalProperties.setProperty(Index.PropertyKeys.TERMPROCESSOR, 
        ObjectParser.toSpec(termProcessor));
    if(hasDirectIndex) {
      additionalDirectProperties = new Properties();
      additionalDirectProperties.setProperty(Index.PropertyKeys.TERMPROCESSOR, 
          ObjectParser.toSpec(NullTermProcessor.getInstance()));
    }
    initIndex();
  }

	/**
	 * Opens the index and prepares it for indexing and searching. 
	 * @throws IndexException 
	 * @throws IOException 
	 */
	protected void initIndex() throws IOException, IndexException {
    // open the index
	  batches = new ArrayList<AtomicIndex.MG4JIndex>();
    if(indexDirectory.exists()) {
      // opening an existing index
      List<String> batchNames = new ArrayList<String>();
      
      File headDir = new File(indexDirectory, HEAD_FILE_NAME);
      if(headDir.exists()) {
        batchNames.add(HEAD_FILE_NAME);
      }
      Map<Integer, String> tails = new TreeMap<Integer, String>();
      for(String aTail : indexDirectory.list(TAILS_FILENAME_FILTER)) {
        tails.put(
            Integer.parseInt(aTail.substring(TAIL_FILE_NAME_PREFIX.length())), 
            aTail);
      }
      // add the tails in order
      batchNames.addAll(tails.values());
      // modify internal state
      synchronized(this) {
        // load all batches, in order
        for(String batchName : batchNames) {
          batches.add(openSubIndex(batchName));
        }
      }      
    } else {
      // new index creation
      indexDirectory.mkdirs();
    }
    synchronized(this) {
      invertedIndex = openInvertedIndexCluster(batches, termProcessor);
    }
    // open direct index
    if(hasDirectIndex) {
      directTerms = new ObjectBigArrayBigList<String>();
      directTermIds = new Object2LongAVLTreeMap<String>();
      directTermIds.defaultReturnValue(-1);
      File directTermsFile = new File(indexDirectory, DIRECT_TERMS_FILENAME);
      if(directTermsFile.exists()) {
        FileLinesCollection fileLines = new FileLinesCollection(
            directTermsFile.getAbsolutePath(), "UTF-8");
        Iterator<MutableString> termsIter = fileLines.iterator();
        long termID = 0;
        while(termsIter.hasNext()) {
          String term = termsIter.next().toString();
          directTerms.add(term);
          directTermIds.put(term, termID++);
        }
      }
      synchronized(this) {
        directIndex = openDirectIndexCluster(batches);
      }
    }
	}
		
  /**
	 * Gets the name of this atomic index. This is used as the file name for the 
	 * directory storing the index files.
	 * @return
	 */
	public String getName() {
	  return name;
	}
	
	/**
	 * Is a direct index configured for this atomic index. 
	 * @return
	 */
	public boolean hasDirectIndex(){
	  return hasDirectIndex;
	}
		
	/**
	 * Starts a new MG4J batch. First time around this will be the head, 
	 * subsequent calls will start a new tail.
	 */
	protected void newBatch() {
	  occurrencesInRAM = 0;
    maxDocSizeInRAM = -1;
    documentsInRAM = 0;
    if(termMap == null) {
      termMap = new Object2ReferenceOpenHashMap<MutableString, 
          PostingsList>(INITIAL_TERM_MAP_SIZE, Hash.FAST_LOAD_FACTOR );      
    } else {
      termMap.clear();
      termMap.trim( INITIAL_TERM_MAP_SIZE );
    } 
    if(documentSizesInRAM  == null) {
      documentSizesInRAM = new IntArrayList();
    } else {
      documentSizesInRAM.clear();
    }
	}
	
	/**
	 * Writes all the data currently stored in RAM to a new index batch. The first
	 * batch is the head index, all other batches are tail indexes.
	 * @throws IOException 
	 * @throws IndexException
	 * @return the number of occurrences written to disk 
	 */
	protected long writeCurrentBatch() throws IOException, IndexException {
	  if(documentsInRAM == 0) return 0;
	  
	  // find the name for the new tail
	  int tailNo = -1;
	  File headDir = new File(indexDirectory, HEAD_FILE_NAME);
	  if(headDir.exists()) {
	    // we have a head, calculate the tail number for this new tail
	    String[] existingTails = indexDirectory.list(TAILS_FILENAME_FILTER);
	    for(String aTail : existingTails) {
	      int aTailNo = Integer.parseInt(aTail.substring(TAIL_FILE_NAME_PREFIX.length()));
	      if(aTailNo > tailNo) tailNo = aTailNo;
	    }
	    tailNo++;	    
	  }
	  
	  // Open an index writer for the new tail
	  String newTailName = tailNo == -1 ? HEAD_FILE_NAME : 
	      (TAIL_FILE_NAME_PREFIX + Integer.toString(tailNo));
	  File newTailDir = new File(indexDirectory, newTailName);
	  newTailDir.mkdir();
	  String mg4jBasename = new File(newTailDir, name).getAbsolutePath();
	  QuasiSuccinctIndexWriter indexWriter = new QuasiSuccinctIndexWriter(
	      IOFactory.FILESYSTEM_FACTORY,
	      mg4jBasename,
	      documentsInRAM,
	      Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
	      QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
	      CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
	      ByteOrder.nativeOrder());
	  // write the data from RAM
    int numTermsInRAM = termMap.size();
    logger.info( "Generating index for batch " + newTailName + 
            "; documents: " + documentsInRAM + "; terms:" + numTermsInRAM + 
            "; occurrences: " + occurrencesInRAM +
            " / " + parent.getOccurrencesInRam());
    
    // We write down all term in appearance order in termArray.
    final MutableString[] termArray = termMap.keySet().toArray(new MutableString[ numTermsInRAM ]);
    // We sort the terms appearing in the batch and write them on disk.
    Arrays.quickSort(0, termArray.length, 
            new IntComparator() {
              @Override
              public int compare(Integer one, Integer other) {
                return compare(one.intValue(), other.intValue());
              }
              
              @Override
              public int compare(int one, int other) {
                return termArray[one].compareTo(termArray[other]);
              }
            },
            new Swapper() {
              @Override
              public void swap(int one, int other) {
                MutableString temp = termArray[one];
                termArray[one] = termArray[other];
                termArray[other] = temp;
              }
            });
	  // write the terms, termmap, and bloom filter files
    
    // make sure we can't create a Bloom filter of expected size 0
    BloomFilter<Void> termFilter = BloomFilter.create(Math.max(numTermsInRAM, 1));
    PrintWriter pw = new PrintWriter( 
        new OutputStreamWriter(new FastBufferedOutputStream(
            new FileOutputStream(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION), 
            64 * 1024), 
        "UTF-8" ));
    for (MutableString t : termArray ) {
      t.println( pw );
      termFilter.add(t);
    }
    pw.close();
    generateTermMap(new File(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION),
        new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
    // write the bloom filter
    BinIO.storeObject(termFilter, 
        new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION)); 
    // write the sizes file
    File sizesFile = new File(mg4jBasename + DiskBasedIndex.SIZES_EXTENSION);
    OutputBitStream sizesStream = new OutputBitStream(sizesFile);   
    for(int docSize : documentSizesInRAM.elements()) {
      sizesStream.writeGamma(docSize);
    }
    sizesStream.close();
    // write the actual index
    int maxCount = 0;
    for ( int i = 0; i < numTermsInRAM; i++ ) {
      PostingsList postingsList = termMap.get( termArray[ i ] );
      if ( maxCount < postingsList.maxCount ) maxCount = postingsList.maxCount;
      postingsList.write(indexWriter);
    }
    indexWriter.close();
    // write the index properties
    try {
      Properties properties = indexWriter.properties();
      additionalProperties.setProperty( Index.PropertyKeys.SIZE, 
          indexWriter.writtenBits());
      // -1 means unknown
      additionalProperties.setProperty( Index.PropertyKeys.MAXDOCSIZE, 
          maxDocSizeInRAM);
      additionalProperties.setProperty( Index.PropertyKeys.MAXCOUNT, maxCount );
      additionalProperties.setProperty( Index.PropertyKeys.OCCURRENCES, 
          occurrencesInRAM );
      properties.addAll(additionalProperties);
      Scan.saveProperties( IOFactory.FILESYSTEM_FACTORY, properties, 
          mg4jBasename + DiskBasedIndex.PROPERTIES_EXTENSION );
      
      // write stats
      PrintStream statsPs = new PrintStream(new File(mg4jBasename + 
          DiskBasedIndex.STATS_EXTENSION));
      indexWriter.printStats(statsPs);
      statsPs.close();
    } catch(ConfigurationException e) {
      // this should never happen
      throw new IndexException("Error while saving tail properties", e);
    }
	  
    if(hasDirectIndex) {
      writeDirectIndex(newTailDir);
    }
    // update parent
    long res = occurrencesInRAM;
    
    // clear out internal state, in preparation for the next tail  
    newBatch();
    
    // merge new tail into index cluster
    try {
      // modify internal state
      synchronized(this) {
        batches.add(openSubIndex(newTailName));
        invertedIndex = openInvertedIndexCluster(batches, termProcessor);
        if(hasDirectIndex) {
          directIndex = openDirectIndexCluster(batches);
        }
      }
    } catch(Exception e) {
      throw new IndexException("Could not open the index just written to " +
         mg4jBasename , e);
    }
    return res;
	}
	
	/**
	 * Writes the in-RAM data to a new direct index batch.
	 * @param batchDir
	 */
  protected void writeDirectIndex(File batchDir) 
      throws IOException, IndexException {
    // The index we are writing is a direct index, so we give it new terms
    // which are actually document IDs, and they have posting lists containing
    // document IDs, which are actually termIDs.

    // The document pointers in RAM are zero-based, so we need to add all the 
    // documents on disk to this.
    long docsOnDisk = 0;
    for(MG4JIndex index : batches) {
      docsOnDisk += index.invertedIndex.numberOfDocuments;
    }
    
    //1. invert index data in RAM
    Object2ReferenceOpenHashMap<MutableString, PostingsList> docMap = 
          new Object2ReferenceOpenHashMap<MutableString, 
            PostingsList>(INITIAL_TERM_MAP_SIZE, Hash.FAST_LOAD_FACTOR );
    MutableString docIdStr = new MutableString();
    // make sure all the terms about to be indexed have direct ID
    for(MutableString termMS : termMap.keySet()) {
      String termString = termMS.toString();
      long directTermId = directTermIds.getLong(termString);
      if(directTermId == directTermIds.defaultReturnValue()) {
        // term not seen before
        directTerms.add(termString);
        directTermId = directTerms.size64() -1;
        directTermIds.put(termString, directTermId);
      }
    }
    // we now read the posting lists for all the terms, in ascending term order    
    MutableString termMS = new MutableString();
    for(long directTermId = 0; directTermId < directTerms.size64(); directTermId++){
      String termString = directTerms.get(directTermId);
      termMS.replace(termString);
      PostingsList termPostings = termMap.get(termMS);
      if(termPostings != null) {
        long docPointer = docsOnDisk + termPostings.firstDocumentPointer;
        for(int i = 0; i < termPostings.documentPointersDifferential.size(); i++) {
          docPointer += termPostings.documentPointersDifferential.get(i);       
          int count = termPostings.counts.getInt(i);
          // convert data to the correct type
          docIdStr.replace(longToTerm(docPointer));
          // at this point we have term, document, counts so we can write the data
          // to the in-RAM direct index
          PostingsList docPostings = docMap.get(docIdStr);
          if(docPostings == null) {
            docPostings = new PostingsList(false);
            docMap.put(docIdStr.copy(), docPostings);
          }
          docPostings.newDocumentPointer(directTermId);
          docPostings.setCount(count); 
          docPostings.flush();
        } 
      }
    }
    
    // 2. write the data from RAM
    String mg4jBasename = new File(batchDir, name + 
        DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
    // copy the default compression flags, and remove positions
    Map<Component, Coding> flags = new HashMap<Component, Coding>(
        CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
    flags.remove(Component.POSITIONS);
    QuasiSuccinctIndexWriter directIndexWriter =
        new QuasiSuccinctIndexWriter(
            IOFactory.FILESYSTEM_FACTORY,
            mg4jBasename, 
            directTerms.size64(),
            Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
            QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
            flags,
            ByteOrder.nativeOrder());
    
    // sort all the docIds
    final MutableString[] docArray = docMap.keySet().toArray(new MutableString[ docMap.size() ]);
    // We sort the terms appearing in the batch and write them on disk.
    Arrays.quickSort(0, docArray.length, 
            new IntComparator() {
              @Override
              public int compare(Integer one, Integer other) {
                return compare(one.intValue(), other.intValue());
              }
              
              @Override
              public int compare(int one, int other) {
                return docArray[one].compareTo(docArray[other]);
              }
            },
            new Swapper() {
              @Override
              public void swap(int one, int other) {
                MutableString temp = docArray[one];
                docArray[one] = docArray[other];
                docArray[other] = temp;
              }
            });
    
    BloomFilter<Void> docBloomFilter = BloomFilter.create(docArray.length);
    PrintWriter pw = new PrintWriter( 
        new OutputStreamWriter(new FastBufferedOutputStream(
            new FileOutputStream(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION), 
            64 * 1024), 
        "UTF-8" ));
    for (MutableString t : docArray ) {
      t.println( pw );
      docBloomFilter.add(t);
    }
    pw.close();
    generateTermMap(new File(mg4jBasename + DiskBasedIndex.TERMS_EXTENSION),
        new File(mg4jBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
    // write the bloom filter
    BinIO.storeObject(docBloomFilter, 
        new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION)); 
    // write the sizes file
    // this is a list of document sizes (directTerms in our case)    
    File sizesFile = new File(mg4jBasename + DiskBasedIndex.SIZES_EXTENSION);
    OutputBitStream sizesStream = new OutputBitStream(sizesFile);
    int maxTermSize = -1; // -1 means unknown
    //for(MutableString term : termArray) {
    for(long directTermId = 0; directTermId < directTerms.size64(); directTermId++){
      String termString = directTerms.get(directTermId);
      termMS.replace(termString);
      PostingsList termPostings = termMap.get(termMS);
      int termSize = termPostings != null ?
          (int)termPostings.frequency : 0;
      sizesStream.writeGamma(termSize);
      if(termSize > maxTermSize) maxTermSize = termSize;
    }
    sizesStream.close();
    
    // write the actual index
    int maxCount = 0;
    long occurrences = 0;
    for ( int i = 0; i < docArray.length; i++ ) {
      PostingsList postingsList = docMap.get( docArray[ i ] );
      if ( maxCount < postingsList.maxCount ) maxCount = postingsList.maxCount;
      postingsList.write(directIndexWriter);
      occurrences += postingsList.occurrences;
    }
    directIndexWriter.close();
    // write the index properties
    try {
      Properties properties = directIndexWriter.properties();
      additionalDirectProperties.setProperty( Index.PropertyKeys.SIZE, 
          directIndexWriter.writtenBits());
      // -1 means unknown
      additionalDirectProperties.setProperty( Index.PropertyKeys.MAXDOCSIZE, 
          maxTermSize);
      additionalDirectProperties.setProperty( Index.PropertyKeys.MAXCOUNT, maxCount );
      additionalDirectProperties.setProperty( Index.PropertyKeys.OCCURRENCES, 
          occurrences);
      properties.addAll(additionalDirectProperties);
      Scan.saveProperties( IOFactory.FILESYSTEM_FACTORY, properties, 
          mg4jBasename + DiskBasedIndex.PROPERTIES_EXTENSION );
      
      // write stats
      PrintStream statsPs = new PrintStream(new File(mg4jBasename + 
          DiskBasedIndex.STATS_EXTENSION));
      directIndexWriter.printStats(statsPs);
      statsPs.close();
    } catch(ConfigurationException e) {
      // this should never happen
      throw new IndexException("Error while saving tail properties", e);
    }
    //update the index-wide direct terms file
    File newDirectTermsFile = new File(indexDirectory, DIRECT_TERMS_FILENAME + HEAD_NEW_EXT);
    pw = new PrintWriter(new OutputStreamWriter(new FastBufferedOutputStream(
        new FileOutputStream(newDirectTermsFile), 64 * 1024), "UTF-8" ));
    for (String t : directTerms ) {
      pw.println(t);
    }
    pw.close();

    File directTermsFile = new File(indexDirectory, DIRECT_TERMS_FILENAME);
    File oldDirectTermsFile = new File(indexDirectory, DIRECT_TERMS_FILENAME + HEAD_OLD_EXT);
    if(!directTermsFile.exists() || directTermsFile.renameTo(oldDirectTermsFile)) {
      if(newDirectTermsFile.renameTo(directTermsFile)) {
        oldDirectTermsFile.delete();
      } else {
        throw new IndexException("Unable to save direct terms file");
      }
    }
  }
	
	
	/**
	 * Combines all the currently existing batches, generating a new head index.
	 * @throws IndexException 
	 * @throws IOException 
	 * @throws ConfigurationException 
	 */
	protected void compactIndex() throws IndexException, IOException, ConfigurationException {
	  File headDirNew = new File(indexDirectory, HEAD_FILE_NAME + HEAD_NEW_EXT);
	  // make a local copy of the sub-indexes
	  List<MG4JIndex> indexesToMerge = 
	      new ArrayList<AtomicIndex.MG4JIndex>(batches);
	  if(!headDirNew.mkdir()) {
	    throw new IndexException("Could not create new head directory at " + 
	        headDirNew.getAbsolutePath() +  "!"); 
	  }
	  
	  Map<Component,Coding> codingFlags = 
	      CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX;
	  String outputBaseName = new File(headDirNew, name).getAbsolutePath();
	  
	  String[] inputBaseNames = new String[indexesToMerge.size()];
	  for(int i = 0; i < inputBaseNames.length; i++) {
	    inputBaseNames[i] = new File(indexesToMerge.get(i).indexDir, name)
	      .getAbsolutePath(); 
	  }
	  
	  try {
      new Concatenate(
          IOFactory.FILESYSTEM_FACTORY,
          outputBaseName,
          inputBaseNames,
          false, // metadataOnly 
          Combine.DEFAULT_BUFFER_SIZE, 
          codingFlags,
          IndexType.QUASI_SUCCINCT,
          true, // skips
          // BitStreamIndex.DEFAULT_QUANTUM,
          // replaced with optimised automatic calculation
          -5, 
          BitStreamIndex.DEFAULT_HEIGHT, 
          SkipBitStreamIndexWriter.DEFAULT_TEMP_BUFFER_SIZE, 
          ProgressLogger.DEFAULT_LOG_INTERVAL).run();
      // generate term map
      generateTermMap(new File(outputBaseName + DiskBasedIndex.TERMS_EXTENSION), 
          new File(outputBaseName +  DiskBasedIndex.TERMMAP_EXTENSION),
          new File(outputBaseName +  DocumentalCluster.BLOOM_EXTENSION));
    } catch(Exception e) {
      throw new IndexException("Exception while combining sub-indexes", e);
    }

    if(hasDirectIndex()) {
      combineDirectIndexes(indexesToMerge, new File(headDirNew, name + 
          DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath());
    }	  
	  
	  // update the internal state
    synchronized(this) {
      // remove the indexes that were merged
      batches.removeAll(indexesToMerge);
      // insert the new head at the front of the list
      File headDir = new File(indexDirectory, HEAD_FILE_NAME);
      File headDirOld = new File(indexDirectory, HEAD_FILE_NAME + HEAD_OLD_EXT);
      if(headDir.exists() && headDir.renameTo(headDirOld)){
        if(headDirNew.renameTo(headDir)) {
          batches.add(0, openSubIndex(HEAD_FILE_NAME));
          invertedIndex = openInvertedIndexCluster(batches, termProcessor);
          if(hasDirectIndex) {
            directIndex =openDirectIndexCluster(batches);
          }
          // clean-up: delete old head, used-up tails
          if(!gate.util.Files.rmdir(headDirOld)) {
            throw new IndexException(
                "Could not fully delete old sub-index at: " + headDirOld);
          }
          for(MG4JIndex aSubIndex : indexesToMerge) {
            if(!aSubIndex.indexDir.equals(headDir)) {
              if(!gate.util.Files.rmdir(aSubIndex.indexDir)){
                throw new IndexException(
                    "Could not fully delete old sub-index at: " + 
                    aSubIndex.indexDir);
              }              
            }
          }
        } else {
          throw new IndexException("Cold not rename new head at " + 
              headDirNew.getAbsolutePath() + " to " + headDir);
        }
      } else {
        throw new IndexException("Cold not rename head at " + 
            headDir.getAbsolutePath() + " to " + headDirOld);
      }
    }
	}
	
	/**
	 * Given a set of direct indexes (MG4J indexes, with counts, but no positions,
	 * that form a lexical cluster) this method produces one single output index
	 * containing the data from all the input indexes.
	 * @param inputIndexes
	 * @param outputBasename
	 * @throws IOException 
	 * @throws ConfigurationException 
	 */
	protected static void combineDirectIndexes (List<MG4JIndex> inputIndexes, 
	    String outputBasename) throws IOException, ConfigurationException {
	  
	  long noOfDocuments = 0;
	  long noOfTerms = 0;
	  for(MG4JIndex index : inputIndexes) {
	    noOfDocuments += index.directIndex.numberOfDocuments;
	    noOfTerms += index.directIndex.numberOfTerms;
	  }
	  
	  // open the output writer
    // copy the default compression flags, and remove positions
    Map<Component, Coding> flags = new HashMap<Component, Coding>(
        CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
    flags.remove(Component.POSITIONS);
    QuasiSuccinctIndexWriter outputIndexWriter =
        new QuasiSuccinctIndexWriter(
            IOFactory.FILESYSTEM_FACTORY,
            outputBasename, 
            noOfDocuments,
            Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
            QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
            flags,
            ByteOrder.nativeOrder());
    
    BloomFilter<Void> bloomFilter = BloomFilter.create(noOfTerms);
    PrintWriter termsPw = new PrintWriter( 
        new OutputStreamWriter(new FastBufferedOutputStream(
            new FileOutputStream(outputBasename + DiskBasedIndex.TERMS_EXTENSION), 
            64 * 1024), 
        "UTF-8" ));
    
    // write the index
    long occurrences = 0;
    int maxCount = 0;
    PostingsList postingsList = new PostingsList(false);
    for(MG4JIndex inputIndex : inputIndexes) {
      IndexReader inputReader = inputIndex.directIndex.getReader();
      File directTermsFile = new File(inputIndex.indexDir, 
          inputIndex.indexName + DIRECT_INDEX_NAME_SUFFIX + 
          DiskBasedIndex.TERMS_EXTENSION);
      FileLinesCollection.FileLinesIterator termsIter =
          new FileLinesCollection(directTermsFile.getAbsolutePath(), 
          "UTF-8").iterator();
      MutableString termMS = null;
      IndexIterator inputIterator = inputReader.nextIterator();
      while(inputIterator != null && termsIter.hasNext()) {
        termMS = termsIter.next();
        bloomFilter.add(termMS);
        termMS.println(termsPw);
        long docPointer = inputIterator.nextDocument();
        while(docPointer !=  IndexIterator.END_OF_LIST) {
          postingsList.newDocumentPointer(docPointer);
          postingsList.setCount(inputIterator.count());
          docPointer = inputIterator.nextDocument();
        }
        postingsList.flush();
        occurrences += postingsList.occurrences;
        if ( maxCount < postingsList.maxCount ) maxCount = postingsList.maxCount;
        postingsList.write(outputIndexWriter);
        postingsList.clear();
        inputIterator = inputReader.nextIterator();
      }
      inputReader.close();
    }
    outputIndexWriter.close();
    termsPw.close();
    generateTermMap(new File(outputBasename + DiskBasedIndex.TERMS_EXTENSION),
        new File(outputBasename + DiskBasedIndex.TERMMAP_EXTENSION), null);
    // write the bloom filter
    BinIO.storeObject(bloomFilter, 
        new File(outputBasename + DocumentalCluster.BLOOM_EXTENSION));
    // direct indexes don't store positions, so sizes are not needed

    // write the index properties
    Properties properties = outputIndexWriter.properties();
    properties.setProperty(Index.PropertyKeys.TERMPROCESSOR, 
        ObjectParser.toSpec(NullTermProcessor.getInstance()));
    properties.setProperty( Index.PropertyKeys.SIZE,  
        outputIndexWriter.writtenBits());
    // -1 means unknown
    properties.setProperty( Index.PropertyKeys.MAXDOCSIZE, -1);
    properties.setProperty( Index.PropertyKeys.MAXCOUNT, maxCount );
    properties.setProperty( Index.PropertyKeys.OCCURRENCES, occurrences);
    Scan.saveProperties( IOFactory.FILESYSTEM_FACTORY, properties, 
        outputBasename + DiskBasedIndex.PROPERTIES_EXTENSION );
    
    // write stats
    PrintStream statsPs = new PrintStream(new File(outputBasename + 
        DiskBasedIndex.STATS_EXTENSION));
    outputIndexWriter.printStats(statsPs);
    statsPs.close();
	}
	
	/**
	 * Instructs this index to dump to disk all the in-RAM index data at the fist 
	 * opportunity.
	 * @return a {@link Future} value that, upon completion, will return the 
	 * number of occurrences written to disk.
	 * @throws InterruptedException if this thread is interrupted while trying to
	 * queue the dump request.
	 */
	public Future<Long> requestSyncToDisk() throws InterruptedException {
	  if(batchWriteTask == null) {
	    batchWriteTask = new FutureTask<Long>(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
          return writeCurrentBatch();
        }
      });
	    inputQueue.put(DUMP_BATCH);  
	  }
	  return batchWriteTask;
	}
	
	/**
	 * Requests this atomic index to compact its on-disk batches into a single
	 * batch.
	 * 
	 * @return a {@link Future} which can be used to find out when the compaction
	 * operation has completed.
	 * @throws InterruptedException if this thread is interrupted while trying to
   * queue the compaction request.
	 */
  public Future<Void> requestCompactIndex() throws InterruptedException {
    if(compactIndexTask == null) {
      compactIndexTask = new FutureTask<Void>(new Callable<Void>(){
        @Override
        public Void call() throws Exception {
          compactIndex();
          return null;
        }
      });
      inputQueue.put(COMPACT_INDEX);
    }
    return compactIndexTask;
  }
	
	/**
	 * Opens one sub-index, specified as a directory inside this Atomic Index's
	 * index directory.
	 * @param subIndexDirname
	 * @return
	 * @throws IOException 
	 * @throws IndexException 
	 */
	protected MG4JIndex openSubIndex(String subIndexDirname) throws IOException, IndexException {
    Index invertedIndex = null;
    File subIndexDir = new File(indexDirectory, subIndexDirname);
    String mg4jBasename = new File(subIndexDir, name).getAbsolutePath(); 
    try {
      try{
        invertedIndex = Index.getInstance(
            mg4jBasename + "?" + UriKeys.MAPPED.name().toLowerCase() + "=1;", 
            true, true);
      } catch(IOException e) {
        // memory mapping failed
        logger.info("Memory mapping failed for index " + mg4jBasename
                + ". Loading as file index instead");
        // now try to open it as a plain an on-disk index
        invertedIndex = Index.getInstance(mg4jBasename, true, true);
      }
    } catch(Exception e) {
      throw new IndexException("Could not open the sub-index at" + mg4jBasename , e);
    }
    //read the Bloom filter 
    File bloomFile = new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION);
    BloomFilter<Void> invertedTermFilter = null;
    try {
      if(bloomFile.exists()) {
        invertedTermFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
      }
    } catch(ClassNotFoundException e) {
      // this should never happen. If it does, it's not fatal
      logger.warn("Exception wile loading stre Bloom Filter", e);
    }
    
    Index directIndex = null;
    BloomFilter<Void> directTermFilter = null;
    if(hasDirectIndex) {
      // open direct index
      mg4jBasename = new File(subIndexDir, name + 
          DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
      try {
        try{
          directIndex = Index.getInstance(
              mg4jBasename + "?" + UriKeys.MAPPED.name().toLowerCase() + "=1;", 
              true, false);
        } catch(IOException e) {
          // memory mapping failed
          logger.info("Memory mapping failed for index " + mg4jBasename
                  + ". Loading as file index instead");
          // now try to open it as a plain an on-disk index
          directIndex = Index.getInstance(mg4jBasename, true, false);
        }
      } catch(Exception e) {
        throw new IndexException("Could not open the sub-index at" + mg4jBasename , e);
      }
      //read the Bloom filter 
      bloomFile = new File(mg4jBasename + DocumentalCluster.BLOOM_EXTENSION);
      
      try {
        if(bloomFile.exists()) {
          directTermFilter = (BloomFilter<Void>) BinIO.loadObject(bloomFile);
        }
      } catch(ClassNotFoundException e) {
        // this should never happen. If it does, it's not fatal
        logger.warn("Exception wile loading stre Bloom Filter", e);
      }
    }
    
    MG4JIndex newIndexData = new MG4JIndex(subIndexDir, name,
        invertedIndex, invertedTermFilter, 
        directIndex, directTermFilter);
	  return newIndexData;
	}
	
	/**
	 * Runnable implementation: the logic of this run method is simply indexing
	 * documents queued to the input queue. To stop it, send a 
	 * {@link GATEDocument#END_OF_QUEUE} value to the input queue.
	 */
	public void run() {
	  indexingThread = Thread.currentThread();
	  GATEDocument aDocument;
	  try{
	    // start in-RAM indexing
	    newBatch();
  	  if(inputQueue != null) {
        do{
          aDocument = inputQueue.take();
          if(aDocument != GATEDocument.END_OF_QUEUE) {
            if(aDocument == DUMP_BATCH) {
              //dump batch was requested
              if(batchWriteTask != null){
                batchWriteTask.run();
              }
              batchWriteTask = null;
            } else if(aDocument == COMPACT_INDEX) {
              // compress index was requested
              if(compactIndexTask != null) {
                compactIndexTask.run();
              }
              compactIndexTask = null;
            } else {
              try {
                long occurencesBefore = occurrencesInRAM;
                processDocument(aDocument);
                aDocument.addOccurrences(occurrencesInRAM - occurencesBefore);
              } catch(Throwable e) {
                logger.error("Problem while indexing document!", e);
              }          
            }
          } else {
            // close down
            writeCurrentBatch();
            flush();
          }
          if(aDocument != DUMP_BATCH && aDocument != COMPACT_INDEX) {
            outputQueue.put(aDocument);  
          }
        } while(aDocument != GATEDocument.END_OF_QUEUE);
  	  }
	  }catch(InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch(Exception e) {
      logger.error("Exception during indexing!", e);
      throw new GateRuntimeException("Exception during indexing!", e);
    } finally {
      indexingThread = null;
    }
	}
	
	/**
	 * Closes all file-based resources.
	 * @throws IOException
	 */
	abstract protected void flush() throws IOException;
	
	/**
	 * Notifies this index to stop its indexing operations, and waits for all data
	 * to be written. 
	 * @throws InterruptedException is the waiting thread is interrupted before 
	 * the indexing thread has finished writing all the data.
	 */
	public void close() throws InterruptedException {
    inputQueue.put(GATEDocument.END_OF_QUEUE);
    if(indexingThread != null) {
      indexingThread.join();
    }
	}

  /**
   * Hook for subclasses, called before processing the annotations
   * for this document.  The default implementation is a no-op.
   */
  protected void documentStarting(GATEDocument gateDocument) throws IndexException {
  }

  /**
   * Hook for subclasses, called after annotations for this document
   * have been processed.  The default implementation is a no-op.
   */
  protected void documentEnding(GATEDocument gateDocument) throws IndexException {
  }
	
  /**
   * Get the annotations that are to be processed for a document,
   * in increasing order of offset.
   */
  protected abstract Annotation[] getAnnotsToProcess(
          GATEDocument gateDocument) throws IndexException;
  
  
  /**
   * Calculate the starting position for the given annotation, storing
   * it in {@link #tokenPosition}.  The starting position is the
   * index of the token within the document where the annotation starts,
   * and <em>must</em> be &gt;= the previous value of tokenPosition.
   * @param ann
   * @param gateDocument
   */
  protected abstract void calculateStartPositionForAnnotation(Annotation ann,
          GATEDocument gateDocument) throws IndexException;
  
  /**
   * Determine the string (or strings, if there are alternatives) that should 
   * be stored in the index for the given annotation.
   * 
   * If a single string value should be returned, it is more efficient to store
   * the value in {@link #currentTerm}, in which case <code>null</code> should 
   * be returned instead.
   * 
   * If the current term should not be indexed (e.g. it's a stop word), then 
   * the implementation should return an empty String array.
   * 
   * @param ann
   * @param gateDocument
   */
  protected abstract String[] calculateTermStringForAnnotation(Annotation ann,
          GATEDocument gateDocument) throws IndexException;
  
  /**
   * Adds the supplied document to the in-RAM index.
   * @param gateDocument the document to index
   * @throws IndexException
   */
  protected void processDocument(GATEDocument gateDocument) throws IndexException{
    //zero document related counters
    tokenPosition = 0;
    
    documentStarting(gateDocument);
    //get the annotations to be processed
    Annotation[] annotsToProcess = getAnnotsToProcess(gateDocument);
    logger.debug("Starting document "
        + gateDocument.getDocument().getName() + ". "
        + annotsToProcess.length + " annotations to process");    
    try {
      //process the annotations one by one.
      for(Annotation ann : annotsToProcess){
        processAnnotation(ann, gateDocument);
      }
      // the current document is finished
      int docLength = tokenPosition + 1;
      if(docLength > maxDocSizeInRAM) maxDocSizeInRAM = docLength;
      documentSizesInRAM.add(docLength);
    } finally {
      documentEnding(gateDocument);
      documentsInRAM++;
    }
  }
  
  /**
   * Indexes one annotation (either a Token or a semantic annotation).
   * @param ann the annotation to be indexed
   * @param gateDocument the GATEDocument containing the annotation
   * @throws IndexException
   * @throws IOException
   */
  protected void processAnnotation(Annotation ann,
      GATEDocument gateDocument) throws IndexException {
    // calculate the position and string for this annotation
    calculateStartPositionForAnnotation(ann, gateDocument);
    String[] terms = calculateTermStringForAnnotation(ann, gateDocument);
    if(terms == null){
      //the value was already stored in #currentTerm by the implementation.
      indexCurrentTerm();
    }else if(terms.length == 0){
      //we received an empty array -> we should NOT index the current term
    }else{
      //we have received multiple values from the implementation
      for(String aTerm : terms){
        currentTerm.replace(aTerm == null ? "" : aTerm);
        indexCurrentTerm();
      }
    }
  }
  
  /**
   * Adds the value in {@link #currentTerm} to the index.
   * @throws IOException 
   */
  protected void indexCurrentTerm() {
    //check if we have seen this mention before
    PostingsList termPostings = termMap.get(currentTerm);
    if(termPostings == null){
      //new term -> create a new postings list.
      termMap.put( currentTerm.copy(), termPostings = new PostingsList(true));
    }
    //add the current posting to the current postings list
    // In a documental cluster, each sub-index is zero-based. This is why we use
    // the local document pointer here.
    termPostings.newDocumentPointer(documentsInRAM);
    //this is needed so that we don't increment the number of occurrences
    //for duplicate values.
    if(termPostings.checkPosition(tokenPosition)){
      termPostings.addPosition(tokenPosition);
      occurrencesInRAM++;
    } else {
      logger.debug("Duplicate position");
    }
  }

  /**
   * Gets the top level directory for this atomic index. This will be a 
   * directory contained in the top level directory of the {@link MimirIndex}
   * which includes this atomic index.
   * @return
   */
  public File getIndexDirectory() {
    return indexDirectory;
  }

  /**
   * Gets the top level {@link MimirIndex} to which this atomic index belongs.
   * @return
   */
  public MimirIndex getParent() {
    return parent;
  }

  /**
   * Gets the input queue used by this atomic index. This queue is used to 
   * submit documents for indexing.
   * @return
   */
  public BlockingQueue<GATEDocument> getInputQueue() {
    return inputQueue;
  }

  /**
   * Gets the output queue used by this atomic index. This is used to 
   * &quot;return&quot; documents that have finished indexing. Notably, values 
   * in this queue will have their occurrences value (see
   * {@link GATEDocument#getOccurrences()}) increased by the number of 
   * occurrences generated by indexing the document in this atomic index.
   * 
   * @return
   */
  public BlockingQueue<GATEDocument> getOutputQueue() {
    return outputQueue;
  }

  /**
   * Gets the inverted index (an {@link Index} value) that can be used to 
   * search this atomic index. This will normally be a 
   * {@link DocumentalCluster} view over all the batches contained. 
   * @return
   */
  public Index getIndex() {
    return invertedIndex;
  }
  
  
  /**
   * Gets the direct index for this atomic index. The returned value is 
   * <code>non-null</code> only if the atomic index was configured to have a 
   * direct index upon its construction (see 
   * {@link #AtomicIndex(MimirIndex, String, File, boolean, TermProcessor, BlockingQueue, BlockingQueue)}.).
   * You can check if a direct index has been configured by calling 
   * {@link #hasDirectIndex()}.
   * @return an Index in which terms and documents are reversed. When querying 
   * the returned index, the &quot;terms&quot; provided should be String 
   * representations of document IDs (as produced by {@link #longToTerm(long)}).
   * The search results is a set of &quot;document IDs&quot;, which are actually
   * term IDs. The actual term string corresponding to the returned term IDs can
   * be obtained by calling {@link #getDirectTerm(long)}.   
   */
  public Index getDirectIndex() {
    return directIndex;
  }
 
  /**
   * Gets the term string for a given direct term ID. The term ID must have been 
   * obtained from the direct index of this index.
   * @param termId the ID for the term being sought.
   * @return the string for the given term.
   */
  public CharSequence getDirectTerm(long termId) {
    return directTerms.get(termId);
  }
  
  /**
   * Gets the list of direct terms for this index. The terms are sorted by the 
   * first they were seen, and <strong>not</strong> lexicographically.
   * @return
   */
  public ObjectBigList<? extends CharSequence> getDirectTerms() {
    return directTerms;
  }
  
  /**
   * Gets the occurrence count in the whole index for a given direct term,
   * specified by a direct term ID (which must have been obtained from the 
   * direct index of this index).
   * 
   * @param directTermId
   * @return
   * @throws IOException
   */
  public long getDirectTermOccurenceCount(long directTermId) throws IOException {
    String termStr = directTerms.get(directTermId);
    // we need to sum up all the counts for this term in the inverted index
    long count = 0;
    IndexIterator idxItr = invertedIndex.documents(termStr);
    long docId = idxItr.nextDocument();
    while(docId != IndexIterator.END_OF_LIST) {
      count += idxItr.count();
      docId = idxItr.nextDocument();
    }
    return count;
  }
  
  /**
   * Returns the number of batches in this atomic index.
   * @return
   */
  public int getBatchCount() {
    return batches.size();
  }
}