pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / ParallelReader.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import org.apache.lucene.document.Document;
21 import org.apache.lucene.document.FieldSelector;
22 import org.apache.lucene.document.FieldSelectorResult;
23 import org.apache.lucene.document.Fieldable;
24 import org.apache.lucene.util.MapBackedSet;
25
26 import java.io.IOException;
27 import java.util.*;
28 import java.util.concurrent.ConcurrentHashMap;
29
30
31 /** An IndexReader which reads multiple, parallel indexes.  Each index added
32  * must have the same number of documents, but typically each contains
33  * different fields.  Each document contains the union of the fields of all
34  * documents with the same document number.  When searching, matches for a
35  * query term are from the first index added that has the field.
36  *
37  * <p>This is useful, e.g., with collections that have large fields which
38  * change rarely and small fields that change more frequently.  The smaller
39  * fields may be re-indexed in a new index and both indexes may be searched
40  * together.
41  *
42  * <p><strong>Warning:</strong> It is up to you to make sure all indexes
43  * are created and modified the same way. For example, if you add
44  * documents to one index, you need to add the same documents in the
45  * same order to the other indexes. <em>Failure to do so will result in
46  * undefined behavior</em>.
47  */
48 public class ParallelReader extends IndexReader {
49   private List<IndexReader> readers = new ArrayList<IndexReader>();
50   private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
51   boolean incRefReaders = false;
52   private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
53   private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
54   private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
55
56   private int maxDoc;
57   private int numDocs;
58   private boolean hasDeletions;
59
60  /** Construct a ParallelReader. 
61   * <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
62   */
63   public ParallelReader() throws IOException { this(true); }
64    
65  /** Construct a ParallelReader. 
66   * @param closeSubReaders indicates whether the subreaders should be closed
67   * when this ParallelReader is closed
68   */
69   public ParallelReader(boolean closeSubReaders) throws IOException {
70     super();
71     this.incRefReaders = !closeSubReaders;
72     readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
73   }
74
75   /** {@inheritDoc} */
76   @Override
77   public String toString() {
78     final StringBuilder buffer = new StringBuilder("ParallelReader(");
79     final Iterator<IndexReader> iter = readers.iterator();
80     if (iter.hasNext()) {
81       buffer.append(iter.next());
82     }
83     while (iter.hasNext()) {
84       buffer.append(", ").append(iter.next());
85     }
86     buffer.append(')');
87     return buffer.toString();
88   }
89
90  /** Add an IndexReader.
91   * @throws IOException if there is a low-level IO error
92   */
93   public void add(IndexReader reader) throws IOException {
94     ensureOpen();
95     add(reader, false);
96   }
97
98  /** Add an IndexReader whose stored fields will not be returned.  This can
99   * accelerate search when stored fields are only needed from a subset of
100   * the IndexReaders.
101   *
102   * @throws IllegalArgumentException if not all indexes contain the same number
103   *     of documents
104   * @throws IllegalArgumentException if not all indexes have the same value
105   *     of {@link IndexReader#maxDoc()}
106   * @throws IOException if there is a low-level IO error
107   */
108   public void add(IndexReader reader, boolean ignoreStoredFields)
109     throws IOException {
110
111     ensureOpen();
112     if (readers.size() == 0) {
113       this.maxDoc = reader.maxDoc();
114       this.numDocs = reader.numDocs();
115       this.hasDeletions = reader.hasDeletions();
116     }
117
118     if (reader.maxDoc() != maxDoc)                // check compatibility
119       throw new IllegalArgumentException
120         ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
121     if (reader.numDocs() != numDocs)
122       throw new IllegalArgumentException
123         ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
124
125     Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
126     readerToFields.put(reader, fields);
127     for (final String field : fields) {                         // update fieldToReader map
128       if (fieldToReader.get(field) == null)
129         fieldToReader.put(field, reader);
130     }
131
132     if (!ignoreStoredFields)
133       storedFieldReaders.add(reader);             // add to storedFieldReaders
134     readers.add(reader);
135     
136     if (incRefReaders) {
137       reader.incRef();
138     }
139     decrefOnClose.add(Boolean.valueOf(incRefReaders));
140   }
141   
142   @Override
143   public synchronized Object clone() {
144     // doReopen calls ensureOpen
145     try {
146       return doReopen(true);
147     } catch (Exception ex) {
148       throw new RuntimeException(ex);
149     }
150   }
151   
152   /**
153    * Tries to reopen the subreaders.
154    * <br>
155    * If one or more subreaders could be re-opened (i. e. subReader.reopen() 
156    * returned a new instance != subReader), then a new ParallelReader instance 
157    * is returned, otherwise null is returned.
158    * <p>
159    * A re-opened instance might share one or more subreaders with the old 
160    * instance. Index modification operations result in undefined behavior
161    * when performed before the old instance is closed.
162    * (see {@link IndexReader#openIfChanged}).
163    * <p>
164    * If subreaders are shared, then the reference count of those
165    * readers is increased to ensure that the subreaders remain open
166    * until the last referring reader is closed.
167    * 
168    * @throws CorruptIndexException if the index is corrupt
169    * @throws IOException if there is a low-level IO error 
170    */
171   @Override
172   protected synchronized IndexReader doOpenIfChanged() throws CorruptIndexException, IOException {
173     // doReopen calls ensureOpen
174     return doReopen(false);
175   }
176     
177   protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
178     ensureOpen();
179     
180     boolean reopened = false;
181     List<IndexReader> newReaders = new ArrayList<IndexReader>();
182     
183     boolean success = false;
184     
185     try {
186       for (final IndexReader oldReader : readers) {
187         IndexReader newReader = null;
188         if (doClone) {
189           newReader = (IndexReader) oldReader.clone();
190           reopened = true;
191         } else {
192           newReader = IndexReader.openIfChanged(oldReader);
193           if (newReader != null) {
194             reopened = true;
195           } else {
196             newReader = oldReader;
197           }
198         }
199         newReaders.add(newReader);
200       }
201       success = true;
202     } finally {
203       if (!success && reopened) {
204         for (int i = 0; i < newReaders.size(); i++) {
205           IndexReader r = newReaders.get(i);
206           if (r != readers.get(i)) {
207             try {
208               r.close();
209             } catch (IOException ignore) {
210               // keep going - we want to clean up as much as possible
211             }
212           }
213         }
214       }
215     }
216
217     if (reopened) {
218       List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
219       ParallelReader pr = new ParallelReader();
220       for (int i = 0; i < readers.size(); i++) {
221         IndexReader oldReader = readers.get(i);
222         IndexReader newReader = newReaders.get(i);
223         if (newReader == oldReader) {
224           newDecrefOnClose.add(Boolean.TRUE);
225           newReader.incRef();
226         } else {
227           // this is a new subreader instance, so on close() we don't
228           // decRef but close it 
229           newDecrefOnClose.add(Boolean.FALSE);
230         }
231         pr.add(newReader, !storedFieldReaders.contains(oldReader));
232       }
233       pr.decrefOnClose = newDecrefOnClose;
234       pr.incRefReaders = incRefReaders;
235       return pr;
236     } else {
237       // No subreader was refreshed
238       return null;
239     }
240   }
241
242
243   @Override
244   public int numDocs() {
245     // Don't call ensureOpen() here (it could affect performance)
246     return numDocs;
247   }
248
249   @Override
250   public int maxDoc() {
251     // Don't call ensureOpen() here (it could affect performance)
252     return maxDoc;
253   }
254
255   @Override
256   public boolean hasDeletions() {
257     ensureOpen();
258     return hasDeletions;
259   }
260
261   // check first reader
262   @Override
263   public boolean isDeleted(int n) {
264     // Don't call ensureOpen() here (it could affect performance)
265     if (readers.size() > 0)
266       return readers.get(0).isDeleted(n);
267     return false;
268   }
269
270   // delete in all readers
271   @Override
272   protected void doDelete(int n) throws CorruptIndexException, IOException {
273     for (final IndexReader reader : readers) {
274       reader.deleteDocument(n);
275     }
276     hasDeletions = true;
277   }
278
279   // undeleteAll in all readers
280   @Override
281   protected void doUndeleteAll() throws CorruptIndexException, IOException {
282     for (final IndexReader reader : readers) {
283       reader.undeleteAll();
284     }
285     hasDeletions = false;
286   }
287
288   // append fields from storedFieldReaders
289   @Override
290   public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
291     ensureOpen();
292     Document result = new Document();
293     for (final IndexReader reader: storedFieldReaders) {
294
295       boolean include = (fieldSelector==null);
296       if (!include) {
297         Collection<String> fields = readerToFields.get(reader);
298         for (final String field : fields)
299           if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
300             include = true;
301             break;
302           }
303       }
304       if (include) {
305         List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
306         for (Fieldable field : fields) {
307           result.add(field);
308         }
309       }
310     }
311     return result;
312   }
313
314   // get all vectors
315   @Override
316   public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
317     ensureOpen();
318     ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
319     for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
320
321       String field = e.getKey();
322       IndexReader reader = e.getValue();
323       TermFreqVector vector = reader.getTermFreqVector(n, field);
324       if (vector != null)
325         results.add(vector);
326     }
327     return results.toArray(new TermFreqVector[results.size()]);
328   }
329
330   @Override
331   public TermFreqVector getTermFreqVector(int n, String field)
332     throws IOException {
333     ensureOpen();
334     IndexReader reader = fieldToReader.get(field);
335     return reader==null ? null : reader.getTermFreqVector(n, field);
336   }
337
338
339   @Override
340   public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
341     ensureOpen();
342     IndexReader reader = fieldToReader.get(field);
343     if (reader != null) {
344       reader.getTermFreqVector(docNumber, field, mapper); 
345     }
346   }
347
348   @Override
349   public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
350     ensureOpen();
351
352     for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
353
354       String field = e.getKey();
355       IndexReader reader = e.getValue();
356       reader.getTermFreqVector(docNumber, field, mapper);
357     }
358
359   }
360
361   @Override
362   public boolean hasNorms(String field) throws IOException {
363     ensureOpen();
364     IndexReader reader = fieldToReader.get(field);
365     return reader==null ? false : reader.hasNorms(field);
366   }
367
368   @Override
369   public byte[] norms(String field) throws IOException {
370     ensureOpen();
371     IndexReader reader = fieldToReader.get(field);
372     return reader==null ? null : reader.norms(field);
373   }
374
375   @Override
376   public void norms(String field, byte[] result, int offset)
377     throws IOException {
378     ensureOpen();
379     IndexReader reader = fieldToReader.get(field);
380     if (reader!=null)
381       reader.norms(field, result, offset);
382   }
383
384   @Override
385   protected void doSetNorm(int n, String field, byte value)
386     throws CorruptIndexException, IOException {
387     IndexReader reader = fieldToReader.get(field);
388     if (reader!=null)
389       reader.doSetNorm(n, field, value);
390   }
391
392   @Override
393   public TermEnum terms() throws IOException {
394     ensureOpen();
395     return new ParallelTermEnum();
396   }
397
398   @Override
399   public TermEnum terms(Term term) throws IOException {
400     ensureOpen();
401     return new ParallelTermEnum(term);
402   }
403
404   @Override
405   public int docFreq(Term term) throws IOException {
406     ensureOpen();
407     IndexReader reader = fieldToReader.get(term.field());
408     return reader==null ? 0 : reader.docFreq(term);
409   }
410
411   @Override
412   public TermDocs termDocs(Term term) throws IOException {
413     ensureOpen();
414     return new ParallelTermDocs(term);
415   }
416
417   @Override
418   public TermDocs termDocs() throws IOException {
419     ensureOpen();
420     return new ParallelTermDocs();
421   }
422
423   @Override
424   public TermPositions termPositions(Term term) throws IOException {
425     ensureOpen();
426     return new ParallelTermPositions(term);
427   }
428
429   @Override
430   public TermPositions termPositions() throws IOException {
431     ensureOpen();
432     return new ParallelTermPositions();
433   }
434   
435   /**
436    * Checks recursively if all subreaders are up to date. 
437    */
438   @Override
439   public boolean isCurrent() throws CorruptIndexException, IOException {
440     ensureOpen();
441     for (final IndexReader reader : readers) {
442       if (!reader.isCurrent()) {
443         return false;
444       }
445     }
446     
447     // all subreaders are up to date
448     return true;
449   }
450
451   @Deprecated
452   @Override
453   public boolean isOptimized() {
454     ensureOpen();
455     for (final IndexReader reader : readers) {
456       if (!reader.isOptimized()) {
457         return false;
458       }
459     }
460     
461     // all subindexes are optimized
462     return true;
463   }
464
465   /** Not implemented.
466    * @throws UnsupportedOperationException
467    */
468   @Override
469   public long getVersion() {
470     throw new UnsupportedOperationException("ParallelReader does not support this method.");
471   }
472
473   // for testing
474   IndexReader[] getSubReaders() {
475     return readers.toArray(new IndexReader[readers.size()]);
476   }
477
478   @Override
479   protected void doCommit(Map<String,String> commitUserData) throws IOException {
480     for (final IndexReader reader : readers)
481       reader.commit(commitUserData);
482   }
483
484   @Override
485   protected synchronized void doClose() throws IOException {
486     for (int i = 0; i < readers.size(); i++) {
487       if (decrefOnClose.get(i).booleanValue()) {
488         readers.get(i).decRef();
489       } else {
490         readers.get(i).close();
491       }
492     }
493   }
494
495   @Override
496   public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
497     ensureOpen();
498     Set<String> fieldSet = new HashSet<String>();
499     for (final IndexReader reader : readers) {
500       Collection<String> names = reader.getFieldNames(fieldNames);
501       fieldSet.addAll(names);
502     }
503     return fieldSet;
504   }
505
506   private class ParallelTermEnum extends TermEnum {
507     private String field;
508     private Iterator<String> fieldIterator;
509     private TermEnum termEnum;
510
511     public ParallelTermEnum() throws IOException {
512       try {
513         field = fieldToReader.firstKey();
514       } catch(NoSuchElementException e) {
515         // No fields, so keep field == null, termEnum == null
516         return;
517       }
518       if (field != null)
519         termEnum = fieldToReader.get(field).terms();
520     }
521
522     public ParallelTermEnum(Term term) throws IOException {
523       field = term.field();
524       IndexReader reader = fieldToReader.get(field);
525       if (reader!=null)
526         termEnum = reader.terms(term);
527     }
528
529     @Override
530     public boolean next() throws IOException {
531       if (termEnum==null)
532         return false;
533
534       // another term in this field?
535       if (termEnum.next() && termEnum.term().field()==field)
536         return true;                              // yes, keep going
537
538       termEnum.close();                           // close old termEnum
539
540       // find the next field with terms, if any
541       if (fieldIterator==null) {
542         fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
543         fieldIterator.next();                     // Skip field to get next one
544       }
545       while (fieldIterator.hasNext()) {
546         field = fieldIterator.next();
547         termEnum = fieldToReader.get(field).terms(new Term(field));
548         Term term = termEnum.term();
549         if (term!=null && term.field()==field)
550           return true;
551         else
552           termEnum.close();
553       }
554  
555       return false;                               // no more fields
556     }
557
558     @Override
559     public Term term() {
560       if (termEnum==null)
561         return null;
562
563       return termEnum.term();
564     }
565
566     @Override
567     public int docFreq() {
568       if (termEnum==null)
569         return 0;
570
571       return termEnum.docFreq();
572     }
573
574     @Override
575     public void close() throws IOException {
576       if (termEnum!=null)
577         termEnum.close();
578     }
579   }
580
581   // wrap a TermDocs in order to support seek(Term)
582   private class ParallelTermDocs implements TermDocs {
583     protected TermDocs termDocs;
584
585     public ParallelTermDocs() {}
586     public ParallelTermDocs(Term term) throws IOException {
587       if (term == null)
588         termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
589       else
590         seek(term);
591     }
592
593     public int doc() { return termDocs.doc(); }
594     public int freq() { return termDocs.freq(); }
595
596     public void seek(Term term) throws IOException {
597       IndexReader reader = fieldToReader.get(term.field());
598       termDocs = reader!=null ? reader.termDocs(term) : null;
599     }
600
601     public void seek(TermEnum termEnum) throws IOException {
602       seek(termEnum.term());
603     }
604
605     public boolean next() throws IOException {
606       if (termDocs==null)
607         return false;
608
609       return termDocs.next();
610     }
611
612     public int read(final int[] docs, final int[] freqs) throws IOException {
613       if (termDocs==null)
614         return 0;
615
616       return termDocs.read(docs, freqs);
617     }
618
619     public boolean skipTo(int target) throws IOException {
620       if (termDocs==null)
621         return false;
622
623       return termDocs.skipTo(target);
624     }
625
626     public void close() throws IOException {
627       if (termDocs!=null)
628         termDocs.close();
629     }
630
631   }
632
633   private class ParallelTermPositions
634     extends ParallelTermDocs implements TermPositions {
635
636     public ParallelTermPositions() {}
637     public ParallelTermPositions(Term term) throws IOException { seek(term); }
638
639     @Override
640     public void seek(Term term) throws IOException {
641       IndexReader reader = fieldToReader.get(term.field());
642       termDocs = reader!=null ? reader.termPositions(term) : null;
643     }
644
645     public int nextPosition() throws IOException {
646       // It is an error to call this if there is no next position, e.g. if termDocs==null
647       return ((TermPositions)termDocs).nextPosition();
648     }
649
650     public int getPayloadLength() {
651       return ((TermPositions)termDocs).getPayloadLength();
652     }
653
654     public byte[] getPayload(byte[] data, int offset) throws IOException {
655       return ((TermPositions)termDocs).getPayload(data, offset);
656     }
657
658
659     // TODO: Remove warning after API has been finalized
660     public boolean isPayloadAvailable() {
661       return ((TermPositions) termDocs).isPayloadAvailable();
662     }
663   }
664
665   @Override
666   public void addReaderFinishedListener(ReaderFinishedListener listener) {
667     super.addReaderFinishedListener(listener);
668     for (IndexReader reader : readers) {
669       reader.addReaderFinishedListener(listener);
670     }
671   }
672
673   @Override
674   public void removeReaderFinishedListener(ReaderFinishedListener listener) {
675     super.removeReaderFinishedListener(listener);
676     for (IndexReader reader : readers) {
677       reader.removeReaderFinishedListener(listener);
678     }
679   }
680 }
681
682
683
684
685