package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Map;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;

/**
 *  Merges segments of approximately equal size, subject to
 *  an allowed number of segments per tier.  This is similar
 *  to {@link LogByteSizeMergePolicy}, except this merge
 *  policy is able to merge non-adjacent segment, and
 *  separates how many segments are merged at once ({@link
 *  #setMaxMergeAtOnce}) from how many segments are allowed
 *  per tier ({@link #setSegmentsPerTier}).  This merge
 *  policy also does not over-merge (ie, cascade merges). 
 *
 *  <p>For normal merging, this policy first computes a
 *  "budget" of how many segments are allowed by be in the
 *  index.  If the index is over-budget, then the policy
 *  sorts segments by decresing size (pro-rating by percent
 *  deletes), and then finds the least-cost merge.  Merge
 *  cost is measured by a combination of the "skew" of the
 *  merge (size of largest seg divided by smallest seg),
 *  total merge size and pct deletes reclaimed,
 *  so that merges with lower skew, smaller size
 *  and those reclaiming more deletes, are
 *  favored.
 *
 *  <p>If a merge will produce a segment that's larger than
 *  {@link #setMaxMergedSegmentMB}, then the policy will
 *  merge fewer segments (down to 1 at once, if that one has
 *  deletions) to keep the segment size under budget.
 *      
 *  <p<b>NOTE</b>: this policy freely merges non-adjacent
 *  segments; if this is a problem, use {@link
 *  LogMergePolicy}.
 *
 *  <p><b>NOTE</b>: This policy always merges by byte size
 *  of the segments, always pro-rates by percent deletes,
 *  and does not apply any maximum segment size during
 *  forceMerge (unlike {@link LogByteSizeMergePolicy}).
 *
 *  @lucene.experimental
 */

// TODO
//   - we could try to take into account whether a large
//     merge is already running (under CMS) and then bias
//     ourselves towards picking smaller merges if so (or,
//     maybe CMS should do so)

public class TieredMergePolicy extends MergePolicy {

  private int maxMergeAtOnce = 10;
  private long maxMergedSegmentBytes = 5*1024*1024*1024L;
  private int maxMergeAtOnceExplicit = 30;

  private long floorSegmentBytes = 2*1024*1024L;
  private double segsPerTier = 10.0;
  private double forceMergeDeletesPctAllowed = 10.0;
  private boolean useCompoundFile = true;
  private double noCFSRatio = 0.1;
  private double reclaimDeletesWeight = 2.0;

  /** Maximum number of segments to be merged at a time
   *  during "normal" merging.  For explicit merging (eg,
   *  forceMerge or forceMergeDeletes was called), see {@link
   *  #setMaxMergeAtOnceExplicit}.  Default is 10. */
  public TieredMergePolicy setMaxMergeAtOnce(int v) {
    if (v < 2) {
      throw new IllegalArgumentException("maxMergeAtOnce must be > 1 (got " + v + ")");
    }
    maxMergeAtOnce = v;
    return this;
  }

  /** @see #setMaxMergeAtOnce */
  public int getMaxMergeAtOnce() {
    return maxMergeAtOnce;
  }

  // TODO: should addIndexes do explicit merging, too?  And,
  // if user calls IW.maybeMerge "explicitly"

  /** Maximum number of segments to be merged at a time,
   *  during forceMerge or forceMergeDeletes. Default is 30. */
  public TieredMergePolicy setMaxMergeAtOnceExplicit(int v) {
    if (v < 2) {
      throw new IllegalArgumentException("maxMergeAtOnceExplicit must be > 1 (got " + v + ")");
    }
    maxMergeAtOnceExplicit = v;
    return this;
  }

  /** @see #setMaxMergeAtOnceExplicit */
  public int getMaxMergeAtOnceExplicit() {
    return maxMergeAtOnceExplicit;
  }

  /** Maximum sized segment to produce during
   *  normal merging.  This setting is approximate: the
   *  estimate of the merged segment size is made by summing
   *  sizes of to-be-merged segments (compensating for
   *  percent deleted docs).  Default is 5 GB. */
  public TieredMergePolicy setMaxMergedSegmentMB(double v) {
    maxMergedSegmentBytes = (long) (v*1024*1024);
    return this;
  }

  /** @see #getMaxMergedSegmentMB */
  public double getMaxMergedSegmentMB() {
    return maxMergedSegmentBytes/1024/1024.;
  }

