PyLucene 3.4.0-1 import
[pylucene.git] / lucene-java-3.4.0 / lucene / src / java / org / apache / lucene / index / ParallelReader.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import org.apache.lucene.document.Document;
21 import org.apache.lucene.document.FieldSelector;
22 import org.apache.lucene.document.FieldSelectorResult;
23 import org.apache.lucene.document.Fieldable;
24 import org.apache.lucene.util.MapBackedSet;
25
26 import java.io.IOException;
27 import java.util.*;
28 import java.util.concurrent.ConcurrentHashMap;
29
30
31 /** An IndexReader which reads multiple, parallel indexes.  Each index added
32  * must have the same number of documents, but typically each contains
33  * different fields.  Each document contains the union of the fields of all
34  * documents with the same document number.  When searching, matches for a
35  * query term are from the first index added that has the field.
36  *
37  * <p>This is useful, e.g., with collections that have large fields which
38  * change rarely and small fields that change more frequently.  The smaller
39  * fields may be re-indexed in a new index and both indexes may be searched
40  * together.
41  *
42  * <p><strong>Warning:</strong> It is up to you to make sure all indexes
43  * are created and modified the same way. For example, if you add
44  * documents to one index, you need to add the same documents in the
45  * same order to the other indexes. <em>Failure to do so will result in
46  * undefined behavior</em>.
47  */
48 public class ParallelReader extends IndexReader {
49   private List<IndexReader> readers = new ArrayList<IndexReader>();
50   private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
51   boolean incRefReaders = false;
52   private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
53   private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
54   private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();
55
56   private int maxDoc;
57   private int numDocs;
58   private boolean hasDeletions;
59
60  /** Construct a ParallelReader. 
61   * <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
62   */
63   public ParallelReader() throws IOException { this(true); }
64    
65  /** Construct a ParallelReader. 
66   * @param closeSubReaders indicates whether the subreaders should be closed
67   * when this ParallelReader is closed
68   */
69   public ParallelReader(boolean closeSubReaders) throws IOException {
70     super();
71     this.incRefReaders = !closeSubReaders;
72     readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
73   }
74
75   /** {@inheritDoc} */
76   @Override
77   public String toString() {
78     final StringBuilder buffer = new StringBuilder("ParallelReader(");
79     final Iterator<IndexReader> iter = readers.iterator();
80     if (iter.hasNext()) {
81       buffer.append(iter.next());
82     }
83     while (iter.hasNext()) {
84       buffer.append(", ").append(iter.next());
85     }
86     buffer.append(')');
87     return buffer.toString();
88   }
89
90  /** Add an IndexReader.
91   * @throws IOException if there is a low-level IO error
92   */
93   public void add(IndexReader reader) throws IOException {
94     ensureOpen();
95     add(reader, false);
96   }
97
98  /** Add an IndexReader whose stored fields will not be returned.  This can
99   * accelerate search when stored fields are only needed from a subset of
100   * the IndexReaders.
101   *
102   * @throws IllegalArgumentException if not all indexes contain the same number
103   *     of documents
104   * @throws IllegalArgumentException if not all indexes have the same value
105   *     of {@link IndexReader#maxDoc()}
106   * @throws IOException if there is a low-level IO error
107   */
108   public void add(IndexReader reader, boolean ignoreStoredFields)
109     throws IOException {
110
111     ensureOpen();
112     if (readers.size() == 0) {
113       this.maxDoc = reader.maxDoc();
114       this.numDocs = reader.numDocs();
115       this.hasDeletions = reader.hasDeletions();
116     }
117
118     if (reader.maxDoc() != maxDoc)                // check compatibility
119       throw new IllegalArgumentException
120         ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
121     if (reader.numDocs() != numDocs)
122       throw new IllegalArgumentException
123         ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
124
125     Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
126     readerToFields.put(reader, fields);
127     for (final String field : fields) {                         // update fieldToReader map
128       if (fieldToReader.get(field) == null)
129         fieldToReader.put(field, reader);
130     }
131
132     if (!ignoreStoredFields)
133       storedFieldReaders.add(reader);             // add to storedFieldReaders
134     readers.add(reader);
135     
136     if (incRefReaders) {
137       reader.incRef();
138     }
139     decrefOnClose.add(Boolean.valueOf(incRefReaders));
140   }
141   
142   @Override
143   public synchronized Object clone() {
144     try {
145       return doReopen(true);
146     } catch (Exception ex) {
147       throw new RuntimeException(ex);
148     }
149   }
150   
151   /**
152    * Tries to reopen the subreaders.
153    * <br>
154    * If one or more subreaders could be re-opened (i. e. subReader.reopen() 
155    * returned a new instance != subReader), then a new ParallelReader instance 
156    * is returned, otherwise this instance is returned.
157    * <p>
158    * A re-opened instance might share one or more subreaders with the old 
159    * instance. Index modification operations result in undefined behavior
160    * when performed before the old instance is closed.
161    * (see {@link IndexReader#reopen()}).
162    * <p>
163    * If subreaders are shared, then the reference count of those
164    * readers is increased to ensure that the subreaders remain open
165    * until the last referring reader is closed.
166    * 
167    * @throws CorruptIndexException if the index is corrupt
168    * @throws IOException if there is a low-level IO error 
169    */
170   @Override
171   public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
172     return doReopen(false);
173   }
174     
175   protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
176     ensureOpen();
177     
178     boolean reopened = false;
179     List<IndexReader> newReaders = new ArrayList<IndexReader>();
180     
181     boolean success = false;
182     
183     try {
184       for (final IndexReader oldReader : readers) {
185         IndexReader newReader = null;
186         if (doClone) {
187           newReader = (IndexReader) oldReader.clone();
188         } else {
189           newReader = oldReader.reopen();
190         }
191         newReaders.add(newReader);
192         // if at least one of the subreaders was updated we remember that
193         // and return a new ParallelReader
194         if (newReader != oldReader) {
195           reopened = true;
196         }
197       }
198       success = true;
199     } finally {
200       if (!success && reopened) {
201         for (int i = 0; i < newReaders.size(); i++) {
202           IndexReader r = newReaders.get(i);
203           if (r != readers.get(i)) {
204             try {
205               r.close();
206             } catch (IOException ignore) {
207               // keep going - we want to clean up as much as possible
208             }
209           }
210         }
211       }
212     }
213
214     if (reopened) {
215       List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
216       ParallelReader pr = new ParallelReader();
217       for (int i = 0; i < readers.size(); i++) {
218         IndexReader oldReader = readers.get(i);
219         IndexReader newReader = newReaders.get(i);
220         if (newReader == oldReader) {
221           newDecrefOnClose.add(Boolean.TRUE);
222           newReader.incRef();
223         } else {
224           // this is a new subreader instance, so on close() we don't
225           // decRef but close it 
226           newDecrefOnClose.add(Boolean.FALSE);
227         }
228         pr.add(newReader, !storedFieldReaders.contains(oldReader));
229       }
230       pr.decrefOnClose = newDecrefOnClose;
231       pr.incRefReaders = incRefReaders;
232       return pr;
233     } else {
234       // No subreader was refreshed
235       return this;
236     }
237   }
238
239
240   @Override
241   public int numDocs() {
242     // Don't call ensureOpen() here (it could affect performance)
243     return numDocs;
244   }
245
246   @Override
247   public int maxDoc() {
248     // Don't call ensureOpen() here (it could affect performance)
249     return maxDoc;
250   }
251
252   @Override
253   public boolean hasDeletions() {
254     // Don't call ensureOpen() here (it could affect performance)
255     return hasDeletions;
256   }
257
258   // check first reader
259   @Override
260   public boolean isDeleted(int n) {
261     // Don't call ensureOpen() here (it could affect performance)
262     if (readers.size() > 0)
263       return readers.get(0).isDeleted(n);
264     return false;
265   }
266
267   // delete in all readers
268   @Override
269   protected void doDelete(int n) throws CorruptIndexException, IOException {
270     for (final IndexReader reader : readers) {
271       reader.deleteDocument(n);
272     }
273     hasDeletions = true;
274   }
275
276   // undeleteAll in all readers
277   @Override
278   protected void doUndeleteAll() throws CorruptIndexException, IOException {
279     for (final IndexReader reader : readers) {
280       reader.undeleteAll();
281     }
282     hasDeletions = false;
283   }
284
285   // append fields from storedFieldReaders
286   @Override
287   public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
288     ensureOpen();
289     Document result = new Document();
290     for (final IndexReader reader: storedFieldReaders) {
291
292       boolean include = (fieldSelector==null);
293       if (!include) {
294         Collection<String> fields = readerToFields.get(reader);
295         for (final String field : fields)
296           if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
297             include = true;
298             break;
299           }
300       }
301       if (include) {
302         List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
303         for (Fieldable field : fields) {
304           result.add(field);
305         }
306       }
307     }
308     return result;
309   }
310
311   // get all vectors
312   @Override
313   public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
314     ensureOpen();
315     ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
316     for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
317
318       String field = e.getKey();
319       IndexReader reader = e.getValue();
320       TermFreqVector vector = reader.getTermFreqVector(n, field);
321       if (vector != null)
322         results.add(vector);
323     }
324     return results.toArray(new TermFreqVector[results.size()]);
325   }
326
327   @Override
328   public TermFreqVector getTermFreqVector(int n, String field)
329     throws IOException {
330     ensureOpen();
331     IndexReader reader = fieldToReader.get(field);
332     return reader==null ? null : reader.getTermFreqVector(n, field);
333   }
334
335
336   @Override
337   public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
338     ensureOpen();
339     IndexReader reader = fieldToReader.get(field);
340     if (reader != null) {
341       reader.getTermFreqVector(docNumber, field, mapper); 
342     }
343   }
344
345   @Override
346   public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
347     ensureOpen();
348
349     for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
350
351       String field = e.getKey();
352       IndexReader reader = e.getValue();
353       reader.getTermFreqVector(docNumber, field, mapper);
354     }
355
356   }
357
358   @Override
359   public boolean hasNorms(String field) throws IOException {
360     ensureOpen();
361     IndexReader reader = fieldToReader.get(field);
362     return reader==null ? false : reader.hasNorms(field);
363   }
364
365   @Override
366   public byte[] norms(String field) throws IOException {
367     ensureOpen();
368     IndexReader reader = fieldToReader.get(field);
369     return reader==null ? null : reader.norms(field);
370   }
371
372   @Override
373   public void norms(String field, byte[] result, int offset)
374     throws IOException {
375     ensureOpen();
376     IndexReader reader = fieldToReader.get(field);
377     if (reader!=null)
378       reader.norms(field, result, offset);
379   }
380
381   @Override
382   protected void doSetNorm(int n, String field, byte value)
383     throws CorruptIndexException, IOException {
384     IndexReader reader = fieldToReader.get(field);
385     if (reader!=null)
386       reader.doSetNorm(n, field, value);
387   }
388
389   @Override
390   public TermEnum terms() throws IOException {
391     ensureOpen();
392     return new ParallelTermEnum();
393   }
394
395   @Override
396   public TermEnum terms(Term term) throws IOException {
397     ensureOpen();
398     return new ParallelTermEnum(term);
399   }
400
401   @Override
402   public int docFreq(Term term) throws IOException {
403     ensureOpen();
404     IndexReader reader = fieldToReader.get(term.field());
405     return reader==null ? 0 : reader.docFreq(term);
406   }
407
408   @Override
409   public TermDocs termDocs(Term term) throws IOException {
410     ensureOpen();
411     return new ParallelTermDocs(term);
412   }
413
414   @Override
415   public TermDocs termDocs() throws IOException {
416     ensureOpen();
417     return new ParallelTermDocs();
418   }
419
420   @Override
421   public TermPositions termPositions(Term term) throws IOException {
422     ensureOpen();
423     return new ParallelTermPositions(term);
424   }
425
426   @Override
427   public TermPositions termPositions() throws IOException {
428     ensureOpen();
429     return new ParallelTermPositions();
430   }
431   
432   /**
433    * Checks recursively if all subreaders are up to date. 
434    */
435   @Override
436   public boolean isCurrent() throws CorruptIndexException, IOException {
437     for (final IndexReader reader : readers) {
438       if (!reader.isCurrent()) {
439         return false;
440       }
441     }
442     
443     // all subreaders are up to date
444     return true;
445   }
446
447   /**
448    * Checks recursively if all subindexes are optimized 
449    */
450   @Override
451   public boolean isOptimized() {
452     for (final IndexReader reader : readers) {
453       if (!reader.isOptimized()) {
454         return false;
455       }
456     }
457     
458     // all subindexes are optimized
459     return true;
460   }
461
462   
463   /** Not implemented.
464    * @throws UnsupportedOperationException
465    */
466   @Override
467   public long getVersion() {
468     throw new UnsupportedOperationException("ParallelReader does not support this method.");
469   }
470
471   // for testing
472   IndexReader[] getSubReaders() {
473     return readers.toArray(new IndexReader[readers.size()]);
474   }
475
476   @Override
477   protected void doCommit(Map<String,String> commitUserData) throws IOException {
478     for (final IndexReader reader : readers)
479       reader.commit(commitUserData);
480   }
481
482   @Override
483   protected synchronized void doClose() throws IOException {
484     for (int i = 0; i < readers.size(); i++) {
485       if (decrefOnClose.get(i).booleanValue()) {
486         readers.get(i).decRef();
487       } else {
488         readers.get(i).close();
489       }
490     }
491   }
492
493   @Override
494   public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
495     ensureOpen();
496     Set<String> fieldSet = new HashSet<String>();
497     for (final IndexReader reader : readers) {
498       Collection<String> names = reader.getFieldNames(fieldNames);
499       fieldSet.addAll(names);
500     }
501     return fieldSet;
502   }
503
504   private class ParallelTermEnum extends TermEnum {
505     private String field;
506     private Iterator<String> fieldIterator;
507     private TermEnum termEnum;
508
509     public ParallelTermEnum() throws IOException {
510       try {
511         field = fieldToReader.firstKey();
512       } catch(NoSuchElementException e) {
513         // No fields, so keep field == null, termEnum == null
514         return;
515       }
516       if (field != null)
517         termEnum = fieldToReader.get(field).terms();
518     }
519
520     public ParallelTermEnum(Term term) throws IOException {
521       field = term.field();
522       IndexReader reader = fieldToReader.get(field);
523       if (reader!=null)
524         termEnum = reader.terms(term);
525     }
526
527     @Override
528     public boolean next() throws IOException {
529       if (termEnum==null)
530         return false;
531
532       // another term in this field?
533       if (termEnum.next() && termEnum.term().field()==field)
534         return true;                              // yes, keep going
535
536       termEnum.close();                           // close old termEnum
537
538       // find the next field with terms, if any
539       if (fieldIterator==null) {
540         fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
541         fieldIterator.next();                     // Skip field to get next one
542       }
543       while (fieldIterator.hasNext()) {
544         field = fieldIterator.next();
545         termEnum = fieldToReader.get(field).terms(new Term(field));
546         Term term = termEnum.term();
547         if (term!=null && term.field()==field)
548           return true;
549         else
550           termEnum.close();
551       }
552  
553       return false;                               // no more fields
554     }
555
556     @Override
557     public Term term() {
558       if (termEnum==null)
559         return null;
560
561       return termEnum.term();
562     }
563
564     @Override
565     public int docFreq() {
566       if (termEnum==null)
567         return 0;
568
569       return termEnum.docFreq();
570     }
571
572     @Override
573     public void close() throws IOException {
574       if (termEnum!=null)
575         termEnum.close();
576     }
577
578   }
579
580   // wrap a TermDocs in order to support seek(Term)
581   private class ParallelTermDocs implements TermDocs {
582     protected TermDocs termDocs;
583
584     public ParallelTermDocs() {}
585     public ParallelTermDocs(Term term) throws IOException {
586       if (term == null)
587         termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
588       else
589         seek(term);
590     }
591
592     public int doc() { return termDocs.doc(); }
593     public int freq() { return termDocs.freq(); }
594
595     public void seek(Term term) throws IOException {
596       IndexReader reader = fieldToReader.get(term.field());
597       termDocs = reader!=null ? reader.termDocs(term) : null;
598     }
599
600     public void seek(TermEnum termEnum) throws IOException {
601       seek(termEnum.term());
602     }
603
604     public boolean next() throws IOException {
605       if (termDocs==null)
606         return false;
607
608       return termDocs.next();
609     }
610
611     public int read(final int[] docs, final int[] freqs) throws IOException {
612       if (termDocs==null)
613         return 0;
614
615       return termDocs.read(docs, freqs);
616     }
617
618     public boolean skipTo(int target) throws IOException {
619       if (termDocs==null)
620         return false;
621
622       return termDocs.skipTo(target);
623     }
624
625     public void close() throws IOException {
626       if (termDocs!=null)
627         termDocs.close();
628     }
629
630   }
631
632   private class ParallelTermPositions
633     extends ParallelTermDocs implements TermPositions {
634
635     public ParallelTermPositions() {}
636     public ParallelTermPositions(Term term) throws IOException { seek(term); }
637
638     @Override
639     public void seek(Term term) throws IOException {
640       IndexReader reader = fieldToReader.get(term.field());
641       termDocs = reader!=null ? reader.termPositions(term) : null;
642     }
643
644     public int nextPosition() throws IOException {
645       // It is an error to call this if there is no next position, e.g. if termDocs==null
646       return ((TermPositions)termDocs).nextPosition();
647     }
648
649     public int getPayloadLength() {
650       return ((TermPositions)termDocs).getPayloadLength();
651     }
652
653     public byte[] getPayload(byte[] data, int offset) throws IOException {
654       return ((TermPositions)termDocs).getPayload(data, offset);
655     }
656
657
658     // TODO: Remove warning after API has been finalized
659     public boolean isPayloadAvailable() {
660       return ((TermPositions) termDocs).isPayloadAvailable();
661     }
662   }
663
664   @Override
665   public void addReaderFinishedListener(ReaderFinishedListener listener) {
666     super.addReaderFinishedListener(listener);
667     for (IndexReader reader : readers) {
668       reader.addReaderFinishedListener(listener);
669     }
670   }
671
672   @Override
673   public void removeReaderFinishedListener(ReaderFinishedListener listener) {
674     super.removeReaderFinishedListener(listener);
675     for (IndexReader reader : readers) {
676       reader.removeReaderFinishedListener(listener);
677     }
678   }
679 }
680
681
682
683
684