RankingQueryRunnerImpl.java

/*
 *  RankingQueryRunnerImpl.java
 *
 *  Copyright (c) 1995-2010, The University of Sheffield. See the file
 *  COPYRIGHT.txt in the software or at http://gate.ac.uk/gate/COPYRIGHT.txt
 *
 *  This file is part of GATE (see http://gate.ac.uk/), and is free
 *  software, licenced under the GNU Library General Public License,
 *  Version 2, June 1991 (in the distribution as file licence.html,
 *  and also available at http://gate.ac.uk/gate/licence.html).
 *
 *  Valentin Tablan, 16 Nov 2011
 *
 *  $Id: RankingQueryRunnerImpl.java 18271 2014-08-21 13:40:10Z ian_roberts $
 */
package gate.mimir.search;

import gate.mimir.index.IndexException;
import gate.mimir.search.query.Binding;
import gate.mimir.search.query.QueryExecutor;
import gate.mimir.search.query.QueryNode;
import gate.mimir.search.score.MimirScorer;
import it.unimi.dsi.fastutil.doubles.DoubleBigArrayBigList;
import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectBigArrayBigList;
import it.unimi.dsi.fastutil.objects.ObjectBigList;
import it.unimi.dsi.fastutil.objects.ObjectList;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A QueryRunner implementation that can perform ranking.
 * This query runner has two modes of functioning: ranking and non-ranking, 
 * depending on whether a {@link MimirScorer} is provided  during construction
 * or not.
 * All documents are referred to using their rank (i.e. position in the list of 
 * results). When working in non-ranking mode, ranking order is the same as 
 * document ID order.
 */
public class RankingQueryRunnerImpl implements QueryRunner {
  
  
  /**
   * Constant used as a flag to mark then of a list of tasks.
   */
  private static final Runnable NO_MORE_TASKS = new Runnable(){ 
    public void run() {}
  };
  
