pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / ParallelReader.java
diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/ParallelReader.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/ParallelReader.java
new file mode 100644 (file)
index 0000000..bf7e0a9
--- /dev/null
@@ -0,0 +1,685 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.document.FieldSelectorResult;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.MapBackedSet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/** An IndexReader which reads multiple, parallel indexes.  Each index added
+ * must have the same number of documents, but typically each contains
+ * different fields.  Each document contains the union of the fields of all
+ * documents with the same document number.  When searching, matches for a
+ * query term are from the first index added that has the field.
+ *
+ * <p>This is useful, e.g., with collections that have large fields which
+ * change rarely and small fields that change more frequently.  The smaller
+ * fields may be re-indexed in a new index and both indexes may be searched
+ * together.
+ *
+ * <p><strong>Warning:</strong> It is up to you to make sure all indexes
+ * are created and modified the same way. For example, if you add
+ * documents to one index, you need to add the same documents in the
+ * same order to the other indexes. <em>Failure to do so will result in
+ * undefined behavior</em>.
+ */
+public class ParallelReader extends IndexReader {
+  private List<IndexReader> readers = new ArrayList<IndexReader>();
+  private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
+  boolean incRefReaders = false;
+  private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
+  private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
+  private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
+
+  private int maxDoc;
+  private int numDocs;
+  private boolean hasDeletions;
+
+ /** Construct a ParallelReader. 
+  * <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
+  */
+  public ParallelReader() throws IOException { this(true); }
+   
+ /** Construct a ParallelReader. 
+  * @param closeSubReaders indicates whether the subreaders should be closed
+  * when this ParallelReader is closed
+  */
+  public ParallelReader(boolean closeSubReaders) throws IOException {
+    super();
+    this.incRefReaders = !closeSubReaders;
+    readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder buffer = new StringBuilder("ParallelReader(");
+    final Iterator<IndexReader> iter = readers.iterator();
+    if (iter.hasNext()) {
+      buffer.append(iter.next());
+    }
+    while (iter.hasNext()) {
+      buffer.append(", ").append(iter.next());
+    }
+    buffer.append(')');
+    return buffer.toString();
+  }
+
+ /** Add an IndexReader.
+  * @throws IOException if there is a low-level IO error
+  */
+  public void add(IndexReader reader) throws IOException {
+    ensureOpen();
+    add(reader, false);
+  }
+
+ /** Add an IndexReader whose stored fields will not be returned.  This can
+  * accelerate search when stored fields are only needed from a subset of
+  * the IndexReaders.
+  *
+  * @throws IllegalArgumentException if not all indexes contain the same number
+  *     of documents
+  * @throws IllegalArgumentException if not all indexes have the same value
+  *     of {@link IndexReader#maxDoc()}
+  * @throws IOException if there is a low-level IO error
+  */
+  public void add(IndexReader reader, boolean ignoreStoredFields)
+    throws IOException {
+
+    ensureOpen();
+    if (readers.size() == 0) {
+      this.maxDoc = reader.maxDoc();
+      this.numDocs = reader.numDocs();
+      this.hasDeletions = reader.hasDeletions();
+    }
+
+    if (reader.maxDoc() != maxDoc)                // check compatibility
+      throw new IllegalArgumentException
+        ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
+    if (reader.numDocs() != numDocs)
+      throw new IllegalArgumentException
+        ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
+
+    Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
+    readerToFields.put(reader, fields);
+    for (final String field : fields) {                         // update fieldToReader map
+      if (fieldToReader.get(field) == null)
+        fieldToReader.put(field, reader);
+    }
+
+    if (!ignoreStoredFields)
+      storedFieldReaders.add(reader);             // add to storedFieldReaders
+    readers.add(reader);
+    
+    if (incRefReaders) {
+      reader.incRef();
+    }
+    decrefOnClose.add(Boolean.valueOf(incRefReaders));
+  }
+  
+  @Override
+  public synchronized Object clone() {
+    // doReopen calls ensureOpen
+    try {
+      return doReopen(true);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  /**
+   * Tries to reopen the subreaders.
+   * <br>
+   * If one or more subreaders could be re-opened (i. e. subReader.reopen() 
+   * returned a new instance != subReader), then a new ParallelReader instance 
+   * is returned, otherwise null is returned.
+   * <p>
+   * A re-opened instance might share one or more subreaders with the old 
+   * instance. Index modification operations result in undefined behavior
+   * when performed before the old instance is closed.
+   * (see {@link IndexReader#openIfChanged}).
+   * <p>
+   * If subreaders are shared, then the reference count of those
+   * readers is increased to ensure that the subreaders remain open
+   * until the last referring reader is closed.
+   * 
+   * @throws CorruptIndexException if the index is corrupt
+   * @throws IOException if there is a low-level IO error 
+   */
+  @Override
+  protected synchronized IndexReader doOpenIfChanged() throws CorruptIndexException, IOException {
+    // doReopen calls ensureOpen
+    return doReopen(false);
+  }
+    
+  protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
+    ensureOpen();
+    
+    boolean reopened = false;
+    List<IndexReader> newReaders = new ArrayList<IndexReader>();
+    
+    boolean success = false;
+    
+    try {
+      for (final IndexReader oldReader : readers) {
+        IndexReader newReader = null;
+        if (doClone) {
+          newReader = (IndexReader) oldReader.clone();
+          reopened = true;
+        } else {
+          newReader = IndexReader.openIfChanged(oldReader);
+          if (newReader != null) {
+            reopened = true;
+          } else {
+            newReader = oldReader;
+          }
+        }
+        newReaders.add(newReader);
+      }
+      success = true;
+    } finally {
+      if (!success && reopened) {
+        for (int i = 0; i < newReaders.size(); i++) {
+          IndexReader r = newReaders.get(i);
+          if (r != readers.get(i)) {
+            try {
+              r.close();
+            } catch (IOException ignore) {
+              // keep going - we want to clean up as much as possible
+            }
+          }
+        }
+      }
+    }
+
+    if (reopened) {
+      List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
+      ParallelReader pr = new ParallelReader();
+      for (int i = 0; i < readers.size(); i++) {
+        IndexReader oldReader = readers.get(i);
+        IndexReader newReader = newReaders.get(i);
+        if (newReader == oldReader) {
+          newDecrefOnClose.add(Boolean.TRUE);
+          newReader.incRef();
+        } else {
+          // this is a new subreader instance, so on close() we don't
+          // decRef but close it 
+          newDecrefOnClose.add(Boolean.FALSE);
+        }
+        pr.add(newReader, !storedFieldReaders.contains(oldReader));
+      }
+      pr.decrefOnClose = newDecrefOnClose;
+      pr.incRefReaders = incRefReaders;
+      return pr;
+    } else {
+      // No subreader was refreshed
+      return null;
+    }
+  }
+
+
+  @Override
+  public int numDocs() {
+    // Don't call ensureOpen() here (it could affect performance)
+    return numDocs;
+  }
+
+  @Override
+  public int maxDoc() {
+    // Don't call ensureOpen() here (it could affect performance)
+    return maxDoc;
+  }
+
+  @Override
+  public boolean hasDeletions() {
+    ensureOpen();
+    return hasDeletions;
+  }
+
+  // check first reader
+  @Override
+  public boolean isDeleted(int n) {
+    // Don't call ensureOpen() here (it could affect performance)
+    if (readers.size() > 0)
+      return readers.get(0).isDeleted(n);
+    return false;
+  }
+
+  // delete in all readers
+  @Override
+  protected void doDelete(int n) throws CorruptIndexException, IOException {
+    for (final IndexReader reader : readers) {
+      reader.deleteDocument(n);
+    }
+    hasDeletions = true;
+  }
+
+  // undeleteAll in all readers
+  @Override
+  protected void doUndeleteAll() throws CorruptIndexException, IOException {
+    for (final IndexReader reader : readers) {
+      reader.undeleteAll();
+    }
+    hasDeletions = false;
+  }
+
+  // append fields from storedFieldReaders
+  @Override
+  public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
+    ensureOpen();
+    Document result = new Document();
+    for (final IndexReader reader: storedFieldReaders) {
+
+      boolean include = (fieldSelector==null);
+      if (!include) {
+        Collection<String> fields = readerToFields.get(reader);
+        for (final String field : fields)
+          if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
+            include = true;
+            break;
+          }
+      }
+      if (include) {
+        List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
+        for (Fieldable field : fields) {
+          result.add(field);
+        }
+      }
+    }
+    return result;
+  }
+
+  // get all vectors
+  @Override
+  public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
+    ensureOpen();
+    ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
+    for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
+
+      String field = e.getKey();
+      IndexReader reader = e.getValue();
+      TermFreqVector vector = reader.getTermFreqVector(n, field);
+      if (vector != null)
+        results.add(vector);
+    }
+    return results.toArray(new TermFreqVector[results.size()]);
+  }
+
+  @Override
+  public TermFreqVector getTermFreqVector(int n, String field)
+    throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(field);
+    return reader==null ? null : reader.getTermFreqVector(n, field);
+  }
+
+
+  @Override
+  public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(field);
+    if (reader != null) {
+      reader.getTermFreqVector(docNumber, field, mapper); 
+    }
+  }
+
+  @Override
+  public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
+    ensureOpen();
+
+    for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
+
+      String field = e.getKey();
+      IndexReader reader = e.getValue();
+      reader.getTermFreqVector(docNumber, field, mapper);
+    }
+
+  }
+
+  @Override
+  public boolean hasNorms(String field) throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(field);
+    return reader==null ? false : reader.hasNorms(field);
+  }
+
+  @Override
+  public byte[] norms(String field) throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(field);
+    return reader==null ? null : reader.norms(field);
+  }
+
+  @Override
+  public void norms(String field, byte[] result, int offset)
+    throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(field);
+    if (reader!=null)
+      reader.norms(field, result, offset);
+  }
+
+  @Override
+  protected void doSetNorm(int n, String field, byte value)
+    throws CorruptIndexException, IOException {
+    IndexReader reader = fieldToReader.get(field);
+    if (reader!=null)
+      reader.doSetNorm(n, field, value);
+  }
+
+  @Override
+  public TermEnum terms() throws IOException {
+    ensureOpen();
+    return new ParallelTermEnum();
+  }
+
+  @Override
+  public TermEnum terms(Term term) throws IOException {
+    ensureOpen();
+    return new ParallelTermEnum(term);
+  }
+
+  @Override
+  public int docFreq(Term term) throws IOException {
+    ensureOpen();
+    IndexReader reader = fieldToReader.get(term.field());
+    return reader==null ? 0 : reader.docFreq(term);
+  }
+
+  @Override
+  public TermDocs termDocs(Term term) throws IOException {
+    ensureOpen();
+    return new ParallelTermDocs(term);
+  }
+
+  @Override
+  public TermDocs termDocs() throws IOException {
+    ensureOpen();
+    return new ParallelTermDocs();
+  }
+
+  @Override
+  public TermPositions termPositions(Term term) throws IOException {
+    ensureOpen();
+    return new ParallelTermPositions(term);
+  }
+
+  @Override
+  public TermPositions termPositions() throws IOException {
+    ensureOpen();
+    return new ParallelTermPositions();
+  }
+  
+  /**
+   * Checks recursively if all subreaders are up to date. 
+   */
+  @Override
+  public boolean isCurrent() throws CorruptIndexException, IOException {
+    ensureOpen();
+    for (final IndexReader reader : readers) {
+      if (!reader.isCurrent()) {
+        return false;
+      }
+    }
+    
+    // all subreaders are up to date
+    return true;
+  }
+
+  @Deprecated
+  @Override
+  public boolean isOptimized() {
+    ensureOpen();
+    for (final IndexReader reader : readers) {
+      if (!reader.isOptimized()) {
+        return false;
+      }
+    }
+    
+    // all subindexes are optimized
+    return true;
+  }
+
+  /** Not implemented.
+   * @throws UnsupportedOperationException
+   */
+  @Override
+  public long getVersion() {
+    throw new UnsupportedOperationException("ParallelReader does not support this method.");
+  }
+
+  // for testing
+  IndexReader[] getSubReaders() {
+    return readers.toArray(new IndexReader[readers.size()]);
+  }
+
+  @Override
+  protected void doCommit(Map<String,String> commitUserData) throws IOException {
+    for (final IndexReader reader : readers)
+      reader.commit(commitUserData);
+  }
+
+  @Override
+  protected synchronized void doClose() throws IOException {
+    for (int i = 0; i < readers.size(); i++) {
+      if (decrefOnClose.get(i).booleanValue()) {
+        readers.get(i).decRef();
+      } else {
+        readers.get(i).close();
+      }
+    }
+  }
+
+  @Override
+  public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
+    ensureOpen();
+    Set<String> fieldSet = new HashSet<String>();
+    for (final IndexReader reader : readers) {
+      Collection<String> names = reader.getFieldNames(fieldNames);
+      fieldSet.addAll(names);
+    }
+    return fieldSet;
+  }
+
+  private class ParallelTermEnum extends TermEnum {
+    private String field;
+    private Iterator<String> fieldIterator;
+    private TermEnum termEnum;
+
+    public ParallelTermEnum() throws IOException {
+      try {
+        field = fieldToReader.firstKey();
+      } catch(NoSuchElementException e) {
+        // No fields, so keep field == null, termEnum == null
+        return;
+      }
+      if (field != null)
+        termEnum = fieldToReader.get(field).terms();
+    }
+
+    public ParallelTermEnum(Term term) throws IOException {
+      field = term.field();
+      IndexReader reader = fieldToReader.get(field);
+      if (reader!=null)
+        termEnum = reader.terms(term);
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (termEnum==null)
+        return false;
+
+      // another term in this field?
+      if (termEnum.next() && termEnum.term().field()==field)
+        return true;                              // yes, keep going
+
+      termEnum.close();                           // close old termEnum
+
+      // find the next field with terms, if any
+      if (fieldIterator==null) {
+        fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
+        fieldIterator.next();                     // Skip field to get next one
+      }
+      while (fieldIterator.hasNext()) {
+        field = fieldIterator.next();
+        termEnum = fieldToReader.get(field).terms(new Term(field));
+        Term term = termEnum.term();
+        if (term!=null && term.field()==field)
+          return true;
+        else
+          termEnum.close();
+      }
+      return false;                               // no more fields
+    }
+
+    @Override
+    public Term term() {
+      if (termEnum==null)
+        return null;
+
+      return termEnum.term();
+    }
+
+    @Override
+    public int docFreq() {
+      if (termEnum==null)
+        return 0;
+
+      return termEnum.docFreq();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (termEnum!=null)
+        termEnum.close();
+    }
+  }
+
+  // wrap a TermDocs in order to support seek(Term)
+  private class ParallelTermDocs implements TermDocs {
+    protected TermDocs termDocs;
+
+    public ParallelTermDocs() {}
+    public ParallelTermDocs(Term term) throws IOException {
+      if (term == null)
+        termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
+      else
+        seek(term);
+    }
+
+    public int doc() { return termDocs.doc(); }
+    public int freq() { return termDocs.freq(); }
+
+    public void seek(Term term) throws IOException {
+      IndexReader reader = fieldToReader.get(term.field());
+      termDocs = reader!=null ? reader.termDocs(term) : null;
+    }
+
+    public void seek(TermEnum termEnum) throws IOException {
+      seek(termEnum.term());
+    }
+
+    public boolean next() throws IOException {
+      if (termDocs==null)
+        return false;
+
+      return termDocs.next();
+    }
+
+    public int read(final int[] docs, final int[] freqs) throws IOException {
+      if (termDocs==null)
+        return 0;
+
+      return termDocs.read(docs, freqs);
+    }
+
+    public boolean skipTo(int target) throws IOException {
+      if (termDocs==null)
+        return false;
+
+      return termDocs.skipTo(target);
+    }
+
+    public void close() throws IOException {
+      if (termDocs!=null)
+        termDocs.close();
+    }
+
+  }
+
+  private class ParallelTermPositions
+    extends ParallelTermDocs implements TermPositions {
+
+    public ParallelTermPositions() {}
+    public ParallelTermPositions(Term term) throws IOException { seek(term); }
+
+    @Override
+    public void seek(Term term) throws IOException {
+      IndexReader reader = fieldToReader.get(term.field());
+      termDocs = reader!=null ? reader.termPositions(term) : null;
+    }
+
+    public int nextPosition() throws IOException {
+      // It is an error to call this if there is no next position, e.g. if termDocs==null
+      return ((TermPositions)termDocs).nextPosition();
+    }
+
+    public int getPayloadLength() {
+      return ((TermPositions)termDocs).getPayloadLength();
+    }
+
+    public byte[] getPayload(byte[] data, int offset) throws IOException {
+      return ((TermPositions)termDocs).getPayload(data, offset);
+    }
+
+
+    // TODO: Remove warning after API has been finalized
+    public boolean isPayloadAvailable() {
+      return ((TermPositions) termDocs).isPayloadAvailable();
+    }
+  }
+
+  @Override
+  public void addReaderFinishedListener(ReaderFinishedListener listener) {
+    super.addReaderFinishedListener(listener);
+    for (IndexReader reader : readers) {
+      reader.addReaderFinishedListener(listener);
+    }
+  }
+
+  @Override
+  public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+    super.removeReaderFinishedListener(listener);
+    for (IndexReader reader : readers) {
+      reader.removeReaderFinishedListener(listener);
+    }
+  }
+}
+
+
+
+
+