TruncateIndex.java

/*
 *  TruncateIndex.java
 *
 *  Copyright (c) 2007-2016, The University of Sheffield.
 *
 *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html), 
 *  and is free software, licenced under the GNU Lesser General Public License,
 *  Version 3, June 2007 (also included with this distribution as file
 *  LICENCE-LGPL3.html).
 *
 *  Ian Roberts, 1st September 2016
 *
 *  $Id: TruncateIndex.java 19708 2016-10-29 16:23:28Z ian_roberts $
 */
package gate.mimir.util;

import gate.Gate;
import gate.creole.Plugin;
import gate.mimir.IndexConfig;
import gate.mimir.IndexConfig.SemanticIndexerConfig;
import gate.mimir.IndexConfig.TokenIndexerConfig;
import gate.mimir.MimirIndex;
import gate.mimir.index.AtomicIndex;
import gate.mimir.index.DocumentCollection;
import gate.util.maven.Utils;
import it.unimi.di.big.mg4j.index.CompressionFlags;
import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.Index;
import it.unimi.di.big.mg4j.index.IndexIterator;
import it.unimi.di.big.mg4j.index.IndexReader;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
import it.unimi.di.big.mg4j.io.IOFactory;
import it.unimi.di.big.mg4j.tool.Scan;
import it.unimi.dsi.big.io.FileLinesCollection;
import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
import it.unimi.dsi.bits.Fast;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.io.InputBitStream;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.lang.MutableString;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Utility class to fix up a Mimir index that has been corrupted, e.g.
 * by an unclean shutdown or out-of-memory condition. The index must be
 * closed to use this tool, which either means the Mimir webapp is not
 * running, or the index has been deleted from the running Mimir. It is
 * very very strongly recommended to back up an index before attempting
 * this procedure. The clean up process will unavoidably remove some
 * number of documents from the tail of the index, but will attempt to
 * keep the number of lost documents to a minimum.
 * 
 * @author ian
 *
 */
public class TruncateIndex {

  private static final Logger log = LoggerFactory.getLogger(TruncateIndex.class);

  /**
   * Comparator that orders mimir zip collection files by number (e.g.
   * mimir-collection-16.zip comes after mimir-collection-12-15.zip but
   * before mimir-collection-100-120.zip)
   */
  public static final Comparator<File> ZIP_COLLECTION_COMPARATOR =
          new Comparator<File>() {
            public int compare(File a, File b) {
              int numA =
                      Integer.parseInt(a.getName().substring(
                              a.getName().lastIndexOf('-') + 1,
                              a.getName().length() - 4));
              int numB =
                      Integer.parseInt(b.getName().substring(
                              b.getName().lastIndexOf('-') + 1,
                              b.getName().length() - 4));
              return numA - numB;
            }
          };

  public static final Comparator<String> BATCH_COMPARATOR =
          new Comparator<String>() {
            public int compare(String a, String b) {
              if(a.equals("head")) {
                if(b.equals("head")) {
                  // both heads
                  return 0;
                } else {
                  // head before tail
                  return -1;
                }
              } else {
                if(b.equals("head")) {
                  // tail after head
                  return 1;
                } else {
                  // both tails, compare by number
                  int numA =
                          Integer.parseInt(a.substring(a.lastIndexOf('-') + 1));
                  int numB =
                          Integer.parseInt(b.substring(b.lastIndexOf('-') + 1));
                  return numA - numB;
                }
              }
            }
          };

  public static final FilenameFilter INDEX_NAME_FILTER = new FilenameFilter() {
    private Pattern pat = Pattern.compile("(?:token|mention)-\\d+");

    @Override
    public boolean accept(File dir, String name) {
      return pat.matcher(name).matches();
    }
  };

  public static final FilenameFilter BATCH_NAME_FILTER = new FilenameFilter() {
    private Pattern pat = Pattern.compile("head|tail-\\d+");

    @Override
    public boolean accept(File dir, String name) {
      return pat.matcher(name).matches();
    }
  };