  /** Controls how aggressively merges that reclaim more
   *  deletions are favored.  Higher values favor selecting
   *  merges that reclaim deletions.  A value of 0.0 means
   *  deletions don't impact merge selection. */
  public TieredMergePolicy setReclaimDeletesWeight(double v) {
    if (v < 0.0) {
      throw new IllegalArgumentException("reclaimDeletesWeight must be >= 0.0 (got " + v + ")");
    }
    reclaimDeletesWeight = v;
    return this;
  }

  /** See {@link #setReclaimDeletesWeight}. */
  public double getReclaimDeletesWeight() {
    return reclaimDeletesWeight;
  }

  /** Segments smaller than this are "rounded up" to this
   *  size, ie treated as equal (floor) size for merge
   *  selection.  This is to prevent frequent flushing of
   *  tiny segments from allowing a long tail in the index.
   *  Default is 2 MB. */
  public TieredMergePolicy setFloorSegmentMB(double v) {
    if (v <= 0.0) {
      throw new IllegalArgumentException("floorSegmentMB must be >= 0.0 (got " + v + ")");
    }
    floorSegmentBytes = (long) (v*1024*1024);
    return this;
  }

  /** @see #setFloorSegmentMB */
  public double getFloorSegmentMB() {
    return floorSegmentBytes/1024*1024.;
  }

  /** When forceMergeDeletes is called, we only merge away a
   *  segment if its delete percentage is over this
   *  threshold.  Default is 10%. */ 
  public TieredMergePolicy setForceMergeDeletesPctAllowed(double v) {
    if (v < 0.0 || v > 100.0) {
      throw new IllegalArgumentException("forceMergeDeletesPctAllowed must be between 0.0 and 100.0 inclusive (got " + v + ")");
    }
    forceMergeDeletesPctAllowed = v;
    return this;
  }

  /** @see #setForceMergeDeletesPctAllowed */
  public double getForceMergeDeletesPctAllowed() {
    return forceMergeDeletesPctAllowed;
  }

  /** Sets the allowed number of segments per tier.  Smaller
   *  values mean more merging but fewer segments.
   *
   *  <p><b>NOTE</b>: this value should be >= the {@link
   *  #setMaxMergeAtOnce} otherwise you'll force too much
   *  merging to occur.</p>
   *
   *  <p>Default is 10.0.</p> */
  public TieredMergePolicy setSegmentsPerTier(double v) {
    if (v < 2.0) {
      throw new IllegalArgumentException("segmentsPerTier must be >= 2.0 (got " + v + ")");
    }
    segsPerTier = v;
    return this;
  }

  /** @see #setSegmentsPerTier */
  public double getSegmentsPerTier() {
    return segsPerTier;
  }

  /** Sets whether compound file format should be used for
   *  newly flushed and newly merged segments.  Default
   *  true. */
  public TieredMergePolicy setUseCompoundFile(boolean useCompoundFile) {
    this.useCompoundFile = useCompoundFile;
    return this;
  }

  /** @see  #setUseCompoundFile */
  public boolean getUseCompoundFile() {
    return useCompoundFile;
  }

  /** If a merged segment will be more than this percentage
   *  of the total size of the index, leave the segment as
   *  non-compound file even if compound file is enabled.
   *  Set to 1.0 to always use CFS regardless of merge
   *  size.  Default is 0.1. */
  public TieredMergePolicy setNoCFSRatio(double noCFSRatio) {
    if (noCFSRatio < 0.0 || noCFSRatio > 1.0) {
      throw new IllegalArgumentException("noCFSRatio must be 0.0 to 1.0 inclusive; got " + noCFSRatio);
    }
    this.noCFSRatio = noCFSRatio;
    return this;
  }
  
  /** @see #setNoCFSRatio */
  public double getNoCFSRatio() {
    return noCFSRatio;
  }

  private class SegmentByteSizeDescending implements Comparator<SegmentInfo> {
    public int compare(SegmentInfo o1, SegmentInfo o2) {
      try {
        final long sz1 = size(o1);
        final long sz2 = size(o2);
        if (sz1 > sz2) {
          return -1;
        } else if (sz2 > sz1) {
          return 1;
        } else {
          return o1.name.compareTo(o2.name);
        }
      } catch (IOException ioe) {
        throw new RuntimeException(ioe);
      }
    }
  }

  private final Comparator<SegmentInfo> segmentByteSizeDescending = new SegmentByteSizeDescending();

  protected static abstract class MergeScore {
    abstract double getScore();
    abstract String getExplanation();
  }

