package org.apache.lucene.search;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.RamUsageEstimator;

/**
 * Caches all docs, and optionally also scores, coming from
 * a search, and is then able to replay them to another
 * collector.  You specify the max RAM this class may use.
 * Once the collection is done, call {@link #isCached}.  If
 * this returns true, you can use {@link #replay} against a
 * new collector.  If it returns false, this means too much
 * RAM was required and you must instead re-run the original
 * search.
 *
 * <p><b>NOTE</b>: this class consumes 4 (or 8 bytes, if
 * scoring is cached) per collected document.  If the result
 * set is large this can easily be a very substantial amount
 * of RAM!
 * 
 * <p><b>NOTE</b>: this class caches at least 128 documents
 * before checking RAM limits.
 * 
 * <p>See the Lucene <tt>contrib/grouping</tt> module for more
 * details including a full code example.</p>
 *
 * @lucene.experimental
 */
public abstract class CachingCollector extends Collector {
  
  // Max out at 512K arrays
  private static final int MAX_ARRAY_SIZE = 512 * 1024;
  private static final int INITIAL_ARRAY_SIZE = 128;
  private final static int[] EMPTY_INT_ARRAY = new int[0];

  private static class SegStart {
    public final IndexReader reader;
    public final int base;
    public final int end;

    public SegStart(IndexReader reader, int base, int end) {
      this.reader = reader;
      this.base = base;
      this.end = end;
    }
  }
  
  private static final class CachedScorer extends Scorer {

    // NOTE: these members are package-private b/c that way accessing them from
    // the outer class does not incur access check by the JVM. The same
    // situation would be if they were defined in the outer class as private
    // members.
    int doc;
    float score;
    
    private CachedScorer() { super((Weight) null); }

    @Override
    public final float score() { return score; }

    @Override
    public final int advance(int target) { throw new UnsupportedOperationException(); }

    @Override
    public final int docID() { return doc; }

    @Override
    public final float freq() { throw new UnsupportedOperationException(); }

    @Override
    public final int nextDoc() { throw new UnsupportedOperationException(); }
  }

  // A CachingCollector which caches scores
  private static final class ScoreCachingCollector extends CachingCollector {

    private final CachedScorer cachedScorer;
    private final List<float[]> cachedScores;

    private Scorer scorer;
    private float[] curScores;

    ScoreCachingCollector(Collector other, double maxRAMMB) {
      super(other, maxRAMMB, true);

      cachedScorer = new CachedScorer();
      cachedScores = new ArrayList<float[]>();
      curScores = new float[128];
      cachedScores.add(curScores);
    }

    ScoreCachingCollector(Collector other, int maxDocsToCache) {
      super(other, maxDocsToCache);

      cachedScorer = new CachedScorer();
      cachedScores = new ArrayList<float[]>();
      curScores = new float[INITIAL_ARRAY_SIZE];
      cachedScores.add(curScores);
    }

    @Override
    public void collect(int doc) throws IOException {

      if (curDocs == null) {
        // Cache was too large
        cachedScorer.score = scorer.score();
        cachedScorer.doc = doc;
        other.collect(doc);
        return;
      }

      // Allocate a bigger array or abort caching
      if (upto == curDocs.length) {
        base += upto;
        
        // Compute next array length - don't allocate too big arrays
        int nextLength = 8*curDocs.length;
        if (nextLength > MAX_ARRAY_SIZE) {
          nextLength = MAX_ARRAY_SIZE;
        }

        if (base + nextLength > maxDocsToCache) {
          // try to allocate a smaller array
          nextLength = maxDocsToCache - base;
          if (nextLength <= 0) {
            // Too many docs to collect -- clear cache
            curDocs = null;
            curScores = null;
            cachedSegs.clear();
            cachedDocs.clear();
            cachedScores.clear();
            cachedScorer.score = scorer.score();
            cachedScorer.doc = doc;
            other.collect(doc);
            return;
          }
        }
        
        curDocs = new int[nextLength];
        cachedDocs.add(curDocs);
        curScores = new float[nextLength];
        cachedScores.add(curScores);
        upto = 0;
      }
      
      curDocs[upto] = doc;
      cachedScorer.score = curScores[upto] = scorer.score();
      upto++;
      cachedScorer.doc = doc;
      other.collect(doc);
    }

