MimirIndex.java
/*
* Index.java
*
* Copyright (c) 2007-2013, The University of Sheffield.
*
* This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html),
* and is free software, licenced under the GNU Lesser General Public License,
* Version 3, June 2007 (also included with this distribution as file
* LICENCE-LGPL3.html).
*
* Valentin Tablan, 30 Oct 2013
*
* $Id: MimirIndex.java 17795 2014-04-10 12:00:46Z ian_roberts $
*/
package gate.mimir;
import gate.Document;
import gate.Gate;
import gate.creole.AnalyserRunningStrategy;
import gate.mimir.IndexConfig.SemanticIndexerConfig;
import gate.mimir.IndexConfig.TokenIndexerConfig;
import gate.mimir.index.AtomicAnnotationIndex;
import gate.mimir.index.AtomicIndex;
import gate.mimir.index.AtomicTokenIndex;
import gate.mimir.index.DocumentCollection;
import gate.mimir.index.DocumentData;
import gate.mimir.index.GATEDocument;
import gate.mimir.index.IndexException;
import gate.mimir.search.QueryEngine;
import gate.util.GateRuntimeException;
import it.unimi.di.big.mg4j.index.cluster.IndexCluster;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* A Mímir index which can index documents and answer queries. This class is the
* main entry point to the Mímir API.
* </p>
* A Mímir index is a compound index comprising the following data elements:
* <ul>
* <li>one or more sub-indexes (implemented by classes that extend
* {@link AtomicIndex}.</li>
* <li>a document collection containing the document textual content and
* metadata</li>
* </ul>
* <p>
* Each sub-index indexes either a certain feature of token annotations
* ({@link AtomicTokenIndex}) or one or more annotation types
* ({@link AtomicAnnotationIndex}).
* </p>
* <p>
* A Mímir index is continually accepting documents to be indexed (through calls
* to {@link #indexDocument(Document)}) and can answer queries though the
* {@link QueryEngine} instance returned by {@link #getQueryEngine()}.
* </p>
* <p>
* Documents submitted for indexing are initially accumulated in RAM, during
* which time they are not available for being searched. After documents in RAM
* are written to disk (a <em>sync-to-disk</em> operation), they become
* searchable. In-RAM documents are synced to disk after a certain amount of
* data has been accumulated (see {@link #setOccurrencesPerBatch(long)}) and
* also at regular time intervals (see {@link #setTimeBetweenBatches(int)}).
* </p>
* <p>
* Client code can request a <em>sync to disk</em> operation by calling
* {@link #requestSyncToDisk()}.
* </p>
* <p>
* Every sync-to-disk operation causes a new index <em>batch</em> to be created.
* All the batches are merged into a {@link IndexCluster} which is then used to
* serve queries. If the number of clusters gets too large, it can harm
* efficiency or the system can run into problems due to too large a number of
* files being open. To avoid this, the index batches can be <em>compacted</em>
* into a single batch. The index will automatically do that once the number of
* batches exceeds {@link IndexConfig#setMaximumBatches(int)}.
* </p>
* <p>
* Client code can request a compact operation by calling
* {@link #requestCompactIndex()}.
* </p>
* <p>
* In order to keep its consistency, a Mímir index <strong>must</strong> be
* closed orderly by calling {@link #close()} before the JVM is shut down.
* </p>
*/
public class MimirIndex {
/**
* The name of the file in the index directory where the index config is
* saved.
*/
public static final String INDEX_CONFIG_FILENAME = "config.xml";
/**
* The name for the file (stored in the root index directory) containing
* the serialised version of the {@link #deletedDocumentIds}.
*/
public static final String DELETED_DOCUMENT_IDS_FILE_NAME = "deleted.ser";
/**
* How many occurrences to index in each batch. This metric is more reliable,
* than document counts, as it does not depend on average document size.
*/
public static final int DEFAULT_OCCURRENCES_PER_BATCH = 100 * 1000 * 1000;
/**
* The default length for the buffer input / output queues for sub-indexers.
*/
public static final int DEFAULT_INDEXING_QUEUE_SIZE = 30;
/**
* Special value used to indicate that the index is closing and there will be
* no more sync tasks to process (an END_OF_QUEUE value for
* {@link #syncRequests}).
*/
protected final static Future<Long> NO_MORE_TASKS = new FutureTask<Long>(
new Callable<Long>() {
@Override
public Long call() throws Exception {
return 0l;
}
});
/**
* How many occurrences to be accumulated in RAM before a new tail batch is
* written to disk.
*/
protected long occurrencesPerBatch = DEFAULT_OCCURRENCES_PER_BATCH;
private static final Logger logger = LoggerFactory.getLogger(MimirIndex.class);
/**
* A {@link Runnable} used in a background thread to perform various index
* maintenance tasks:
* <ul>
* <li>check that the documents are being returned from the sub-indexers
* in the same order as they were submitted for indexing;</li>
* <li>update the {@link MimirIndex#occurrencesInRam} value by adding the
* occurrences produced by indexing new documents.</li>
* <li>delete indexed documents from GATE</li>
* </ul>
*/
protected class IndexMaintenanceRunner implements Runnable {
public void run(){
boolean finished = false;
while(!finished){
GATEDocument currentDocument = null;
try {
//get one document from each of the sub-indexers
//check identity and add to output queue.
for(AtomicIndex aSubIndexer : subIndexes){
GATEDocument aDoc = aSubIndexer.getOutputQueue().take();
if(currentDocument == null){
currentDocument = aDoc;
}else if(aDoc != currentDocument){
//malfunction!
throw new RuntimeException(
"Out of order document received from sub-indexer!");
}
}
//we obtained the same document from all the sub-indexers
if(currentDocument != GATEDocument.END_OF_QUEUE) {
occurrencesInRam += currentDocument.getOccurrences();
// let's delete it
logger.debug("Deleting document "
+ currentDocument.getDocument().getName());
gate.Factory.deleteResource(currentDocument.getDocument());
logger.debug("Document deleted. "
+ Gate.getCreoleRegister().getLrInstances(
currentDocument.getDocument().getClass().getName())
.size() + " documents still live.");
} else {
// we're done
finished = true;
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* A {@link Runnable} used in a background thread to perform various index
* maintenance tasks:
* <ul>
* <li>update the {@link MimirIndex#occurrencesInRam} value by subtracting
* the occurrence counts for all the batches that have recently been written
* to disk. It finds these by consuming the {@link Future}s in
* {@link MimirIndex#syncRequests}.</li>
* <li>Compact the index when too many on-disk batches have been created.</li>
* <li>compact the document collection when too many archive files have
* been created.</li>
* </ul>
*
* We use a different background thread (instead of adding more work to
* {@link IndexMaintenanceRunner}) because each of the threads gets its tasks
* from a different blocking queue, which allows us to sleep the background
* threads while they're not needed.
*/
protected class IndexMaintenanceRunner2 implements Runnable {
@Override
public void run() {
try {
Future<Long> aTask = syncRequests.take();
while(aTask != NO_MORE_TASKS) {
try {
occurrencesInRam -= aTask.get();
if(syncRequests.isEmpty()) {
// latest dump finished: compact index if needed;
boolean compactNeeded = false;
for(AtomicIndex aSubIndex : subIndexes) {
if(aSubIndex.getBatchCount() > indexConfig.getMaximumBatches()) {
compactNeeded = true;
break;
}
}
if(compactNeeded && !closed){
logger.debug("Compacting sub-indexes");
compactIndexSync();
}
if(documentCollection.getArchiveCount() > indexConfig.getMaximumBatches()
&& !closed) {
try {
logger.debug("Compacting document collection");
compactDocumentCollection();
} catch(Exception e) {
logger.error("Error while compacting document collection. "
+ "Index is now invalid. Closing index to avoid further damage.",
e);
try {
close();
} catch(InterruptedException e1) {
logger.error("Received interrupt request while closing "
+ "operation in progress", e);
Thread.currentThread().interrupt();
} catch(IOException e1) {
logger.error("Further IO exception while closing index.", e1);
}
}
}
}
} catch(ExecutionException e) {
// a sync request has failed. The index may be damaged, so we will
// close it to avoid further damage.
logger.error("A sync-to-disk request has failed. Closing index "
+ "to avoid further damage.", e);
try {
close();
} catch(IOException e1) {
logger.error("A further error was generated while attmepting "
+ "to close index.", e1);
}
}
aTask = syncRequests.take();
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Request the index compaction, and waits for all the operations to
* complete.
* @throws InterruptedException
*/
protected void compactIndexSync() throws InterruptedException {
List<Future<Void>> futures = requestCompactIndex();
for(Future<Void> f : futures){
try {
f.get();
} catch(InterruptedException e) {
// we were interrupted while waiting for a compacting operation
logger.error("Received interrupt request while compacting "
+ "operation in progress", e);
Thread.currentThread().interrupt();
} catch(ExecutionException e) {
logger.error("Execution exception while compacting the index. Index "
+ "may now be corrupted, closing it to avoid further damage", e);
try {
close();
} catch(InterruptedException e1) {
logger.error("Received interrupt request while closing "
+ "operation in progress", e);
} catch(IOException e1) {
logger.error("Further IO exception while closing index.", e1);
}
}
}
}
}
private class WriteDeletedDocsTask extends TimerTask {
public void run() {
synchronized(maintenanceTimer) {
File delFile = new File(indexDirectory, DELETED_DOCUMENT_IDS_FILE_NAME);
if(delFile.exists()) {
delFile.delete();
}
try{
logger.debug("Writing deleted documents set");
ObjectOutputStream oos = new ObjectOutputStream(
new GZIPOutputStream(
new BufferedOutputStream(
new FileOutputStream(delFile))));
oos.writeObject(deletedDocumentIds);
oos.flush();
oos.close();
logger.debug("Writing deleted documents set completed.");
}catch (IOException e) {
logger.error("Exception while writing deleted documents set", e);
}
}
}
}
/**
* {@link TimerTask} used to regularly dump the latest document to an on-disk
* batch, allowing them to become searchable.
*/
protected class SyncToDiskTask extends TimerTask {
@Override
public void run() {
if(occurrencesInRam > 0) {
try {
requestSyncToDisk();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* The {@link IndexConfig} used for this index.
*/
protected IndexConfig indexConfig;
/**
* The top level directory containing this index.
*/
protected File indexDirectory;
/**
* The zipped document collection from MG4J (built during the indexing of the
* first token feature). This can be used to obtain the document text and to
* display the content of the hits.
*/
protected DocumentCollection documentCollection;
/**
* The thread used to clean-up GATE documents after they have been indexed.
*/
protected Thread maintenanceThread;
/**
* Background thread used to subtract occurrence counts for batches that have
* recently been dumped to disk.
*/
protected Thread maintenanceThread2;
protected volatile boolean closed = false;
/**
* A list of futures representing sync-to-disk operations currently
* in-progress in all of the sub-indexes.
*/
protected BlockingQueue<Future<Long>> syncRequests;
/**
* The set of IDs for the documents marked as deleted.
*/
private transient SortedSet<Long> deletedDocumentIds;
/**
* A timer used to execute various regular index maintenance tasks, such as
* the writing of deleted documents data to disk, and making sure regular
* dumps to disk are performed.
*/
private transient Timer maintenanceTimer;
/**
* The timer task used to top write to disk the deleted documents data.
* This value is non-null only when there is a pending write.
*/
private volatile transient WriteDeletedDocsTask writeDeletedDocsTask;
/**
* Timer task used to schedule regular dumps to disk making sure recent
* documents become searcheable after at most {@link #timeBetweenBatches} #
* milliseconds.
*/
private volatile transient SyncToDiskTask syncToDiskTask;
/**
* The token indexes, in the order they are listed in the {@link #indexConfig}.
*/
protected AtomicTokenIndex[] tokenIndexes;
/**
* The annotation indexes, in the order they are listed in the
* {@link #indexConfig}.
*/
protected AtomicAnnotationIndex[] mentionIndexes;
/**
* The {@link #tokenIndexes} and {@link #mentionIndexes} in one single array.
*/
protected AtomicIndex[] subIndexes;
protected int indexingQueueSize = DEFAULT_INDEXING_QUEUE_SIZE;
/**
* The total number of occurrences in all sub-indexes that have not yet been
* written to disk.
*/
protected volatile long occurrencesInRam;
/**
* The {@link QueryEngine} used to run searches on this index.
*/
protected QueryEngine queryEngine;
/**
* Creates a new Mímir index.
*
* @param indexConfig the configuration for the index.
* @throws IOException
* @throws IndexException
*/
public MimirIndex(IndexConfig indexConfig) throws IOException, IndexException {
this.indexConfig = indexConfig;
this.indexDirectory = this.indexConfig.getIndexDirectory();
openIndex();
// save the config for the new index
IndexConfig.writeConfigToFile(indexConfig, new File(indexDirectory,
INDEX_CONFIG_FILENAME));
}
/**
* Open and existing Mímir index.
* @param indexDirectory the on-disk directory containing the index to be
* opened.
* @throws IndexException if the index cannot be opened
* @throws IllegalArgumentException if an index cannot be found at the
* specified location.
* @throws IOException if the index cannot be opened.
*/
public MimirIndex(File indexDirectory ) throws IOException, IndexException {
if(!indexDirectory.isDirectory()) throw new IllegalArgumentException(
"No index found at " + indexDirectory);
File indexConfigFile = new File(indexDirectory, INDEX_CONFIG_FILENAME);
if(!indexConfigFile.canRead()) throw new IllegalArgumentException(
"Cannot read index config from " + indexConfigFile);
this.indexConfig = IndexConfig.readConfigFromFile(indexConfigFile,
indexDirectory);
this.indexDirectory = this.indexConfig.getIndexDirectory();
if(indexConfig.getFormatVersion() < 7){
throw new IndexException("The index at " + indexDirectory +
" uses too old a format and cannot be opened.");
}
openIndex();
}
/**
* Opens the index files, if any, prepares all the sub-indexers specified in
* the index config, and gets this index ready to start indexing documents and
* answer queries.
* @throws IOException
* @throws IndexException
*/
protected void openIndex() throws IOException, IndexException {
// ####################
// Prepare for indexing
// ####################
// read the index config and create the sub-indexers
TokenIndexerConfig tokConfs[] = indexConfig.getTokenIndexers();
tokenIndexes = new AtomicTokenIndex[tokConfs.length];
for(int i = 0; i < tokConfs.length; i++) {
String subIndexname = "token-" + i;
tokenIndexes[i] = new AtomicTokenIndex(
this,
subIndexname,
tokConfs[i].isDirectIndexEnabled(),
new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
new LinkedBlockingQueue<GATEDocument>(indexingQueueSize),
tokConfs[i],
i == 0);
}
SemanticIndexerConfig sics[] = indexConfig.getSemanticIndexers();
mentionIndexes = new AtomicAnnotationIndex[sics.length];
for(int i = 0; i < sics.length; i++) {
String subIndexname = "mention-" + i;
mentionIndexes[i] = new AtomicAnnotationIndex(
this,
subIndexname,
sics[i].isDirectIndexEnabled(),
new LinkedBlockingQueue<GATEDocument>(),
new LinkedBlockingQueue<GATEDocument>(),
sics[i]);
}
// construct the joint array of sub-indexes
subIndexes = new AtomicIndex[tokenIndexes.length + mentionIndexes.length];
System.arraycopy(tokenIndexes, 0, subIndexes, 0, tokenIndexes.length);
System.arraycopy(mentionIndexes, 0, subIndexes, tokenIndexes.length,
mentionIndexes.length);
occurrencesInRam = 0;
syncRequests = new LinkedBlockingQueue<Future<Long>>();
// #####################
// Prepare for searching
// #####################
readDeletedDocs();
// #####################
// Index maintenance
// #####################
// start the documents collector thread
maintenanceThread = new Thread(new IndexMaintenanceRunner(),
indexDirectory.getAbsolutePath() + " index maintenance");
maintenanceThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in background tread", e);
}
});
maintenanceThread.start();
// start the occurrences subtractor thread
maintenanceThread2 = new Thread(
new IndexMaintenanceRunner2(),
indexDirectory.getAbsolutePath() + " index maintenance 2");
maintenanceThread2.setPriority(Thread.MIN_PRIORITY);
maintenanceThread2.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in background tread", e);
}
});
maintenanceThread2.start();
// start the timer for regular sync-ing, and maintenance of the deleted docs
maintenanceTimer = new Timer("Mímir index maintenance timer");
synchronized(maintenanceTimer) {
syncToDiskTask = new SyncToDiskTask();
if(indexConfig.getTimeBetweenBatches() <= 0) {
indexConfig.setTimeBetweenBatches(IndexConfig.DEFAULT_TIME_BETWEEN_BATCHES);
}
maintenanceTimer.schedule(syncToDiskTask,
indexConfig.getTimeBetweenBatches(),
indexConfig.getTimeBetweenBatches());
}
// open the zipped document collection
documentCollection = new DocumentCollection(indexDirectory);
}
/**
* Queues a new document for indexing. The document will first go into the
* indexing queue, from where the various sub-indexes take their input. Once
* processed, the document data is stored in RAM until a sync-to-disk
* operation occurs. Only after that does the document become searchable.
*
* @param document the document to be indexed.
* @throws InterruptedException if the process of posting the new document
* to all the input queues is interrupted.
* @throws IllegalStateException if the index has already been closed.
*/
public void indexDocument(Document document) throws InterruptedException {
if(closed) throw new IllegalStateException("This index has been closed, "
+ "no further documents can be indexed.");
// check if we need to write a new batch:
// we have too many occurrences and
// there are no outstanding batch writing operations
if( occurrencesInRam > occurrencesPerBatch && syncRequests.isEmpty()) {
requestSyncToDisk();
}
GATEDocument gDocument = new GATEDocument(document, indexConfig);
synchronized(subIndexes) {
for(AtomicIndex aSubIndex: subIndexes){
aSubIndex.getInputQueue().put(gDocument);
}
}
}
/**
* Asks this index to write to disk all the index data currently stored in
* RAM so that it can become searchable. The work happens in several
* background threads (one for each sub-index) at the earliest opportunity.
* @return a list of futures that can be used to find out when the operation
* has completed.
* @throws InterruptedException if the current thread has been interrupted
* while trying to queue the sync request.
*/
public List<Future<Long>> requestSyncToDisk() throws InterruptedException {
List<Future<Long>> futures = new ArrayList<Future<Long>>();
if(syncRequests.isEmpty()) {
synchronized(subIndexes) {
for(AtomicIndex aSubIndex : subIndexes) {
Future<Long> task = aSubIndex.requestSyncToDisk();
futures.add(task);
syncRequests.put(task);
}
}
} else {
// sync already in progress: instead of causing a new sync, notify caller
// when current operation completes
futures.addAll(syncRequests);
}
return futures;
}
/**
* Asks each of the sub-indexes in this index to compact all their batches
* into a single index. This reduces the number of open file handles required.
* The work happens in several background threads (one for each sub-index) at
* the earliest opportunity.
*
* @return a list of futures (one for each sub-index) that can be used to find
* out when the operation has completed.
* @throws InterruptedException
* if the current thread has been interrupted while trying to queue
* the compaction request.
*/
public List<Future<Void>> requestCompactIndex() throws InterruptedException {
List<Future<Void>> futures = new ArrayList<Future<Void>>();
synchronized(subIndexes) {
for(AtomicIndex aSubIndex : subIndexes) {
futures.add(aSubIndex.requestCompactIndex());
}
}
return futures;
}
/**
* Requests that the {@link DocumentCollection} contained by this index is
* compacted. This method blocks until the compaction has completed.
*
* In normal operation, the index maintains the collection, which includes
* regular compactions, so there should be no reason to call this method.
*
* @throws ZipException
* @throws IOException
* @throws IndexException
*/
public void compactDocumentCollection() throws ZipException, IOException, IndexException {
documentCollection.compact();
}
/**
* Called by the first token indexer when a new document has been indexed
* to ask the main index to save the necessary zip collection data
* @param gDocument
* @throws IndexException
*/
public void writeZipDocumentData(DocumentData docData) throws IndexException {
documentCollection.writeDocument(docData);
}
/**
* Stops this index from accepting any further document for indexing, stops
* this index from accepting any more queries, finishes indexing all the
* currently queued documents, writes all the files to disk, after which it
* returns control to the calling thread.
* This may be a lengthy operation, depending on the amount of data that still
* needs to be written to disk.
*
* @throws InterruptedException
* @throws IOException
*/
public void close() throws InterruptedException, IOException {
if(closed) return;
closed = true;
// close the query engine
if(queryEngine != null) queryEngine.close();
// stop the indexing
synchronized(subIndexes) {
for(AtomicIndex aSubIndex : subIndexes) {
aSubIndex.getInputQueue().put(GATEDocument.END_OF_QUEUE);
}
}
synchronized(maintenanceTimer) {
// write the deleted documents set
if(writeDeletedDocsTask != null) {
writeDeletedDocsTask.cancel();
}
// explicitly call it one last time
new WriteDeletedDocsTask().run();
maintenanceTimer.cancel();
}
// wait for indexing to end
maintenanceThread.join();
syncRequests.put(NO_MORE_TASKS);
maintenanceThread2.join();
// close the document collection
documentCollection.close();
// write the config file
try {
IndexConfig.writeConfigToFile(indexConfig, new File(indexDirectory,
INDEX_CONFIG_FILENAME));
} catch(IOException e) {
throw new GateRuntimeException("Could not save the index configuration!",
e);
}
logger.info("Index shutdown complete");
}
/**
* Gets the {@link IndexConfig} value for this index.
* @return
*/
public IndexConfig getIndexConfig() {
return indexConfig;
}
/**
* Returns the {@link QueryEngine} instance that can be used to post queries
* to this index. Each index holds one single query engine, so the same value
* will always be returned by repeated calls.
* @return
*/
public QueryEngine getQueryEngine() {
if(queryEngine == null) {
queryEngine = new QueryEngine(this);
}
return queryEngine;
}
/**
* Gets the top level directory for this index.
* @return
*/
public File getIndexDirectory() {
return indexDirectory;
}
/**
* Gets the current estimated number of occurrences in RAM. An occurrence
* represents one term (either a token or an annotation) occurring in an
* indexed document. This value can be used as a good measurement of the total
* amount of data that is currently being stored in RAM and waiting to be
* synced to disk.
* @return
*/
public long getOccurrencesInRam() {
return occurrencesInRam;
}
/**
* Returns the size of the indexing queue. See
* {@link #setIndexingQueueSize(int)} for more comments.
* @return
*/
public int getIndexingQueueSize() {
return indexingQueueSize;
}
/**
* Sets the size of the indexing queue(s) used by this index.
* Documents submitted for indexing are held in a queue until the indexers
* become ready to process them. One queue is used for each of the
* sub-indexes. A larger queue size can smooth out bursts of activity, but
* requires more memory (as a larger number of documents may need to be stored
* at the same time). A smaller value is more economical, but it can leads to
* slow-downs when certain documents take too long to index, and can clog up
* the queue. Defaults to {@value #DEFAULT_INDEXING_QUEUE_SIZE}.
* @param indexingQueueSize
*/
public void setIndexingQueueSize(int indexingQueueSize) {
this.indexingQueueSize = indexingQueueSize;
}
/**
* Gets the number of occurrences that should be used as a trigger for a sync
* to disk operation, leading to the creation of a new index batch.
* @return
*/
public long getOccurrencesPerBatch() {
return occurrencesPerBatch;
}
/**
* Sets the number of occurrences that should trigger a sync-to-disk operation
* leading to a new batch being created from the data previously stored in
* RAM.
*
* An occurrence represents one term (either a token or an annotation)
* occurring in an indexed document. This value can be used as a good
* measurement of the total amount of data that is currently being stored in
* RAM and waiting to be synced to disk.
*
* @param occurrencesPerBatch
*/
public void setOccurrencesPerBatch(long occurrencesPerBatch) {
this.occurrencesPerBatch = occurrencesPerBatch;
}
/**
* Gets the time interval (in milliseconds) between sync-to-disk operations.
* This is approximately the maximum amount of time that a document can spend
* being stored in RAM (and thus not searchable) after having been submitted
* for indexing. The measurement is not precise because of the time spent by
* the document in the indexing queue (after being received but before being
* processed) and the time take to write a new index batch to disk.
*
* @return
*/
public int getTimeBetweenBatches() {
return getIndexConfig().getTimeBetweenBatches();
}
/**
* Sets the time interval (in milliseconds) between sync-to-disk operations.
* This is approximately the maximum amount of time that a document can spend
* being stored in RAM (and thus not searchable) after having been submitted
* for indexing. The measurement is not precise because of the time spent by
* the document in the indexing queue (after being received but before being
* processed) and the time take to write a new index batch to disk.
*
* @return
*/
public void setTimeBetweenBatches(int timeBetweenBatches) {
if(indexConfig.getTimeBetweenBatches() != timeBetweenBatches) {
indexConfig.setTimeBetweenBatches(timeBetweenBatches);
synchronized(maintenanceTimer) {
if(syncToDiskTask != null) {
syncToDiskTask.cancel();
}
syncToDiskTask = new SyncToDiskTask();
maintenanceTimer.schedule(syncToDiskTask, timeBetweenBatches,
timeBetweenBatches);
}
}
}
/**
* Gets the {@link DocumentCollection} instance used by this index. The
* document collection is normally fully managed by the index, so there should
* be no need to access it directly through this method.
*
* @return
*/
public DocumentCollection getDocumentCollection() {
return documentCollection;
}
/**
* Gets the total number of documents currently searcheable
* @return
*/
public long getIndexedDocumentsCount() {
if(subIndexes != null && subIndexes.length > 0 &&
subIndexes[0].getIndex() != null){
return subIndexes[0].getIndex().numberOfDocuments;
} else {
return 0;
}
}
/**
* Gets the {@link DocumentData} for a given document ID, from the on disk
* document collection. In memory caching is performed to reduce the cost of
* this call.
* @param documentID
* the ID of the document to be obtained.
* @return the {@link DocumentData} associated with the given document ID.
* @throws IOException
*/
public synchronized DocumentData getDocumentData(long documentID)
throws IndexException, IOException {
if(isDeleted(documentID)) {
throw new IndexException("Invalid document ID " + documentID);
}
return documentCollection.getDocumentData(documentID);
}
/**
* Gets the size (number of tokens) for a document.
* @param documentId the document being requested.
*
* @return
*/
public int getDocumentSize(long documentId) {
return tokenIndexes[0].getIndex().sizes.get(documentId);
}
/**
* Marks a given document (identified by its ID) as deleted. Deleted documents
* are never returned as search results.
* @param documentId
*/
public void deleteDocument(long documentId) {
if(deletedDocumentIds.add(documentId)) {
writeDeletedDocsLater();
}
}
/**
* Marks the given batch of documents (identified by ID) as deleted. Deleted
* documents are never returned as search results.
* @param documentIds
*/
public void deleteDocuments(Collection<? extends Number> documentIds) {
List<Long> idsToDelete = new ArrayList<Long>(documentIds.size());
for(Number n : documentIds) {
idsToDelete.add(Long.valueOf(n.longValue()));
}
if(deletedDocumentIds.addAll(idsToDelete)) {
writeDeletedDocsLater();
}
}
/**
* Checks whether a given document (specified by its ID) is marked as deleted.
* @param documentId
* @return
*/
public boolean isDeleted(long documentId) {
return deletedDocumentIds.contains(documentId);
}
/**
* Mark the given document (identified by ID) as <i>not</i> deleted. Calling
* this method for a document ID that is not currently marked as deleted has
* no effect.
*/
public void undeleteDocument(long documentId) {
if(deletedDocumentIds.remove(documentId)) {
writeDeletedDocsLater();
}
}
/**
* Mark the given documents (identified by ID) as <i>not</i> deleted. Calling
* this method for a document ID that is not currently marked as deleted has
* no effect.
*/
public void undeleteDocuments(Collection<? extends Number> documentIds) {
List<Long> idsToUndelete = new ArrayList<Long>(documentIds.size());
for(Number n : documentIds) {
idsToUndelete.add(Long.valueOf(n.longValue()));
}
if(deletedDocumentIds.removeAll(idsToUndelete)) {
writeDeletedDocsLater();
}
}
/**
* Writes the set of deleted document to disk in a background thread, after a
* short delay. If a previous request has not started yet, this new request
* will replace it.
*/
protected void writeDeletedDocsLater() {
synchronized(maintenanceTimer) {
if(writeDeletedDocsTask != null) {
writeDeletedDocsTask.cancel();
}
writeDeletedDocsTask = new WriteDeletedDocsTask();
maintenanceTimer.schedule(writeDeletedDocsTask, 1000);
}
}
/**
* Reads the list of deleted documents from disk.
*/
@SuppressWarnings("unchecked")
protected synchronized void readDeletedDocs() throws IOException{
deletedDocumentIds = Collections.synchronizedSortedSet(
new TreeSet<Long>());
File delFile = new File(indexDirectory, DELETED_DOCUMENT_IDS_FILE_NAME);
if(delFile.exists()) {
try {
ObjectInputStream ois = new ObjectInputStream(
new GZIPInputStream(
new BufferedInputStream(
new FileInputStream(delFile))));
// an old index will have saved a Set<Integer>, a new one will be
// Set<Long>
Set<? extends Number> savedSet = (Set<? extends Number>)ois.readObject();
for(Number n : savedSet) {
deletedDocumentIds.add(Long.valueOf(n.longValue()));
}
} catch(ClassNotFoundException e) {
// this should never happen
throw new RuntimeException(e);
}
}
}
/**
* Returns the {@link AtomicTokenIndex} responsible for indexing a particular
* feature on token annotations.
*
* @param featureName
* @return
*/
public AtomicTokenIndex getTokenIndex(String featureName) {
if(featureName == null) {
// return the default token index
return tokenIndexes[0];
} else {
for(int i = 0; i < indexConfig.getTokenIndexers().length; i++) {
if(indexConfig.getTokenIndexers()[i].getFeatureName().equals(featureName)) {
return tokenIndexes[i];
}
}
}
return null;
}
/**
* Returns the {@link AtomicAnnotationIndex} instance responsible for indexing
* annotations of the type specified.
*
* @param annotationType
* @return
*/
public AtomicAnnotationIndex getAnnotationIndex(String annotationType) {
for(int i = 0; i < indexConfig.getSemanticIndexers().length; i++) {
for(String aType :
indexConfig.getSemanticIndexers()[i].getAnnotationTypes()) {
if(aType.equals(annotationType)) {
return mentionIndexes[i];
}
}
}
return null;
}
}