  /**
   * The background thread implementation: simply collects {@link Runnable}s 
   * from the {@link RankingQueryRunnerImpl#backgroundTasks} queue and runs them. 
   */
  protected class BackgroundRunner implements Runnable {
    @Override
    public void run() {
      try {
        while(!closed) {
          Runnable job = backgroundTasks.take();
          if(job == NO_MORE_TASKS) break;
          else  job.run();
        }
      } catch(InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }
   
  
  /**
   * Collects the document hits (i.e. {@link Binding}s) for the documents 
   * between the two provided ranks (indexes in the {@link #documentsOrder} 
   * list. If ranking is not being performed ( {@link #documentsOrder} is
   * <code>null</null>, then the indexes are used against the 
   * {@link #documentIds} list.
   * 
   * This is the only actor that writes to the {@link #documentHits} list.
   */  
  protected class HitsCollector implements Runnable {
    /**
     * The starting rank
     */
    long start;
    
    /**
     * The ending rank
     */
    long end;
    
    public HitsCollector(long rangeStart, long rangeEnd) {
      this.start = rangeStart;
      this.end = rangeEnd;
    }
    
    @Override
    public void run() {
      long[] documentIndexes = null;
      if(ranking) {
        // we're ranking -> first calculate the range of documents in ID order
        documentIndexes = new long[(int)(end - start)];
        for(long i = start; i < end; i++) {
          documentIndexes[(int)(i - start)] = documentsOrder.getLong(i);
        }
        Arrays.sort(documentIndexes);
      }
      
      try {
        // see if we can get at the first document
        long docIndex = (documentIndexes != null ? documentIndexes[0] : start);
        long docId = documentIds.getLong(docIndex);
        if(queryExecutor.getLatestDocument() < 0 ||
           queryExecutor.getLatestDocument() >= docId) {
          // we need to 'scroll back' the executor: get a new executor
          QueryExecutor oldExecutor = queryExecutor;
          queryExecutor = queryExecutor.getQueryNode().getQueryExecutor(
                  queryEngine);
          oldExecutor.close();
        }
        for(long i = start; i < end; i++) {
          docIndex = (documentIndexes != null ? 
              documentIndexes[(int)(i - start)] : i);
          docId = documentIds.getLong(docIndex);
          // don't need to check for deletion here as we know for sure that this
          // doc ID is ok.  The only exception would be if it was deleted since
          // this query was originally issued, but I think we can live with that
          long newDoc = queryExecutor.nextDocument(docId - 1);
          // sanity check
          if(newDoc == docId) {
            List<Binding> hits = new ObjectArrayList<Binding>();
            Binding aHit = queryExecutor.nextHit();
            while(aHit != null) {
              hits.add(aHit);
              aHit = queryExecutor.nextHit();
            }
            documentHits.set(docIndex, hits);
          } else {
            // this could happen if we've been closed in the mean time
            if(closed) return;
            // we got the wrong document ID
            logger.error("Unexpected document ID returned by executor " +
            		"(got " + newDoc + " while expecting " + docId + "!");
          }
        }
      } catch(IOException e) {
        // this could happen if we've been closed in the mean time
        if(closed) return;
        // otherwise, it's an error
        logger.error("Exception while restarting the query executor.", e);
        try {
          close();
        } catch(IOException e1) {
          logger.error("Exception while closing the query runner.", e1);
        }
      }
    }
  }
  
  
  /**
   * The first action started when a new {@link RankingQueryRunnerImpl} is 
   * created. It performs the following actions:
   * <ul>
   *   <li>collects all document IDs in 
   *   {@link RankingQueryRunnerImpl#documentIds}</li>
   *   <li>if ranking enabled
   *     <ul>
   *     <li>it collects all document scores
   *     </ul>
   *   </li>  
   *   <li>if ranking not enabled
   *     <ul>
   *       <li>it collects the document hits for the first 
   *       block of documents</li>
   *     </ul>
   *   </li>
   *   <li>If ranking enabled, after all document IDs are obtained, it starts 
   *   the work for ranking the first block of documents (which, upon 
   *   completion, will also start a background job to collect all the hits for 
   *   that block).</li>  
   * </ul>
   */
  protected class DocIdsCollector implements Runnable {
    @Override
    public void run() {
      try{
        // collect all documents and their scores
        if(ranking) scorer.wrap(queryExecutor);
        long docId = nextNotDeleted();
        while(docId >= 0) {
          // enlarge the hits list
          if(ranking){
            documentScores.add(scorer.score());
            documentHits.add(null);
          } else {
            // not scoring: also collect the hits for the first block of documents
            if(docId < docBlockSize) {
              ObjectList<Binding> hits = new ObjectArrayList<Binding>();
              Binding hit = queryExecutor.nextHit();
              while(hit != null) {
                hits.add(hit);
                hit = queryExecutor.nextHit();
              }
              documentHits.add(hits);
            } else {
              documentHits.add(null);
            }
          }
          // and store the new doc ID
          documentIds.add(docId);
          docId = nextNotDeleted();
        }
        allDocIdsCollected = true;
        if(ranking) {
          // now rank the first batch of documents
          // this will also start a second background job to collect the hits
          rankDocuments(docBlockSize -1);
        }
      } catch (Exception e) {
        // this could happen if we've been closed in the mean time
        if(closed) return;
        // otherwise, it's an error
        logger.error("Exception while collecting document IDs", e);
        try {
          close();
        } catch(IOException e1) {
          logger.error("Exception while closing, after exception.", e1);
        }
      }
    }
  }
  
  /**
   * Shared logger instance.
   */
  protected static Logger logger =  LoggerFactory.getLogger(RankingQueryRunnerImpl.class);
  
  /**
   * The {@link QueryExecutor} for the query being run.
   */
  protected QueryExecutor queryExecutor;
  
  /**
   * The QueryEngine we run inside.
   */
  protected QueryEngine queryEngine;
  
  /**
   * The {@link MimirScorer} to be used for ranking documents.
   */
  protected MimirScorer scorer;
  
  /**
   * Flag set to <code>true</code> when ranking is being performed, or 
   * <code>false</code> otherwise.
   */
  final boolean ranking;

  /**
   * The number of documents to be ranked (of have their hits collected) as a 
   * block.
   */
  protected int docBlockSize;
  
  /**
   * The document IDs for the documents found to contain hits. This list is
   * sorted in ascending documentID order.
   */
  protected LongBigList documentIds;
  
  /**
   * If scoring is enabled ({@link #scorer} is not <code>null</code>), this list
   * contains the scores for the documents found to contain hits. This list is 
   * aligned to {@link #documentIds}.   
   */
  protected DoubleBigArrayBigList documentScores;
  
  /**
   * The sets of hits for each returned document. This data structure is lazily 
   * built, so some elements may be null. This list is aligned to 
   * {@link #documentIds}.   
   */
  protected ObjectBigList<List<Binding>> documentHits;

  /**
   * The order the documents should be returned in (elements in this list are 
   * indexes in {@link #documentIds}).
   */
  protected LongBigList documentsOrder;
  
  /**
   * Data structure holding references to {@link Future}s that are currently 
   * working (or have worked) on collecting hits for a range of document 
   * indexes.
   */
  protected SortedMap<long[], Future<?>> hitCollectors;
  
  /**
   * The background thread used for collecting hits.
   */
  protected Thread runningThread;
  
  /**
   * A queue with tasks to be executed by the background thread. 
   */
  protected BlockingQueue<Runnable> backgroundTasks;
  
  /**
   * Flag used to mark that all results documents have been counted.
   */
  protected volatile boolean allDocIdsCollected = false;
  
  /**
   * The task that's working on collecting all the document IDs. When this 
   * activity has finished, the precise documents count is known.
   */
  protected volatile FutureTask<Object> docIdCollectorFuture;
  
  /**
   * Internal flag used to mark when this query runner has been closed.
   */
  protected volatile boolean closed;
  
  /**
   * Creates a query runner in ranking mode.
   * @param qNode the {@link QueryNode} for the query being executed.
   * @param scorer the {@link MimirScorer} to use for ranking.
   * @param qEngine the {@link QueryEngine} used for executing the queries.
   * @throws IOException
   */
  public RankingQueryRunnerImpl(QueryExecutor executor, MimirScorer scorer) throws IOException {
    this.queryExecutor = executor;
    this.scorer = scorer;
    this.closed = false;
    ranking = scorer != null;
    queryEngine = queryExecutor.getQueryEngine();
    docBlockSize = queryEngine.getDocumentBlockSize();
    documentIds = new LongBigArrayBigList();
    documentHits = new ObjectBigArrayBigList<List<Binding>>();
    if(scorer != null) {
      documentScores = new DoubleBigArrayBigList();
      documentsOrder = new LongBigArrayBigList(docBlockSize);
    }
    hitCollectors = new Object2ObjectAVLTreeMap<long[], Future<?>>(
        new Comparator<long[]>(){
          @Override
          public int compare(long[] o1, long[] o2) {
            long res = o1[0] - o2[0]; 
            return res > 0 ? 1 : (res == 0 ? 0 : -1); 
          }
        });
    // start the background thread
    backgroundTasks = new LinkedBlockingQueue<Runnable>();
    Runnable backgroundRunner = new BackgroundRunner();
    //get a thread from the executor, if one exists
    if(queryEngine.getExecutor() != null){
      try {
        queryEngine.getExecutor().execute(backgroundRunner);
      } catch(RejectedExecutionException e) {
        logger.warn("Could not allocate a new background thread", e);
        throw new RejectedExecutionException(
          "System overloaded, please try again later."); 
      }
    }else{
      Thread theThread = new Thread(backgroundRunner, getClass().getName());
      theThread.setDaemon(true);
      theThread.start();
    }

    // queue a job for collecting all document ids
    try {
      docIdCollectorFuture = new FutureTask<Object>(new DocIdsCollector(), null);
      backgroundTasks.put(docIdCollectorFuture);
      if(!ranking) {
        // if not ranking, the doc IDs collector will all collect the
        // hits for the first docBlockSize number of documents
        synchronized(hitCollectors) {
          hitCollectors.put(new long[]{0, docBlockSize}, docIdCollectorFuture);
        }
      }
    } catch(InterruptedException e) {
      Thread.currentThread().interrupt();
      logger.error("Could not queue a background task.", e);
    }
  }
  
  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentsCount()
   */
  @Override
  public long getDocumentsCount() {
    if(allDocIdsCollected) return documentIds.size64();
    else return -1;
  }
  
  /**
   * Synchronous version of {@link #getDocumentsCount()} that waits if necessary
   * before returning the correct result (instead of returning  <code>-1</code>
   * of the value is not yet known).
   * @return the total number of documents found to match the query.
   */
  @Override
  public long getDocumentsCountSync() {
    try {
      docIdCollectorFuture.get();
    } catch(Exception e) {
      logger.error("Exception while getting all document IDs", e);
      throw new IllegalStateException(
        "Exception while getting all document IDs", e);
    }
    return getDocumentsCount();
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getCurrentDocumentsCount()
   */
  @Override
  public long getDocumentsCurrentCount() {
    return documentIds.size64();
  }
  
  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentID(int)
   */
  @Override
  public long getDocumentID(long rank) throws IndexOutOfBoundsException, IOException {
    return documentIds.getLong(getDocumentIndex(rank));
  }
  
  @Override
  public double getDocumentScore(long rank) throws IndexOutOfBoundsException, IOException {
    return (documentScores != null) ? 
        documentScores.getDouble(getDocumentIndex(rank)) : 
        DEFAULT_SCORE;
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentHits(int)
   */
  @Override
  public List<Binding> getDocumentHits(long rank) throws IndexOutOfBoundsException, IOException {
    long documentIndex = getDocumentIndex(rank);
    List<Binding> hits = documentHits.get(documentIndex);
    if(hits == null) {
      // hits not collected yet
      try {
        // find the Future working on it, or start a new one, 
        // then wait for it to complete
        collectHits(new long[]{rank, rank + 1}).get();
        hits = documentHits.get(documentIndex);
      } catch(Exception e) {
        logger.error("Exception while waiting for hits collection", e);
        throw new RuntimeException(
          "Exception while waiting for hits collection", e); 
      }
    }
    return hits;
  }
  
  /**
   * Given a document rank, return its index in the {@link #documentIds} list.
   * If ranking is not being performed, then the rank is interpreted as an index 
   * against the {@link #documentIds} list and is simply returned. 
   * @param rank
   * @return
   * @throws IOException, IndexOutOfBoundsException 
   */
  protected long getDocumentIndex(long rank) throws IOException, 
      IndexOutOfBoundsException {
    long maxRank = documentIds.size64();
    if(rank >= maxRank) throw new IndexOutOfBoundsException(
      "Document rank too large (" + rank + " > " + maxRank + ".");
    if(documentsOrder != null) {
      // we're in ranking mode
      if(rank >= documentsOrder.size64()) {
        // document exists, but has not been ranked yet
        rankDocuments(rank);
      }
      return documentsOrder.getLong(rank);
    } else {
      return rank;
    }
  }
  
  /**
   * Ranks some more documents (i.e. adds more entries to the 
   * {@link #documentsOrder} list, making sure that the document at provided 
   * rank is included (if such a document exists). If the provided rank is 
   * larger than the number of result documents, then all documents will be
   * ranked before this method returns. 
   * This is the only method that writes to the {@link #documentsOrder} list.
   * This method is executed synchronously in the client thread.
   *  
   * @param rank
   * @throws IOException 
   */
  protected void rankDocuments(long rank) throws IOException {
    if(rank < documentsOrder.size64()) return;
    synchronized(documentsOrder) {
      // rank some documents
      long rankRangeStart = documentsOrder.size64();
      // right boundary is exclusive
      long rankRangeEnd = rank + 1;
      if((rankRangeEnd - rankRangeStart) < (docBlockSize)) {
        // extend the size of the chunk of documents to be ranked
        rankRangeEnd = rankRangeStart + docBlockSize; 
      }
      // the document with the minimum score already ranked.
      long smallestOldScoreDocId = rankRangeStart > 0 ? 
        documentIds.getLong(documentsOrder.getLong(rankRangeStart -1))
        : -1;
      // the score for the document above, which is a the upper limit for new scores
      double smallestOldScore = rankRangeStart > 0 ? 
          documentScores.getDouble(documentsOrder.getLong(rankRangeStart -1))
          : Double.POSITIVE_INFINITY;
      // now collect some more documents
      for(long i = 0; i < documentIds.size64(); i++) {
        long documentId = documentIds.getLong(i);
        double documentScore = documentScores.getDouble(i);
        // the index for the document with the smallest score, 
        // from the new ones being ranked 
        long smallestDocIndex = rankRangeStart < documentsOrder.size64() ?
            documentsOrder.getLong(rankRangeStart) : -1;
        // the smallest score that's been seen in this new round 
        double smallestNewScore = smallestDocIndex == -1 ? Double.NEGATIVE_INFINITY : 
            documentScores.getDouble(smallestDocIndex);
        // we care about this new document if:
        // - we haven't collected enough documents yet, or
        // - it has a better score than the smallest score so far, but a 
        // smaller score than the maximum permitted score (i.e. it has not 
        // already been ranked)., or
        // - it's a new document (i.e. with an ID strictly larger) with the same 
        // score as the largest permitted score
        if(documentsOrder.size64() < rankRangeEnd
           || 
           (documentScore > smallestNewScore && documentScore < smallestOldScore) 
           ||
           (documentScore == smallestOldScore && documentId > smallestOldScoreDocId)
           ) {
          // find the rank for the new doc in the documentsOrder list, and insert
          documentsOrder.add(findRank(documentScore, rankRangeStart, 
              documentsOrder.size64()), i);
          // if we have too many documents, drop the lowest scoring one
          if(documentsOrder.size64() > rankRangeEnd) {
            documentsOrder.removeLong(documentsOrder.size64() - 1);
          }          
        }
      }
      // start collecting the hits for the newly ranked documents (in a new thread)
      if(documentsOrder.size64() > rankRangeStart){
        collectHits(new long[] {rankRangeStart, documentsOrder.size64()});
      }
    }
  }
  
  /**
   * Given a document score, finds the correct insertion point into the 
   * {@link #documentsOrder} list, within a given range of ranks.
   * This method performs binary search followed by a linear scan so that the 
   * returned insertion point is the largest correct one (i.e. later documents 
   * with the same score get sorted after earlier ones, thus keeping the sorting
   * stable).
   *      
   * @param documentScore the score for the new document.
   * @param start the start of the search range within {@link #documentsOrder} 
   * @param end the end of the search range within {@link #documentsOrder} 
   * @return the largest correct insertion point
   */
  protected long findRank(double documentScore, long start, long end) {
    // standard binary search
    double midVal;
    end--;
    while (start <= end) {
     long mid = (start + end) >>> 1;
     midVal = documentScores.getDouble(documentsOrder.getLong(mid));
     // note that the documentOrder list is in decreasing score order!
     if (midVal > documentScore) start = mid + 1;
     else if (midVal < documentScore) end = mid - 1;
     else {
       // we found a doc with exactly the same score: scan to the right
       while(documentsOrder.size64() < mid && 
             documentScores.getDouble(documentsOrder.getLong(mid)) == 
           documentScore){
         mid++;
       }
       return mid;
     }
    }
    return start;
  }
  
  /**
   * Makes sure all the documents in the specified range are queued for hit 
   * collection. 
   * @param interval the interval specified by 2 document ranks. The interval is
   * defined as the elements in {@link #documentsOrder} between ranks 
   * interval[0] and (interval[1]-1) inclusive. 
   * @return the future that has been queued for collecting the hits.
   */
  protected Future<?> collectHits(long[] interval) {
    // expand the interval to block size (or size of documentsOrder)
    if(interval[1] - interval[0] < docBlockSize) {
      final long expansion = docBlockSize - (interval[1] - interval[0]);
      // expand up to (expansion / 2) to the left
      interval[0] = Math.max(0, interval[0] - (expansion / 2));
      // expand to the right
      long upperBound = documentsOrder != null ? 
          documentsOrder.size64() : documentIds.size64();
      interval[1] = Math.min(upperBound, interval[0] + docBlockSize);
    }
    HitsCollector hitsCollector = null;
    synchronized(hitCollectors) {
      SortedMap<long[], Future<?>> headMap = hitCollectors.headMap(interval); 
      long[] previousInterval = headMap.isEmpty() ? new long[]{0, 0} : 
          headMap.lastKey();
      if(previousInterval[1] >= interval[1]) {
        // we're part of previous interval
        return hitCollectors.get(previousInterval);
      } else {
        // calculate an appropriate interval to collect hits for
        SortedMap<long[], Future<?>> tailMap = hitCollectors.tailMap(
          new long[]{interval[1], interval[1]});
        long[] followingInterval = tailMap.isEmpty() ? 
            new long[]{interval[1], interval[1]} : tailMap.firstKey();
        long start = Math.max(previousInterval[1] - 1, interval[0]);
        long end = Math.min(followingInterval[0], interval[1]);
        hitsCollector = new HitsCollector(start, end);
        FutureTask<?> future = new FutureTask<Object>(hitsCollector, null);
        hitCollectors.put(new long[]{start, end}, future);
        try {
          backgroundTasks.put(future);
        } catch(InterruptedException e) {
          logger.error("Error while queuing background work", e);
          throw new RuntimeException("Error while queuing background work", e);
        }
        return future;
      }
    }
  }
  
  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentText(int, int, int)
   */
  @Override
  public String[][] getDocumentText(long rank, int termPosition, int length) 
          throws IndexException, IndexOutOfBoundsException, IOException {
    return queryEngine.getText(getDocumentID(rank), termPosition, length);
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentURI(int)
   */
  @Override
  public String getDocumentURI(long rank) throws IndexException, 
      IndexOutOfBoundsException, IOException {
    return queryEngine.getDocumentURI(getDocumentID(rank));
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentTitle(int)
   */
  @Override
  public String getDocumentTitle(long rank) throws IndexException, 
      IndexOutOfBoundsException, IOException {
    return queryEngine.getDocumentTitle(getDocumentID(rank));
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentMetadataField(int, java.lang.String)
   */
  @Override
  public Serializable getDocumentMetadataField(long rank, String fieldName)
      throws IndexException, IndexOutOfBoundsException, IOException {
    return queryEngine.getDocumentMetadataField(getDocumentID(rank), fieldName);
  }

  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#getDocumentMetadataFields(int, java.util.Set)
   */
  @Override
  public Map<String, Serializable> getDocumentMetadataFields(long rank,
      Set<String> fieldNames) throws IndexException, IndexOutOfBoundsException, 
      IOException {
    Map<String, Serializable> res = new HashMap<String, Serializable>();
    for(String fieldName : fieldNames) {
      Serializable value = getDocumentMetadataField(rank, fieldName);
      if(value != null) res.put(fieldName, value);
    }
    return res;
  }
  
  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#renderDocument(int, java.lang.Appendable)
   */
  @Override
  public void renderDocument(long rank, Appendable out) throws IOException, 
      IndexException {
        queryEngine.renderDocument(getDocumentID(rank), 
                getDocumentHits(rank), out);
  }
  
  /* (non-Javadoc)
   * @see gate.mimir.search.QueryRunner#close()
   */
  @Override
  public void close() throws IOException {
    this.closed = true;
    try{
      if(queryEngine != null) queryEngine.releaseQueryRunner(this);
      if(queryExecutor != null) queryExecutor.close();
      scorer = null;      
    } finally {
      try {
        // stop the background tasks runnable, 
        // which will return the thread to the pool
        backgroundTasks.put(NO_MORE_TASKS);
      } catch(InterruptedException e) {
        // ignore
      }      
    }
  }

  /**
   * Find the next document ID for the current query executor which is not
   * marked as deleted in the index.
   */
  protected long nextNotDeleted() throws IOException {
    long docId = ranking ? scorer.nextDocument(-1)
                         : queryExecutor.nextDocument(-1);
    while(docId >= 0 && queryEngine.getIndex().isDeleted(docId)) {
      docId = ranking ? scorer.nextDocument(-1)
                      : queryExecutor.nextDocument(-1);
    }
    
    return docId;
  }
}