    @Override
    public void replay(Collector other) throws IOException {
      replayInit(other);
      
      int curUpto = 0;
      int curBase = 0;
      int chunkUpto = 0;
      curDocs = EMPTY_INT_ARRAY;
      for (SegStart seg : cachedSegs) {
        other.setNextReader(seg.reader, seg.base);
        other.setScorer(cachedScorer);
        while (curBase + curUpto < seg.end) {
          if (curUpto == curDocs.length) {
            curBase += curDocs.length;
            curDocs = cachedDocs.get(chunkUpto);
            curScores = cachedScores.get(chunkUpto);
            chunkUpto++;
            curUpto = 0;
          }
          cachedScorer.score = curScores[curUpto];
          cachedScorer.doc = curDocs[curUpto];
          other.collect(curDocs[curUpto++]);
        }
      }
    }

    @Override
    public void setScorer(Scorer scorer) throws IOException {
      this.scorer = scorer;
      other.setScorer(cachedScorer);
    }

    @Override
    public String toString() {
      if (isCached()) {
        return "CachingCollector (" + (base+upto) + " docs & scores cached)";
      } else {
        return "CachingCollector (cache was cleared)";
      }
    }

  }

  // A CachingCollector which does not cache scores
  private static final class NoScoreCachingCollector extends CachingCollector {
    
    NoScoreCachingCollector(Collector other, double maxRAMMB) {
     super(other, maxRAMMB, false);
    }

    NoScoreCachingCollector(Collector other, int maxDocsToCache) {
     super(other, maxDocsToCache);
    }
    
    @Override
    public void collect(int doc) throws IOException {

      if (curDocs == null) {
        // Cache was too large
        other.collect(doc);
        return;
      }

      // Allocate a bigger array or abort caching
      if (upto == curDocs.length) {
        base += upto;
        
        // Compute next array length - don't allocate too big arrays
        int nextLength = 8*curDocs.length;
        if (nextLength > MAX_ARRAY_SIZE) {
          nextLength = MAX_ARRAY_SIZE;
        }

        if (base + nextLength > maxDocsToCache) {
          // try to allocate a smaller array
          nextLength = maxDocsToCache - base;
          if (nextLength <= 0) {
            // Too many docs to collect -- clear cache
            curDocs = null;
            cachedSegs.clear();
            cachedDocs.clear();
            other.collect(doc);
            return;
          }
        }
        
        curDocs = new int[nextLength];
        cachedDocs.add(curDocs);
        upto = 0;
      }
      
      curDocs[upto] = doc;
      upto++;
      other.collect(doc);
    }

    @Override
    public void replay(Collector other) throws IOException {
      replayInit(other);
      
      int curUpto = 0;
      int curbase = 0;
      int chunkUpto = 0;
      curDocs = EMPTY_INT_ARRAY;
      for (SegStart seg : cachedSegs) {
        other.setNextReader(seg.reader, seg.base);
        while (curbase + curUpto < seg.end) {
          if (curUpto == curDocs.length) {
            curbase += curDocs.length;
            curDocs = cachedDocs.get(chunkUpto);
            chunkUpto++;
            curUpto = 0;
          }
          other.collect(curDocs[curUpto++]);
        }
      }
    }

    @Override
    public void setScorer(Scorer scorer) throws IOException {
      other.setScorer(scorer);
    }

    @Override
    public String toString() {
      if (isCached()) {
        return "CachingCollector (" + (base+upto) + " docs cached)";
      } else {
        return "CachingCollector (cache was cleared)";
      }
    }

  }

  // TODO: would be nice if a collector defined a
  // needsScores() method so we can specialize / do checks
  // up front. This is only relevant for the ScoreCaching 
  // version -- if the wrapped Collector does not need 
  // scores, it can avoid cachedScorer entirely.
  protected final Collector other;

  protected final int maxDocsToCache;
  protected final List<SegStart> cachedSegs = new ArrayList<SegStart>();
  protected final List<int[]> cachedDocs;

  private IndexReader lastReader;
  
  protected int[] curDocs;
  protected int upto;
  protected int base;
  protected int lastDocBase;