  public static void main(String... args) throws Exception {
    Gate.runInSandbox(true);
    Gate.init();
    int i = 0;
    Pattern mavenPluginPattern = Pattern.compile("([^/]+?):([^/]+?):([^/]+)");
    while(i < args.length) {
      if("-d".equals(args[i])) {
        // Maven cache directory
        Utils.addCacheDirectory(new File(args[++i]));
        i++;
      } else if("-p".equals(args[i])) {
        // plugin - either URL or file path to a directory plugin, or group:artifact:version for a Maven one
        String plugin = args[++i];
        plugin = plugin.trim();
        try {
          Matcher m = mavenPluginPattern.matcher(plugin);
          if(m.matches()) {
            // this looks like a Maven plugin
            Gate.getCreoleRegister().registerPlugin(new Plugin.Maven(m.group(1), m.group(2), m.group(3)));
          } else {
            try {
              URL u = new URL(plugin);
              // succeeded in parsing as a URL, load that
              Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(u));
            } catch(MalformedURLException e) {
              // not a URL, treat as a file
              File pluginFile = new File(plugin);
              Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(pluginFile.toURI().toURL()));
            }
          }
        } catch(Exception e) {
          log.error("Failed to load plugin " + plugin, e);
          System.exit(1);
        }
        i++;
      } else {
        break;
      }
    }
    truncateIndex(new File(args[i]));
  }

  /**
   * Attempt to fix up a corrupted Mimir index by truncating some number
   * of documents off the end. There will be a certain number of
   * documents in complete index batches, and a (possibly different)
   * number of documents successfully persisted to disk in the zip files
   * of the DocumentCollection, the index will be truncated to the
   * smaller of those two numbers.
   * 
   * @param indexDirectory the top-level directory of the Mimir index
   *          (containing config.xml)
   */
  public static void truncateIndex(File indexDirectory) throws Exception {
    // 1. Repair the last zip file in the DocumentCollection
    repairLastZip(indexDirectory);

    // 2. Determine the last "good" batch (the greatest numbered head or
    // tail that is fully written to disk in every AtomicIndex) and
    // stash the bad ones
    String lastGoodBatch = determineLastGoodBatch(indexDirectory);

    if(lastGoodBatch == null) {
      throw new RuntimeException(
              "All batches are corrupt, sorry, this index is a write-off");
    }

    // 3. If the zip collection is at least as long as the sum of the
    // good batches, truncate it to match the batches and we're done.
    BatchDetails batches = batchEndPoints(indexDirectory);
    long totalDocsInBatches = batches.endPoints[batches.endPoints.length - 1];
    long totalDocsInZips = totalDocumentsInZipCollection(indexDirectory);

    if(totalDocsInBatches == totalDocsInZips) {
      log.info("We're in luck, the batches and zips line up exactly");
      return;
    } else if(totalDocsInZips > totalDocsInBatches) {
      truncateZipCollectionTo(indexDirectory, totalDocsInBatches);
      return;
    } else if(totalDocsInZips == 0) {
      throw new RuntimeException("Zip collection is empty");
    }

    // 4. Otherwise, the zip collection stops in the middle of a batch B
    int endBatch = -1;
    for(int i = 0; i < batches.names.length; i++) {
      if(batches.endPoints[i] >= totalDocsInZips) {
        endBatch = i;
        break;
      }
    }
    log.info("Zip collection ends within " + batches.names[endBatch]);
    if(batches.endPoints[endBatch] == totalDocsInZips) {
      // special case - zip collection ends exactly at the end of a
      // batch. Stash subsequent batches and we're done
      log.info("Zip collection ends exactly at the end of batch "
              + batches.names[endBatch]);
      log.info("Stashing subsequent batches");
      stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
              .subList(endBatch + 1, batches.endPoints.length));
      log.info("Done");
      return;
    }
    // 4.1. Stash B (for every AtomicIndex) and any batches beyond it.
    stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
            .subList(endBatch, batches.endPoints.length));

    // 4.2. Read each stashed B and re-write it but with documents
    // beyond the end of the zip collection omitted
    long endOfPreviousBatch = 0L;
    if(endBatch > 0) {
      endOfPreviousBatch = batches.endPoints[endBatch - 1];
    }
    trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips
            - endOfPreviousBatch);

    // 4.3. Truncate the direct indexes for those AtomicIndexes that
    // require it
    IndexConfig indexConfig =
            IndexConfig.readConfigFromFile(new File(indexDirectory,
                    MimirIndex.INDEX_CONFIG_FILENAME));
    TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers();
    for(int i = 0; i < tokenIndexes.length; i++) {
      if(tokenIndexes[i].isDirectIndexEnabled()) {
        truncateDirectIndex(indexDirectory, "token-" + i,
                batches.names[endBatch], totalDocsInZips - 1);
      }
    }
    SemanticIndexerConfig[] semanticIndexes = indexConfig.getSemanticIndexers();
    for(int i = 0; i < semanticIndexes.length; i++) {
      if(semanticIndexes[i].isDirectIndexEnabled()) {
        truncateDirectIndex(indexDirectory, "mention-" + i,
                batches.names[endBatch], totalDocsInZips - 1);
      }
    }
  }

  public static void repairLastZip(File indexDirectory) throws IOException {
    log.info("Ensuring last zip file in " + indexDirectory.getAbsolutePath()
            + " is complete");
    File[] zipCollectionFiles =
            indexDirectory
                    .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
    if(zipCollectionFiles.length > 0) {
      java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
      File lastZip = zipCollectionFiles[zipCollectionFiles.length - 1];
      log.info("Last zip is " + lastZip.getName());
      File brokenBatches = new File(indexDirectory, "broken-batches");
      brokenBatches.mkdirs();
      File movedLastZip = new File(brokenBatches, lastZip.getName());
      if(movedLastZip.exists()) {
        movedLastZip.delete();
      }
      if(!lastZip.renameTo(movedLastZip)) {
        throw new RuntimeException("Could not stash " + lastZip.getName()
                + " in broken-batches");
      }
      log.debug("Moved " + lastZip.getName() + " to broken-batches");
      String lastGoodDoc = null;
      try(FileInputStream oldIn = new FileInputStream(movedLastZip);
              ZipInputStream zipIn = new ZipInputStream(oldIn);
              FileOutputStream newOut = new FileOutputStream(lastZip);
              ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
        ZipEntry entry = null;
        try {
          while((entry = zipIn.getNextEntry()) != null) {
            ByteArrayOutputStream data = new ByteArrayOutputStream();
            IOUtils.copy(zipIn, data);
            // if we get here the input zip was not truncated mid-entry,
            // so it's safe to write this entry
            zipOut.putNextEntry(entry);
            IOUtils.write(data.toByteArray(), zipOut);
            zipOut.closeEntry();
            lastGoodDoc = entry.getName();
          }
        } catch(EOFException eof) {
          // this is expected, if the zip was not properly closed
        }
      }
      log.info("Last good document ID was " + lastGoodDoc);
    } else {
      log.warn("No files in zip collection");
    }
  }

  /**
   * Determines the last "good" batch name (head or tail-N) for the
   * given index, and stashes any bad batches in the broken-batches
   * directory.
   * 
   * @param indexDirectory
   * @return
   * @throws IOException
   */
  public static String determineLastGoodBatch(File indexDirectory)
          throws IOException {
    String lastGood = null;

    File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
    if(subIndexes.length == 0) {
      throw new RuntimeException("Index has no AtomicIndexes!");
    }
    String[] batches = subIndexes[0].list(BATCH_NAME_FILTER);
    java.util.Arrays.sort(batches, BATCH_COMPARATOR);
    BATCH: for(String batch : batches) {
      for(File subIndex : subIndexes) {
        if(!new File(new File(subIndex, batch), subIndex.getName()
                + ".properties").exists()) {
          break BATCH;
        }
      }
      // if we get to here we know this batch exists in all sub-indexes
      lastGood = batch;
    }

    if(lastGood != null) {
      File brokenBatches = new File(indexDirectory, "broken-batches");
      // stash bad batches
      for(File subIndex : subIndexes) {
        File[] thisIndexBatches = subIndex.listFiles(BATCH_NAME_FILTER);
        for(File b : thisIndexBatches) {
          if(BATCH_COMPARATOR.compare(lastGood, b.getName()) < 0) {
            // this is a bad batch, stash it
            File movedB =
                    new File(brokenBatches, subIndex.getName() + "-"
                            + b.getName());
            if(movedB.exists()) {
              FileUtils.deleteDirectory(movedB);
            }
            if(!b.renameTo(movedB)) {
              throw new RuntimeException("Could not stash " + movedB.getName());
            }
          }
        }
      }
    }

    return lastGood;
  }

  public static class BatchDetails {
    String[] names;

    long[] endPoints;
  }

  public static BatchDetails batchEndPoints(File indexDirectory)
          throws IOException, ConfigurationException {
    BatchDetails details = new BatchDetails();
    long totalDocs = 0;
    File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
    if(subIndexes.length == 0) {
      throw new RuntimeException("Index has no AtomicIndexes!");
    }
    details.names = subIndexes[0].list(BATCH_NAME_FILTER);
    java.util.Arrays.sort(details.names, BATCH_COMPARATOR);

    details.endPoints = new long[details.names.length];
    for(int i = 0; i < details.names.length; i++) {
      Properties batchProps = new Properties();
      try(FileInputStream propsIn =
              new FileInputStream(new File(new File(subIndexes[0],
                      details.names[i]), subIndexes[0].getName()
                      + ".properties"))) {
        batchProps.load(propsIn);
      }
      totalDocs += batchProps.getLong("documents");
      details.endPoints[i] = totalDocs;
    }

    return details;
  }

  public static long totalDocumentsInZipCollection(File indexDirectory)
          throws IOException {
    long totalDocs = 0;
    File[] zipCollectionFiles =
            indexDirectory
                    .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
    for(File zip : zipCollectionFiles) {
      try(ZipFile zf = new ZipFile(zip)) {
        totalDocs += zf.size();
      }
    }

    return totalDocs;
  }

  public static void truncateZipCollectionTo(File indexDirectory, long numDocs)
          throws IOException {
    File[] zipCollectionFiles =
            indexDirectory
                    .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
    java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
    // the truncation point is somewhere within the last zip file whose
    // first entry is less than numDocs (document IDs are zero based, so
    // the document named numDocs is actually the (numDocs+1)th one).
    int targetFile = -1;
    for(int i = 0; i < zipCollectionFiles.length; i++) {
      try(FileInputStream fis = new FileInputStream(zipCollectionFiles[i]);
              ZipInputStream zipIn = new ZipInputStream(fis)) {
        ZipEntry firstEntry = zipIn.getNextEntry();
        if(firstEntry != null) {
          long documentId = Long.parseLong(firstEntry.getName());
          if(documentId >= numDocs) {
            break;
          } else {
            targetFile = i;
          }
        }
      }
    }

    if(targetFile < 0) {
      throw new RuntimeException(
              "Zip collection broken beyond repair - there is no zip file containing the cut point");
    }

    // we know that document (numDocs-1) is somewhere in
    // zipCollectionFiles[targetFile]. Move that file out of the way and
    // rewrite it, truncated appropriately.
    File origFile = zipCollectionFiles[targetFile];
    File brokenBatches = new File(indexDirectory, "broken-batches");
    brokenBatches.mkdirs();
    File movedFile =
            new File(brokenBatches, "to-truncate-" + origFile.getName());
    if(movedFile.exists()) {
      movedFile.delete();
    }
    if(!origFile.renameTo(movedFile)) {
      throw new RuntimeException("Could not stash " + origFile.getName()
              + " in broken-batches");
    }
    String lastEntryName = String.valueOf(numDocs - 1);
    try(FileInputStream oldIn = new FileInputStream(movedFile);
            ZipInputStream zipIn = new ZipInputStream(oldIn);
            FileOutputStream newOut = new FileOutputStream(origFile);
            ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
      ZipEntry entry = null;
      try {
        while((entry = zipIn.getNextEntry()) != null) {
          ByteArrayOutputStream data = new ByteArrayOutputStream();
          IOUtils.copy(zipIn, data);
          // if we get here the input zip was not truncated mid-entry,
          // so it's safe to write this entry
          zipOut.putNextEntry(entry);
          IOUtils.write(data.toByteArray(), zipOut);
          zipOut.closeEntry();
          if(lastEntryName.equals(entry.getName())) {
            // reached the cut point, stop copying
            break;
          }
        }
      } catch(EOFException eof) {
        // this is expected, if the zip was not properly closed
      }
    }
    log.info("Truncated zip collection file " + origFile + " to document "
            + lastEntryName);
  }

  public static void stashBatches(File indexDirectory, List<String> batches)
          throws IOException {
    File brokenBatches = new File(indexDirectory, "broken-batches");
    File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);

    for(File subIndex : subIndexes) {
      for(String batchName : batches) {
        File b = new File(subIndex, batchName);
        File movedB =
                new File(brokenBatches, subIndex.getName() + "-" + batchName);
        if(movedB.exists()) {
          FileUtils.deleteDirectory(movedB);
        }
        if(!b.renameTo(movedB)) {
          throw new RuntimeException("Could not stash " + movedB.getName());
        }
      }
    }
  }

  /**
   * Trim the given batch in all sub-indexes to the given length in
   * documents. Assumes the batch has already been stashed as
   * broken-batches/subindex-batchName.
   * 
   * @param indexDirectory top level index directory
   * @param batchName name of the batch to trim
   * @param numDocs number of documents to which the batch should be
   *          trimmed.
   */
  public static void trimBatch(File indexDirectory, String batchName,
          long numDocs) throws Exception {
    File brokenBatches = new File(indexDirectory, "broken-batches");
    File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);

    for(File subIndex : subIndexes) {
      File stashedBatch =
              new File(brokenBatches, subIndex.getName() + "-" + batchName);
      if(!stashedBatch.exists()) {
        throw new RuntimeException("Stashed batch " + stashedBatch
                + " not found");
      }
      File batchDir = new File(subIndex, batchName);
      batchDir.mkdirs();
      log.info("Trimming batch " + batchDir);
      String stashedIndexBasename =
              new File(stashedBatch, subIndex.getName()).getAbsolutePath();
      String outputIndexBasename =
              new File(batchDir, subIndex.getName()).getAbsolutePath();

      Index stashedIndex = Index.getInstance(stashedIndexBasename, true, true);

      // when you read through an index sequentially, the IndexIterators
      // don't tell you what term they were for, so we need to read the
      // .terms file from the stashed batch in step with the index
      // reader.
      File stashedTermsFile =
              new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
      FileLinesCollection termsColl =
              new FileLinesCollection(stashedTermsFile.getAbsolutePath(),
                      "UTF-8");
      long numTerms = termsColl.size64();
      Iterator<MutableString> termsIter = termsColl.iterator();
      File newTermsFile =
              new File(outputIndexBasename + DiskBasedIndex.TERMS_EXTENSION);

      // there will certainly be no *more* than numTerms terms in the
      // final index, there may be fewer
      BloomFilter<Void> termFilter = BloomFilter.create(Math.max(numTerms, 1));

      Properties writerProperties = null;
      long writtenBits = 0;
      int maxDocSize = 0;
      int maxCount = 0;
      long totalOccurrences = 0;
      try(IndexReader indexReader = stashedIndex.getReader();
              FileOutputStream termsOS = new FileOutputStream(newTermsFile);
              OutputStreamWriter termsOSW =
                      new OutputStreamWriter(termsOS, "UTF-8");
              PrintWriter termsWriter = new PrintWriter(termsOSW)) {
        QuasiSuccinctIndexWriter indexWriter =
                new QuasiSuccinctIndexWriter(
                        IOFactory.FILESYSTEM_FACTORY,
                        outputIndexBasename,
                        numDocs,
                        Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
                        QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
                        CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
                        ByteOrder.nativeOrder());

        IndexIterator iter;
        while((iter = indexReader.nextIterator()) != null) {
          MutableString term = termsIter.next();
          // we can't stream the inverted list, because we need to know
          // up front how many documents the term is found in so we can
          // write that number before writing the positions.
          LongList docPointers = new LongArrayList();
          IntList counts = new IntArrayList();
          List<IntArrayList> positions = new ArrayList<>();
          long frequency = 0;
          long curPointer;
          long occurrences = 0;
          long sumMaxPos = 0;
          while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
            if(curPointer < numDocs) {
              frequency++;
              docPointers.add(curPointer);
              counts.add(iter.count());
              IntArrayList thisDocPositions = new IntArrayList(iter.count());
              positions.add(thisDocPositions);
              occurrences += iter.count();
              totalOccurrences += iter.count();
              if(iter.count() > maxCount) {
                maxCount = iter.count();
              }
              int pos;
              int lastPos = 0;
              while((pos = iter.nextPosition()) != IndexIterator.END_OF_POSITIONS) {
                thisDocPositions.add(pos);
                lastPos = pos;
              }
              sumMaxPos += lastPos;
              if(lastPos > maxDocSize) {
                maxDocSize = lastPos;
              }
            } else {
              break;
            }
          }

          if(frequency > 0) {
            // this term occurred in at least one document that we're
            // not truncating, so now we know it's safe to write the
            // (truncated) inverted list to the new index and the term
            // to the terms file.

            term.println(termsWriter);
            termFilter.add(term);

            indexWriter.newInvertedList(frequency, occurrences, sumMaxPos);
            indexWriter.writeFrequency(frequency);
            for(int i = 0; i < frequency; i++) {
              OutputBitStream obs = indexWriter.newDocumentRecord();
              indexWriter.writeDocumentPointer(obs, docPointers.get(i));
              indexWriter.writePositionCount(obs, counts.get(i));
              indexWriter.writeDocumentPositions(obs, positions.get(i)
                      .elements(), 0, positions.get(i).size(), -1);
            }
          }
        }

        indexWriter.close();
        writerProperties = indexWriter.properties();
        // write stats file
        try(PrintStream statsPs =
                new PrintStream(new File(outputIndexBasename
                        + DiskBasedIndex.STATS_EXTENSION))) {
          indexWriter.printStats(statsPs);
        }
        writtenBits = indexWriter.writtenBits();
      }

      // regenerate the term map from the (possibly shorter) terms file
      AtomicIndex.generateTermMap(new File(outputIndexBasename
              + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
              + DiskBasedIndex.TERMMAP_EXTENSION), null);

      // write the bloom filter
      BinIO.storeObject(termFilter, new File(outputIndexBasename
              + DocumentalCluster.BLOOM_EXTENSION));

      // write the truncated sizes file
      File stashedSizesFile =
              new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
      File sizesFile =
              new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
      try(InputBitStream stashedSizesStream =
              new InputBitStream(stashedSizesFile);
              OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
        for(long i = 0; i < numDocs; i++) {
          sizesStream.writeGamma(stashedSizesStream.readGamma());
        }
      }

      // generate the index properties
      Properties stashedProps = new Properties();
      try(FileInputStream stashedPropsStream =
              new FileInputStream(stashedIndexBasename
                      + DiskBasedIndex.PROPERTIES_EXTENSION)) {
        stashedProps.load(stashedPropsStream);
      }
      Properties newProps = new Properties();
      newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
              stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
      newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
      // -1 means unknown
      newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize);
      newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
      newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
      writerProperties.addAll(newProps);
      Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
              outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
    }
  }

  /**
   * Truncate the given direct index to remove documents beyond the
   * given lastDocId. The original version of the batch is assumed to
   * have been stashed as broken-batches/subIndexName-batchName
   * 
   * @param indexDirectory the top-level index directory
   * @param subIndexName the name of the sub-index (token-N or
   *          mention-N)
   * @param batchName the name of the batch (head or tail-N)
   * @param lastDocId the last valid document ID
   */
  public static void truncateDirectIndex(File indexDirectory,
          String subIndexName, String batchName, long lastDocId)
          throws Exception {
    File brokenBatches = new File(indexDirectory, "broken-batches");
    File stashedBatch = new File(brokenBatches, subIndexName + "-" + batchName);
    if(!stashedBatch.exists()) {
      throw new RuntimeException("Stashed batch " + stashedBatch + " not found");
    }
    File batchDir = new File(new File(indexDirectory, subIndexName), batchName);
    batchDir.mkdirs();
    log.info("Trimming direct index for batch " + batchDir);

    String stashedIndexBasename =
            new File(stashedBatch, subIndexName
                    + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
    String outputIndexBasename =
            new File(batchDir, subIndexName
                    + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();

    // A direct index is modelled in Mimir as an inverted index where
    // the terms are documents and vice versa. The "term string" is a
    // zero-padded hex representation of the document ID, so we simply
    // need to stream "inverted" lists from the stashed index to the new
    // one until we reach the term string that is the hex representation
    // of lastDocId.

    MutableString lastDocIdAsHex =
            new MutableString(AtomicIndex.longToTerm(lastDocId));

    // determine the number of documents in this direct index (i.e. the
    // number of entries in the .terms file that are lexicographically
    // <= lastDocIdAsHex)
    long numDocsInIndex = 0;

    File stashedTermsFile =
            new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
    FileLinesCollection termsColl =
            new FileLinesCollection(stashedTermsFile.getAbsolutePath(), "UTF-8");
    try(FileLinesIterator docIdsIter = termsColl.iterator()) {
      while(docIdsIter.hasNext()
              && docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) {
        numDocsInIndex++;
      }
    }
    log.info("Trimmed index will contain " + numDocsInIndex + " documents");

    // write the truncated "terms" file, term map and bloom filter
    BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex);

    try(FileLinesIterator docIdsIter = termsColl.iterator();
            PrintWriter pw =
                    new PrintWriter(new OutputStreamWriter(
                            new FastBufferedOutputStream(new FileOutputStream(
                                    outputIndexBasename
                                            + DiskBasedIndex.TERMS_EXTENSION),
                                    64 * 1024), "UTF-8"))) {
      for(long i = 0; i < numDocsInIndex; i++) {
        MutableString t = docIdsIter.next();
        t.println(pw);
        docBloomFilter.add(t);
      }
    }
    AtomicIndex.generateTermMap(new File(outputIndexBasename
            + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
            + DiskBasedIndex.TERMMAP_EXTENSION), null);
    BinIO.storeObject(docBloomFilter, new File(outputIndexBasename
            + DocumentalCluster.BLOOM_EXTENSION));

    // stream "inverted lists" (i.e. documents) from the stashed to the
    // new index, and build up a cache of "document sizes" (i.e. the
    // number of documents that contain each term referenced in this
    // index). We can't simply use the sizes from the stashed index
    // because they will include the counts from the inverted lists
    // we're trimming off.
    Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap();
    termSizes.defaultReturnValue(0);

    // we need the total potential number of direct terms to create the
    // index writer
    File directTermsFile =
            new File(new File(indexDirectory, subIndexName),
                    AtomicIndex.DIRECT_TERMS_FILENAME);
    FileLinesCollection directTerms =
            new FileLinesCollection(directTermsFile.getAbsolutePath(), "UTF-8");

    Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false);

    int maxCount = 0;
    long totalOccurrences = 0;
    long writtenBits = 0;
    int maxTermSize = -1; // -1 means unknown
    Properties writerProperties;
    
    try(IndexReader indexReader = stashedIndex.getReader()) {
      // copy the default compression flags, and remove positions
      Map<Component, Coding> flags =
              new HashMap<Component, Coding>(
                      CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
      flags.remove(Component.POSITIONS);
      QuasiSuccinctIndexWriter directIndexWriter =
              new QuasiSuccinctIndexWriter(
                      IOFactory.FILESYSTEM_FACTORY,
                      outputIndexBasename,
                      directTerms.size64(),
                      Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
                      QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags,
                      ByteOrder.nativeOrder());
      IndexIterator iter;
      int docCounter = 0;
      long occurrences = 0;
      while((iter = indexReader.nextIterator()) != null
              && ++docCounter <= numDocsInIndex) {
        // annoyingly we can't stream straight from the old inverted
        // list to the new one, as we need to know up front the total
        // occurrences value which is not exposed through any public
        // API.
        LongList docPointers = new LongArrayList();
        IntList counts = new IntArrayList();
        long frequency = iter.frequency();
        long curPointer;
        while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
          docPointers.add(curPointer);
          counts.add(iter.count());
          termSizes.put(curPointer, termSizes.get(curPointer) + iter.count());
          occurrences += iter.count();
          totalOccurrences += iter.count();
          if(iter.count() > maxCount) {
            maxCount = iter.count();
          }
        }
        directIndexWriter.newInvertedList(frequency, occurrences, 0);
        directIndexWriter.writeFrequency(frequency);
        for(int i = 0; i < frequency; i++) {
          OutputBitStream obs = directIndexWriter.newDocumentRecord();
          directIndexWriter.writeDocumentPointer(obs, docPointers.get(i));
          directIndexWriter.writePositionCount(obs, counts.get(i));
          // no positions in a direct index
        }
      }
      directIndexWriter.close();
      writtenBits = directIndexWriter.writtenBits();
      
      // write the new sizes file
      File sizesFile = new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
      try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
        for(long i = 0; i < directTerms.size64(); i++) {
          int termSize = termSizes.get(i);
          sizesStream.writeGamma(termSize);
          if(termSize > maxTermSize) {
            maxTermSize = termSize;
          }
        }
      }
      writerProperties = directIndexWriter.properties();
      // write stats file
      try(PrintStream statsPs =
              new PrintStream(new File(outputIndexBasename
                      + DiskBasedIndex.STATS_EXTENSION))) {
        directIndexWriter.printStats(statsPs);
      }
    }
    
    // generate the index properties
    Properties stashedProps = new Properties();
    try(FileInputStream stashedPropsStream =
            new FileInputStream(stashedIndexBasename
                    + DiskBasedIndex.PROPERTIES_EXTENSION)) {
      stashedProps.load(stashedPropsStream);
    }
    Properties newProps = new Properties();
    newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
            stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
    newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
    // -1 means unknown
    newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize);
    newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
    newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
    writerProperties.addAll(newProps);
    Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
            outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
  }
}