  @Override
  public MergeSpecification findMerges(SegmentInfos infos) throws IOException {
    if (verbose()) {
      message("findMerges: " + infos.size() + " segments");
    }
    if (infos.size() == 0) {
      return null;
    }
    final Collection<SegmentInfo> merging = writer.get().getMergingSegments();
    final Collection<SegmentInfo> toBeMerged = new HashSet<SegmentInfo>();

    final List<SegmentInfo> infosSorted = new ArrayList<SegmentInfo>(infos.asList());
    Collections.sort(infosSorted, segmentByteSizeDescending);

    // Compute total index bytes & print details about the index
    long totIndexBytes = 0;
    long minSegmentBytes = Long.MAX_VALUE;
    for(SegmentInfo info : infosSorted) {
      final long segBytes = size(info);
      if (verbose()) {
        String extra = merging.contains(info) ? " [merging]" : "";
        if (segBytes >= maxMergedSegmentBytes/2.0) {
          extra += " [skip: too large]";
        } else if (segBytes < floorSegmentBytes) {
          extra += " [floored]";
        }
        message("  seg=" + writer.get().segString(info) + " size=" + String.format("%.3f", segBytes/1024/1024.) + " MB" + extra);
      }

      minSegmentBytes = Math.min(segBytes, minSegmentBytes);
      // Accum total byte size
      totIndexBytes += segBytes;
    }

    // If we have too-large segments, grace them out
    // of the maxSegmentCount:
    int tooBigCount = 0;
    while (tooBigCount < infosSorted.size() && size(infosSorted.get(tooBigCount)) >= maxMergedSegmentBytes/2.0) {
      totIndexBytes -= size(infosSorted.get(tooBigCount));
      tooBigCount++;
    }

    minSegmentBytes = floorSize(minSegmentBytes);

    // Compute max allowed segs in the index
    long levelSize = minSegmentBytes;
    long bytesLeft = totIndexBytes;
    double allowedSegCount = 0;
    while(true) {
      final double segCountLevel = bytesLeft / (double) levelSize;
      if (segCountLevel < segsPerTier) {
        allowedSegCount += Math.ceil(segCountLevel);
        break;
      }
      allowedSegCount += segsPerTier;
      bytesLeft -= segsPerTier * levelSize;
      levelSize *= maxMergeAtOnce;
    }
    int allowedSegCountInt = (int) allowedSegCount;

    MergeSpecification spec = null;

    // Cycle to possibly select more than one merge:
    while(true) {

      long mergingBytes = 0;

      // Gather eligible segments for merging, ie segments
      // not already being merged and not already picked (by
      // prior iteration of this loop) for merging:
      final List<SegmentInfo> eligible = new ArrayList<SegmentInfo>();
      for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
        final SegmentInfo info = infosSorted.get(idx);
        if (merging.contains(info)) {
          mergingBytes += info.sizeInBytes(true);
        } else if (!toBeMerged.contains(info)) {
          eligible.add(info);
        }
      }

      final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;

      message("  allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount);

      if (eligible.size() == 0) {
        return spec;
      }

      if (eligible.size() >= allowedSegCountInt) {

        // OK we are over budget -- find best merge!
        MergeScore bestScore = null;
        List<SegmentInfo> best = null;
        boolean bestTooLarge = false;
        long bestMergeBytes = 0;

        // Consider all merge starts:
        for(int startIdx = 0;startIdx <= eligible.size()-maxMergeAtOnce; startIdx++) {

          long totAfterMergeBytes = 0;

          final List<SegmentInfo> candidate = new ArrayList<SegmentInfo>();
          boolean hitTooLarge = false;
          for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
            final SegmentInfo info = eligible.get(idx);
            final long segBytes = size(info);

            if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
              hitTooLarge = true;
              // NOTE: we continue, so that we can try
              // "packing" smaller segments into this merge
              // to see if we can get closer to the max
              // size; this in general is not perfect since
              // this is really "bin packing" and we'd have
              // to try different permutations.
              continue;
            }
            candidate.add(info);
            totAfterMergeBytes += segBytes;
          }

          final MergeScore score = score(candidate, hitTooLarge, mergingBytes);
          message("  maybe=" + writer.get().segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format("%.3f MB", totAfterMergeBytes/1024./1024.));

          // If we are already running a max sized merge
          // (maxMergeIsRunning), don't allow another max
          // sized merge to kick off:
          if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
            best = candidate;
            bestScore = score;
            bestTooLarge = hitTooLarge;
            bestMergeBytes = totAfterMergeBytes;
          }
        }
        