  /**
   * Creates a {@link CachingCollector} which does not wrap another collector.
   * The cached documents and scores can later be {@link #replay(Collector)
   * replayed}.
   * 
   * @param acceptDocsOutOfOrder
   *          whether documents are allowed to be collected out-of-order
   */
  public static CachingCollector create(final boolean acceptDocsOutOfOrder, boolean cacheScores, double maxRAMMB) {
    Collector other = new Collector() {
      @Override
      public boolean acceptsDocsOutOfOrder() {
        return acceptDocsOutOfOrder;
      }
      
      @Override
      public void setScorer(Scorer scorer) throws IOException {}

      @Override
      public void collect(int doc) throws IOException {}

      @Override
      public void setNextReader(IndexReader reader, int docBase) throws IOException {}

    };
    return create(other, cacheScores, maxRAMMB);
  }

  /**
   * Create a new {@link CachingCollector} that wraps the given collector and
   * caches documents and scores up to the specified RAM threshold.
   * 
   * @param other
   *          the Collector to wrap and delegate calls to.
   * @param cacheScores
   *          whether to cache scores in addition to document IDs. Note that
   *          this increases the RAM consumed per doc
   * @param maxRAMMB
   *          the maximum RAM in MB to consume for caching the documents and
   *          scores. If the collector exceeds the threshold, no documents and
   *          scores are cached.
   */
  public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) {
    return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
  }

  /**
   * Create a new {@link CachingCollector} that wraps the given collector and
   * caches documents and scores up to the specified max docs threshold.
   *
   * @param other
   *          the Collector to wrap and delegate calls to.
   * @param cacheScores
   *          whether to cache scores in addition to document IDs. Note that
   *          this increases the RAM consumed per doc
   * @param maxDocsToCache
   *          the maximum number of documents for caching the documents and
   *          possible the scores. If the collector exceeds the threshold,
   *          no documents and scores are cached.
   */
  public static CachingCollector create(Collector other, boolean cacheScores, int maxDocsToCache) {
    return cacheScores ? new ScoreCachingCollector(other, maxDocsToCache) : new NoScoreCachingCollector(other, maxDocsToCache);
  }
  
  // Prevent extension from non-internal classes
  private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) {
    this.other = other;

    cachedDocs = new ArrayList<int[]>();
    curDocs = new int[INITIAL_ARRAY_SIZE];
    cachedDocs.add(curDocs);

    int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
    if (cacheScores) {
      bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT;
    }
    maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
  }

  private CachingCollector(Collector other, int maxDocsToCache) {
    this.other = other;

    cachedDocs = new ArrayList<int[]>();
    curDocs = new int[INITIAL_ARRAY_SIZE];
    cachedDocs.add(curDocs);
    this.maxDocsToCache = maxDocsToCache;
  }

  @Override
  public boolean acceptsDocsOutOfOrder() {
    return other.acceptsDocsOutOfOrder();
  }

  public boolean isCached() {
    return curDocs != null;
  }

  @Override  
  public void setNextReader(IndexReader reader, int docBase) throws IOException {
    other.setNextReader(reader, docBase);
    if (lastReader != null) {
      cachedSegs.add(new SegStart(lastReader, lastDocBase, base + upto));
    }
    lastDocBase = docBase;
    lastReader = reader;
  }

  /** Reused by the specialized inner classes. */
  void replayInit(Collector other) {
    if (!isCached()) {
      throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
    }
    
    if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) {
      throw new IllegalArgumentException(
          "cannot replay: given collector does not support "
              + "out-of-order collection, while the wrapped collector does. "
              + "Therefore cached documents may be out-of-order.");
    }
    
    //System.out.println("CC: replay totHits=" + (upto + base));
    if (lastReader != null) {
      cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto));
      lastReader = null;
    }
  }

  /**
   * Replays the cached doc IDs (and scores) to the given Collector. If this
   * instance does not cache scores, then Scorer is not set on
   * {@code other.setScorer} as well as scores are not replayed.
   * 
   * @throws IllegalStateException
   *           if this collector is not cached (i.e., if the RAM limits were too
   *           low for the number of documents + scores to cache).
   * @throws IllegalArgumentException
   *           if the given Collect's does not support out-of-order collection,
   *           while the collector passed to the ctor does.
   */
  public abstract void replay(Collector other) throws IOException;
  
}
