pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / contrib / facet / src / java / org / apache / lucene / facet / search / StandardFacetsAccumulator.java
1 package org.apache.lucene.facet.search;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map.Entry;
8 import java.util.logging.Level;
9 import java.util.logging.Logger;
10
11 import org.apache.lucene.index.IndexReader;
12
13 import org.apache.lucene.facet.search.aggregator.Aggregator;
14 import org.apache.lucene.facet.search.params.FacetSearchParams;
15 import org.apache.lucene.facet.search.params.FacetRequest;
16 import org.apache.lucene.facet.search.results.FacetResult;
17 import org.apache.lucene.facet.search.results.IntermediateFacetResult;
18 import org.apache.lucene.facet.taxonomy.TaxonomyReader;
19 import org.apache.lucene.facet.util.PartitionsUtils;
20 import org.apache.lucene.facet.util.ScoredDocIdsUtils;
21
22 /**
23  * Licensed to the Apache Software Foundation (ASF) under one or more
24  * contributor license agreements.  See the NOTICE file distributed with
25  * this work for additional information regarding copyright ownership.
26  * The ASF licenses this file to You under the Apache License, Version 2.0
27  * (the "License"); you may not use this file except in compliance with
28  * the License.  You may obtain a copy of the License at
29  *
30  *     http://www.apache.org/licenses/LICENSE-2.0
31  *
32  * Unless required by applicable law or agreed to in writing, software
33  * distributed under the License is distributed on an "AS IS" BASIS,
34  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35  * See the License for the specific language governing permissions and
36  * limitations under the License.
37  */
38
39 /**
40  * Standard implementation for {@link FacetsAccumulator}, utilizing partitions to save on memory.
41  * <p>
42  * Why partitions? Because if there are say 100M categories out of which 
43  * only top K are required, we must first compute value for all 100M categories
44  * (going over all documents) and only then could we select top K. 
45  * This is made easier on memory by working in partitions of distinct categories: 
46  * Once a values for a partition are found, we take the top K for that 
47  * partition and work on the next partition, them merge the top K of both, 
48  * and so forth, thereby computing top K with RAM needs for the size of 
49  * a single partition rather than for the size of all the 100M categories.
50  * <p>
51  * Decision on partitions size is done at indexing time, and the facet information
52  * for each partition is maintained separately.
53  * <p>
54  * <u>Implementation detail:</u> Since facets information of each partition is 
55  * maintained in a separate "category list", we can be more efficient
56  * at search time, because only the facet info for a single partition 
57  * need to be read while processing that partition. 
58  * 
59  * @lucene.experimental
60  */
61 public class StandardFacetsAccumulator extends FacetsAccumulator {
62
63   private static final Logger logger = Logger.getLogger(StandardFacetsAccumulator.class.getName());
64
65   protected final IntArrayAllocator intArrayAllocator;
66   protected final FloatArrayAllocator floatArrayAllocator;
67
68   protected int partitionSize;
69   protected int maxPartitions;
70   protected boolean isUsingComplements;
71
72   private TotalFacetCounts totalFacetCounts;
73
74   private Object accumulateGuard;
75
76   public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
77       TaxonomyReader taxonomyReader, IntArrayAllocator intArrayAllocator,
78       FloatArrayAllocator floatArrayAllocator) {
79     
80     super(searchParams,indexReader,taxonomyReader);
81     int realPartitionSize = intArrayAllocator == null || floatArrayAllocator == null 
82               ? PartitionsUtils.partitionSize(searchParams, taxonomyReader) : -1; // -1 if not needed.
83     this.intArrayAllocator = intArrayAllocator != null 
84         ? intArrayAllocator
85         // create a default one if null was provided
86         : new IntArrayAllocator(realPartitionSize, 1);
87     this.floatArrayAllocator = floatArrayAllocator != null 
88         ? floatArrayAllocator
89         // create a default one if null provided
90         : new FloatArrayAllocator(realPartitionSize, 1);
91     // can only be computed later when docids size is known
92     isUsingComplements = false;
93     partitionSize = PartitionsUtils.partitionSize(searchParams, taxonomyReader);
94     maxPartitions = (int) Math.ceil(this.taxonomyReader.getSize() / (double) partitionSize);
95     accumulateGuard = new Object();
96   }
97
98   public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
99       TaxonomyReader taxonomyReader) {
100     
101     this(searchParams, indexReader, taxonomyReader, null, null);
102   }
103
104   @Override
105   public List<FacetResult> accumulate(ScoredDocIDs docids) throws IOException {
106
107     // synchronize to prevent calling two accumulate()'s at the same time.
108     // We decided not to synchronize the method because that might mislead
109     // users to feel encouraged to call this method simultaneously.
110     synchronized (accumulateGuard) {
111
112       // only now we can compute this
113       isUsingComplements = shouldComplement(docids);
114
115       if (isUsingComplements) {
116         try {
117           totalFacetCounts = TotalFacetCountsCache.getSingleton()
118             .getTotalCounts(indexReader, taxonomyReader,
119                 searchParams.getFacetIndexingParams(), searchParams.getClCache());
120           if (totalFacetCounts != null) {
121             docids = ScoredDocIdsUtils.getComplementSet(docids, indexReader);
122           } else {
123             isUsingComplements = false;
124           }
125         } catch (UnsupportedOperationException e) {
126           // TODO (Facet): this exception is thrown from TotalCountsKey if the
127           // IndexReader used does not support getVersion(). We should re-think
128           // this: is this tiny detail worth disabling total counts completely
129           // for such readers? Currently, it's not supported by Parallel and
130           // MultiReader, which might be problematic for several applications.
131           // We could, for example, base our "isCurrent" logic on something else
132           // than the reader's version. Need to think more deeply about it.
133           if (logger.isLoggable(Level.FINEST)) {
134             logger.log(Level.FINEST, "IndexReader used does not support completents: ", e);
135           }
136           isUsingComplements = false;
137         } catch (IOException e) {
138           if (logger.isLoggable(Level.FINEST)) {
139             logger.log(Level.FINEST, "Failed to load/calculate total counts (complement counting disabled): ", e);
140           }
141           // silently fail if for some reason failed to load/save from/to dir 
142           isUsingComplements = false;
143         } catch (Exception e) {
144           // give up: this should not happen!
145           IOException ioEx = new IOException(
146               "PANIC: Got unexpected exception while trying to get/calculate total counts: "
147               +e.getMessage());
148           ioEx.initCause(e);
149           throw ioEx;
150         }
151       }
152
153       docids = actualDocsToAccumulate(docids);
154
155       FacetArrays facetArrays = new FacetArrays(intArrayAllocator, floatArrayAllocator);
156
157       HashMap<FacetRequest, IntermediateFacetResult> fr2tmpRes = new HashMap<FacetRequest, IntermediateFacetResult>();
158
159       try {
160         for (int part = 0; part < maxPartitions; part++) {
161
162           // fill arrays from category lists
163           fillArraysForPartition(docids, facetArrays, part);
164
165           int offset = part * partitionSize;
166
167           // for each partition we go over all requests and handle
168           // each, where
169           // the request maintains the merged result.
170           // In this implementation merges happen after each
171           // partition,
172           // but other impl could merge only at the end.
173           for (FacetRequest fr : searchParams.getFacetRequests()) {
174             FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
175             IntermediateFacetResult res4fr = frHndlr.fetchPartitionResult(facetArrays, offset);
176             IntermediateFacetResult oldRes = fr2tmpRes.get(fr);
177             if (oldRes != null) {
178               res4fr = frHndlr.mergeResults(oldRes, res4fr);
179             }
180             fr2tmpRes.put(fr, res4fr);
181           }
182         }
183       } finally {
184         facetArrays.free();
185       }
186
187       // gather results from all requests into a list for returning them
188       List<FacetResult> res = new ArrayList<FacetResult>();
189       for (FacetRequest fr : searchParams.getFacetRequests()) {
190         FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
191         IntermediateFacetResult tmpResult = fr2tmpRes.get(fr); 
192         if (tmpResult == null) {
193           continue; // do not add a null to the list.
194         }
195         FacetResult facetRes = frHndlr.renderFacetResult(tmpResult); 
196         // final labeling if allowed (because labeling is a costly operation)
197         if (isAllowLabeling()) {
198           frHndlr.labelResult(facetRes);
199         }
200         res.add(facetRes);
201       }
202
203       return res;
204     }
205   }
206
207   /**
208    * Set the actual set of documents over which accumulation should take place.
209    * <p>
210    * Allows to override the set of documents to accumulate for. Invoked just
211    * before actual accumulating starts. From this point that set of documents
212    * remains unmodified. Default implementation just returns the input
213    * unchanged.
214    * 
215    * @param docids
216    *          candidate documents to accumulate for
217    * @return actual documents to accumulate for
218    */
219   protected ScoredDocIDs actualDocsToAccumulate(ScoredDocIDs docids) throws IOException {
220     return docids;
221   }
222
223   /** Check if it is worth to use complements */
224   protected boolean shouldComplement(ScoredDocIDs docids) {
225     return 
226       mayComplement() && 
227       (docids.size() > indexReader.numDocs() * getComplementThreshold()) ;
228   }
229
230   /**
231    * Iterate over the documents for this partition and fill the facet arrays with the correct
232    * count/complement count/value.
233    * @param internalCollector
234    * @param facetArrays
235    * @param part
236    * @throws IOException
237    */
238   private final void fillArraysForPartition(ScoredDocIDs docids,
239       FacetArrays facetArrays, int partition) throws IOException {
240     
241     if (isUsingComplements) {
242       initArraysByTotalCounts(facetArrays, partition, docids.size());
243     } else {
244       facetArrays.free(); // to get a cleared array for this partition
245     }
246
247     HashMap<CategoryListIterator, Aggregator> categoryLists = getCategoryListMap(
248         facetArrays, partition);
249
250     for (Entry<CategoryListIterator, Aggregator> entry : categoryLists.entrySet()) {
251       CategoryListIterator categoryList = entry.getKey();
252       if (!categoryList.init()) {
253         continue;
254       }
255
256       Aggregator categorator = entry.getValue();
257       ScoredDocIDsIterator iterator = docids.iterator();
258       while (iterator.next()) {
259         int docID = iterator.getDocID();
260         if (!categoryList.skipTo(docID)) {
261           continue;
262         }
263         categorator.setNextDoc(docID, iterator.getScore());
264         long ordinal;
265         while ((ordinal = categoryList.nextCategory()) <= Integer.MAX_VALUE) {
266           categorator.aggregate((int) ordinal);
267         }
268       }
269     }
270   }
271
272   /**
273    * Init arrays for partition by total counts, optionally applying a factor
274    */
275   private final void initArraysByTotalCounts(FacetArrays facetArrays, int partition, int nAccumulatedDocs) {
276     int[] intArray = facetArrays.getIntArray();
277     totalFacetCounts.fillTotalCountsForPartition(intArray, partition);
278     double totalCountsFactor = getTotalCountsFactor();
279     // fix total counts, but only if the effect of this would be meaningfull. 
280     if (totalCountsFactor < 0.99999) {
281       int delta = nAccumulatedDocs + 1;
282       for (int i = 0; i < intArray.length; i++) {
283         intArray[i] *= totalCountsFactor;
284         // also translate to prevent loss of non-positive values
285         // due to complement sampling (ie if sampled docs all decremented a certain category). 
286         intArray[i] += delta; 
287       }
288     }
289   }
290
291   /**
292    * Expert: factor by which counts should be multiplied when initializing
293    * the count arrays from total counts.
294    * Default implementation for this returns 1, which is a no op.  
295    * @return a factor by which total counts should be multiplied
296    */
297   protected double getTotalCountsFactor() {
298     return 1;
299   }
300
301   /**
302    * Create an {@link Aggregator} and a {@link CategoryListIterator} for each
303    * and every {@link FacetRequest}. Generating a map, matching each
304    * categoryListIterator to its matching aggregator.
305    * <p>
306    * If two CategoryListIterators are served by the same aggregator, a single
307    * aggregator is returned for both.
308    * 
309    * <b>NOTE: </b>If a given category list iterator is needed with two different
310    * aggregators (e.g counting and association) - an exception is thrown as this
311    * functionality is not supported at this time.
312    */
313   protected HashMap<CategoryListIterator, Aggregator> getCategoryListMap(FacetArrays facetArrays,
314       int partition) throws IOException {
315     
316     HashMap<CategoryListIterator, Aggregator> categoryLists = new HashMap<CategoryListIterator, Aggregator>();
317
318     for (FacetRequest facetRequest : searchParams.getFacetRequests()) {
319       Aggregator categoryAggregator = facetRequest.createAggregator(
320           isUsingComplements, facetArrays, indexReader,  taxonomyReader);
321
322       CategoryListIterator cli = 
323         facetRequest.createCategoryListIterator(indexReader, taxonomyReader, searchParams, partition);
324       
325       // get the aggregator
326       Aggregator old = categoryLists.put(cli, categoryAggregator);
327
328       if (old != null && !old.equals(categoryAggregator)) {
329         // TODO (Facet): create a more meaningful RE class, and throw it.
330         throw new RuntimeException(
331         "Overriding existing category list with different aggregator. THAT'S A NO NO!");
332       }
333       // if the aggregator is the same we're covered
334     }
335
336     return categoryLists;
337   }
338 }