1 package org.apache.lucene.facet.search;
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashMap;
7 import java.util.Map.Entry;
8 import java.util.logging.Level;
9 import java.util.logging.Logger;
11 import org.apache.lucene.index.IndexReader;
13 import org.apache.lucene.facet.search.aggregator.Aggregator;
14 import org.apache.lucene.facet.search.params.FacetSearchParams;
15 import org.apache.lucene.facet.search.params.FacetRequest;
16 import org.apache.lucene.facet.search.results.FacetResult;
17 import org.apache.lucene.facet.search.results.IntermediateFacetResult;
18 import org.apache.lucene.facet.taxonomy.TaxonomyReader;
19 import org.apache.lucene.facet.util.PartitionsUtils;
20 import org.apache.lucene.facet.util.ScoredDocIdsUtils;
23 * Licensed to the Apache Software Foundation (ASF) under one or more
24 * contributor license agreements. See the NOTICE file distributed with
25 * this work for additional information regarding copyright ownership.
26 * The ASF licenses this file to You under the Apache License, Version 2.0
27 * (the "License"); you may not use this file except in compliance with
28 * the License. You may obtain a copy of the License at
30 * http://www.apache.org/licenses/LICENSE-2.0
32 * Unless required by applicable law or agreed to in writing, software
33 * distributed under the License is distributed on an "AS IS" BASIS,
34 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35 * See the License for the specific language governing permissions and
36 * limitations under the License.
40 * Standard implementation for {@link FacetsAccumulator}, utilizing partitions to save on memory.
42 * Why partitions? Because if there are say 100M categories out of which
43 * only top K are required, we must first compute value for all 100M categories
44 * (going over all documents) and only then could we select top K.
45 * This is made easier on memory by working in partitions of distinct categories:
46 * Once a values for a partition are found, we take the top K for that
47 * partition and work on the next partition, them merge the top K of both,
48 * and so forth, thereby computing top K with RAM needs for the size of
49 * a single partition rather than for the size of all the 100M categories.
51 * Decision on partitions size is done at indexing time, and the facet information
52 * for each partition is maintained separately.
54 * <u>Implementation detail:</u> Since facets information of each partition is
55 * maintained in a separate "category list", we can be more efficient
56 * at search time, because only the facet info for a single partition
57 * need to be read while processing that partition.
59 * @lucene.experimental
61 public class StandardFacetsAccumulator extends FacetsAccumulator {
63 private static final Logger logger = Logger.getLogger(StandardFacetsAccumulator.class.getName());
65 protected final IntArrayAllocator intArrayAllocator;
66 protected final FloatArrayAllocator floatArrayAllocator;
68 protected int partitionSize;
69 protected int maxPartitions;
70 protected boolean isUsingComplements;
72 private TotalFacetCounts totalFacetCounts;
74 private Object accumulateGuard;
76 public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
77 TaxonomyReader taxonomyReader, IntArrayAllocator intArrayAllocator,
78 FloatArrayAllocator floatArrayAllocator) {
80 super(searchParams,indexReader,taxonomyReader);
81 int realPartitionSize = intArrayAllocator == null || floatArrayAllocator == null
82 ? PartitionsUtils.partitionSize(searchParams, taxonomyReader) : -1; // -1 if not needed.
83 this.intArrayAllocator = intArrayAllocator != null
85 // create a default one if null was provided
86 : new IntArrayAllocator(realPartitionSize, 1);
87 this.floatArrayAllocator = floatArrayAllocator != null
89 // create a default one if null provided
90 : new FloatArrayAllocator(realPartitionSize, 1);
91 // can only be computed later when docids size is known
92 isUsingComplements = false;
93 partitionSize = PartitionsUtils.partitionSize(searchParams, taxonomyReader);
94 maxPartitions = (int) Math.ceil(this.taxonomyReader.getSize() / (double) partitionSize);
95 accumulateGuard = new Object();
98 public StandardFacetsAccumulator(FacetSearchParams searchParams, IndexReader indexReader,
99 TaxonomyReader taxonomyReader) {
101 this(searchParams, indexReader, taxonomyReader, null, null);
105 public List<FacetResult> accumulate(ScoredDocIDs docids) throws IOException {
107 // synchronize to prevent calling two accumulate()'s at the same time.
108 // We decided not to synchronize the method because that might mislead
109 // users to feel encouraged to call this method simultaneously.
110 synchronized (accumulateGuard) {
112 // only now we can compute this
113 isUsingComplements = shouldComplement(docids);
115 if (isUsingComplements) {
117 totalFacetCounts = TotalFacetCountsCache.getSingleton()
118 .getTotalCounts(indexReader, taxonomyReader,
119 searchParams.getFacetIndexingParams(), searchParams.getClCache());
120 if (totalFacetCounts != null) {
121 docids = ScoredDocIdsUtils.getComplementSet(docids, indexReader);
123 isUsingComplements = false;
125 } catch (UnsupportedOperationException e) {
126 // TODO (Facet): this exception is thrown from TotalCountsKey if the
127 // IndexReader used does not support getVersion(). We should re-think
128 // this: is this tiny detail worth disabling total counts completely
129 // for such readers? Currently, it's not supported by Parallel and
130 // MultiReader, which might be problematic for several applications.
131 // We could, for example, base our "isCurrent" logic on something else
132 // than the reader's version. Need to think more deeply about it.
133 if (logger.isLoggable(Level.FINEST)) {
134 logger.log(Level.FINEST, "IndexReader used does not support completents: ", e);
136 isUsingComplements = false;
137 } catch (IOException e) {
138 if (logger.isLoggable(Level.FINEST)) {
139 logger.log(Level.FINEST, "Failed to load/calculate total counts (complement counting disabled): ", e);
141 // silently fail if for some reason failed to load/save from/to dir
142 isUsingComplements = false;
143 } catch (Exception e) {
144 // give up: this should not happen!
145 IOException ioEx = new IOException(
146 "PANIC: Got unexpected exception while trying to get/calculate total counts: "
153 docids = actualDocsToAccumulate(docids);
155 FacetArrays facetArrays = new FacetArrays(intArrayAllocator, floatArrayAllocator);
157 HashMap<FacetRequest, IntermediateFacetResult> fr2tmpRes = new HashMap<FacetRequest, IntermediateFacetResult>();
160 for (int part = 0; part < maxPartitions; part++) {
162 // fill arrays from category lists
163 fillArraysForPartition(docids, facetArrays, part);
165 int offset = part * partitionSize;
167 // for each partition we go over all requests and handle
169 // the request maintains the merged result.
170 // In this implementation merges happen after each
172 // but other impl could merge only at the end.
173 for (FacetRequest fr : searchParams.getFacetRequests()) {
174 FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
175 IntermediateFacetResult res4fr = frHndlr.fetchPartitionResult(facetArrays, offset);
176 IntermediateFacetResult oldRes = fr2tmpRes.get(fr);
177 if (oldRes != null) {
178 res4fr = frHndlr.mergeResults(oldRes, res4fr);
180 fr2tmpRes.put(fr, res4fr);
187 // gather results from all requests into a list for returning them
188 List<FacetResult> res = new ArrayList<FacetResult>();
189 for (FacetRequest fr : searchParams.getFacetRequests()) {
190 FacetResultsHandler frHndlr = fr.createFacetResultsHandler(taxonomyReader);
191 IntermediateFacetResult tmpResult = fr2tmpRes.get(fr);
192 if (tmpResult == null) {
193 continue; // do not add a null to the list.
195 FacetResult facetRes = frHndlr.renderFacetResult(tmpResult);
196 // final labeling if allowed (because labeling is a costly operation)
197 if (isAllowLabeling()) {
198 frHndlr.labelResult(facetRes);
208 * Set the actual set of documents over which accumulation should take place.
210 * Allows to override the set of documents to accumulate for. Invoked just
211 * before actual accumulating starts. From this point that set of documents
212 * remains unmodified. Default implementation just returns the input
216 * candidate documents to accumulate for
217 * @return actual documents to accumulate for
219 protected ScoredDocIDs actualDocsToAccumulate(ScoredDocIDs docids) throws IOException {
223 /** Check if it is worth to use complements */
224 protected boolean shouldComplement(ScoredDocIDs docids) {
227 (docids.size() > indexReader.numDocs() * getComplementThreshold()) ;
231 * Iterate over the documents for this partition and fill the facet arrays with the correct
232 * count/complement count/value.
233 * @param internalCollector
236 * @throws IOException
238 private final void fillArraysForPartition(ScoredDocIDs docids,
239 FacetArrays facetArrays, int partition) throws IOException {
241 if (isUsingComplements) {
242 initArraysByTotalCounts(facetArrays, partition, docids.size());
244 facetArrays.free(); // to get a cleared array for this partition
247 HashMap<CategoryListIterator, Aggregator> categoryLists = getCategoryListMap(
248 facetArrays, partition);
250 for (Entry<CategoryListIterator, Aggregator> entry : categoryLists.entrySet()) {
251 CategoryListIterator categoryList = entry.getKey();
252 if (!categoryList.init()) {
256 Aggregator categorator = entry.getValue();
257 ScoredDocIDsIterator iterator = docids.iterator();
258 while (iterator.next()) {
259 int docID = iterator.getDocID();
260 if (!categoryList.skipTo(docID)) {
263 categorator.setNextDoc(docID, iterator.getScore());
265 while ((ordinal = categoryList.nextCategory()) <= Integer.MAX_VALUE) {
266 categorator.aggregate((int) ordinal);
273 * Init arrays for partition by total counts, optionally applying a factor
275 private final void initArraysByTotalCounts(FacetArrays facetArrays, int partition, int nAccumulatedDocs) {
276 int[] intArray = facetArrays.getIntArray();
277 totalFacetCounts.fillTotalCountsForPartition(intArray, partition);
278 double totalCountsFactor = getTotalCountsFactor();
279 // fix total counts, but only if the effect of this would be meaningfull.
280 if (totalCountsFactor < 0.99999) {
281 int delta = nAccumulatedDocs + 1;
282 for (int i = 0; i < intArray.length; i++) {
283 intArray[i] *= totalCountsFactor;
284 // also translate to prevent loss of non-positive values
285 // due to complement sampling (ie if sampled docs all decremented a certain category).
286 intArray[i] += delta;
292 * Expert: factor by which counts should be multiplied when initializing
293 * the count arrays from total counts.
294 * Default implementation for this returns 1, which is a no op.
295 * @return a factor by which total counts should be multiplied
297 protected double getTotalCountsFactor() {
302 * Create an {@link Aggregator} and a {@link CategoryListIterator} for each
303 * and every {@link FacetRequest}. Generating a map, matching each
304 * categoryListIterator to its matching aggregator.
306 * If two CategoryListIterators are served by the same aggregator, a single
307 * aggregator is returned for both.
309 * <b>NOTE: </b>If a given category list iterator is needed with two different
310 * aggregators (e.g counting and association) - an exception is thrown as this
311 * functionality is not supported at this time.
313 protected HashMap<CategoryListIterator, Aggregator> getCategoryListMap(FacetArrays facetArrays,
314 int partition) throws IOException {
316 HashMap<CategoryListIterator, Aggregator> categoryLists = new HashMap<CategoryListIterator, Aggregator>();
318 for (FacetRequest facetRequest : searchParams.getFacetRequests()) {
319 Aggregator categoryAggregator = facetRequest.createAggregator(
320 isUsingComplements, facetArrays, indexReader, taxonomyReader);
322 CategoryListIterator cli =
323 facetRequest.createCategoryListIterator(indexReader, taxonomyReader, searchParams, partition);
325 // get the aggregator
326 Aggregator old = categoryLists.put(cli, categoryAggregator);
328 if (old != null && !old.equals(categoryAggregator)) {
329 // TODO (Facet): create a more meaningful RE class, and throw it.
330 throw new RuntimeException(
331 "Overriding existing category list with different aggregator. THAT'S A NO NO!");
333 // if the aggregator is the same we're covered
336 return categoryLists;