--- /dev/null
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.document.FieldSelectorResult;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.MapBackedSet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/** An IndexReader which reads multiple, parallel indexes. Each index added
+ * must have the same number of documents, but typically each contains
+ * different fields. Each document contains the union of the fields of all
+ * documents with the same document number. When searching, matches for a
+ * query term are from the first index added that has the field.
+ *
+ * <p>This is useful, e.g., with collections that have large fields which
+ * change rarely and small fields that change more frequently. The smaller
+ * fields may be re-indexed in a new index and both indexes may be searched
+ * together.
+ *
+ * <p><strong>Warning:</strong> It is up to you to make sure all indexes
+ * are created and modified the same way. For example, if you add
+ * documents to one index, you need to add the same documents in the
+ * same order to the other indexes. <em>Failure to do so will result in
+ * undefined behavior</em>.
+ */
+public class ParallelReader extends IndexReader {
+ private List<IndexReader> readers = new ArrayList<IndexReader>();
+ private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
+ boolean incRefReaders = false;
+ private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
+ private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
+ private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
+
+ private int maxDoc;
+ private int numDocs;
+ private boolean hasDeletions;
+
+ /** Construct a ParallelReader.
+ * <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
+ */
+ public ParallelReader() throws IOException { this(true); }
+
+ /** Construct a ParallelReader.
+ * @param closeSubReaders indicates whether the subreaders should be closed
+ * when this ParallelReader is closed
+ */
+ public ParallelReader(boolean closeSubReaders) throws IOException {
+ super();
+ this.incRefReaders = !closeSubReaders;
+ readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ final StringBuilder buffer = new StringBuilder("ParallelReader(");
+ final Iterator<IndexReader> iter = readers.iterator();
+ if (iter.hasNext()) {
+ buffer.append(iter.next());
+ }
+ while (iter.hasNext()) {
+ buffer.append(", ").append(iter.next());
+ }
+ buffer.append(')');
+ return buffer.toString();
+ }
+
+ /** Add an IndexReader.
+ * @throws IOException if there is a low-level IO error
+ */
+ public void add(IndexReader reader) throws IOException {
+ ensureOpen();
+ add(reader, false);
+ }
+
+ /** Add an IndexReader whose stored fields will not be returned. This can
+ * accelerate search when stored fields are only needed from a subset of
+ * the IndexReaders.
+ *
+ * @throws IllegalArgumentException if not all indexes contain the same number
+ * of documents
+ * @throws IllegalArgumentException if not all indexes have the same value
+ * of {@link IndexReader#maxDoc()}
+ * @throws IOException if there is a low-level IO error
+ */
+ public void add(IndexReader reader, boolean ignoreStoredFields)
+ throws IOException {
+
+ ensureOpen();
+ if (readers.size() == 0) {
+ this.maxDoc = reader.maxDoc();
+ this.numDocs = reader.numDocs();
+ this.hasDeletions = reader.hasDeletions();
+ }
+
+ if (reader.maxDoc() != maxDoc) // check compatibility
+ throw new IllegalArgumentException
+ ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
+ if (reader.numDocs() != numDocs)
+ throw new IllegalArgumentException
+ ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
+
+ Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
+ readerToFields.put(reader, fields);
+ for (final String field : fields) { // update fieldToReader map
+ if (fieldToReader.get(field) == null)
+ fieldToReader.put(field, reader);
+ }
+
+ if (!ignoreStoredFields)
+ storedFieldReaders.add(reader); // add to storedFieldReaders
+ readers.add(reader);
+
+ if (incRefReaders) {
+ reader.incRef();
+ }
+ decrefOnClose.add(Boolean.valueOf(incRefReaders));
+ }
+
+ @Override
+ public synchronized Object clone() {
+ // doReopen calls ensureOpen
+ try {
+ return doReopen(true);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Tries to reopen the subreaders.
+ * <br>
+ * If one or more subreaders could be re-opened (i. e. subReader.reopen()
+ * returned a new instance != subReader), then a new ParallelReader instance
+ * is returned, otherwise null is returned.
+ * <p>
+ * A re-opened instance might share one or more subreaders with the old
+ * instance. Index modification operations result in undefined behavior
+ * when performed before the old instance is closed.
+ * (see {@link IndexReader#openIfChanged}).
+ * <p>
+ * If subreaders are shared, then the reference count of those
+ * readers is increased to ensure that the subreaders remain open
+ * until the last referring reader is closed.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ */
+ @Override
+ protected synchronized IndexReader doOpenIfChanged() throws CorruptIndexException, IOException {
+ // doReopen calls ensureOpen
+ return doReopen(false);
+ }
+
+ protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
+ ensureOpen();
+
+ boolean reopened = false;
+ List<IndexReader> newReaders = new ArrayList<IndexReader>();
+
+ boolean success = false;
+
+ try {
+ for (final IndexReader oldReader : readers) {
+ IndexReader newReader = null;
+ if (doClone) {
+ newReader = (IndexReader) oldReader.clone();
+ reopened = true;
+ } else {
+ newReader = IndexReader.openIfChanged(oldReader);
+ if (newReader != null) {
+ reopened = true;
+ } else {
+ newReader = oldReader;
+ }
+ }
+ newReaders.add(newReader);
+ }
+ success = true;
+ } finally {
+ if (!success && reopened) {
+ for (int i = 0; i < newReaders.size(); i++) {
+ IndexReader r = newReaders.get(i);
+ if (r != readers.get(i)) {
+ try {
+ r.close();
+ } catch (IOException ignore) {
+ // keep going - we want to clean up as much as possible
+ }
+ }
+ }
+ }
+ }
+
+ if (reopened) {
+ List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
+ ParallelReader pr = new ParallelReader();
+ for (int i = 0; i < readers.size(); i++) {
+ IndexReader oldReader = readers.get(i);
+ IndexReader newReader = newReaders.get(i);
+ if (newReader == oldReader) {
+ newDecrefOnClose.add(Boolean.TRUE);
+ newReader.incRef();
+ } else {
+ // this is a new subreader instance, so on close() we don't
+ // decRef but close it
+ newDecrefOnClose.add(Boolean.FALSE);
+ }
+ pr.add(newReader, !storedFieldReaders.contains(oldReader));
+ }
+ pr.decrefOnClose = newDecrefOnClose;
+ pr.incRefReaders = incRefReaders;
+ return pr;
+ } else {
+ // No subreader was refreshed
+ return null;
+ }
+ }
+
+
+ @Override
+ public int numDocs() {
+ // Don't call ensureOpen() here (it could affect performance)
+ return numDocs;
+ }
+
+ @Override
+ public int maxDoc() {
+ // Don't call ensureOpen() here (it could affect performance)
+ return maxDoc;
+ }
+
+ @Override
+ public boolean hasDeletions() {
+ ensureOpen();
+ return hasDeletions;
+ }
+
+ // check first reader
+ @Override
+ public boolean isDeleted(int n) {
+ // Don't call ensureOpen() here (it could affect performance)
+ if (readers.size() > 0)
+ return readers.get(0).isDeleted(n);
+ return false;
+ }
+
+ // delete in all readers
+ @Override
+ protected void doDelete(int n) throws CorruptIndexException, IOException {
+ for (final IndexReader reader : readers) {
+ reader.deleteDocument(n);
+ }
+ hasDeletions = true;
+ }
+
+ // undeleteAll in all readers
+ @Override
+ protected void doUndeleteAll() throws CorruptIndexException, IOException {
+ for (final IndexReader reader : readers) {
+ reader.undeleteAll();
+ }
+ hasDeletions = false;
+ }
+
+ // append fields from storedFieldReaders
+ @Override
+ public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
+ ensureOpen();
+ Document result = new Document();
+ for (final IndexReader reader: storedFieldReaders) {
+
+ boolean include = (fieldSelector==null);
+ if (!include) {
+ Collection<String> fields = readerToFields.get(reader);
+ for (final String field : fields)
+ if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
+ include = true;
+ break;
+ }
+ }
+ if (include) {
+ List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
+ for (Fieldable field : fields) {
+ result.add(field);
+ }
+ }
+ }
+ return result;
+ }
+
+ // get all vectors
+ @Override
+ public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
+ ensureOpen();
+ ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
+ for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
+
+ String field = e.getKey();
+ IndexReader reader = e.getValue();
+ TermFreqVector vector = reader.getTermFreqVector(n, field);
+ if (vector != null)
+ results.add(vector);
+ }
+ return results.toArray(new TermFreqVector[results.size()]);
+ }
+
+ @Override
+ public TermFreqVector getTermFreqVector(int n, String field)
+ throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(field);
+ return reader==null ? null : reader.getTermFreqVector(n, field);
+ }
+
+
+ @Override
+ public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(field);
+ if (reader != null) {
+ reader.getTermFreqVector(docNumber, field, mapper);
+ }
+ }
+
+ @Override
+ public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
+ ensureOpen();
+
+ for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
+
+ String field = e.getKey();
+ IndexReader reader = e.getValue();
+ reader.getTermFreqVector(docNumber, field, mapper);
+ }
+
+ }
+
+ @Override
+ public boolean hasNorms(String field) throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(field);
+ return reader==null ? false : reader.hasNorms(field);
+ }
+
+ @Override
+ public byte[] norms(String field) throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(field);
+ return reader==null ? null : reader.norms(field);
+ }
+
+ @Override
+ public void norms(String field, byte[] result, int offset)
+ throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(field);
+ if (reader!=null)
+ reader.norms(field, result, offset);
+ }
+
+ @Override
+ protected void doSetNorm(int n, String field, byte value)
+ throws CorruptIndexException, IOException {
+ IndexReader reader = fieldToReader.get(field);
+ if (reader!=null)
+ reader.doSetNorm(n, field, value);
+ }
+
+ @Override
+ public TermEnum terms() throws IOException {
+ ensureOpen();
+ return new ParallelTermEnum();
+ }
+
+ @Override
+ public TermEnum terms(Term term) throws IOException {
+ ensureOpen();
+ return new ParallelTermEnum(term);
+ }
+
+ @Override
+ public int docFreq(Term term) throws IOException {
+ ensureOpen();
+ IndexReader reader = fieldToReader.get(term.field());
+ return reader==null ? 0 : reader.docFreq(term);
+ }
+
+ @Override
+ public TermDocs termDocs(Term term) throws IOException {
+ ensureOpen();
+ return new ParallelTermDocs(term);
+ }
+
+ @Override
+ public TermDocs termDocs() throws IOException {
+ ensureOpen();
+ return new ParallelTermDocs();
+ }
+
+ @Override
+ public TermPositions termPositions(Term term) throws IOException {
+ ensureOpen();
+ return new ParallelTermPositions(term);
+ }
+
+ @Override
+ public TermPositions termPositions() throws IOException {
+ ensureOpen();
+ return new ParallelTermPositions();
+ }
+
+ /**
+ * Checks recursively if all subreaders are up to date.
+ */
+ @Override
+ public boolean isCurrent() throws CorruptIndexException, IOException {
+ ensureOpen();
+ for (final IndexReader reader : readers) {
+ if (!reader.isCurrent()) {
+ return false;
+ }
+ }
+
+ // all subreaders are up to date
+ return true;
+ }
+
+ @Deprecated
+ @Override
+ public boolean isOptimized() {
+ ensureOpen();
+ for (final IndexReader reader : readers) {
+ if (!reader.isOptimized()) {
+ return false;
+ }
+ }
+
+ // all subindexes are optimized
+ return true;
+ }
+
+ /** Not implemented.
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ public long getVersion() {
+ throw new UnsupportedOperationException("ParallelReader does not support this method.");
+ }
+
+ // for testing
+ IndexReader[] getSubReaders() {
+ return readers.toArray(new IndexReader[readers.size()]);
+ }
+
+ @Override
+ protected void doCommit(Map<String,String> commitUserData) throws IOException {
+ for (final IndexReader reader : readers)
+ reader.commit(commitUserData);
+ }
+
+ @Override
+ protected synchronized void doClose() throws IOException {
+ for (int i = 0; i < readers.size(); i++) {
+ if (decrefOnClose.get(i).booleanValue()) {
+ readers.get(i).decRef();
+ } else {
+ readers.get(i).close();
+ }
+ }
+ }
+
+ @Override
+ public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
+ ensureOpen();
+ Set<String> fieldSet = new HashSet<String>();
+ for (final IndexReader reader : readers) {
+ Collection<String> names = reader.getFieldNames(fieldNames);
+ fieldSet.addAll(names);
+ }
+ return fieldSet;
+ }
+
+ private class ParallelTermEnum extends TermEnum {
+ private String field;
+ private Iterator<String> fieldIterator;
+ private TermEnum termEnum;
+
+ public ParallelTermEnum() throws IOException {
+ try {
+ field = fieldToReader.firstKey();
+ } catch(NoSuchElementException e) {
+ // No fields, so keep field == null, termEnum == null
+ return;
+ }
+ if (field != null)
+ termEnum = fieldToReader.get(field).terms();
+ }
+
+ public ParallelTermEnum(Term term) throws IOException {
+ field = term.field();
+ IndexReader reader = fieldToReader.get(field);
+ if (reader!=null)
+ termEnum = reader.terms(term);
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (termEnum==null)
+ return false;
+
+ // another term in this field?
+ if (termEnum.next() && termEnum.term().field()==field)
+ return true; // yes, keep going
+
+ termEnum.close(); // close old termEnum
+
+ // find the next field with terms, if any
+ if (fieldIterator==null) {
+ fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
+ fieldIterator.next(); // Skip field to get next one
+ }
+ while (fieldIterator.hasNext()) {
+ field = fieldIterator.next();
+ termEnum = fieldToReader.get(field).terms(new Term(field));
+ Term term = termEnum.term();
+ if (term!=null && term.field()==field)
+ return true;
+ else
+ termEnum.close();
+ }
+
+ return false; // no more fields
+ }
+
+ @Override
+ public Term term() {
+ if (termEnum==null)
+ return null;
+
+ return termEnum.term();
+ }
+
+ @Override
+ public int docFreq() {
+ if (termEnum==null)
+ return 0;
+
+ return termEnum.docFreq();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (termEnum!=null)
+ termEnum.close();
+ }
+ }
+
+ // wrap a TermDocs in order to support seek(Term)
+ private class ParallelTermDocs implements TermDocs {
+ protected TermDocs termDocs;
+
+ public ParallelTermDocs() {}
+ public ParallelTermDocs(Term term) throws IOException {
+ if (term == null)
+ termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
+ else
+ seek(term);
+ }
+
+ public int doc() { return termDocs.doc(); }
+ public int freq() { return termDocs.freq(); }
+
+ public void seek(Term term) throws IOException {
+ IndexReader reader = fieldToReader.get(term.field());
+ termDocs = reader!=null ? reader.termDocs(term) : null;
+ }
+
+ public void seek(TermEnum termEnum) throws IOException {
+ seek(termEnum.term());
+ }
+
+ public boolean next() throws IOException {
+ if (termDocs==null)
+ return false;
+
+ return termDocs.next();
+ }
+
+ public int read(final int[] docs, final int[] freqs) throws IOException {
+ if (termDocs==null)
+ return 0;
+
+ return termDocs.read(docs, freqs);
+ }
+
+ public boolean skipTo(int target) throws IOException {
+ if (termDocs==null)
+ return false;
+
+ return termDocs.skipTo(target);
+ }
+
+ public void close() throws IOException {
+ if (termDocs!=null)
+ termDocs.close();
+ }
+
+ }
+
+ private class ParallelTermPositions
+ extends ParallelTermDocs implements TermPositions {
+
+ public ParallelTermPositions() {}
+ public ParallelTermPositions(Term term) throws IOException { seek(term); }
+
+ @Override
+ public void seek(Term term) throws IOException {
+ IndexReader reader = fieldToReader.get(term.field());
+ termDocs = reader!=null ? reader.termPositions(term) : null;
+ }
+
+ public int nextPosition() throws IOException {
+ // It is an error to call this if there is no next position, e.g. if termDocs==null
+ return ((TermPositions)termDocs).nextPosition();
+ }
+
+ public int getPayloadLength() {
+ return ((TermPositions)termDocs).getPayloadLength();
+ }
+
+ public byte[] getPayload(byte[] data, int offset) throws IOException {
+ return ((TermPositions)termDocs).getPayload(data, offset);
+ }
+
+
+ // TODO: Remove warning after API has been finalized
+ public boolean isPayloadAvailable() {
+ return ((TermPositions) termDocs).isPayloadAvailable();
+ }
+ }
+
+ @Override
+ public void addReaderFinishedListener(ReaderFinishedListener listener) {
+ super.addReaderFinishedListener(listener);
+ for (IndexReader reader : readers) {
+ reader.addReaderFinishedListener(listener);
+ }
+ }
+
+ @Override
+ public void removeReaderFinishedListener(ReaderFinishedListener listener) {
+ super.removeReaderFinishedListener(listener);
+ for (IndexReader reader : readers) {
+ reader.removeReaderFinishedListener(listener);
+ }
+ }
+}
+
+
+
+
+