FederatedQueryRunner.java

  1. /*
  2.  *  FederatedQueryRunner.java
  3.  *
  4.  *  Copyright (c) 2007-2011, The University of Sheffield.
  5.  *
  6.  *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html),
  7.  *  and is free software, licenced under the GNU Lesser General Public License,
  8.  *  Version 3, June 2007 (also included with this distribution as file
  9.  *  LICENCE-LGPL3.html).
  10.  *
  11.  *  Ian Roberts, 15 Dec 2011
  12.  *
  13.  *  $Id: FederatedQueryRunner.java 16414 2012-12-10 10:54:47Z valyt $
  14.  */
  15. package gate.mimir.search;

  16. import gate.mimir.index.IndexException;
  17. import gate.mimir.search.query.Binding;

  18. import it.unimi.dsi.fastutil.ints.IntArrayList;
  19. import it.unimi.dsi.fastutil.ints.IntBigArrayBigList;
  20. import it.unimi.dsi.fastutil.ints.IntBigList;
  21. import it.unimi.dsi.fastutil.ints.IntList;
  22. import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
  23. import it.unimi.dsi.fastutil.longs.LongBigList;

  24. import java.io.IOException;
  25. import java.io.Serializable;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Set;

  29. import org.slf4j.Logger;
  30. import org.slf4j.LoggerFactory;

  31. /**
  32.  * A {@link QueryRunner} that presents a set of sub-indexes (represented by
  33.  * their own QueryRunners) as a single index.
  34.  */
  35. public class FederatedQueryRunner implements QueryRunner {
  36.  
  37.   private static final Logger log = LoggerFactory.getLogger(FederatedQueryRunner.class);
  38.  
  39.   /**
  40.    * The total number of result documents (or -1 if not yet known).
  41.    */
  42.   private long documentsCount = -1;
  43.  
  44.   /**
  45.    * The query runners for the sub-indexes.
  46.    */
  47.   protected QueryRunner[] subRunners;
  48.  
  49.   /**
  50.    * The next rank that needs to be merged from each sub runner.
  51.    */
  52.   protected long[] nextSubRunnerRank;
  53.  
  54.   /**
  55.    * For each result document rank, this list supplies the index for the
  56.    * sub-runner that supplied the document.
  57.    */
  58.   protected IntBigList rank2runnerIndex;
  59.  
  60.   /**
  61.    * Which of the sub-runners has provided the previous document. This is an
  62.    * instance field so that we can rotate the sub-runners (when the scores are
  63.    * equal)
  64.    */
  65.   private int bestSubRunnerIndex = -1;
  66.  
  67.   /**
  68.    * For each result document rank, this list supplies the rank of the document
  69.    * in sub-runner that supplied it.
  70.    */
  71.   protected LongBigList rank2subRank;
  72.  
  73.   public FederatedQueryRunner(QueryRunner[] subrunners) {
  74.     this.subRunners = subrunners;
  75.     this.nextSubRunnerRank = null;
  76.     this.rank2runnerIndex = new IntBigArrayBigList();
  77.     this.rank2subRank = new LongBigArrayBigList();
  78.   }

  79.   /* (non-Javadoc)
  80.    * @see gate.mimir.search.QueryRunner#getDocumentsCount()
  81.    */
  82.   @Override
  83.   public long getDocumentsCount() {
  84.     if(documentsCount < 0) {
  85.       long newDocumentsCount = 0;
  86.       for(QueryRunner subRunner : subRunners) {
  87.         long subDocumentsCount = subRunner.getDocumentsCount();
  88.         if(subDocumentsCount < 0) {
  89.           return -1;
  90.         } else {
  91.           newDocumentsCount += subDocumentsCount;
  92.         }
  93.       }
  94.       synchronized(this) {
  95.         // initialize the nextSubRunnerRank array
  96.         nextSubRunnerRank = new long[subRunners.length];
  97.         for(int i = 0; i < nextSubRunnerRank.length; i++) {
  98.           if(subRunners[i].getDocumentsCount() == 0) {
  99.             nextSubRunnerRank[i] = -1;
  100.           }
  101.         }
  102.         documentsCount = newDocumentsCount;
  103.       }
  104.     }
  105.     return documentsCount;
  106.   }

  107.  
  108.  
  109.   /* (non-Javadoc)
  110.    * @see gate.mimir.search.QueryRunner#getDocumentsCountSync()
  111.    */
  112.   @Override
  113.   public long getDocumentsCountSync() {
  114.     for(QueryRunner subRunner : subRunners) {
  115.       subRunner.getDocumentsCountSync();
  116.     }
  117.     return getDocumentsCount();
  118.   }

  119.   /* (non-Javadoc)
  120.    * @see gate.mimir.search.QueryRunner#getCurrentDocumentsCount()
  121.    */
  122.   @Override
  123.   public long getDocumentsCurrentCount() {
  124.     if(documentsCount >= 0) {
  125.       return documentsCount;
  126.     } else {
  127.       int newDocumentsCount = 0;
  128.       for(QueryRunner subRunner : subRunners) {
  129.         newDocumentsCount += subRunner.getDocumentsCurrentCount();
  130.       }
  131.       return newDocumentsCount;
  132.     }
  133.   }
  134.  
  135.   /**
  136.    * Ensure that the given rank is resolved to the appropriate sub-runner rank.
  137.    * @throws IndexOutOfBoundsException if rank is beyond the last document.
  138.    */
  139.   private final synchronized void checkRank(long rank) throws IndexOutOfBoundsException, IOException {
  140.     if(rank < 0) throw new IndexOutOfBoundsException(
  141.       "Document rank " + rank + " is negative.");
  142.     long maxRank = getDocumentsCount();
  143.     if(rank >= maxRank) throw new IndexOutOfBoundsException(
  144.         "Document rank too large (" + rank + " >= " + maxRank + ").");    
  145.     // quick check to see if we need to do anything else
  146.     if(rank < rank2runnerIndex.size64()) {
  147.       return;
  148.     }
  149.     for(long nextRank = rank2runnerIndex.size64(); nextRank <= rank; nextRank++) {
  150.       boolean allOut = true;
  151.       // start with the runner next the previously chosen one
  152.       bestSubRunnerIndex = (bestSubRunnerIndex + 1) % subRunners.length;
  153.       double maxScore = Double.NEGATIVE_INFINITY;
  154.       if(nextSubRunnerRank[bestSubRunnerIndex] >= 0) {
  155.         maxScore = subRunners[bestSubRunnerIndex].getDocumentScore(
  156.             nextSubRunnerRank[bestSubRunnerIndex]);
  157.         allOut = false;
  158.       }
  159.       // now check all remaining runners, in round-robin fashion
  160.       final int from = bestSubRunnerIndex + 1;
  161.       final int to = bestSubRunnerIndex + subRunners.length;
  162.       for(int bigI = from; bigI < to; bigI++) {
  163.         int i = bigI % subRunners.length;
  164.         if(nextSubRunnerRank[i] >= 0) {
  165.           allOut = false;
  166.           if(subRunners[i].getDocumentScore(nextSubRunnerRank[i]) > maxScore) {
  167.             bestSubRunnerIndex = i;
  168.             maxScore = subRunners[i].getDocumentScore(nextSubRunnerRank[i]);
  169.           }
  170.         }
  171.       }
  172.       if(allOut) {
  173.         // we ran out of docs
  174.         throw new IndexOutOfBoundsException("Requested rank was " + rank +
  175.           " but ran out of documents at " + nextRank + "!");
  176.       }
  177.       // consume the next doc from bestSubRunnerIndex
  178.       rank2runnerIndex.add(bestSubRunnerIndex);
  179.       rank2subRank.add(nextSubRunnerRank[bestSubRunnerIndex]);
  180.       if(nextSubRunnerRank[bestSubRunnerIndex] <
  181.           subRunners[bestSubRunnerIndex].getDocumentsCount() -1) {
  182.         nextSubRunnerRank[bestSubRunnerIndex]++;
  183.       } else {
  184.         // this runner has run out of documents
  185.         nextSubRunnerRank[bestSubRunnerIndex] = -1;
  186.       }
  187.     }
  188.   }
  189.  
  190.   /* (non-Javadoc)
  191.    * @see gate.mimir.search.QueryRunner#getDocumentID(int)
  192.    */
  193.   @Override
  194.   public long getDocumentID(long rank) throws IndexOutOfBoundsException,
  195.     IOException {
  196.     checkRank(rank);
  197.     long subId = subRunners[rank2runnerIndex.getInt(rank)].getDocumentID(
  198.       rank2subRank.getLong(rank));
  199.     return subId * subRunners.length + rank2runnerIndex.getInt(rank);
  200.   }

  201.   /* (non-Javadoc)
  202.    * @see gate.mimir.search.QueryRunner#getDocumentScore(int)
  203.    */
  204.   @Override
  205.   public double getDocumentScore(long rank) throws IndexOutOfBoundsException,
  206.     IOException {
  207.     checkRank(rank);
  208.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentScore(
  209.         rank2subRank.getLong(rank));
  210.   }

  211.   /* (non-Javadoc)
  212.    * @see gate.mimir.search.QueryRunner#getDocumentHits(int)
  213.    */
  214.   @Override
  215.   public List<Binding> getDocumentHits(long rank)
  216.     throws IndexOutOfBoundsException, IOException {
  217.     checkRank(rank);
  218.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentHits(
  219.         rank2subRank.getLong(rank));
  220.   }

  221.   /* (non-Javadoc)
  222.    * @see gate.mimir.search.QueryRunner#getDocumentText(int, int, int)
  223.    */
  224.   @Override
  225.   public String[][] getDocumentText(long rank, int termPosition, int length)
  226.     throws IndexException, IndexOutOfBoundsException, IOException {
  227.     checkRank(rank);
  228.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentText(
  229.         rank2subRank.getLong(rank), termPosition, length);
  230.   }

  231.   /* (non-Javadoc)
  232.    * @see gate.mimir.search.QueryRunner#getDocumentURI(int)
  233.    */
  234.   @Override
  235.   public String getDocumentURI(long rank) throws IndexException,
  236.     IndexOutOfBoundsException, IOException {
  237.     checkRank(rank);
  238.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentURI(
  239.         rank2subRank.getLong(rank));
  240.   }

  241.   /* (non-Javadoc)
  242.    * @see gate.mimir.search.QueryRunner#getDocumentTitle(int)
  243.    */
  244.   @Override
  245.   public String getDocumentTitle(long rank) throws IndexException,
  246.     IndexOutOfBoundsException, IOException {
  247.     checkRank(rank);
  248.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentTitle(
  249.         rank2subRank.getLong(rank));
  250.   }

  251.   /* (non-Javadoc)
  252.    * @see gate.mimir.search.QueryRunner#getDocumentMetadataField(int, java.lang.String)
  253.    */
  254.   @Override
  255.   public Serializable getDocumentMetadataField(long rank, String fieldName)
  256.     throws IndexException, IndexOutOfBoundsException, IOException {
  257.     checkRank(rank);
  258.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataField(
  259.         rank2subRank.getLong(rank), fieldName);
  260.   }

  261.   /* (non-Javadoc)
  262.    * @see gate.mimir.search.QueryRunner#getDocumentMetadataFields(int, java.util.Set)
  263.    */
  264.   @Override
  265.   public Map<String, Serializable> getDocumentMetadataFields(long rank,
  266.                                                              Set<String> fieldNames)
  267.     throws IndexException, IndexOutOfBoundsException, IOException {
  268.     checkRank(rank);
  269.     return subRunners[rank2runnerIndex.getInt(rank)].getDocumentMetadataFields(
  270.           rank2subRank.getLong(rank), fieldNames);
  271.   }

  272.   /* (non-Javadoc)
  273.    * @see gate.mimir.search.QueryRunner#renderDocument(int, java.lang.Appendable)
  274.    */
  275.   @Override
  276.   public void renderDocument(long rank, Appendable out) throws IOException,
  277.     IndexException {
  278.     checkRank(rank);
  279.     subRunners[rank2runnerIndex.getInt(rank)].renderDocument(
  280.         rank2subRank.getLong(rank), out);
  281.   }

  282.   /* (non-Javadoc)
  283.    * @see gate.mimir.search.QueryRunner#close()
  284.    */
  285.   @Override
  286.   public void close() throws IOException {
  287.     for(QueryRunner r : subRunners) {
  288.       try{
  289.         r.close();
  290.       } catch (Throwable t) {
  291.         log.error("Error while closing sub-runner", t);
  292.       }
  293.     }
  294.   }
  295. }