ExecutorsList.java
/*
* ExecutorsList.java
*
* Copyright (c) 2007-2011, The University of Sheffield.
*
* This file is part of GATE MÃmir (see http://gate.ac.uk/family/mimir.html),
* and is free software, licenced under the GNU Lesser General Public License,
* Version 3, June 2007 (also included with this distribution as file
* LICENCE-LGPL3.html).
*
* Valentin Tablan, 27 Aug 2009
*
* $Id: ExecutorsList.java 15767 2012-05-11 15:45:23Z valyt $
*/
package gate.mimir.search.query;
import gate.mimir.search.QueryEngine;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class for managing a large list of {@link QueryExecutor}s. This class is
* responsible for accessing a large set of executors while only keeping a
* limited number of them loaded. It automatically manages the closing and
* reopening of executors as needed.
*
*/
public class ExecutorsList {
/**
* Constructor.
*
* @param maxLiveExecutors how many executor should maximally be kept in
* memory.
* @param engine the {@link QueryEngine} used to create executors.
* @param nodes the {@link QueryNode} for which the executors are created.
*/
public ExecutorsList(int maxLiveExecutors, QueryEngine engine,
QueryNode[] nodes) {
this.maxLiveExecutors = maxLiveExecutors;
this.engine = engine;
this.nodes = nodes;
this.closed = false;
latestDocuments = new long[nodes.length];
Arrays.fill(latestDocuments, EXECUTOR_NOT_STARTED);
hitsOnLatestDocument = new Binding[nodes.length][];
hitsReturned = new int[nodes.length];
executorsOpened = 0;
executorsClosed = 0;
// alternative implementation
executors = new QueryExecutor[nodes.length];
executorsNext = new int[nodes.length];
executorsPrev = new int[nodes.length];
executorsFirst = -1;
executorsLast = -1;
executorsSize = 0;
}
/**
* Returns the number of nodes/executors managed by this list.
* @return
*/
public int size(){
return nodes == null ? 0 : nodes.length;
}
/**
* Constructor that uses the default maximum number of live executors.
* @param engine the {@link QueryEngine} used to create executors.
* @param nodes the {@link QueryNode}s for which the executors are created.
*/
public ExecutorsList(QueryEngine engine, QueryNode[] nodes) {
this(DEFAULT_MAX_LIVE_EXECUTORS, engine, nodes);
}
public QueryExecutor getExecutor(int nodeId) throws IOException{
if(executors[nodeId] == null) { // we need to create a new executor
executorsSize++;
if(executorsSize > maxLiveExecutors) { // about to go over: remove last
executors[executorsLast].close();
executorsClosed++;
executors[executorsLast] = null;
int newLast = executorsPrev[executorsLast];
executorsNext[newLast] = -1;
executorsPrev[executorsLast] = -1;
executorsNext[executorsLast] = -1;
executorsLast = newLast;
}
// open the new executor
executors[nodeId] = nodes[nodeId].getQueryExecutor(engine);
executorsOpened++;
// add first to the list
executorsNext[nodeId] = executorsFirst;
executorsPrev[nodeId] = -1;
if(executorsFirst != -1) { // old first becomes second
executorsPrev[executorsFirst] = nodeId;
} else { // there was no first -> list was empty -> first = last
executorsLast = nodeId;
}
//nodeId is the new first
executorsFirst = nodeId;
} else { // move to front
int prev = executorsPrev[nodeId];
int next = executorsNext[nodeId];
if(prev >= 0) executorsNext[prev] = next;
if(next >= 0) executorsPrev[next] = prev;
executorsPrev[nodeId] = -1;
executorsNext[nodeId] = executorsFirst;
executorsPrev[executorsFirst] = nodeId;
executorsFirst = nodeId;
}
return executors[nodeId];
}
public long nextDocument(int nodeId, long greaterThan) throws IOException{
if(latestDocuments[nodeId] == -1){
//executor already exhausted
return -1;
}
QueryExecutor executor = getExecutor(nodeId);
if(executor.getLatestDocument() < 0) {
// newly recreated executor, so we need to skip ahead
greaterThan = Math.max(greaterThan, latestDocuments[nodeId]);
}
latestDocuments[nodeId] = executor.nextDocument(greaterThan);
hitsReturned[nodeId] = 0;
hitsOnLatestDocument[nodeId] = null;
return latestDocuments[nodeId];
}
public Binding nextHit(int nodeId) throws IOException{
if(latestDocuments[nodeId] == -1){
//executor already exhausted
return null;
}
if(hitsReturned[nodeId] == 0) {
// we're asking for the first hit on this document: build the cache
QueryExecutor executor = getExecutor(nodeId);
if(executor.getLatestDocument() < 0) {
// newly (re)created executor, so we need to skip ahead
long oldLatest = latestDocuments[nodeId];
latestDocuments[nodeId] = executor.nextDocument(latestDocuments[nodeId] - 1);
if(oldLatest != latestDocuments[nodeId]){
throw new RuntimeException("Malfunction in " +
this.getClass().getName() +
": executor scrolled to a different document after reload!");
}
}
List<Binding> hits = new LinkedList<Binding>();
Binding aHit = executor.nextHit();
while(aHit != null) {
hits.add(aHit);
aHit = executor.nextHit();
}
hitsOnLatestDocument[nodeId] = hits.toArray(new Binding[hits.size()]);
}
// now return directly from cache
Binding aHhit =
(hitsReturned[nodeId] < hitsOnLatestDocument[nodeId].length) ?
hitsOnLatestDocument[nodeId][hitsReturned[nodeId]] :
null;
if(aHhit != null){
hitsReturned[nodeId]++;
}
return aHhit;
}
public long latestDocument(int nodeId){
return latestDocuments[nodeId];
}
/**
* Closes all executors still live, and releases all memory resources.
* @throws IOException
*/
public void close() throws IOException{
closed = true;
for(int i = 0; i< executors.length; i++) {
if(executors[i] != null) {
executors[i].close();
executors[i] = null;
executorsClosed++;
}
}
engine = null;
hitsReturned = null;
latestDocuments = null;
nodes = null;
logger.debug("Closing executors list. Operations (open/close): " +
executorsOpened +"/" + executorsClosed);
}
/**
* The default maximum number of executor to be kept live.
*/
public static final int DEFAULT_MAX_LIVE_EXECUTORS = 200000;
/**
* The load factor used when none specified in constructor.
*/
protected static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* Value returned when {@link #latestDocument(int)} is called for an executor
* that was not started yet (i.e. nextDocument was not called yet).
*/
public static final int EXECUTOR_NOT_STARTED = -2;
/**
* The maximum number of executors that should be kept in memory at any one
* time.
*/
protected int maxLiveExecutors;
/**
* The {@link QueryEngine} used to create executors.
*/
protected QueryEngine engine;
/**
* Has {@link #close()} been called?
*/
protected boolean closed;
private long executorsClosed;
private long executorsOpened;
protected static Logger logger = LoggerFactory.getLogger(ExecutorsList.class);
/**
* The {@link QueryNode} used to create executors.
*/
protected QueryNode[] nodes;
/**
* Array that holds the latest document ID returned by each executor.
*/
protected long[] latestDocuments;
/**
* The number of hits already returned from the latest document, for each
* executor. This is used to skip already-returned hits when the executor
* needs to be re-loaded.
*/
protected int[] hitsReturned;
/**
* A cache storing all the hits on the latest document for each executor.
* First array index selects the executor, second array index selects the hit.
*/
protected Binding[][] hitsOnLatestDocument;
/**
* An array contining the executors (some position may be null, if the
* executor on that location has been dropped from RAM).
*/
protected QueryExecutor[] executors;
protected int[] executorsNext;
protected int[] executorsPrev;
protected int executorsFirst;
protected int executorsLast;
protected int executorsSize;
}