        if (best != null) {
          if (spec == null) {
            spec = new MergeSpecification();
          }
          final OneMerge merge = new OneMerge(best);
          spec.add(merge);
          for(SegmentInfo info : merge.segments) {
            toBeMerged.add(info);
          }

          if (verbose()) {
            message("  add merge=" + writer.get().segString(merge.segments) + " size=" + String.format("%.3f MB", bestMergeBytes/1024./1024.) + " score=" + String.format("%.3f", bestScore.getScore()) + " " + bestScore.getExplanation() + (bestTooLarge ? " [max merge]" : ""));
          }
        } else {
          return spec;
        }
      } else {
        return spec;
      }
    }
  }

  /** Expert: scores one merge; subclasses can override. */
  protected MergeScore score(List<SegmentInfo> candidate, boolean hitTooLarge, long mergingBytes) throws IOException {
    long totBeforeMergeBytes = 0;
    long totAfterMergeBytes = 0;
    long totAfterMergeBytesFloored = 0;
    for(SegmentInfo info : candidate) {
      final long segBytes = size(info);
      totAfterMergeBytes += segBytes;
      totAfterMergeBytesFloored += floorSize(segBytes);
      totBeforeMergeBytes += info.sizeInBytes(true);
    }

    // Measure "skew" of the merge, which can range
    // from 1.0/numSegsBeingMerged (good) to 1.0
    // (poor):
    final double skew;
    if (hitTooLarge) {
      // Pretend the merge has perfect skew; skew doesn't
      // matter in this case because this merge will not
      // "cascade" and so it cannot lead to N^2 merge cost
      // over time:
      skew = 1.0/maxMergeAtOnce;
    } else {
      skew = ((double) floorSize(size(candidate.get(0))))/totAfterMergeBytesFloored;
    }

    // Strongly favor merges with less skew (smaller
    // mergeScore is better):
    double mergeScore = skew;

    // Gently favor smaller merges over bigger ones.  We
    // don't want to make this exponent too large else we
    // can end up doing poor merges of small segments in
    // order to avoid the large merges:
    mergeScore *= Math.pow(totAfterMergeBytes, 0.05);

    // Strongly favor merges that reclaim deletes:
    final double nonDelRatio = ((double) totAfterMergeBytes)/totBeforeMergeBytes;
    mergeScore *= Math.pow(nonDelRatio, reclaimDeletesWeight);

    final double finalMergeScore = mergeScore;

    return new MergeScore() {

      @Override
      public double getScore() {
        return finalMergeScore;
      }

      @Override
      public String getExplanation() {
        return "skew=" + String.format("%.3f", skew) + " nonDelRatio=" + String.format("%.3f", nonDelRatio);
      }
    };
  }

  @Override
  public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentInfo,Boolean> segmentsToMerge) throws IOException {
    if (verbose()) {
      message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + writer.get().segString(infos) + " segmentsToMerge=" + segmentsToMerge);
    }

    List<SegmentInfo> eligible = new ArrayList<SegmentInfo>();
    boolean forceMergeRunning = false;
    final Collection<SegmentInfo> merging = writer.get().getMergingSegments();
    boolean segmentIsOriginal = false;
    for(SegmentInfo info : infos) {
      final Boolean isOriginal = segmentsToMerge.get(info);
      if (isOriginal != null) {
        segmentIsOriginal = isOriginal;
        if (!merging.contains(info)) {
          eligible.add(info);
        } else {
          forceMergeRunning = true;
        }
      }
    }

    if (eligible.size() == 0) {
      return null;
    }

    if ((maxSegmentCount > 1 && eligible.size() <= maxSegmentCount) ||
        (maxSegmentCount == 1 && eligible.size() == 1 && (!segmentIsOriginal || isMerged(eligible.get(0))))) {
      if (verbose()) {
        message("already merged");
      }
      return null;
    }

    Collections.sort(eligible, segmentByteSizeDescending);

    if (verbose()) {
      message("eligible=" + eligible);
      message("forceMergeRunning=" + forceMergeRunning);
    }

    int end = eligible.size();
    
    MergeSpecification spec = null;

    // Do full merges, first, backwards:
    while(end >= maxMergeAtOnceExplicit + maxSegmentCount - 1) {
      if (spec == null) {
        spec = new MergeSpecification();
      }
      final OneMerge merge = new OneMerge(eligible.subList(end-maxMergeAtOnceExplicit, end));
      if (verbose()) {
        message("add merge=" + writer.get().segString(merge.segments));
      }
      spec.add(merge);
      end -= maxMergeAtOnceExplicit;
    }

    if (spec == null && !forceMergeRunning) {
      // Do final merge
      final int numToMerge = end - maxSegmentCount + 1;
      final OneMerge merge = new OneMerge(eligible.subList(end-numToMerge, end));
      if (verbose()) {
        message("add final merge=" + merge.segString(writer.get().getDirectory()));
      }
      spec = new MergeSpecification();
      spec.add(merge);
    }

    return spec;
  }

  @Override
  public MergeSpecification findForcedDeletesMerges(SegmentInfos infos)
      throws CorruptIndexException, IOException {
    if (verbose()) {
      message("findForcedDeletesMerges infos=" + writer.get().segString(infos) + " forceMergeDeletesPctAllowed=" + forceMergeDeletesPctAllowed);
    }
    final List<SegmentInfo> eligible = new ArrayList<SegmentInfo>();
    final Collection<SegmentInfo> merging = writer.get().getMergingSegments();
    for(SegmentInfo info : infos) {
      double pctDeletes = 100.*((double) writer.get().numDeletedDocs(info))/info.docCount;
      if (pctDeletes > forceMergeDeletesPctAllowed && !merging.contains(info)) {
        eligible.add(info);
      }
    }

    if (eligible.size() == 0) {
      return null;
    }

    Collections.sort(eligible, segmentByteSizeDescending);

    if (verbose()) {
      message("eligible=" + eligible);
    }

    int start = 0;
    MergeSpecification spec = null;

    while(start < eligible.size()) {
      // Don't enforce max merged size here: app is explicitly
      // calling forceMergeDeletes, and knows this may take a
      // long time / produce big segments (like forceMerge):
      final int end = Math.min(start + maxMergeAtOnceExplicit, eligible.size());
      if (spec == null) {
        spec = new MergeSpecification();
      }

      final OneMerge merge = new OneMerge(eligible.subList(start, end));
      if (verbose()) {
        message("add merge=" + writer.get().segString(merge.segments));
      }
      spec.add(merge);
      start = end;
    }

    return spec;
  }

  @Override
  public boolean useCompoundFile(SegmentInfos infos, SegmentInfo mergedInfo) throws IOException {
    final boolean doCFS;

    if (!useCompoundFile) {
      doCFS = false;
    } else if (noCFSRatio == 1.0) {
      doCFS = true;
    } else {
      long totalSize = 0;
      for (SegmentInfo info : infos)
        totalSize += size(info);

      doCFS = size(mergedInfo) <= noCFSRatio * totalSize;
    }
    return doCFS;
  }

  @Override
  public void close() {
  }

  private boolean isMerged(SegmentInfo info)
    throws IOException {
    IndexWriter w = writer.get();
    assert w != null;
    boolean hasDeletions = w.numDeletedDocs(info) > 0;
    return !hasDeletions &&
      !info.hasSeparateNorms() &&
      info.dir == w.getDirectory() &&
      (info.getUseCompoundFile() == useCompoundFile || noCFSRatio < 1.0);
  }

  // Segment size in bytes, pro-rated by % deleted
  private long size(SegmentInfo info) throws IOException {
    final long byteSize = info.sizeInBytes(true);    
    final int delCount = writer.get().numDeletedDocs(info);
    final double delRatio = (info.docCount <= 0 ? 0.0f : ((double)delCount / (double)info.docCount));    
    assert delRatio <= 1.0;
    return (long) (byteSize * (1.0-delRatio));
  }

  private long floorSize(long bytes) {
    return Math.max(floorSegmentBytes, bytes);
  }

  private boolean verbose() {
    IndexWriter w = writer.get();
    return w != null && w.verbose();
  }

  private void message(String message) {
    if (verbose()) {
      writer.get().message("TMP: " + message);
    }
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder("[" + getClass().getSimpleName() + ": ");
    sb.append("maxMergeAtOnce=").append(maxMergeAtOnce).append(", ");
    sb.append("maxMergeAtOnceExplicit=").append(maxMergeAtOnceExplicit).append(", ");
    sb.append("maxMergedSegmentMB=").append(maxMergedSegmentBytes/1024/1024.).append(", ");
    sb.append("floorSegmentMB=").append(floorSegmentBytes/1024/1024.).append(", ");
    sb.append("forceMergeDeletesPctAllowed=").append(forceMergeDeletesPctAllowed).append(", ");
    sb.append("segmentsPerTier=").append(segsPerTier).append(", ");
    sb.append("useCompoundFile=").append(useCompoundFile).append(", ");
    sb.append("noCFSRatio=").append(noCFSRatio);
    return sb.toString();
  }
}
