TruncateIndex.java

  1. /*
  2.  *  TruncateIndex.java
  3.  *
  4.  *  Copyright (c) 2007-2016, The University of Sheffield.
  5.  *
  6.  *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html),
  7.  *  and is free software, licenced under the GNU Lesser General Public License,
  8.  *  Version 3, June 2007 (also included with this distribution as file
  9.  *  LICENCE-LGPL3.html).
  10.  *
  11.  *  Ian Roberts, 1st September 2016
  12.  *
  13.  *  $Id: TruncateIndex.java 19708 2016-10-29 16:23:28Z ian_roberts $
  14.  */
  15. package gate.mimir.util;

  16. import gate.Gate;
  17. import gate.creole.Plugin;
  18. import gate.mimir.IndexConfig;
  19. import gate.mimir.IndexConfig.SemanticIndexerConfig;
  20. import gate.mimir.IndexConfig.TokenIndexerConfig;
  21. import gate.mimir.MimirIndex;
  22. import gate.mimir.index.AtomicIndex;
  23. import gate.mimir.index.DocumentCollection;
  24. import gate.util.maven.Utils;
  25. import it.unimi.di.big.mg4j.index.CompressionFlags;
  26. import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
  27. import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
  28. import it.unimi.di.big.mg4j.index.DiskBasedIndex;
  29. import it.unimi.di.big.mg4j.index.Index;
  30. import it.unimi.di.big.mg4j.index.IndexIterator;
  31. import it.unimi.di.big.mg4j.index.IndexReader;
  32. import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
  33. import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
  34. import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
  35. import it.unimi.di.big.mg4j.io.IOFactory;
  36. import it.unimi.di.big.mg4j.tool.Scan;
  37. import it.unimi.dsi.big.io.FileLinesCollection;
  38. import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
  39. import it.unimi.dsi.bits.Fast;
  40. import it.unimi.dsi.fastutil.ints.IntArrayList;
  41. import it.unimi.dsi.fastutil.ints.IntList;
  42. import it.unimi.dsi.fastutil.io.BinIO;
  43. import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
  44. import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
  45. import it.unimi.dsi.fastutil.longs.LongArrayList;
  46. import it.unimi.dsi.fastutil.longs.LongList;
  47. import it.unimi.dsi.io.InputBitStream;
  48. import it.unimi.dsi.io.OutputBitStream;
  49. import it.unimi.dsi.lang.MutableString;
  50. import it.unimi.dsi.util.BloomFilter;
  51. import it.unimi.dsi.util.Properties;

  52. import java.io.EOFException;
  53. import java.io.File;
  54. import java.io.FileInputStream;
  55. import java.io.FileOutputStream;
  56. import java.io.FilenameFilter;
  57. import java.io.IOException;
  58. import java.io.OutputStreamWriter;
  59. import java.io.PrintStream;
  60. import java.io.PrintWriter;
  61. import java.net.MalformedURLException;
  62. import java.net.URL;
  63. import java.nio.ByteOrder;
  64. import java.util.ArrayList;
  65. import java.util.Comparator;
  66. import java.util.HashMap;
  67. import java.util.Iterator;
  68. import java.util.List;
  69. import java.util.Map;
  70. import java.util.regex.Matcher;
  71. import java.util.regex.Pattern;
  72. import java.util.zip.ZipEntry;
  73. import java.util.zip.ZipFile;
  74. import java.util.zip.ZipInputStream;
  75. import java.util.zip.ZipOutputStream;

  76. import org.apache.commons.configuration.ConfigurationException;
  77. import org.apache.commons.io.FileUtils;
  78. import org.apache.commons.io.IOUtils;
  79. import org.apache.commons.io.output.ByteArrayOutputStream;
  80. import org.slf4j.Logger;
  81. import org.slf4j.LoggerFactory;

  82. /**
  83.  * Utility class to fix up a Mimir index that has been corrupted, e.g.
  84.  * by an unclean shutdown or out-of-memory condition. The index must be
  85.  * closed to use this tool, which either means the Mimir webapp is not
  86.  * running, or the index has been deleted from the running Mimir. It is
  87.  * very very strongly recommended to back up an index before attempting
  88.  * this procedure. The clean up process will unavoidably remove some
  89.  * number of documents from the tail of the index, but will attempt to
  90.  * keep the number of lost documents to a minimum.
  91.  *
  92.  * @author ian
  93.  *
  94.  */
  95. public class TruncateIndex {

  96.   private static final Logger log = LoggerFactory.getLogger(TruncateIndex.class);

  97.   /**
  98.    * Comparator that orders mimir zip collection files by number (e.g.
  99.    * mimir-collection-16.zip comes after mimir-collection-12-15.zip but
  100.    * before mimir-collection-100-120.zip)
  101.    */
  102.   public static final Comparator<File> ZIP_COLLECTION_COMPARATOR =
  103.           new Comparator<File>() {
  104.             public int compare(File a, File b) {
  105.               int numA =
  106.                       Integer.parseInt(a.getName().substring(
  107.                               a.getName().lastIndexOf('-') + 1,
  108.                               a.getName().length() - 4));
  109.               int numB =
  110.                       Integer.parseInt(b.getName().substring(
  111.                               b.getName().lastIndexOf('-') + 1,
  112.                               b.getName().length() - 4));
  113.               return numA - numB;
  114.             }
  115.           };

  116.   public static final Comparator<String> BATCH_COMPARATOR =
  117.           new Comparator<String>() {
  118.             public int compare(String a, String b) {
  119.               if(a.equals("head")) {
  120.                 if(b.equals("head")) {
  121.                   // both heads
  122.                   return 0;
  123.                 } else {
  124.                   // head before tail
  125.                   return -1;
  126.                 }
  127.               } else {
  128.                 if(b.equals("head")) {
  129.                   // tail after head
  130.                   return 1;
  131.                 } else {
  132.                   // both tails, compare by number
  133.                   int numA =
  134.                           Integer.parseInt(a.substring(a.lastIndexOf('-') + 1));
  135.                   int numB =
  136.                           Integer.parseInt(b.substring(b.lastIndexOf('-') + 1));
  137.                   return numA - numB;
  138.                 }
  139.               }
  140.             }
  141.           };

  142.   public static final FilenameFilter INDEX_NAME_FILTER = new FilenameFilter() {
  143.     private Pattern pat = Pattern.compile("(?:token|mention)-\\d+");

  144.     @Override
  145.     public boolean accept(File dir, String name) {
  146.       return pat.matcher(name).matches();
  147.     }
  148.   };

  149.   public static final FilenameFilter BATCH_NAME_FILTER = new FilenameFilter() {
  150.     private Pattern pat = Pattern.compile("head|tail-\\d+");

  151.     @Override
  152.     public boolean accept(File dir, String name) {
  153.       return pat.matcher(name).matches();
  154.     }
  155.   };

  156.   public static void main(String... args) throws Exception {
  157.     Gate.runInSandbox(true);
  158.     Gate.init();
  159.     int i = 0;
  160.     Pattern mavenPluginPattern = Pattern.compile("([^/]+?):([^/]+?):([^/]+)");
  161.     while(i < args.length) {
  162.       if("-d".equals(args[i])) {
  163.         // Maven cache directory
  164.         Utils.addCacheDirectory(new File(args[++i]));
  165.         i++;
  166.       } else if("-p".equals(args[i])) {
  167.         // plugin - either URL or file path to a directory plugin, or group:artifact:version for a Maven one
  168.         String plugin = args[++i];
  169.         plugin = plugin.trim();
  170.         try {
  171.           Matcher m = mavenPluginPattern.matcher(plugin);
  172.           if(m.matches()) {
  173.             // this looks like a Maven plugin
  174.             Gate.getCreoleRegister().registerPlugin(new Plugin.Maven(m.group(1), m.group(2), m.group(3)));
  175.           } else {
  176.             try {
  177.               URL u = new URL(plugin);
  178.               // succeeded in parsing as a URL, load that
  179.               Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(u));
  180.             } catch(MalformedURLException e) {
  181.               // not a URL, treat as a file
  182.               File pluginFile = new File(plugin);
  183.               Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(pluginFile.toURI().toURL()));
  184.             }
  185.           }
  186.         } catch(Exception e) {
  187.           log.error("Failed to load plugin " + plugin, e);
  188.           System.exit(1);
  189.         }
  190.         i++;
  191.       } else {
  192.         break;
  193.       }
  194.     }
  195.     truncateIndex(new File(args[i]));
  196.   }

  197.   /**
  198.    * Attempt to fix up a corrupted Mimir index by truncating some number
  199.    * of documents off the end. There will be a certain number of
  200.    * documents in complete index batches, and a (possibly different)
  201.    * number of documents successfully persisted to disk in the zip files
  202.    * of the DocumentCollection, the index will be truncated to the
  203.    * smaller of those two numbers.
  204.    *
  205.    * @param indexDirectory the top-level directory of the Mimir index
  206.    *          (containing config.xml)
  207.    */
  208.   public static void truncateIndex(File indexDirectory) throws Exception {
  209.     // 1. Repair the last zip file in the DocumentCollection
  210.     repairLastZip(indexDirectory);

  211.     // 2. Determine the last "good" batch (the greatest numbered head or
  212.     // tail that is fully written to disk in every AtomicIndex) and
  213.     // stash the bad ones
  214.     String lastGoodBatch = determineLastGoodBatch(indexDirectory);

  215.     if(lastGoodBatch == null) {
  216.       throw new RuntimeException(
  217.               "All batches are corrupt, sorry, this index is a write-off");
  218.     }

  219.     // 3. If the zip collection is at least as long as the sum of the
  220.     // good batches, truncate it to match the batches and we're done.
  221.     BatchDetails batches = batchEndPoints(indexDirectory);
  222.     long totalDocsInBatches = batches.endPoints[batches.endPoints.length - 1];
  223.     long totalDocsInZips = totalDocumentsInZipCollection(indexDirectory);

  224.     if(totalDocsInBatches == totalDocsInZips) {
  225.       log.info("We're in luck, the batches and zips line up exactly");
  226.       return;
  227.     } else if(totalDocsInZips > totalDocsInBatches) {
  228.       truncateZipCollectionTo(indexDirectory, totalDocsInBatches);
  229.       return;
  230.     } else if(totalDocsInZips == 0) {
  231.       throw new RuntimeException("Zip collection is empty");
  232.     }

  233.     // 4. Otherwise, the zip collection stops in the middle of a batch B
  234.     int endBatch = -1;
  235.     for(int i = 0; i < batches.names.length; i++) {
  236.       if(batches.endPoints[i] >= totalDocsInZips) {
  237.         endBatch = i;
  238.         break;
  239.       }
  240.     }
  241.     log.info("Zip collection ends within " + batches.names[endBatch]);
  242.     if(batches.endPoints[endBatch] == totalDocsInZips) {
  243.       // special case - zip collection ends exactly at the end of a
  244.       // batch. Stash subsequent batches and we're done
  245.       log.info("Zip collection ends exactly at the end of batch "
  246.               + batches.names[endBatch]);
  247.       log.info("Stashing subsequent batches");
  248.       stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
  249.               .subList(endBatch + 1, batches.endPoints.length));
  250.       log.info("Done");
  251.       return;
  252.     }
  253.     // 4.1. Stash B (for every AtomicIndex) and any batches beyond it.
  254.     stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
  255.             .subList(endBatch, batches.endPoints.length));

  256.     // 4.2. Read each stashed B and re-write it but with documents
  257.     // beyond the end of the zip collection omitted
  258.     long endOfPreviousBatch = 0L;
  259.     if(endBatch > 0) {
  260.       endOfPreviousBatch = batches.endPoints[endBatch - 1];
  261.     }
  262.     trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips
  263.             - endOfPreviousBatch);

  264.     // 4.3. Truncate the direct indexes for those AtomicIndexes that
  265.     // require it
  266.     IndexConfig indexConfig =
  267.             IndexConfig.readConfigFromFile(new File(indexDirectory,
  268.                     MimirIndex.INDEX_CONFIG_FILENAME));
  269.     TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers();
  270.     for(int i = 0; i < tokenIndexes.length; i++) {
  271.       if(tokenIndexes[i].isDirectIndexEnabled()) {
  272.         truncateDirectIndex(indexDirectory, "token-" + i,
  273.                 batches.names[endBatch], totalDocsInZips - 1);
  274.       }
  275.     }
  276.     SemanticIndexerConfig[] semanticIndexes = indexConfig.getSemanticIndexers();
  277.     for(int i = 0; i < semanticIndexes.length; i++) {
  278.       if(semanticIndexes[i].isDirectIndexEnabled()) {
  279.         truncateDirectIndex(indexDirectory, "mention-" + i,
  280.                 batches.names[endBatch], totalDocsInZips - 1);
  281.       }
  282.     }
  283.   }

  284.   public static void repairLastZip(File indexDirectory) throws IOException {
  285.     log.info("Ensuring last zip file in " + indexDirectory.getAbsolutePath()
  286.             + " is complete");
  287.     File[] zipCollectionFiles =
  288.             indexDirectory
  289.                     .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
  290.     if(zipCollectionFiles.length > 0) {
  291.       java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
  292.       File lastZip = zipCollectionFiles[zipCollectionFiles.length - 1];
  293.       log.info("Last zip is " + lastZip.getName());
  294.       File brokenBatches = new File(indexDirectory, "broken-batches");
  295.       brokenBatches.mkdirs();
  296.       File movedLastZip = new File(brokenBatches, lastZip.getName());
  297.       if(movedLastZip.exists()) {
  298.         movedLastZip.delete();
  299.       }
  300.       if(!lastZip.renameTo(movedLastZip)) {
  301.         throw new RuntimeException("Could not stash " + lastZip.getName()
  302.                 + " in broken-batches");
  303.       }
  304.       log.debug("Moved " + lastZip.getName() + " to broken-batches");
  305.       String lastGoodDoc = null;
  306.       try(FileInputStream oldIn = new FileInputStream(movedLastZip);
  307.               ZipInputStream zipIn = new ZipInputStream(oldIn);
  308.               FileOutputStream newOut = new FileOutputStream(lastZip);
  309.               ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
  310.         ZipEntry entry = null;
  311.         try {
  312.           while((entry = zipIn.getNextEntry()) != null) {
  313.             ByteArrayOutputStream data = new ByteArrayOutputStream();
  314.             IOUtils.copy(zipIn, data);
  315.             // if we get here the input zip was not truncated mid-entry,
  316.             // so it's safe to write this entry
  317.             zipOut.putNextEntry(entry);
  318.             IOUtils.write(data.toByteArray(), zipOut);
  319.             zipOut.closeEntry();
  320.             lastGoodDoc = entry.getName();
  321.           }
  322.         } catch(EOFException eof) {
  323.           // this is expected, if the zip was not properly closed
  324.         }
  325.       }
  326.       log.info("Last good document ID was " + lastGoodDoc);
  327.     } else {
  328.       log.warn("No files in zip collection");
  329.     }
  330.   }

  331.   /**
  332.    * Determines the last "good" batch name (head or tail-N) for the
  333.    * given index, and stashes any bad batches in the broken-batches
  334.    * directory.
  335.    *
  336.    * @param indexDirectory
  337.    * @return
  338.    * @throws IOException
  339.    */
  340.   public static String determineLastGoodBatch(File indexDirectory)
  341.           throws IOException {
  342.     String lastGood = null;

  343.     File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
  344.     if(subIndexes.length == 0) {
  345.       throw new RuntimeException("Index has no AtomicIndexes!");
  346.     }
  347.     String[] batches = subIndexes[0].list(BATCH_NAME_FILTER);
  348.     java.util.Arrays.sort(batches, BATCH_COMPARATOR);
  349.     BATCH: for(String batch : batches) {
  350.       for(File subIndex : subIndexes) {
  351.         if(!new File(new File(subIndex, batch), subIndex.getName()
  352.                 + ".properties").exists()) {
  353.           break BATCH;
  354.         }
  355.       }
  356.       // if we get to here we know this batch exists in all sub-indexes
  357.       lastGood = batch;
  358.     }

  359.     if(lastGood != null) {
  360.       File brokenBatches = new File(indexDirectory, "broken-batches");
  361.       // stash bad batches
  362.       for(File subIndex : subIndexes) {
  363.         File[] thisIndexBatches = subIndex.listFiles(BATCH_NAME_FILTER);
  364.         for(File b : thisIndexBatches) {
  365.           if(BATCH_COMPARATOR.compare(lastGood, b.getName()) < 0) {
  366.             // this is a bad batch, stash it
  367.             File movedB =
  368.                     new File(brokenBatches, subIndex.getName() + "-"
  369.                             + b.getName());
  370.             if(movedB.exists()) {
  371.               FileUtils.deleteDirectory(movedB);
  372.             }
  373.             if(!b.renameTo(movedB)) {
  374.               throw new RuntimeException("Could not stash " + movedB.getName());
  375.             }
  376.           }
  377.         }
  378.       }
  379.     }

  380.     return lastGood;
  381.   }

  382.   public static class BatchDetails {
  383.     String[] names;

  384.     long[] endPoints;
  385.   }

  386.   public static BatchDetails batchEndPoints(File indexDirectory)
  387.           throws IOException, ConfigurationException {
  388.     BatchDetails details = new BatchDetails();
  389.     long totalDocs = 0;
  390.     File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
  391.     if(subIndexes.length == 0) {
  392.       throw new RuntimeException("Index has no AtomicIndexes!");
  393.     }
  394.     details.names = subIndexes[0].list(BATCH_NAME_FILTER);
  395.     java.util.Arrays.sort(details.names, BATCH_COMPARATOR);

  396.     details.endPoints = new long[details.names.length];
  397.     for(int i = 0; i < details.names.length; i++) {
  398.       Properties batchProps = new Properties();
  399.       try(FileInputStream propsIn =
  400.               new FileInputStream(new File(new File(subIndexes[0],
  401.                       details.names[i]), subIndexes[0].getName()
  402.                       + ".properties"))) {
  403.         batchProps.load(propsIn);
  404.       }
  405.       totalDocs += batchProps.getLong("documents");
  406.       details.endPoints[i] = totalDocs;
  407.     }

  408.     return details;
  409.   }

  410.   public static long totalDocumentsInZipCollection(File indexDirectory)
  411.           throws IOException {
  412.     long totalDocs = 0;
  413.     File[] zipCollectionFiles =
  414.             indexDirectory
  415.                     .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
  416.     for(File zip : zipCollectionFiles) {
  417.       try(ZipFile zf = new ZipFile(zip)) {
  418.         totalDocs += zf.size();
  419.       }
  420.     }

  421.     return totalDocs;
  422.   }

  423.   public static void truncateZipCollectionTo(File indexDirectory, long numDocs)
  424.           throws IOException {
  425.     File[] zipCollectionFiles =
  426.             indexDirectory
  427.                     .listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
  428.     java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
  429.     // the truncation point is somewhere within the last zip file whose
  430.     // first entry is less than numDocs (document IDs are zero based, so
  431.     // the document named numDocs is actually the (numDocs+1)th one).
  432.     int targetFile = -1;
  433.     for(int i = 0; i < zipCollectionFiles.length; i++) {
  434.       try(FileInputStream fis = new FileInputStream(zipCollectionFiles[i]);
  435.               ZipInputStream zipIn = new ZipInputStream(fis)) {
  436.         ZipEntry firstEntry = zipIn.getNextEntry();
  437.         if(firstEntry != null) {
  438.           long documentId = Long.parseLong(firstEntry.getName());
  439.           if(documentId >= numDocs) {
  440.             break;
  441.           } else {
  442.             targetFile = i;
  443.           }
  444.         }
  445.       }
  446.     }

  447.     if(targetFile < 0) {
  448.       throw new RuntimeException(
  449.               "Zip collection broken beyond repair - there is no zip file containing the cut point");
  450.     }

  451.     // we know that document (numDocs-1) is somewhere in
  452.     // zipCollectionFiles[targetFile]. Move that file out of the way and
  453.     // rewrite it, truncated appropriately.
  454.     File origFile = zipCollectionFiles[targetFile];
  455.     File brokenBatches = new File(indexDirectory, "broken-batches");
  456.     brokenBatches.mkdirs();
  457.     File movedFile =
  458.             new File(brokenBatches, "to-truncate-" + origFile.getName());
  459.     if(movedFile.exists()) {
  460.       movedFile.delete();
  461.     }
  462.     if(!origFile.renameTo(movedFile)) {
  463.       throw new RuntimeException("Could not stash " + origFile.getName()
  464.               + " in broken-batches");
  465.     }
  466.     String lastEntryName = String.valueOf(numDocs - 1);
  467.     try(FileInputStream oldIn = new FileInputStream(movedFile);
  468.             ZipInputStream zipIn = new ZipInputStream(oldIn);
  469.             FileOutputStream newOut = new FileOutputStream(origFile);
  470.             ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
  471.       ZipEntry entry = null;
  472.       try {
  473.         while((entry = zipIn.getNextEntry()) != null) {
  474.           ByteArrayOutputStream data = new ByteArrayOutputStream();
  475.           IOUtils.copy(zipIn, data);
  476.           // if we get here the input zip was not truncated mid-entry,
  477.           // so it's safe to write this entry
  478.           zipOut.putNextEntry(entry);
  479.           IOUtils.write(data.toByteArray(), zipOut);
  480.           zipOut.closeEntry();
  481.           if(lastEntryName.equals(entry.getName())) {
  482.             // reached the cut point, stop copying
  483.             break;
  484.           }
  485.         }
  486.       } catch(EOFException eof) {
  487.         // this is expected, if the zip was not properly closed
  488.       }
  489.     }
  490.     log.info("Truncated zip collection file " + origFile + " to document "
  491.             + lastEntryName);
  492.   }

  493.   public static void stashBatches(File indexDirectory, List<String> batches)
  494.           throws IOException {
  495.     File brokenBatches = new File(indexDirectory, "broken-batches");
  496.     File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);

  497.     for(File subIndex : subIndexes) {
  498.       for(String batchName : batches) {
  499.         File b = new File(subIndex, batchName);
  500.         File movedB =
  501.                 new File(brokenBatches, subIndex.getName() + "-" + batchName);
  502.         if(movedB.exists()) {
  503.           FileUtils.deleteDirectory(movedB);
  504.         }
  505.         if(!b.renameTo(movedB)) {
  506.           throw new RuntimeException("Could not stash " + movedB.getName());
  507.         }
  508.       }
  509.     }
  510.   }

  511.   /**
  512.    * Trim the given batch in all sub-indexes to the given length in
  513.    * documents. Assumes the batch has already been stashed as
  514.    * broken-batches/subindex-batchName.
  515.    *
  516.    * @param indexDirectory top level index directory
  517.    * @param batchName name of the batch to trim
  518.    * @param numDocs number of documents to which the batch should be
  519.    *          trimmed.
  520.    */
  521.   public static void trimBatch(File indexDirectory, String batchName,
  522.           long numDocs) throws Exception {
  523.     File brokenBatches = new File(indexDirectory, "broken-batches");
  524.     File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);

  525.     for(File subIndex : subIndexes) {
  526.       File stashedBatch =
  527.               new File(brokenBatches, subIndex.getName() + "-" + batchName);
  528.       if(!stashedBatch.exists()) {
  529.         throw new RuntimeException("Stashed batch " + stashedBatch
  530.                 + " not found");
  531.       }
  532.       File batchDir = new File(subIndex, batchName);
  533.       batchDir.mkdirs();
  534.       log.info("Trimming batch " + batchDir);
  535.       String stashedIndexBasename =
  536.               new File(stashedBatch, subIndex.getName()).getAbsolutePath();
  537.       String outputIndexBasename =
  538.               new File(batchDir, subIndex.getName()).getAbsolutePath();

  539.       Index stashedIndex = Index.getInstance(stashedIndexBasename, true, true);

  540.       // when you read through an index sequentially, the IndexIterators
  541.       // don't tell you what term they were for, so we need to read the
  542.       // .terms file from the stashed batch in step with the index
  543.       // reader.
  544.       File stashedTermsFile =
  545.               new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
  546.       FileLinesCollection termsColl =
  547.               new FileLinesCollection(stashedTermsFile.getAbsolutePath(),
  548.                       "UTF-8");
  549.       long numTerms = termsColl.size64();
  550.       Iterator<MutableString> termsIter = termsColl.iterator();
  551.       File newTermsFile =
  552.               new File(outputIndexBasename + DiskBasedIndex.TERMS_EXTENSION);

  553.       // there will certainly be no *more* than numTerms terms in the
  554.       // final index, there may be fewer
  555.       BloomFilter<Void> termFilter = BloomFilter.create(Math.max(numTerms, 1));

  556.       Properties writerProperties = null;
  557.       long writtenBits = 0;
  558.       int maxDocSize = 0;
  559.       int maxCount = 0;
  560.       long totalOccurrences = 0;
  561.       try(IndexReader indexReader = stashedIndex.getReader();
  562.               FileOutputStream termsOS = new FileOutputStream(newTermsFile);
  563.               OutputStreamWriter termsOSW =
  564.                       new OutputStreamWriter(termsOS, "UTF-8");
  565.               PrintWriter termsWriter = new PrintWriter(termsOSW)) {
  566.         QuasiSuccinctIndexWriter indexWriter =
  567.                 new QuasiSuccinctIndexWriter(
  568.                         IOFactory.FILESYSTEM_FACTORY,
  569.                         outputIndexBasename,
  570.                         numDocs,
  571.                         Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
  572.                         QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
  573.                         CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
  574.                         ByteOrder.nativeOrder());

  575.         IndexIterator iter;
  576.         while((iter = indexReader.nextIterator()) != null) {
  577.           MutableString term = termsIter.next();
  578.           // we can't stream the inverted list, because we need to know
  579.           // up front how many documents the term is found in so we can
  580.           // write that number before writing the positions.
  581.           LongList docPointers = new LongArrayList();
  582.           IntList counts = new IntArrayList();
  583.           List<IntArrayList> positions = new ArrayList<>();
  584.           long frequency = 0;
  585.           long curPointer;
  586.           long occurrences = 0;
  587.           long sumMaxPos = 0;
  588.           while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
  589.             if(curPointer < numDocs) {
  590.               frequency++;
  591.               docPointers.add(curPointer);
  592.               counts.add(iter.count());
  593.               IntArrayList thisDocPositions = new IntArrayList(iter.count());
  594.               positions.add(thisDocPositions);
  595.               occurrences += iter.count();
  596.               totalOccurrences += iter.count();
  597.               if(iter.count() > maxCount) {
  598.                 maxCount = iter.count();
  599.               }
  600.               int pos;
  601.               int lastPos = 0;
  602.               while((pos = iter.nextPosition()) != IndexIterator.END_OF_POSITIONS) {
  603.                 thisDocPositions.add(pos);
  604.                 lastPos = pos;
  605.               }
  606.               sumMaxPos += lastPos;
  607.               if(lastPos > maxDocSize) {
  608.                 maxDocSize = lastPos;
  609.               }
  610.             } else {
  611.               break;
  612.             }
  613.           }

  614.           if(frequency > 0) {
  615.             // this term occurred in at least one document that we're
  616.             // not truncating, so now we know it's safe to write the
  617.             // (truncated) inverted list to the new index and the term
  618.             // to the terms file.

  619.             term.println(termsWriter);
  620.             termFilter.add(term);

  621.             indexWriter.newInvertedList(frequency, occurrences, sumMaxPos);
  622.             indexWriter.writeFrequency(frequency);
  623.             for(int i = 0; i < frequency; i++) {
  624.               OutputBitStream obs = indexWriter.newDocumentRecord();
  625.               indexWriter.writeDocumentPointer(obs, docPointers.get(i));
  626.               indexWriter.writePositionCount(obs, counts.get(i));
  627.               indexWriter.writeDocumentPositions(obs, positions.get(i)
  628.                       .elements(), 0, positions.get(i).size(), -1);
  629.             }
  630.           }
  631.         }

  632.         indexWriter.close();
  633.         writerProperties = indexWriter.properties();
  634.         // write stats file
  635.         try(PrintStream statsPs =
  636.                 new PrintStream(new File(outputIndexBasename
  637.                         + DiskBasedIndex.STATS_EXTENSION))) {
  638.           indexWriter.printStats(statsPs);
  639.         }
  640.         writtenBits = indexWriter.writtenBits();
  641.       }

  642.       // regenerate the term map from the (possibly shorter) terms file
  643.       AtomicIndex.generateTermMap(new File(outputIndexBasename
  644.               + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
  645.               + DiskBasedIndex.TERMMAP_EXTENSION), null);

  646.       // write the bloom filter
  647.       BinIO.storeObject(termFilter, new File(outputIndexBasename
  648.               + DocumentalCluster.BLOOM_EXTENSION));

  649.       // write the truncated sizes file
  650.       File stashedSizesFile =
  651.               new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
  652.       File sizesFile =
  653.               new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
  654.       try(InputBitStream stashedSizesStream =
  655.               new InputBitStream(stashedSizesFile);
  656.               OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
  657.         for(long i = 0; i < numDocs; i++) {
  658.           sizesStream.writeGamma(stashedSizesStream.readGamma());
  659.         }
  660.       }

  661.       // generate the index properties
  662.       Properties stashedProps = new Properties();
  663.       try(FileInputStream stashedPropsStream =
  664.               new FileInputStream(stashedIndexBasename
  665.                       + DiskBasedIndex.PROPERTIES_EXTENSION)) {
  666.         stashedProps.load(stashedPropsStream);
  667.       }
  668.       Properties newProps = new Properties();
  669.       newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
  670.               stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
  671.       newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
  672.       // -1 means unknown
  673.       newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize);
  674.       newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
  675.       newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
  676.       writerProperties.addAll(newProps);
  677.       Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
  678.               outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
  679.     }
  680.   }

  681.   /**
  682.    * Truncate the given direct index to remove documents beyond the
  683.    * given lastDocId. The original version of the batch is assumed to
  684.    * have been stashed as broken-batches/subIndexName-batchName
  685.    *
  686.    * @param indexDirectory the top-level index directory
  687.    * @param subIndexName the name of the sub-index (token-N or
  688.    *          mention-N)
  689.    * @param batchName the name of the batch (head or tail-N)
  690.    * @param lastDocId the last valid document ID
  691.    */
  692.   public static void truncateDirectIndex(File indexDirectory,
  693.           String subIndexName, String batchName, long lastDocId)
  694.           throws Exception {
  695.     File brokenBatches = new File(indexDirectory, "broken-batches");
  696.     File stashedBatch = new File(brokenBatches, subIndexName + "-" + batchName);
  697.     if(!stashedBatch.exists()) {
  698.       throw new RuntimeException("Stashed batch " + stashedBatch + " not found");
  699.     }
  700.     File batchDir = new File(new File(indexDirectory, subIndexName), batchName);
  701.     batchDir.mkdirs();
  702.     log.info("Trimming direct index for batch " + batchDir);

  703.     String stashedIndexBasename =
  704.             new File(stashedBatch, subIndexName
  705.                     + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
  706.     String outputIndexBasename =
  707.             new File(batchDir, subIndexName
  708.                     + AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();

  709.     // A direct index is modelled in Mimir as an inverted index where
  710.     // the terms are documents and vice versa. The "term string" is a
  711.     // zero-padded hex representation of the document ID, so we simply
  712.     // need to stream "inverted" lists from the stashed index to the new
  713.     // one until we reach the term string that is the hex representation
  714.     // of lastDocId.

  715.     MutableString lastDocIdAsHex =
  716.             new MutableString(AtomicIndex.longToTerm(lastDocId));

  717.     // determine the number of documents in this direct index (i.e. the
  718.     // number of entries in the .terms file that are lexicographically
  719.     // <= lastDocIdAsHex)
  720.     long numDocsInIndex = 0;

  721.     File stashedTermsFile =
  722.             new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
  723.     FileLinesCollection termsColl =
  724.             new FileLinesCollection(stashedTermsFile.getAbsolutePath(), "UTF-8");
  725.     try(FileLinesIterator docIdsIter = termsColl.iterator()) {
  726.       while(docIdsIter.hasNext()
  727.               && docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) {
  728.         numDocsInIndex++;
  729.       }
  730.     }
  731.     log.info("Trimmed index will contain " + numDocsInIndex + " documents");

  732.     // write the truncated "terms" file, term map and bloom filter
  733.     BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex);

  734.     try(FileLinesIterator docIdsIter = termsColl.iterator();
  735.             PrintWriter pw =
  736.                     new PrintWriter(new OutputStreamWriter(
  737.                             new FastBufferedOutputStream(new FileOutputStream(
  738.                                     outputIndexBasename
  739.                                             + DiskBasedIndex.TERMS_EXTENSION),
  740.                                     64 * 1024), "UTF-8"))) {
  741.       for(long i = 0; i < numDocsInIndex; i++) {
  742.         MutableString t = docIdsIter.next();
  743.         t.println(pw);
  744.         docBloomFilter.add(t);
  745.       }
  746.     }
  747.     AtomicIndex.generateTermMap(new File(outputIndexBasename
  748.             + DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
  749.             + DiskBasedIndex.TERMMAP_EXTENSION), null);
  750.     BinIO.storeObject(docBloomFilter, new File(outputIndexBasename
  751.             + DocumentalCluster.BLOOM_EXTENSION));

  752.     // stream "inverted lists" (i.e. documents) from the stashed to the
  753.     // new index, and build up a cache of "document sizes" (i.e. the
  754.     // number of documents that contain each term referenced in this
  755.     // index). We can't simply use the sizes from the stashed index
  756.     // because they will include the counts from the inverted lists
  757.     // we're trimming off.
  758.     Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap();
  759.     termSizes.defaultReturnValue(0);

  760.     // we need the total potential number of direct terms to create the
  761.     // index writer
  762.     File directTermsFile =
  763.             new File(new File(indexDirectory, subIndexName),
  764.                     AtomicIndex.DIRECT_TERMS_FILENAME);
  765.     FileLinesCollection directTerms =
  766.             new FileLinesCollection(directTermsFile.getAbsolutePath(), "UTF-8");

  767.     Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false);

  768.     int maxCount = 0;
  769.     long totalOccurrences = 0;
  770.     long writtenBits = 0;
  771.     int maxTermSize = -1; // -1 means unknown
  772.     Properties writerProperties;
  773.    
  774.     try(IndexReader indexReader = stashedIndex.getReader()) {
  775.       // copy the default compression flags, and remove positions
  776.       Map<Component, Coding> flags =
  777.               new HashMap<Component, Coding>(
  778.                       CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
  779.       flags.remove(Component.POSITIONS);
  780.       QuasiSuccinctIndexWriter directIndexWriter =
  781.               new QuasiSuccinctIndexWriter(
  782.                       IOFactory.FILESYSTEM_FACTORY,
  783.                       outputIndexBasename,
  784.                       directTerms.size64(),
  785.                       Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
  786.                       QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags,
  787.                       ByteOrder.nativeOrder());
  788.       IndexIterator iter;
  789.       int docCounter = 0;
  790.       long occurrences = 0;
  791.       while((iter = indexReader.nextIterator()) != null
  792.               && ++docCounter <= numDocsInIndex) {
  793.         // annoyingly we can't stream straight from the old inverted
  794.         // list to the new one, as we need to know up front the total
  795.         // occurrences value which is not exposed through any public
  796.         // API.
  797.         LongList docPointers = new LongArrayList();
  798.         IntList counts = new IntArrayList();
  799.         long frequency = iter.frequency();
  800.         long curPointer;
  801.         while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
  802.           docPointers.add(curPointer);
  803.           counts.add(iter.count());
  804.           termSizes.put(curPointer, termSizes.get(curPointer) + iter.count());
  805.           occurrences += iter.count();
  806.           totalOccurrences += iter.count();
  807.           if(iter.count() > maxCount) {
  808.             maxCount = iter.count();
  809.           }
  810.         }
  811.         directIndexWriter.newInvertedList(frequency, occurrences, 0);
  812.         directIndexWriter.writeFrequency(frequency);
  813.         for(int i = 0; i < frequency; i++) {
  814.           OutputBitStream obs = directIndexWriter.newDocumentRecord();
  815.           directIndexWriter.writeDocumentPointer(obs, docPointers.get(i));
  816.           directIndexWriter.writePositionCount(obs, counts.get(i));
  817.           // no positions in a direct index
  818.         }
  819.       }
  820.       directIndexWriter.close();
  821.       writtenBits = directIndexWriter.writtenBits();
  822.      
  823.       // write the new sizes file
  824.       File sizesFile = new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
  825.       try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
  826.         for(long i = 0; i < directTerms.size64(); i++) {
  827.           int termSize = termSizes.get(i);
  828.           sizesStream.writeGamma(termSize);
  829.           if(termSize > maxTermSize) {
  830.             maxTermSize = termSize;
  831.           }
  832.         }
  833.       }
  834.       writerProperties = directIndexWriter.properties();
  835.       // write stats file
  836.       try(PrintStream statsPs =
  837.               new PrintStream(new File(outputIndexBasename
  838.                       + DiskBasedIndex.STATS_EXTENSION))) {
  839.         directIndexWriter.printStats(statsPs);
  840.       }
  841.     }
  842.    
  843.     // generate the index properties
  844.     Properties stashedProps = new Properties();
  845.     try(FileInputStream stashedPropsStream =
  846.             new FileInputStream(stashedIndexBasename
  847.                     + DiskBasedIndex.PROPERTIES_EXTENSION)) {
  848.       stashedProps.load(stashedPropsStream);
  849.     }
  850.     Properties newProps = new Properties();
  851.     newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
  852.             stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
  853.     newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
  854.     // -1 means unknown
  855.     newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize);
  856.     newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
  857.     newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
  858.     writerProperties.addAll(newProps);
  859.     Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
  860.             outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
  861.   }
  862. }