AtomicTokenIndex.java

  1. /*
  2.  *  AtomicTokenIndex.java
  3.  *
  4.  *  Copyright (c) 2007-2013, The University of Sheffield.
  5.  *
  6.  *  This file is part of GATE Mímir (see http://gate.ac.uk/family/mimir.html),
  7.  *  and is free software, licenced under the GNU Lesser General Public License,
  8.  *  Version 3, June 2007 (also included with this distribution as file
  9.  *  LICENCE-LGPL3.html).
  10.  *
  11.  *  Valentin Tablan, 19 Dec 2013
  12.  *
  13.  *  $Id: AtomicTokenIndex.java 17371 2014-02-20 15:45:05Z valyt $
  14.  */
  15. package gate.mimir.index;

  16. import gate.Annotation;
  17. import gate.FeatureMap;
  18. import gate.mimir.DocumentMetadataHelper;
  19. import gate.mimir.IndexConfig.TokenIndexerConfig;
  20. import gate.mimir.MimirIndex;
  21. import it.unimi.di.big.mg4j.index.Index;
  22. import it.unimi.dsi.lang.ObjectParser;

  23. import java.io.File;
  24. import java.io.IOException;
  25. import java.io.UnsupportedEncodingException;
  26. import java.nio.ByteBuffer;
  27. import java.nio.CharBuffer;
  28. import java.nio.charset.CharacterCodingException;
  29. import java.nio.charset.Charset;
  30. import java.nio.charset.CharsetDecoder;
  31. import java.nio.charset.CharsetEncoder;
  32. import java.nio.charset.CodingErrorAction;
  33. import java.util.LinkedList;
  34. import java.util.List;
  35. import java.util.concurrent.BlockingQueue;

  36. import org.slf4j.Logger;
  37. import org.slf4j.LoggerFactory;

  38. /**
  39.  * An {@link AtomicIndex} implementation for indexing tokens.
  40.  */
  41. public class AtomicTokenIndex extends AtomicIndex {
  42.  
  43.   private final static Logger logger = LoggerFactory.getLogger(AtomicTokenIndex.class);
  44.  
  45.   /**
  46.    * A constant (empty String array) used for filtering terms from indexing.
  47.    * @see #calculateTermStringForAnnotation(Annotation, GATEDocument)
  48.    * implementation.
  49.    */
  50.   private static final String[] DO_NOT_INDEX = new String[]{};
  51.  
  52.  
  53.   protected final CharsetEncoder UTF8_CHARSET_ENCODER = Charset.forName("UTF-8").newEncoder();
  54.  
  55.   protected final CharsetDecoder UTF8_CHARSET_DECODER = Charset.forName("UTF-8").newDecoder();
  56.  
  57.   /**
  58.    * Is this token index responsible for writing the zip collection?
  59.    */
  60.   protected boolean zipCollectionEnabled = false;
  61.  
  62.   /**
  63.    * Stores the document tokens for writing to the zip collection;
  64.    */
  65.   protected List<String> documentTokens;
  66.  
  67.   /**
  68.    * Stores the document non-tokens for writing to the zip collection;
  69.    */
  70.   protected List<String> documentNonTokens;
  71.  
  72.   /**
  73.    * An array of helpers for creating document metadata.
  74.    */
  75.   protected DocumentMetadataHelper[] docMetadataHelpers;
  76.  
  77.   /**
  78.    * GATE document factory used by the zip builder, and also to
  79.    * translate field indexes to field names.
  80.    */
  81.   protected GATEDocumentFactory factory;
  82.  
  83.  
  84.   /**
  85.    * The feature name corresponding to the field.
  86.    */
  87.   protected String featureName;
  88.  

  89.  
  90.   /**
  91.    * Creates a new atomic index for indexing tokens.
  92.    * @param parent the top level {@link MimirIndex} to which this new atomic
  93.    * index belongs.
  94.    * @param name the name for the new atomic index. This will be used as the
  95.    * name of the top level directory for this atomic index (which is a
  96.    * sub-directory of the parent) and as a base name for all the files of this
  97.    * atomic index.
  98.    * @param hasDirectIndex should a direct index be created as well.
  99.    * @param inputQueue the queue where documents are submitted for indexing;
  100.    * @param outputQueue the queue where indexed documents are returned to;
  101.    * @throws IndexException
  102.    * @throws IOException
  103.    */
  104.   public AtomicTokenIndex(MimirIndex parent, String name,
  105.       boolean hasDirectIndex, BlockingQueue<GATEDocument> inputQueue,
  106.       BlockingQueue<GATEDocument> outputQueue, TokenIndexerConfig config,
  107.       boolean zipCollection) throws IOException, IndexException {
  108.     super(parent, name, hasDirectIndex,
  109.         config.getTermProcessor(), inputQueue, outputQueue);
  110.     this.featureName = config.getFeatureName();
  111.     this.zipCollectionEnabled = zipCollection;
  112.     if(zipCollectionEnabled) {
  113.       documentTokens = new LinkedList<String>();
  114.       documentNonTokens = new LinkedList<String>();
  115.       docMetadataHelpers = parent.getIndexConfig().getDocMetadataHelpers();
  116.     }
  117.    
  118.     // save the term processor
  119.     additionalProperties.setProperty(Index.PropertyKeys.TERMPROCESSOR,
  120.         ObjectParser.toSpec(termProcessor));
  121.    
  122.     try {
  123.       UTF8_CHARSET_ENCODER.replaceWith("[?]".getBytes("UTF-8"));
  124.       UTF8_CHARSET_ENCODER.onMalformedInput(CodingErrorAction.REPLACE);
  125.       UTF8_CHARSET_ENCODER.onUnmappableCharacter(CodingErrorAction.REPLACE);
  126.     } catch(UnsupportedEncodingException e) {
  127.       // this should never happen
  128.       throw new RuntimeException("UTF-8 not supported");
  129.     }
  130.    
  131.     indexingThread = new Thread(this, "Mimir-" + name + " indexing thread");
  132.     indexingThread.start();
  133.   }

  134.  
  135.   /**
  136.    * If zipping, inform the collection builder that a new document
  137.    * is about to start.
  138.    */
  139.   protected void documentStarting(GATEDocument gateDocument) throws IndexException {
  140.     if(zipCollectionEnabled) {
  141.       // notify the metadata helpers
  142.       if(docMetadataHelpers != null){
  143.         for(DocumentMetadataHelper aHelper : docMetadataHelpers){
  144.           aHelper.documentStart(gateDocument);
  145.         }
  146.       }
  147.     }
  148.     // set lastTokenIndex to -1 so we don't have to special-case the first
  149.     // token in the document in calculateStartPosition
  150.     tokenPosition = -1;
  151.   }

  152.   /**
  153.    * If zipping, inform the collection builder that we finished
  154.    * the current document.
  155.    */
  156.   protected void documentEnding(GATEDocument gateDocument) throws IndexException {
  157.     if(zipCollectionEnabled) {
  158.       DocumentData docData = new DocumentData(
  159.           gateDocument.uri().toString(),
  160.           gateDocument.title().toString(),
  161.           documentTokens.toArray(new String[documentTokens.size()]),
  162.           documentNonTokens.toArray(new String[documentNonTokens.size()]));
  163.       if(docMetadataHelpers != null){
  164.         for(DocumentMetadataHelper aHelper : docMetadataHelpers){
  165.           aHelper.documentEnd(gateDocument, docData);
  166.         }
  167.       }
  168.       parent.writeZipDocumentData(docData);
  169.       documentTokens.clear();
  170.       documentNonTokens.clear();
  171.     }
  172.   }

  173.   /**
  174.    * Get the token annotations from this document, in increasing
  175.    * order of offset.
  176.    */
  177.   protected Annotation[] getAnnotsToProcess(GATEDocument gateDocument) {
  178.     Annotation[] tokens = gateDocument.getTokenAnnots();
  179.     return tokens;
  180.   }

  181.   /**
  182.    * This indexer always adds one posting per token, so the start
  183.    * position for the next annotation is always one more than the
  184.    * previous one.
  185.    *
  186.    * @param ann
  187.    * @param gateDocument
  188.    */
  189.   protected void calculateStartPositionForAnnotation(Annotation ann,
  190.           GATEDocument gateDocument) {
  191.     tokenPosition++;
  192.   }

  193.   /**
  194.    * For a token annotation, the "string" we index is the feature value
  195.    * corresponding to the name of the field to index.  As well as
  196.    * calculating the string, this method writes an entry to the zip
  197.    * collection builder if it exists.
  198.    *
  199.    * @param ann
  200.    * @param gateDocument
  201.    */
  202.   protected String[] calculateTermStringForAnnotation(Annotation ann,
  203.           GATEDocument gateDocument) throws IndexException {
  204.     FeatureMap tokenFeatures = ann.getFeatures();
  205.     String value = (String)tokenFeatures.get(featureName);
  206.     // make sure we get valid UTF-8 content
  207.    // illegal strings will simply be rendered as "[UNMAPPED]"
  208.     if(value != null) {
  209.       try {
  210.         CharBuffer cb = CharBuffer.wrap(value);
  211.         ByteBuffer bb = UTF8_CHARSET_ENCODER.encode(cb);
  212.         cb = UTF8_CHARSET_DECODER.decode(bb);
  213.         value  = cb.toString();
  214.       } catch(CharacterCodingException e) {
  215.         // this should not happen
  216.         value = null;
  217.         logger.error("Error while normalizing input", e);
  218.       }      
  219.     }

  220.    
  221.     currentTerm.replace(value == null ? "" : value);
  222.     //save the *unprocessed* term to the collection, if required.
  223.     if(zipCollectionEnabled) {
  224.       documentTokens.add(currentTerm.toString());
  225.       documentNonTokens.add(gateDocument.getNonTokens()[tokenPosition]);
  226.     }
  227.     if(termProcessor.processTerm(currentTerm)){
  228.       //the processor has changed the term, and allowed us to index it
  229.       return null;  
  230.     }else{
  231.       //the processor has filtered the term -> don't index it.
  232.       return DO_NOT_INDEX;
  233.     }
  234.   }

  235.   /**
  236.    * Overridden to close the zip collection builder.
  237.    */
  238.   @Override
  239.   protected void flush() throws IOException {
  240.   }
  241. }