PyLucene 3.4.0-1 import
[pylucene.git] / lucene-java-3.4.0 / lucene / src / java / org / apache / lucene / search / ParallelMultiSearcher.java
1 package org.apache.lucene.search;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.NoSuchElementException;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.CompletionService;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.locks.Lock;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.apache.lucene.index.IndexReader;
36 import org.apache.lucene.index.Term;
37 import org.apache.lucene.util.NamedThreadFactory;
38 import org.apache.lucene.util.ThreadInterruptedException;
39
40 /** Implements parallel search over a set of <code>Searchables</code>.
41  *
42  * <p>Applications usually need only call the inherited {@link #search(Query,int)}
43  * or {@link #search(Query,Filter,int)} methods.
44  * 
45  * @deprecated Please pass an ExecutorService to {@link
46  * IndexSearcher}, instead.
47  */
48 @Deprecated
49 public class ParallelMultiSearcher extends MultiSearcher {
50   private final ExecutorService executor;
51   private final Searchable[] searchables;
52   private final int[] starts;
53
54   /** Creates a {@link Searchable} which searches <i>searchables</i> with the default 
55    * executor service (a cached thread pool). */
56   public ParallelMultiSearcher(Searchable... searchables) throws IOException {
57     this(Executors.newCachedThreadPool(new NamedThreadFactory(ParallelMultiSearcher.class.getSimpleName())), searchables);
58   }
59
60   /**
61    * Creates a {@link Searchable} which searches <i>searchables</i> with the specified ExecutorService.
62    */
63   public ParallelMultiSearcher(ExecutorService executor, Searchable... searchables) throws IOException {
64     super(searchables);
65     this.searchables = searchables;
66     this.starts = getStarts();
67     this.executor = executor;
68   }
69   /**
70    * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search to complete and merge
71    * the results back together.
72    */
73   @Override
74   public int docFreq(final Term term) throws IOException {
75     final ExecutionHelper<Integer> runner = new ExecutionHelper<Integer>(executor);
76     for(int i = 0; i < searchables.length; i++) {
77       final Searchable searchable = searchables[i];
78       runner.submit(new Callable<Integer>() {
79         public Integer call() throws IOException {
80           return Integer.valueOf(searchable.docFreq(term));
81         }
82       });
83     }
84     int docFreq = 0;
85     for (Integer num : runner) {
86       docFreq += num.intValue();
87     }
88     return docFreq;
89   }
90
91   /**
92    * A search implementation which executes each 
93    * {@link Searchable} in its own thread and waits for each search to complete and merge
94    * the results back together.
95    */
96   @Override
97   public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
98     final HitQueue hq = new HitQueue(nDocs, false);
99     final Lock lock = new ReentrantLock();
100     final ExecutionHelper<TopDocs> runner = new ExecutionHelper<TopDocs>(executor);
101     
102     for (int i = 0; i < searchables.length; i++) { // search each searchable
103       runner.submit(
104           new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq, i, starts));
105     }
106
107     int totalHits = 0;
108     float maxScore = Float.NEGATIVE_INFINITY;
109     for (final TopDocs topDocs : runner) {
110       totalHits += topDocs.totalHits;
111       maxScore = Math.max(maxScore, topDocs.getMaxScore());
112     }
113
114     final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
115     for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
116       scoreDocs[i] = hq.pop();
117
118     return new TopDocs(totalHits, scoreDocs, maxScore);
119   }
120
121   /**
122    * A search implementation allowing sorting which spans a new thread for each
123    * Searchable, waits for each search to complete and merges
124    * the results back together.
125    */
126   @Override
127   public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException {
128     if (sort == null) throw new NullPointerException();
129
130     final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs);
131     final Lock lock = new ReentrantLock();
132     final ExecutionHelper<TopFieldDocs> runner = new ExecutionHelper<TopFieldDocs>(executor);
133     for (int i = 0; i < searchables.length; i++) { // search each searchable
134       runner.submit(
135           new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs, hq, sort, i, starts));
136     }
137     int totalHits = 0;
138     float maxScore = Float.NEGATIVE_INFINITY;
139     for (final TopFieldDocs topFieldDocs : runner) {
140       totalHits += topFieldDocs.totalHits;
141       maxScore = Math.max(maxScore, topFieldDocs.getMaxScore());
142     }
143     final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
144     for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
145       scoreDocs[i] = hq.pop();
146
147     return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
148   }
149
150   /** Lower-level search API.
151   *
152   * <p>{@link Collector#collect(int)} is called for every matching document.
153   *
154   * <p>Applications should only use this if they need <i>all</i> of the
155   * matching documents.  The high-level search API ({@link
156   * Searcher#search(Query,int)}) is usually more efficient, as it skips
157   * non-high-scoring hits.
158   * 
159   * <p>This method cannot be parallelized, because {@link Collector}
160   * supports no concurrent access.
161   *
162   * @param weight to match documents
163   * @param filter if non-null, a bitset used to eliminate some documents
164   * @param collector to receive hits
165   */
166   @Override
167   public void search(final Weight weight, final Filter filter, final Collector collector)
168    throws IOException {
169    for (int i = 0; i < searchables.length; i++) {
170
171      final int start = starts[i];
172
173      final Collector hc = new Collector() {
174        @Override
175        public void setScorer(final Scorer scorer) throws IOException {
176          collector.setScorer(scorer);
177        }
178        
179        @Override
180        public void collect(final int doc) throws IOException {
181          collector.collect(doc);
182        }
183        
184        @Override
185        public void setNextReader(final IndexReader reader, final int docBase) throws IOException {
186          collector.setNextReader(reader, start + docBase);
187        }
188        
189        @Override
190        public boolean acceptsDocsOutOfOrder() {
191          return collector.acceptsDocsOutOfOrder();
192        }
193      };
194      
195      searchables[i].search(weight, filter, hc);
196    }
197   }
198
199   @Override
200   public void close() throws IOException {
201     executor.shutdown();
202     super.close();
203   }
204
205   @Override
206   HashMap<Term, Integer> createDocFrequencyMap(Set<Term> terms) throws IOException {
207     final Term[] allTermsArray = terms.toArray(new Term[terms.size()]);
208     final int[] aggregatedDocFreqs = new int[terms.size()];
209     final ExecutionHelper<int[]> runner = new ExecutionHelper<int[]>(executor);
210     for (Searchable searchable : searchables) {
211       runner.submit(
212           new DocumentFrequencyCallable(searchable, allTermsArray));
213     }
214     final int docFreqLen = aggregatedDocFreqs.length;
215     for (final int[] docFreqs : runner) {
216       for(int i=0; i < docFreqLen; i++){
217         aggregatedDocFreqs[i] += docFreqs[i];
218       }
219     }
220
221     final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
222     for(int i=0; i<allTermsArray.length; i++) {
223       dfMap.put(allTermsArray[i], Integer.valueOf(aggregatedDocFreqs[i]));
224     }
225     return dfMap;
226   }
227
228   
229   /**
230    * A {@link Callable} to retrieve the document frequencies for a Term array  
231    */
232   private static final class DocumentFrequencyCallable implements Callable<int[]> {
233     private final Searchable searchable;
234     private final Term[] terms;
235     
236     public DocumentFrequencyCallable(Searchable searchable, Term[] terms) {
237       this.searchable = searchable;
238       this.terms = terms;
239     }
240     
241     public int[] call() throws Exception {
242       return searchable.docFreqs(terms);
243     }
244   }
245   
246   /**
247    * A helper class that wraps a {@link CompletionService} and provides an
248    * iterable interface to the completed {@link Callable} instances.
249    * 
250    * @param <T>
251    *          the type of the {@link Callable} return value
252    */
253   private static final class ExecutionHelper<T> implements Iterator<T>, Iterable<T> {
254     private final CompletionService<T> service;
255     private int numTasks;
256
257     ExecutionHelper(final Executor executor) {
258       this.service = new ExecutorCompletionService<T>(executor);
259     }
260
261     public boolean hasNext() {
262       return numTasks > 0;
263     }
264
265     public void submit(Callable<T> task) {
266       this.service.submit(task);
267       ++numTasks;
268     }
269
270     public T next() {
271      if(!this.hasNext())
272        throw new NoSuchElementException();
273       try {
274         return service.take().get();
275       } catch (InterruptedException e) {
276         throw new ThreadInterruptedException(e);
277       } catch (ExecutionException e) {
278         throw new RuntimeException(e);
279       } finally {
280         --numTasks;
281       }
282     }
283
284     public void remove() {
285       throw new UnsupportedOperationException();
286     }
287
288     public Iterator<T> iterator() {
289       // use the shortcut here - this is only used in a privat context
290       return this;
291     }
292
293   }
294 }