FederatedQueryRunner.java
/*
* FederatedQueryRunner.java
*
* Copyright (c) 2007-2011, The University of Sheffield.
*
* This file is part of GATE MÃmir (see http://gate.ac.uk/family/mimir.html),
* and is free software, licenced under the GNU Lesser General Public License,
* Version 3, June 2007 (also included with this distribution as file
* LICENCE-LGPL3.html).
*
* Ian Roberts, 15 Dec 2011
*
* $Id: FederatedQueryRunner.java 16414 2012-12-10 10:54:47Z valyt $
*/
package gate.mimir.search;
import gate.mimir.index.IndexException;
import gate.mimir.search.query.Binding;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntBigArrayBigList;
import it.unimi.dsi.fastutil.ints.IntBigList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
import it.unimi.dsi.fastutil.longs.LongBigList;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link QueryRunner} that presents a set of sub-indexes (represented by
* their own QueryRunners) as a single index.
*/
public class FederatedQueryRunner implements QueryRunner {
private static final Logger log = LoggerFactory.getLogger(FederatedQueryRunner.class);
/**
* The total number of result documents (or -1 if not yet known).
*/
private long documentsCount = -1;
/**
* The query runners for the sub-indexes.
*/
protected QueryRunner[] subRunners;
/**
* The next rank that needs to be merged from each sub runner.
*/
protected long[] nextSubRunnerRank;
/**
* For each result document rank, this list supplies the index for the
* sub-runner that supplied the document.
*/
protected IntBigList rank2runnerIndex;
/**
* Which of the sub-runners has provided the previous document. This is an
* instance field so that we can rotate the sub-runners (when the scores are
* equal)
*/
private int bestSubRunnerIndex = -1;
/**
* For each result document rank, this list supplies the rank of the document
* in sub-runner that supplied it.
*/
protected LongBigList rank2subRank;
public FederatedQueryRunner(QueryRunner[] subrunners) {
this.subRunners = subrunners;
this.nextSubRunnerRank = null;
this.rank2runnerIndex = new IntBigArrayBigList();
this.rank2subRank = new LongBigArrayBigList();
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentsCount()
*/
@Override
public long getDocumentsCount() {
if(documentsCount < 0) {
long newDocumentsCount = 0;
for(QueryRunner subRunner : subRunners) {
long subDocumentsCount = subRunner.getDocumentsCount();
if(subDocumentsCount < 0) {
return -1;
} else {
newDocumentsCount += subDocumentsCount;
}
}
synchronized(this) {
// initialize the nextSubRunnerRank array
nextSubRunnerRank = new long[subRunners.length];
for(int i = 0; i < nextSubRunnerRank.length; i++) {
if(subRunners[i].getDocumentsCount() == 0) {
nextSubRunnerRank[i] = -1;
}
}
documentsCount = newDocumentsCount;
}
}
return documentsCount;
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentsCountSync()
*/
@Override
public long getDocumentsCountSync() {
for(QueryRunner subRunner : subRunners) {
subRunner.getDocumentsCountSync();
}
return getDocumentsCount();
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getCurrentDocumentsCount()
*/
@Override
public long getDocumentsCurrentCount() {
if(documentsCount >= 0) {
return documentsCount;
} else {
int newDocumentsCount = 0;
for(QueryRunner subRunner : subRunners) {
newDocumentsCount += subRunner.getDocumentsCurrentCount();
}
return newDocumentsCount;
}
}
/**
* Ensure that the given rank is resolved to the appropriate sub-runner rank.
* @throws IndexOutOfBoundsException if rank is beyond the last document.
*/
private final synchronized void checkRank(long rank) throws IndexOutOfBoundsException, IOException {
if(rank < 0) throw new IndexOutOfBoundsException(
"Document rank " + rank + " is negative.");
long maxRank = getDocumentsCount();
if(rank >= maxRank) throw new IndexOutOfBoundsException(
"Document rank too large (" + rank + " >= " + maxRank + ").");
// quick check to see if we need to do anything else
if(rank < rank2runnerIndex.size64()) {
return;
}
for(long nextRank = rank2runnerIndex.size64(); nextRank <= rank; nextRank++) {
boolean allOut = true;
// start with the runner next the previously chosen one
bestSubRunnerIndex = (bestSubRunnerIndex + 1) % subRunners.length;
double maxScore = Double.NEGATIVE_INFINITY;
if(nextSubRunnerRank[bestSubRunnerIndex] >= 0) {
maxScore = subRunners[bestSubRunnerIndex].getDocumentScore(
nextSubRunnerRank[bestSubRunnerIndex]);
allOut = false;
}
// now check all remaining runners, in round-robin fashion
final int from = bestSubRunnerIndex + 1;
final int to = bestSubRunnerIndex + subRunners.length;
for(int bigI = from; bigI < to; bigI++) {
int i = bigI % subRunners.length;
if(nextSubRunnerRank[i] >= 0) {
allOut = false;
if(subRunners[i].getDocumentScore(nextSubRunnerRank[i]) > maxScore) {
bestSubRunnerIndex = i;
maxScore = subRunners[i].getDocumentScore(nextSubRunnerRank[i]);
}
}
}
if(allOut) {
// we ran out of docs
throw new IndexOutOfBoundsException("Requested rank was " + rank +
" but ran out of documents at " + nextRank + "!");
}
// consume the next doc from bestSubRunnerIndex
rank2runnerIndex.add(bestSubRunnerIndex);
rank2subRank.add(nextSubRunnerRank[bestSubRunnerIndex]);
if(nextSubRunnerRank[bestSubRunnerIndex] <
subRunners[bestSubRunnerIndex].getDocumentsCount() -1) {
nextSubRunnerRank[bestSubRunnerIndex]++;
} else {
// this runner has run out of documents
nextSubRunnerRank[bestSubRunnerIndex] = -1;
}
}
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentID(int)
*/
@Override
public long getDocumentID(long rank) throws IndexOutOfBoundsException,
IOException {
checkRank(rank);
long subId = subRunners[rank2runnerIndex.getInt(rank)].getDocumentID(
rank2subRank.getLong(rank));
return subId * subRunners.length + rank2runnerIndex.getInt(rank);
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentScore(int)
*/
@Override
public double getDocumentScore(long rank) throws IndexOutOfBoundsException,
IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentScore(
rank2subRank.getLong(rank));
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentHits(int)
*/
@Override
public List<Binding> getDocumentHits(long rank)
throws IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentHits(
rank2subRank.getLong(rank));
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentText(int, int, int)
*/
@Override
public String[][] getDocumentText(long rank, int termPosition, int length)
throws IndexException, IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentText(
rank2subRank.getLong(rank), termPosition, length);
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentURI(int)
*/
@Override
public String getDocumentURI(long rank) throws IndexException,
IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentURI(
rank2subRank.getLong(rank));
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentTitle(int)
*/
@Override
public String getDocumentTitle(long rank) throws IndexException,
IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentTitle(
rank2subRank.getLong(rank));
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentMetadataField(int, java.lang.String)
*/
@Override
public Serializable getDocumentMetadataField(long rank, String fieldName)
throws IndexException, IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataField(
rank2subRank.getLong(rank), fieldName);
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#getDocumentMetadataFields(int, java.util.Set)
*/
@Override
public Map<String, Serializable> getDocumentMetadataFields(long rank,
Set<String> fieldNames)
throws IndexException, IndexOutOfBoundsException, IOException {
checkRank(rank);
return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataFields(
rank2subRank.getLong(rank), fieldNames);
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#renderDocument(int, java.lang.Appendable)
*/
@Override
public void renderDocument(long rank, Appendable out) throws IOException,
IndexException {
checkRank(rank);
subRunners[rank2runnerIndex.getInt(rank)].renderDocument(
rank2subRank.getLong(rank), out);
}
/* (non-Javadoc)
* @see gate.mimir.search.QueryRunner#close()
*/
@Override
public void close() throws IOException {
for(QueryRunner r : subRunners) {
try{
r.close();
} catch (Throwable t) {
log.error("Error while closing sub-runner", t);
}
}
}
}