--- /dev/null
+package org.apache.lucene.search;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * Caches all docs, and optionally also scores, coming from
+ * a search, and is then able to replay them to another
+ * collector. You specify the max RAM this class may use.
+ * Once the collection is done, call {@link #isCached}. If
+ * this returns true, you can use {@link #replay} against a
+ * new collector. If it returns false, this means too much
+ * RAM was required and you must instead re-run the original
+ * search.
+ *
+ * <p><b>NOTE</b>: this class consumes 4 (or 8 bytes, if
+ * scoring is cached) per collected document. If the result
+ * set is large this can easily be a very substantial amount
+ * of RAM!
+ *
+ * <p><b>NOTE</b>: this class caches at least 128 documents
+ * before checking RAM limits.
+ *
+ * <p>See the Lucene <tt>contrib/grouping</tt> module for more
+ * details including a full code example.</p>
+ *
+ * @lucene.experimental
+ */
+public abstract class CachingCollector extends Collector {
+
+ // Max out at 512K arrays
+ private static final int MAX_ARRAY_SIZE = 512 * 1024;
+ private static final int INITIAL_ARRAY_SIZE = 128;
+ private final static int[] EMPTY_INT_ARRAY = new int[0];
+
+ private static class SegStart {
+ public final IndexReader reader;
+ public final int base;
+ public final int end;
+
+ public SegStart(IndexReader reader, int base, int end) {
+ this.reader = reader;
+ this.base = base;
+ this.end = end;
+ }
+ }
+
+ private static final class CachedScorer extends Scorer {
+
+ // NOTE: these members are package-private b/c that way accessing them from
+ // the outer class does not incur access check by the JVM. The same
+ // situation would be if they were defined in the outer class as private
+ // members.
+ int doc;
+ float score;
+
+ private CachedScorer() { super((Weight) null); }
+
+ @Override
+ public final float score() { return score; }
+
+ @Override
+ public final int advance(int target) { throw new UnsupportedOperationException(); }
+
+ @Override
+ public final int docID() { return doc; }
+
+ @Override
+ public final float freq() { throw new UnsupportedOperationException(); }
+
+ @Override
+ public final int nextDoc() { throw new UnsupportedOperationException(); }
+ }
+
+ // A CachingCollector which caches scores
+ private static final class ScoreCachingCollector extends CachingCollector {
+
+ private final CachedScorer cachedScorer;
+ private final List<float[]> cachedScores;
+
+ private Scorer scorer;
+ private float[] curScores;
+
+ ScoreCachingCollector(Collector other, double maxRAMMB) {
+ super(other, maxRAMMB, true);
+
+ cachedScorer = new CachedScorer();
+ cachedScores = new ArrayList<float[]>();
+ curScores = new float[128];
+ cachedScores.add(curScores);
+ }
+
+ ScoreCachingCollector(Collector other, int maxDocsToCache) {
+ super(other, maxDocsToCache);
+
+ cachedScorer = new CachedScorer();
+ cachedScores = new ArrayList<float[]>();
+ curScores = new float[INITIAL_ARRAY_SIZE];
+ cachedScores.add(curScores);
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+
+ if (curDocs == null) {
+ // Cache was too large
+ cachedScorer.score = scorer.score();
+ cachedScorer.doc = doc;
+ other.collect(doc);
+ return;
+ }
+
+ // Allocate a bigger array or abort caching
+ if (upto == curDocs.length) {
+ base += upto;
+
+ // Compute next array length - don't allocate too big arrays
+ int nextLength = 8*curDocs.length;
+ if (nextLength > MAX_ARRAY_SIZE) {
+ nextLength = MAX_ARRAY_SIZE;
+ }
+
+ if (base + nextLength > maxDocsToCache) {
+ // try to allocate a smaller array
+ nextLength = maxDocsToCache - base;
+ if (nextLength <= 0) {
+ // Too many docs to collect -- clear cache
+ curDocs = null;
+ curScores = null;
+ cachedSegs.clear();
+ cachedDocs.clear();
+ cachedScores.clear();
+ cachedScorer.score = scorer.score();
+ cachedScorer.doc = doc;
+ other.collect(doc);
+ return;
+ }
+ }
+
+ curDocs = new int[nextLength];
+ cachedDocs.add(curDocs);
+ curScores = new float[nextLength];
+ cachedScores.add(curScores);
+ upto = 0;
+ }
+
+ curDocs[upto] = doc;
+ cachedScorer.score = curScores[upto] = scorer.score();
+ upto++;
+ cachedScorer.doc = doc;
+ other.collect(doc);
+ }
+
+ @Override
+ public void replay(Collector other) throws IOException {
+ replayInit(other);
+
+ int curUpto = 0;
+ int curBase = 0;
+ int chunkUpto = 0;
+ curDocs = EMPTY_INT_ARRAY;
+ for (SegStart seg : cachedSegs) {
+ other.setNextReader(seg.reader, seg.base);
+ other.setScorer(cachedScorer);
+ while (curBase + curUpto < seg.end) {
+ if (curUpto == curDocs.length) {
+ curBase += curDocs.length;
+ curDocs = cachedDocs.get(chunkUpto);
+ curScores = cachedScores.get(chunkUpto);
+ chunkUpto++;
+ curUpto = 0;
+ }
+ cachedScorer.score = curScores[curUpto];
+ cachedScorer.doc = curDocs[curUpto];
+ other.collect(curDocs[curUpto++]);
+ }
+ }
+ }
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+ this.scorer = scorer;
+ other.setScorer(cachedScorer);
+ }
+
+ @Override
+ public String toString() {
+ if (isCached()) {
+ return "CachingCollector (" + (base+upto) + " docs & scores cached)";
+ } else {
+ return "CachingCollector (cache was cleared)";
+ }
+ }
+
+ }
+
+ // A CachingCollector which does not cache scores
+ private static final class NoScoreCachingCollector extends CachingCollector {
+
+ NoScoreCachingCollector(Collector other, double maxRAMMB) {
+ super(other, maxRAMMB, false);
+ }
+
+ NoScoreCachingCollector(Collector other, int maxDocsToCache) {
+ super(other, maxDocsToCache);
+ }
+
+ @Override
+ public void collect(int doc) throws IOException {
+
+ if (curDocs == null) {
+ // Cache was too large
+ other.collect(doc);
+ return;
+ }
+
+ // Allocate a bigger array or abort caching
+ if (upto == curDocs.length) {
+ base += upto;
+
+ // Compute next array length - don't allocate too big arrays
+ int nextLength = 8*curDocs.length;
+ if (nextLength > MAX_ARRAY_SIZE) {
+ nextLength = MAX_ARRAY_SIZE;
+ }
+
+ if (base + nextLength > maxDocsToCache) {
+ // try to allocate a smaller array
+ nextLength = maxDocsToCache - base;
+ if (nextLength <= 0) {
+ // Too many docs to collect -- clear cache
+ curDocs = null;
+ cachedSegs.clear();
+ cachedDocs.clear();
+ other.collect(doc);
+ return;
+ }
+ }
+
+ curDocs = new int[nextLength];
+ cachedDocs.add(curDocs);
+ upto = 0;
+ }
+
+ curDocs[upto] = doc;
+ upto++;
+ other.collect(doc);
+ }
+
+ @Override
+ public void replay(Collector other) throws IOException {
+ replayInit(other);
+
+ int curUpto = 0;
+ int curbase = 0;
+ int chunkUpto = 0;
+ curDocs = EMPTY_INT_ARRAY;
+ for (SegStart seg : cachedSegs) {
+ other.setNextReader(seg.reader, seg.base);
+ while (curbase + curUpto < seg.end) {
+ if (curUpto == curDocs.length) {
+ curbase += curDocs.length;
+ curDocs = cachedDocs.get(chunkUpto);
+ chunkUpto++;
+ curUpto = 0;
+ }
+ other.collect(curDocs[curUpto++]);
+ }
+ }
+ }
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {
+ other.setScorer(scorer);
+ }
+
+ @Override
+ public String toString() {
+ if (isCached()) {
+ return "CachingCollector (" + (base+upto) + " docs cached)";
+ } else {
+ return "CachingCollector (cache was cleared)";
+ }
+ }
+
+ }
+
+ // TODO: would be nice if a collector defined a
+ // needsScores() method so we can specialize / do checks
+ // up front. This is only relevant for the ScoreCaching
+ // version -- if the wrapped Collector does not need
+ // scores, it can avoid cachedScorer entirely.
+ protected final Collector other;
+
+ protected final int maxDocsToCache;
+ protected final List<SegStart> cachedSegs = new ArrayList<SegStart>();
+ protected final List<int[]> cachedDocs;
+
+ private IndexReader lastReader;
+
+ protected int[] curDocs;
+ protected int upto;
+ protected int base;
+ protected int lastDocBase;
+
+ /**
+ * Creates a {@link CachingCollector} which does not wrap another collector.
+ * The cached documents and scores can later be {@link #replay(Collector)
+ * replayed}.
+ *
+ * @param acceptDocsOutOfOrder
+ * whether documents are allowed to be collected out-of-order
+ */
+ public static CachingCollector create(final boolean acceptDocsOutOfOrder, boolean cacheScores, double maxRAMMB) {
+ Collector other = new Collector() {
+ @Override
+ public boolean acceptsDocsOutOfOrder() {
+ return acceptDocsOutOfOrder;
+ }
+
+ @Override
+ public void setScorer(Scorer scorer) throws IOException {}
+
+ @Override
+ public void collect(int doc) throws IOException {}
+
+ @Override
+ public void setNextReader(IndexReader reader, int docBase) throws IOException {}
+
+ };
+ return create(other, cacheScores, maxRAMMB);
+ }
+
+ /**
+ * Create a new {@link CachingCollector} that wraps the given collector and
+ * caches documents and scores up to the specified RAM threshold.
+ *
+ * @param other
+ * the Collector to wrap and delegate calls to.
+ * @param cacheScores
+ * whether to cache scores in addition to document IDs. Note that
+ * this increases the RAM consumed per doc
+ * @param maxRAMMB
+ * the maximum RAM in MB to consume for caching the documents and
+ * scores. If the collector exceeds the threshold, no documents and
+ * scores are cached.
+ */
+ public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) {
+ return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
+ }
+
+ /**
+ * Create a new {@link CachingCollector} that wraps the given collector and
+ * caches documents and scores up to the specified max docs threshold.
+ *
+ * @param other
+ * the Collector to wrap and delegate calls to.
+ * @param cacheScores
+ * whether to cache scores in addition to document IDs. Note that
+ * this increases the RAM consumed per doc
+ * @param maxDocsToCache
+ * the maximum number of documents for caching the documents and
+ * possible the scores. If the collector exceeds the threshold,
+ * no documents and scores are cached.
+ */
+ public static CachingCollector create(Collector other, boolean cacheScores, int maxDocsToCache) {
+ return cacheScores ? new ScoreCachingCollector(other, maxDocsToCache) : new NoScoreCachingCollector(other, maxDocsToCache);
+ }
+
+ // Prevent extension from non-internal classes
+ private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) {
+ this.other = other;
+
+ cachedDocs = new ArrayList<int[]>();
+ curDocs = new int[INITIAL_ARRAY_SIZE];
+ cachedDocs.add(curDocs);
+
+ int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
+ if (cacheScores) {
+ bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT;
+ }
+ maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
+ }
+
+ private CachingCollector(Collector other, int maxDocsToCache) {
+ this.other = other;
+
+ cachedDocs = new ArrayList<int[]>();
+ curDocs = new int[INITIAL_ARRAY_SIZE];
+ cachedDocs.add(curDocs);
+ this.maxDocsToCache = maxDocsToCache;
+ }
+
+ @Override
+ public boolean acceptsDocsOutOfOrder() {
+ return other.acceptsDocsOutOfOrder();
+ }
+
+ public boolean isCached() {
+ return curDocs != null;
+ }
+
+ @Override
+ public void setNextReader(IndexReader reader, int docBase) throws IOException {
+ other.setNextReader(reader, docBase);
+ if (lastReader != null) {
+ cachedSegs.add(new SegStart(lastReader, lastDocBase, base + upto));
+ }
+ lastDocBase = docBase;
+ lastReader = reader;
+ }
+
+ /** Reused by the specialized inner classes. */
+ void replayInit(Collector other) {
+ if (!isCached()) {
+ throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
+ }
+
+ if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) {
+ throw new IllegalArgumentException(
+ "cannot replay: given collector does not support "
+ + "out-of-order collection, while the wrapped collector does. "
+ + "Therefore cached documents may be out-of-order.");
+ }
+
+ //System.out.println("CC: replay totHits=" + (upto + base));
+ if (lastReader != null) {
+ cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto));
+ lastReader = null;
+ }
+ }
+
+ /**
+ * Replays the cached doc IDs (and scores) to the given Collector. If this
+ * instance does not cache scores, then Scorer is not set on
+ * {@code other.setScorer} as well as scores are not replayed.
+ *
+ * @throws IllegalStateException
+ * if this collector is not cached (i.e., if the RAM limits were too
+ * low for the number of documents + scores to cache).
+ * @throws IllegalArgumentException
+ * if the given Collect's does not support out-of-order collection,
+ * while the collector passed to the ctor does.
+ */
+ public abstract void replay(Collector other) throws IOException;
+
+}