MimirConnector.java
/*
* MimirConnector.java
*
* Copyright (c) 2007-2011, The University of Sheffield.
*
* This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html),
* and is free software, licenced under the GNU Lesser General Public License,
* Version 3, June 2007 (also included with this distribution as file
* LICENCE-LGPL3.html).
*
* Ian Roberts, 26 Mar 2010
*
* $Id: MimirConnector.java 18954 2015-10-20 21:13:20Z ian_roberts $
*/
package gate.mimir.index;
import gate.Document;
import gate.mimir.tool.WebUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URL;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class that implements the client side of the Mimir RPC indexing
* protocol.
*/
public class MimirConnector {
protected static Logger logger = LoggerFactory.getLogger(MimirConnector.class);
/**
* The maximum size of the {@link #byteBuffer}.
*/
protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
/**
* Timer used to regularly check if there is any data to send to the remote
* server.
*/
protected Timer backgroundTimer;
/**
* Has this connector been closed?
*/
protected volatile boolean closed = false;
/**
* The last time we sent data to the remote server
*/
protected volatile long lastWrite;
/**
* A byte buffer accumulating data to be sent to the remote server.
*/
protected ByteArrayOutputStream byteBuffer;
/**
* An instance of {@link ObjectOutputStream} used to serialise document for
* transmission over the wire.
*/
protected ObjectOutputStream objectOutputStream;
/**
* The name for the document feature used to hold the document URI.
*/
public static final String MIMIR_URI_FEATURE = "gate.mimir.uri";
protected WebUtils webUtils;
/**
* The number of milliseconds between connections to the remote server. All
* documents submitted for indexing are locally cached and get sent to the
* server in batches, at intervals defined by this value.
*/
private int connectionInterval = -1;
/**
* How many documents have been buffered since the last connection.
*/
private int docsSinceLastConnection = 0;
/**
* The URL of the mimir index that is to receive the document. This would
* typically be of the form
* <code>http://server:port/mimir/<index UUID>/</code>.
*/
protected URL indexURL;
/**
* The default value for the connection interval (see
* {@link #setConnectionInterval(int)}).
*/
public static final int DEFAULT_CONNECTION_INTERVAL = -1;
public MimirConnector(URL indexUrl, WebUtils webUtils) throws IOException {
this.indexURL = indexUrl;
this.webUtils = webUtils;
byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
objectOutputStream = new ObjectOutputStream(byteBuffer);
lastWrite = System.currentTimeMillis();
}
public MimirConnector(URL indexUrl) throws IOException {
this(indexUrl, new WebUtils());
}
/**
* Pass the given GATE document to the Mimir index at the given URL for
* indexing. The document should match the expectations of the Mimir index
* to which it is being sent, in particular it should include the token and
* semantic annotation types that the index expects. This method has no way
* of checking that those expectations are met, and will not complain if they
* are not, but in that case the resulting index will not contain any useful
* information.
*
* @param doc the document to index.
* @param documentURI the URI that should be used to represent the document
* in the index. May be null, in which case the index will assign a
* URI itself.
* @throws IOException if any error has occurred communicating with the Mímir
* service.
* @throws InterruptedException if the current thread is interrupted while
* waiting to submit the document to the input queue.
*/
public void sendToMimir(Document doc, String documentURI) throws IOException, InterruptedException {
if(closed) throw new IOException("This Mímir connector has been closed.");
boolean uriFeatureWasSet = false;
Object oldUriFeatureValue = null;
if(documentURI != null && doc != null) {
// set the URI as a document feature, saving the old value (if any)
uriFeatureWasSet = doc.getFeatures().containsKey(MIMIR_URI_FEATURE);
oldUriFeatureValue = doc.getFeatures().get(MIMIR_URI_FEATURE);
doc.getFeatures().put(MIMIR_URI_FEATURE, documentURI);
}
synchronized(this) {
if(doc != null){
objectOutputStream.writeUnshared(doc);
docsSinceLastConnection++;
}
if(byteBuffer.size() > BYTE_BUFFER_SIZE) {
writeBuffer(); // this will also empty (reset) the buffer
}
// if too long since last write, write the buffer
if(System.currentTimeMillis() - lastWrite > connectionInterval) {
writeBuffer();
}
}
if(documentURI != null && doc != null) {
// reset the URI feature to the value it had (or didn't have) before
if(uriFeatureWasSet) {
doc.getFeatures().put(MIMIR_URI_FEATURE, oldUriFeatureValue);
} else {
doc.getFeatures().remove(MIMIR_URI_FEATURE);
}
}
}
/**
* Writes the current contents of the byte buffer to the remote server.
* @throws IOException
*/
protected synchronized void writeBuffer() throws IOException {
if(docsSinceLastConnection > 0) {
StringBuilder indexURLString = new StringBuilder(indexURL.toExternalForm());
if(indexURLString.length() == 0) {
throw new IllegalArgumentException("No index URL specified");
}
if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
// add a slash if necessary
indexURLString.append('/');
}
indexURLString.append("manage/indexUrl");
// close the object OS so that it writes its coda
objectOutputStream.close();
try {
// first phase - call the indexUrl action to find out where to post the
// data
StringBuilder postUrlBuilder = new StringBuilder();
webUtils.getText(postUrlBuilder, indexURLString.toString());
// second phase - post to the URL we were given
webUtils.postData(postUrlBuilder.toString(), byteBuffer);
} finally {
byteBuffer.reset();
objectOutputStream = new ObjectOutputStream(byteBuffer);
docsSinceLastConnection = 0;
}
}
lastWrite = System.currentTimeMillis();
}
/**
* Gets the current value for the connection interval (see
* {@link #setConnectionInterval(int)}).
* @return
*/
public int getConnectionInterval() {
return connectionInterval;
}
/**
* Sets the number of milliseconds between connections to the remote server.
* All documents submitted for indexing are locally cached and get sent to the
* server in batches, at intervals defined by this value. Negative values mean
* that no local caching should take place, and documents should be sent to
* the remote server as soon as possible. Defaults to
* {@value #DEFAULT_CONNECTION_INTERVAL}.
* @param connectionInterval
*/
public synchronized void setConnectionInterval(int connectionInterval) {
this.connectionInterval = connectionInterval;
if(connectionInterval <= 0 && backgroundTimer != null) {
backgroundTimer.cancel();
} else {
if(backgroundTimer != null) {
backgroundTimer.cancel();
}
backgroundTimer = new Timer(getClass().getName() + " background timer");
// we set a timer task that regularly submits a null value. This causes
// the connector to flush any data that is getting too old.
backgroundTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
sendToMimir(null, null);
} catch(Exception e) {
// this should never happen
logger.error(MimirConnector.class.getName() + " internal error", e);
}
}
}, connectionInterval, connectionInterval);
}
}
/**
* Notifies this Mímir connector that no more documents remain to be sent.
* At this point any locally cached documents are submitted to the remote
* server, after which the remote connection is closed. This method then
* returns.
* @throws IOException if the background thread used for the communication
* with the remote end point has encountered a problem.
* @throws InterruptedException if the current thread is interrupted while
* waiting to notify the background thread of the termination.
*/
public void close() throws IOException, InterruptedException {
closed = true;
if(backgroundTimer != null){
backgroundTimer.cancel();
}
// flush all cached content one last time
writeBuffer();
}
}