TruncateIndex.java
/*
* TruncateIndex.java
*
* Copyright (c) 2007-2016, The University of Sheffield.
*
* This file is part of GATE MÃmir (see http://gate.ac.uk/family/mimir.html),
* and is free software, licenced under the GNU Lesser General Public License,
* Version 3, June 2007 (also included with this distribution as file
* LICENCE-LGPL3.html).
*
* Ian Roberts, 1st September 2016
*
* $Id: TruncateIndex.java 19708 2016-10-29 16:23:28Z ian_roberts $
*/
package gate.mimir.util;
import gate.Gate;
import gate.creole.Plugin;
import gate.mimir.IndexConfig;
import gate.mimir.IndexConfig.SemanticIndexerConfig;
import gate.mimir.IndexConfig.TokenIndexerConfig;
import gate.mimir.MimirIndex;
import gate.mimir.index.AtomicIndex;
import gate.mimir.index.DocumentCollection;
import gate.util.maven.Utils;
import it.unimi.di.big.mg4j.index.CompressionFlags;
import it.unimi.di.big.mg4j.index.CompressionFlags.Coding;
import it.unimi.di.big.mg4j.index.CompressionFlags.Component;
import it.unimi.di.big.mg4j.index.DiskBasedIndex;
import it.unimi.di.big.mg4j.index.Index;
import it.unimi.di.big.mg4j.index.IndexIterator;
import it.unimi.di.big.mg4j.index.IndexReader;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndex;
import it.unimi.di.big.mg4j.index.QuasiSuccinctIndexWriter;
import it.unimi.di.big.mg4j.index.cluster.DocumentalCluster;
import it.unimi.di.big.mg4j.io.IOFactory;
import it.unimi.di.big.mg4j.tool.Scan;
import it.unimi.dsi.big.io.FileLinesCollection;
import it.unimi.dsi.big.io.FileLinesCollection.FileLinesIterator;
import it.unimi.dsi.bits.Fast;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.io.InputBitStream;
import it.unimi.dsi.io.OutputBitStream;
import it.unimi.dsi.lang.MutableString;
import it.unimi.dsi.util.BloomFilter;
import it.unimi.dsi.util.Properties;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to fix up a Mimir index that has been corrupted, e.g.
* by an unclean shutdown or out-of-memory condition. The index must be
* closed to use this tool, which either means the Mimir webapp is not
* running, or the index has been deleted from the running Mimir. It is
* very very strongly recommended to back up an index before attempting
* this procedure. The clean up process will unavoidably remove some
* number of documents from the tail of the index, but will attempt to
* keep the number of lost documents to a minimum.
*
* @author ian
*
*/
public class TruncateIndex {
private static final Logger log = LoggerFactory.getLogger(TruncateIndex.class);
/**
* Comparator that orders mimir zip collection files by number (e.g.
* mimir-collection-16.zip comes after mimir-collection-12-15.zip but
* before mimir-collection-100-120.zip)
*/
public static final Comparator<File> ZIP_COLLECTION_COMPARATOR =
new Comparator<File>() {
public int compare(File a, File b) {
int numA =
Integer.parseInt(a.getName().substring(
a.getName().lastIndexOf('-') + 1,
a.getName().length() - 4));
int numB =
Integer.parseInt(b.getName().substring(
b.getName().lastIndexOf('-') + 1,
b.getName().length() - 4));
return numA - numB;
}
};
public static final Comparator<String> BATCH_COMPARATOR =
new Comparator<String>() {
public int compare(String a, String b) {
if(a.equals("head")) {
if(b.equals("head")) {
// both heads
return 0;
} else {
// head before tail
return -1;
}
} else {
if(b.equals("head")) {
// tail after head
return 1;
} else {
// both tails, compare by number
int numA =
Integer.parseInt(a.substring(a.lastIndexOf('-') + 1));
int numB =
Integer.parseInt(b.substring(b.lastIndexOf('-') + 1));
return numA - numB;
}
}
}
};
public static final FilenameFilter INDEX_NAME_FILTER = new FilenameFilter() {
private Pattern pat = Pattern.compile("(?:token|mention)-\\d+");
@Override
public boolean accept(File dir, String name) {
return pat.matcher(name).matches();
}
};
public static final FilenameFilter BATCH_NAME_FILTER = new FilenameFilter() {
private Pattern pat = Pattern.compile("head|tail-\\d+");
@Override
public boolean accept(File dir, String name) {
return pat.matcher(name).matches();
}
};
public static void main(String... args) throws Exception {
Gate.runInSandbox(true);
Gate.init();
int i = 0;
Pattern mavenPluginPattern = Pattern.compile("([^/]+?):([^/]+?):([^/]+)");
while(i < args.length) {
if("-d".equals(args[i])) {
// Maven cache directory
Utils.addCacheDirectory(new File(args[++i]));
i++;
} else if("-p".equals(args[i])) {
// plugin - either URL or file path to a directory plugin, or group:artifact:version for a Maven one
String plugin = args[++i];
plugin = plugin.trim();
try {
Matcher m = mavenPluginPattern.matcher(plugin);
if(m.matches()) {
// this looks like a Maven plugin
Gate.getCreoleRegister().registerPlugin(new Plugin.Maven(m.group(1), m.group(2), m.group(3)));
} else {
try {
URL u = new URL(plugin);
// succeeded in parsing as a URL, load that
Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(u));
} catch(MalformedURLException e) {
// not a URL, treat as a file
File pluginFile = new File(plugin);
Gate.getCreoleRegister().registerPlugin(new Plugin.Directory(pluginFile.toURI().toURL()));
}
}
} catch(Exception e) {
log.error("Failed to load plugin " + plugin, e);
System.exit(1);
}
i++;
} else {
break;
}
}
truncateIndex(new File(args[i]));
}
/**
* Attempt to fix up a corrupted Mimir index by truncating some number
* of documents off the end. There will be a certain number of
* documents in complete index batches, and a (possibly different)
* number of documents successfully persisted to disk in the zip files
* of the DocumentCollection, the index will be truncated to the
* smaller of those two numbers.
*
* @param indexDirectory the top-level directory of the Mimir index
* (containing config.xml)
*/
public static void truncateIndex(File indexDirectory) throws Exception {
// 1. Repair the last zip file in the DocumentCollection
repairLastZip(indexDirectory);
// 2. Determine the last "good" batch (the greatest numbered head or
// tail that is fully written to disk in every AtomicIndex) and
// stash the bad ones
String lastGoodBatch = determineLastGoodBatch(indexDirectory);
if(lastGoodBatch == null) {
throw new RuntimeException(
"All batches are corrupt, sorry, this index is a write-off");
}
// 3. If the zip collection is at least as long as the sum of the
// good batches, truncate it to match the batches and we're done.
BatchDetails batches = batchEndPoints(indexDirectory);
long totalDocsInBatches = batches.endPoints[batches.endPoints.length - 1];
long totalDocsInZips = totalDocumentsInZipCollection(indexDirectory);
if(totalDocsInBatches == totalDocsInZips) {
log.info("We're in luck, the batches and zips line up exactly");
return;
} else if(totalDocsInZips > totalDocsInBatches) {
truncateZipCollectionTo(indexDirectory, totalDocsInBatches);
return;
} else if(totalDocsInZips == 0) {
throw new RuntimeException("Zip collection is empty");
}
// 4. Otherwise, the zip collection stops in the middle of a batch B
int endBatch = -1;
for(int i = 0; i < batches.names.length; i++) {
if(batches.endPoints[i] >= totalDocsInZips) {
endBatch = i;
break;
}
}
log.info("Zip collection ends within " + batches.names[endBatch]);
if(batches.endPoints[endBatch] == totalDocsInZips) {
// special case - zip collection ends exactly at the end of a
// batch. Stash subsequent batches and we're done
log.info("Zip collection ends exactly at the end of batch "
+ batches.names[endBatch]);
log.info("Stashing subsequent batches");
stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
.subList(endBatch + 1, batches.endPoints.length));
log.info("Done");
return;
}
// 4.1. Stash B (for every AtomicIndex) and any batches beyond it.
stashBatches(indexDirectory, java.util.Arrays.asList(batches.names)
.subList(endBatch, batches.endPoints.length));
// 4.2. Read each stashed B and re-write it but with documents
// beyond the end of the zip collection omitted
long endOfPreviousBatch = 0L;
if(endBatch > 0) {
endOfPreviousBatch = batches.endPoints[endBatch - 1];
}
trimBatch(indexDirectory, batches.names[endBatch], totalDocsInZips
- endOfPreviousBatch);
// 4.3. Truncate the direct indexes for those AtomicIndexes that
// require it
IndexConfig indexConfig =
IndexConfig.readConfigFromFile(new File(indexDirectory,
MimirIndex.INDEX_CONFIG_FILENAME));
TokenIndexerConfig[] tokenIndexes = indexConfig.getTokenIndexers();
for(int i = 0; i < tokenIndexes.length; i++) {
if(tokenIndexes[i].isDirectIndexEnabled()) {
truncateDirectIndex(indexDirectory, "token-" + i,
batches.names[endBatch], totalDocsInZips - 1);
}
}
SemanticIndexerConfig[] semanticIndexes = indexConfig.getSemanticIndexers();
for(int i = 0; i < semanticIndexes.length; i++) {
if(semanticIndexes[i].isDirectIndexEnabled()) {
truncateDirectIndex(indexDirectory, "mention-" + i,
batches.names[endBatch], totalDocsInZips - 1);
}
}
}
public static void repairLastZip(File indexDirectory) throws IOException {
log.info("Ensuring last zip file in " + indexDirectory.getAbsolutePath()
+ " is complete");
File[] zipCollectionFiles =
indexDirectory
.listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
if(zipCollectionFiles.length > 0) {
java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
File lastZip = zipCollectionFiles[zipCollectionFiles.length - 1];
log.info("Last zip is " + lastZip.getName());
File brokenBatches = new File(indexDirectory, "broken-batches");
brokenBatches.mkdirs();
File movedLastZip = new File(brokenBatches, lastZip.getName());
if(movedLastZip.exists()) {
movedLastZip.delete();
}
if(!lastZip.renameTo(movedLastZip)) {
throw new RuntimeException("Could not stash " + lastZip.getName()
+ " in broken-batches");
}
log.debug("Moved " + lastZip.getName() + " to broken-batches");
String lastGoodDoc = null;
try(FileInputStream oldIn = new FileInputStream(movedLastZip);
ZipInputStream zipIn = new ZipInputStream(oldIn);
FileOutputStream newOut = new FileOutputStream(lastZip);
ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
ZipEntry entry = null;
try {
while((entry = zipIn.getNextEntry()) != null) {
ByteArrayOutputStream data = new ByteArrayOutputStream();
IOUtils.copy(zipIn, data);
// if we get here the input zip was not truncated mid-entry,
// so it's safe to write this entry
zipOut.putNextEntry(entry);
IOUtils.write(data.toByteArray(), zipOut);
zipOut.closeEntry();
lastGoodDoc = entry.getName();
}
} catch(EOFException eof) {
// this is expected, if the zip was not properly closed
}
}
log.info("Last good document ID was " + lastGoodDoc);
} else {
log.warn("No files in zip collection");
}
}
/**
* Determines the last "good" batch name (head or tail-N) for the
* given index, and stashes any bad batches in the broken-batches
* directory.
*
* @param indexDirectory
* @return
* @throws IOException
*/
public static String determineLastGoodBatch(File indexDirectory)
throws IOException {
String lastGood = null;
File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
if(subIndexes.length == 0) {
throw new RuntimeException("Index has no AtomicIndexes!");
}
String[] batches = subIndexes[0].list(BATCH_NAME_FILTER);
java.util.Arrays.sort(batches, BATCH_COMPARATOR);
BATCH: for(String batch : batches) {
for(File subIndex : subIndexes) {
if(!new File(new File(subIndex, batch), subIndex.getName()
+ ".properties").exists()) {
break BATCH;
}
}
// if we get to here we know this batch exists in all sub-indexes
lastGood = batch;
}
if(lastGood != null) {
File brokenBatches = new File(indexDirectory, "broken-batches");
// stash bad batches
for(File subIndex : subIndexes) {
File[] thisIndexBatches = subIndex.listFiles(BATCH_NAME_FILTER);
for(File b : thisIndexBatches) {
if(BATCH_COMPARATOR.compare(lastGood, b.getName()) < 0) {
// this is a bad batch, stash it
File movedB =
new File(brokenBatches, subIndex.getName() + "-"
+ b.getName());
if(movedB.exists()) {
FileUtils.deleteDirectory(movedB);
}
if(!b.renameTo(movedB)) {
throw new RuntimeException("Could not stash " + movedB.getName());
}
}
}
}
}
return lastGood;
}
public static class BatchDetails {
String[] names;
long[] endPoints;
}
public static BatchDetails batchEndPoints(File indexDirectory)
throws IOException, ConfigurationException {
BatchDetails details = new BatchDetails();
long totalDocs = 0;
File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
if(subIndexes.length == 0) {
throw new RuntimeException("Index has no AtomicIndexes!");
}
details.names = subIndexes[0].list(BATCH_NAME_FILTER);
java.util.Arrays.sort(details.names, BATCH_COMPARATOR);
details.endPoints = new long[details.names.length];
for(int i = 0; i < details.names.length; i++) {
Properties batchProps = new Properties();
try(FileInputStream propsIn =
new FileInputStream(new File(new File(subIndexes[0],
details.names[i]), subIndexes[0].getName()
+ ".properties"))) {
batchProps.load(propsIn);
}
totalDocs += batchProps.getLong("documents");
details.endPoints[i] = totalDocs;
}
return details;
}
public static long totalDocumentsInZipCollection(File indexDirectory)
throws IOException {
long totalDocs = 0;
File[] zipCollectionFiles =
indexDirectory
.listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
for(File zip : zipCollectionFiles) {
try(ZipFile zf = new ZipFile(zip)) {
totalDocs += zf.size();
}
}
return totalDocs;
}
public static void truncateZipCollectionTo(File indexDirectory, long numDocs)
throws IOException {
File[] zipCollectionFiles =
indexDirectory
.listFiles(DocumentCollection.CollectionFile.FILENAME_FILTER);
java.util.Arrays.sort(zipCollectionFiles, ZIP_COLLECTION_COMPARATOR);
// the truncation point is somewhere within the last zip file whose
// first entry is less than numDocs (document IDs are zero based, so
// the document named numDocs is actually the (numDocs+1)th one).
int targetFile = -1;
for(int i = 0; i < zipCollectionFiles.length; i++) {
try(FileInputStream fis = new FileInputStream(zipCollectionFiles[i]);
ZipInputStream zipIn = new ZipInputStream(fis)) {
ZipEntry firstEntry = zipIn.getNextEntry();
if(firstEntry != null) {
long documentId = Long.parseLong(firstEntry.getName());
if(documentId >= numDocs) {
break;
} else {
targetFile = i;
}
}
}
}
if(targetFile < 0) {
throw new RuntimeException(
"Zip collection broken beyond repair - there is no zip file containing the cut point");
}
// we know that document (numDocs-1) is somewhere in
// zipCollectionFiles[targetFile]. Move that file out of the way and
// rewrite it, truncated appropriately.
File origFile = zipCollectionFiles[targetFile];
File brokenBatches = new File(indexDirectory, "broken-batches");
brokenBatches.mkdirs();
File movedFile =
new File(brokenBatches, "to-truncate-" + origFile.getName());
if(movedFile.exists()) {
movedFile.delete();
}
if(!origFile.renameTo(movedFile)) {
throw new RuntimeException("Could not stash " + origFile.getName()
+ " in broken-batches");
}
String lastEntryName = String.valueOf(numDocs - 1);
try(FileInputStream oldIn = new FileInputStream(movedFile);
ZipInputStream zipIn = new ZipInputStream(oldIn);
FileOutputStream newOut = new FileOutputStream(origFile);
ZipOutputStream zipOut = new ZipOutputStream(newOut)) {
ZipEntry entry = null;
try {
while((entry = zipIn.getNextEntry()) != null) {
ByteArrayOutputStream data = new ByteArrayOutputStream();
IOUtils.copy(zipIn, data);
// if we get here the input zip was not truncated mid-entry,
// so it's safe to write this entry
zipOut.putNextEntry(entry);
IOUtils.write(data.toByteArray(), zipOut);
zipOut.closeEntry();
if(lastEntryName.equals(entry.getName())) {
// reached the cut point, stop copying
break;
}
}
} catch(EOFException eof) {
// this is expected, if the zip was not properly closed
}
}
log.info("Truncated zip collection file " + origFile + " to document "
+ lastEntryName);
}
public static void stashBatches(File indexDirectory, List<String> batches)
throws IOException {
File brokenBatches = new File(indexDirectory, "broken-batches");
File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
for(File subIndex : subIndexes) {
for(String batchName : batches) {
File b = new File(subIndex, batchName);
File movedB =
new File(brokenBatches, subIndex.getName() + "-" + batchName);
if(movedB.exists()) {
FileUtils.deleteDirectory(movedB);
}
if(!b.renameTo(movedB)) {
throw new RuntimeException("Could not stash " + movedB.getName());
}
}
}
}
/**
* Trim the given batch in all sub-indexes to the given length in
* documents. Assumes the batch has already been stashed as
* broken-batches/subindex-batchName.
*
* @param indexDirectory top level index directory
* @param batchName name of the batch to trim
* @param numDocs number of documents to which the batch should be
* trimmed.
*/
public static void trimBatch(File indexDirectory, String batchName,
long numDocs) throws Exception {
File brokenBatches = new File(indexDirectory, "broken-batches");
File[] subIndexes = indexDirectory.listFiles(INDEX_NAME_FILTER);
for(File subIndex : subIndexes) {
File stashedBatch =
new File(brokenBatches, subIndex.getName() + "-" + batchName);
if(!stashedBatch.exists()) {
throw new RuntimeException("Stashed batch " + stashedBatch
+ " not found");
}
File batchDir = new File(subIndex, batchName);
batchDir.mkdirs();
log.info("Trimming batch " + batchDir);
String stashedIndexBasename =
new File(stashedBatch, subIndex.getName()).getAbsolutePath();
String outputIndexBasename =
new File(batchDir, subIndex.getName()).getAbsolutePath();
Index stashedIndex = Index.getInstance(stashedIndexBasename, true, true);
// when you read through an index sequentially, the IndexIterators
// don't tell you what term they were for, so we need to read the
// .terms file from the stashed batch in step with the index
// reader.
File stashedTermsFile =
new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
FileLinesCollection termsColl =
new FileLinesCollection(stashedTermsFile.getAbsolutePath(),
"UTF-8");
long numTerms = termsColl.size64();
Iterator<MutableString> termsIter = termsColl.iterator();
File newTermsFile =
new File(outputIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
// there will certainly be no *more* than numTerms terms in the
// final index, there may be fewer
BloomFilter<Void> termFilter = BloomFilter.create(Math.max(numTerms, 1));
Properties writerProperties = null;
long writtenBits = 0;
int maxDocSize = 0;
int maxCount = 0;
long totalOccurrences = 0;
try(IndexReader indexReader = stashedIndex.getReader();
FileOutputStream termsOS = new FileOutputStream(newTermsFile);
OutputStreamWriter termsOSW =
new OutputStreamWriter(termsOS, "UTF-8");
PrintWriter termsWriter = new PrintWriter(termsOSW)) {
QuasiSuccinctIndexWriter indexWriter =
new QuasiSuccinctIndexWriter(
IOFactory.FILESYSTEM_FACTORY,
outputIndexBasename,
numDocs,
Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE,
CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX,
ByteOrder.nativeOrder());
IndexIterator iter;
while((iter = indexReader.nextIterator()) != null) {
MutableString term = termsIter.next();
// we can't stream the inverted list, because we need to know
// up front how many documents the term is found in so we can
// write that number before writing the positions.
LongList docPointers = new LongArrayList();
IntList counts = new IntArrayList();
List<IntArrayList> positions = new ArrayList<>();
long frequency = 0;
long curPointer;
long occurrences = 0;
long sumMaxPos = 0;
while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
if(curPointer < numDocs) {
frequency++;
docPointers.add(curPointer);
counts.add(iter.count());
IntArrayList thisDocPositions = new IntArrayList(iter.count());
positions.add(thisDocPositions);
occurrences += iter.count();
totalOccurrences += iter.count();
if(iter.count() > maxCount) {
maxCount = iter.count();
}
int pos;
int lastPos = 0;
while((pos = iter.nextPosition()) != IndexIterator.END_OF_POSITIONS) {
thisDocPositions.add(pos);
lastPos = pos;
}
sumMaxPos += lastPos;
if(lastPos > maxDocSize) {
maxDocSize = lastPos;
}
} else {
break;
}
}
if(frequency > 0) {
// this term occurred in at least one document that we're
// not truncating, so now we know it's safe to write the
// (truncated) inverted list to the new index and the term
// to the terms file.
term.println(termsWriter);
termFilter.add(term);
indexWriter.newInvertedList(frequency, occurrences, sumMaxPos);
indexWriter.writeFrequency(frequency);
for(int i = 0; i < frequency; i++) {
OutputBitStream obs = indexWriter.newDocumentRecord();
indexWriter.writeDocumentPointer(obs, docPointers.get(i));
indexWriter.writePositionCount(obs, counts.get(i));
indexWriter.writeDocumentPositions(obs, positions.get(i)
.elements(), 0, positions.get(i).size(), -1);
}
}
}
indexWriter.close();
writerProperties = indexWriter.properties();
// write stats file
try(PrintStream statsPs =
new PrintStream(new File(outputIndexBasename
+ DiskBasedIndex.STATS_EXTENSION))) {
indexWriter.printStats(statsPs);
}
writtenBits = indexWriter.writtenBits();
}
// regenerate the term map from the (possibly shorter) terms file
AtomicIndex.generateTermMap(new File(outputIndexBasename
+ DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
+ DiskBasedIndex.TERMMAP_EXTENSION), null);
// write the bloom filter
BinIO.storeObject(termFilter, new File(outputIndexBasename
+ DocumentalCluster.BLOOM_EXTENSION));
// write the truncated sizes file
File stashedSizesFile =
new File(stashedIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
File sizesFile =
new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
try(InputBitStream stashedSizesStream =
new InputBitStream(stashedSizesFile);
OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
for(long i = 0; i < numDocs; i++) {
sizesStream.writeGamma(stashedSizesStream.readGamma());
}
}
// generate the index properties
Properties stashedProps = new Properties();
try(FileInputStream stashedPropsStream =
new FileInputStream(stashedIndexBasename
+ DiskBasedIndex.PROPERTIES_EXTENSION)) {
stashedProps.load(stashedPropsStream);
}
Properties newProps = new Properties();
newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
// -1 means unknown
newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxDocSize);
newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
writerProperties.addAll(newProps);
Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
}
}
/**
* Truncate the given direct index to remove documents beyond the
* given lastDocId. The original version of the batch is assumed to
* have been stashed as broken-batches/subIndexName-batchName
*
* @param indexDirectory the top-level index directory
* @param subIndexName the name of the sub-index (token-N or
* mention-N)
* @param batchName the name of the batch (head or tail-N)
* @param lastDocId the last valid document ID
*/
public static void truncateDirectIndex(File indexDirectory,
String subIndexName, String batchName, long lastDocId)
throws Exception {
File brokenBatches = new File(indexDirectory, "broken-batches");
File stashedBatch = new File(brokenBatches, subIndexName + "-" + batchName);
if(!stashedBatch.exists()) {
throw new RuntimeException("Stashed batch " + stashedBatch + " not found");
}
File batchDir = new File(new File(indexDirectory, subIndexName), batchName);
batchDir.mkdirs();
log.info("Trimming direct index for batch " + batchDir);
String stashedIndexBasename =
new File(stashedBatch, subIndexName
+ AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
String outputIndexBasename =
new File(batchDir, subIndexName
+ AtomicIndex.DIRECT_INDEX_NAME_SUFFIX).getAbsolutePath();
// A direct index is modelled in Mimir as an inverted index where
// the terms are documents and vice versa. The "term string" is a
// zero-padded hex representation of the document ID, so we simply
// need to stream "inverted" lists from the stashed index to the new
// one until we reach the term string that is the hex representation
// of lastDocId.
MutableString lastDocIdAsHex =
new MutableString(AtomicIndex.longToTerm(lastDocId));
// determine the number of documents in this direct index (i.e. the
// number of entries in the .terms file that are lexicographically
// <= lastDocIdAsHex)
long numDocsInIndex = 0;
File stashedTermsFile =
new File(stashedIndexBasename + DiskBasedIndex.TERMS_EXTENSION);
FileLinesCollection termsColl =
new FileLinesCollection(stashedTermsFile.getAbsolutePath(), "UTF-8");
try(FileLinesIterator docIdsIter = termsColl.iterator()) {
while(docIdsIter.hasNext()
&& docIdsIter.next().compareTo(lastDocIdAsHex) <= 0) {
numDocsInIndex++;
}
}
log.info("Trimmed index will contain " + numDocsInIndex + " documents");
// write the truncated "terms" file, term map and bloom filter
BloomFilter<Void> docBloomFilter = BloomFilter.create(numDocsInIndex);
try(FileLinesIterator docIdsIter = termsColl.iterator();
PrintWriter pw =
new PrintWriter(new OutputStreamWriter(
new FastBufferedOutputStream(new FileOutputStream(
outputIndexBasename
+ DiskBasedIndex.TERMS_EXTENSION),
64 * 1024), "UTF-8"))) {
for(long i = 0; i < numDocsInIndex; i++) {
MutableString t = docIdsIter.next();
t.println(pw);
docBloomFilter.add(t);
}
}
AtomicIndex.generateTermMap(new File(outputIndexBasename
+ DiskBasedIndex.TERMS_EXTENSION), new File(outputIndexBasename
+ DiskBasedIndex.TERMMAP_EXTENSION), null);
BinIO.storeObject(docBloomFilter, new File(outputIndexBasename
+ DocumentalCluster.BLOOM_EXTENSION));
// stream "inverted lists" (i.e. documents) from the stashed to the
// new index, and build up a cache of "document sizes" (i.e. the
// number of documents that contain each term referenced in this
// index). We can't simply use the sizes from the stashed index
// because they will include the counts from the inverted lists
// we're trimming off.
Long2IntOpenHashMap termSizes = new Long2IntOpenHashMap();
termSizes.defaultReturnValue(0);
// we need the total potential number of direct terms to create the
// index writer
File directTermsFile =
new File(new File(indexDirectory, subIndexName),
AtomicIndex.DIRECT_TERMS_FILENAME);
FileLinesCollection directTerms =
new FileLinesCollection(directTermsFile.getAbsolutePath(), "UTF-8");
Index stashedIndex = Index.getInstance(stashedIndexBasename, true, false);
int maxCount = 0;
long totalOccurrences = 0;
long writtenBits = 0;
int maxTermSize = -1; // -1 means unknown
Properties writerProperties;
try(IndexReader indexReader = stashedIndex.getReader()) {
// copy the default compression flags, and remove positions
Map<Component, Coding> flags =
new HashMap<Component, Coding>(
CompressionFlags.DEFAULT_QUASI_SUCCINCT_INDEX);
flags.remove(Component.POSITIONS);
QuasiSuccinctIndexWriter directIndexWriter =
new QuasiSuccinctIndexWriter(
IOFactory.FILESYSTEM_FACTORY,
outputIndexBasename,
directTerms.size64(),
Fast.mostSignificantBit(QuasiSuccinctIndex.DEFAULT_QUANTUM),
QuasiSuccinctIndexWriter.DEFAULT_CACHE_SIZE, flags,
ByteOrder.nativeOrder());
IndexIterator iter;
int docCounter = 0;
long occurrences = 0;
while((iter = indexReader.nextIterator()) != null
&& ++docCounter <= numDocsInIndex) {
// annoyingly we can't stream straight from the old inverted
// list to the new one, as we need to know up front the total
// occurrences value which is not exposed through any public
// API.
LongList docPointers = new LongArrayList();
IntList counts = new IntArrayList();
long frequency = iter.frequency();
long curPointer;
while((curPointer = iter.nextDocument()) != IndexIterator.END_OF_LIST) {
docPointers.add(curPointer);
counts.add(iter.count());
termSizes.put(curPointer, termSizes.get(curPointer) + iter.count());
occurrences += iter.count();
totalOccurrences += iter.count();
if(iter.count() > maxCount) {
maxCount = iter.count();
}
}
directIndexWriter.newInvertedList(frequency, occurrences, 0);
directIndexWriter.writeFrequency(frequency);
for(int i = 0; i < frequency; i++) {
OutputBitStream obs = directIndexWriter.newDocumentRecord();
directIndexWriter.writeDocumentPointer(obs, docPointers.get(i));
directIndexWriter.writePositionCount(obs, counts.get(i));
// no positions in a direct index
}
}
directIndexWriter.close();
writtenBits = directIndexWriter.writtenBits();
// write the new sizes file
File sizesFile = new File(outputIndexBasename + DiskBasedIndex.SIZES_EXTENSION);
try(OutputBitStream sizesStream = new OutputBitStream(sizesFile)) {
for(long i = 0; i < directTerms.size64(); i++) {
int termSize = termSizes.get(i);
sizesStream.writeGamma(termSize);
if(termSize > maxTermSize) {
maxTermSize = termSize;
}
}
}
writerProperties = directIndexWriter.properties();
// write stats file
try(PrintStream statsPs =
new PrintStream(new File(outputIndexBasename
+ DiskBasedIndex.STATS_EXTENSION))) {
directIndexWriter.printStats(statsPs);
}
}
// generate the index properties
Properties stashedProps = new Properties();
try(FileInputStream stashedPropsStream =
new FileInputStream(stashedIndexBasename
+ DiskBasedIndex.PROPERTIES_EXTENSION)) {
stashedProps.load(stashedPropsStream);
}
Properties newProps = new Properties();
newProps.setProperty(Index.PropertyKeys.TERMPROCESSOR,
stashedProps.getProperty(Index.PropertyKeys.TERMPROCESSOR));
newProps.setProperty(Index.PropertyKeys.SIZE, writtenBits);
// -1 means unknown
newProps.setProperty(Index.PropertyKeys.MAXDOCSIZE, maxTermSize);
newProps.setProperty(Index.PropertyKeys.MAXCOUNT, maxCount);
newProps.setProperty(Index.PropertyKeys.OCCURRENCES, totalOccurrences);
writerProperties.addAll(newProps);
Scan.saveProperties(IOFactory.FILESYSTEM_FACTORY, writerProperties,
outputIndexBasename + DiskBasedIndex.PROPERTIES_EXTENSION);
}
}