pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / ConcurrentMergeScheduler.java
diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
new file mode 100644 (file)
index 0000000..bbb1f93
--- /dev/null
@@ -0,0 +1,582 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util.CollectionUtil;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+/** A {@link MergeScheduler} that runs each merge using a
+ *  separate thread.
+ *
+ *  <p>Specify the max number of threads that may run at
+ *  once with {@link #setMaxThreadCount}.</p>
+ *
+ *  <p>Separately specify the maximum number of simultaneous
+ *  merges with {@link #setMaxMergeCount}.  If the number of
+ *  merges exceeds the max number of threads then the
+ *  largest merges are paused until one of the smaller
+ *  merges completes.</p>
+ *
+ *  <p>If more than {@link #getMaxMergeCount} merges are
+ *  requested then this class will forcefully throttle the
+ *  incoming threads by pausing until one more more merges
+ *  complete.</p>
+ */ 
+public class ConcurrentMergeScheduler extends MergeScheduler {
+
+  private int mergeThreadPriority = -1;
+
+  protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
+
+  // Max number of merge threads allowed to be running at
+  // once.  When there are more merges then this, we
+  // forcefully pause the larger ones, letting the smaller
+  // ones run, up until maxMergeCount merges at which point
+  // we forcefully pause incoming threads (that presumably
+  // are the ones causing so much merging).  We dynamically
+  // default this from 1 to 3, depending on how many cores
+  // you have:
+  private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
+
+  // Max number of merges we accept before forcefully
+  // throttling the incoming threads
+  private int maxMergeCount = maxThreadCount+2;
+
+  protected Directory dir;
+
+  private volatile boolean closed;
+  protected IndexWriter writer;
+  protected int mergeThreadCount;
+
+  public ConcurrentMergeScheduler() {
+    if (allInstances != null) {
+      // Only for testing
+      addMyself();
+    }
+  }
+
+  /** Sets the max # simultaneous merge threads that should
+   *  be running at once.  This must be <= {@link
+   *  #setMaxMergeCount}. */
+  public void setMaxThreadCount(int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException("count should be at least 1");
+    }
+    if (count > maxMergeCount) {
+      throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
+    }
+    maxThreadCount = count;
+  }
+
+  /** @see #setMaxThreadCount(int) */
+  public int getMaxThreadCount() {
+    return maxThreadCount;
+  }
+
+  /** Sets the max # simultaneous merges that are allowed.
+   *  If a merge is necessary yet we already have this many
+   *  threads running, the incoming thread (that is calling
+   *  add/updateDocument) will block until a merge thread
+   *  has completed.  Note that we will only run the
+   *  smallest {@link #setMaxThreadCount} merges at a time. */
+  public void setMaxMergeCount(int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException("count should be at least 1");
+    }
+    if (count < maxThreadCount) {
+      throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
+    }
+    maxMergeCount = count;
+  }
+
+  /** See {@link #setMaxMergeCount}. */
+  public int getMaxMergeCount() {
+    return maxMergeCount;
+  }
+
+  /** Return the priority that merge threads run at.  By
+   *  default the priority is 1 plus the priority of (ie,
+   *  slightly higher priority than) the first thread that
+   *  calls merge. */
+  public synchronized int getMergeThreadPriority() {
+    initMergeThreadPriority();
+    return mergeThreadPriority;
+  }
+
+  /** Set the base priority that merge threads run at.
+   *  Note that CMS may increase priority of some merge
+   *  threads beyond this base priority.  It's best not to
+   *  set this any higher than
+   *  Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
+   *  room to set relative priority among threads.  */
+  public synchronized void setMergeThreadPriority(int pri) {
+    if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
+      throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
+    mergeThreadPriority = pri;
+    updateMergeThreads();
+  }
+
+  // Larger merges come first
+  protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
+    public int compare(MergeThread t1, MergeThread t2) {
+      final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
+      final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
+      
+      final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
+      final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
+
+      return c2 - c1;
+    }
+  };
+
+  /**
+   * Called whenever the running merges have changed, to pause & unpause
+   * threads. This method sorts the merge threads by their merge size in
+   * descending order and then pauses/unpauses threads from first to last --
+   * that way, smaller merges are guaranteed to run before larger ones.
+   */
+  protected synchronized void updateMergeThreads() {
+
+    // Only look at threads that are alive & not in the
+    // process of stopping (ie have an active merge):
+    final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
+
+    int threadIdx = 0;
+    while (threadIdx < mergeThreads.size()) {
+      final MergeThread mergeThread = mergeThreads.get(threadIdx);
+      if (!mergeThread.isAlive()) {
+        // Prune any dead threads
+        mergeThreads.remove(threadIdx);
+        continue;
+      }
+      if (mergeThread.getCurrentMerge() != null) {
+        activeMerges.add(mergeThread);
+      }
+      threadIdx++;
+    }
+
+    // Sort the merge threads in descending order.
+    CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
+    
+    int pri = mergeThreadPriority;
+    final int activeMergeCount = activeMerges.size();
+    for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+      final MergeThread mergeThread = activeMerges.get(threadIdx);
+      final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
+      if (merge == null) { 
+        continue;
+      }
+
+      // pause the thread if maxThreadCount is smaller than the number of merge threads.
+      final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
+
+      if (verbose()) {
+        if (doPause != merge.getPause()) {
+          if (doPause) {
+            message("pause thread " + mergeThread.getName());
+          } else {
+            message("unpause thread " + mergeThread.getName());
+          }
+        }
+      }
+      if (doPause != merge.getPause()) {
+        merge.setPause(doPause);
+      }
+
+      if (!doPause) {
+        if (verbose()) {
+          message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
+        }
+        mergeThread.setThreadPriority(pri);
+        pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
+      }
+    }
+  }
+
+  /**
+   * Returns true if verbosing is enabled. This method is usually used in
+   * conjunction with {@link #message(String)}, like that:
+   * 
+   * <pre>
+   * if (verbose()) {
+   *   message(&quot;your message&quot;);
+   * }
+   * </pre>
+   */
+  protected boolean verbose() {
+    return writer != null && writer.verbose();
+  }
+  
+  /**
+   * Outputs the given message - this method assumes {@link #verbose()} was
+   * called and returned true.
+   */
+  protected void message(String message) {
+    writer.message("CMS: " + message);
+  }
+
+  private synchronized void initMergeThreadPriority() {
+    if (mergeThreadPriority == -1) {
+      // Default to slightly higher priority than our
+      // calling thread
+      mergeThreadPriority = 1+Thread.currentThread().getPriority();
+      if (mergeThreadPriority > Thread.MAX_PRIORITY)
+        mergeThreadPriority = Thread.MAX_PRIORITY;
+    }
+  }
+
+  @Override
+  public void close() {
+    closed = true;
+    sync();
+  }
+
+  /** Wait for any running merge threads to finish */
+  public void sync() {
+    while (true) {
+      MergeThread toSync = null;
+      synchronized (this) {
+        for (MergeThread t : mergeThreads) {
+          if (t.isAlive()) {
+            toSync = t;
+            break;
+          }
+        }
+      }
+      if (toSync != null) {
+        try {
+          toSync.join();
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Returns the number of merge threads that are alive. Note that this number
+   * is &le; {@link #mergeThreads} size.
+   */
+  protected synchronized int mergeThreadCount() {
+    int count = 0;
+    for (MergeThread mt : mergeThreads) {
+      if (mt.isAlive() && mt.getCurrentMerge() != null) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public void merge(IndexWriter writer) throws IOException {
+
+    assert !Thread.holdsLock(writer);
+
+    this.writer = writer;
+
+    initMergeThreadPriority();
+
+    dir = writer.getDirectory();
+
+    // First, quickly run through the newly proposed merges
+    // and add any orthogonal merges (ie a merge not
+    // involving segments already pending to be merged) to
+    // the queue.  If we are way behind on merging, many of
+    // these newly proposed merges will likely already be
+    // registered.
+
+    if (verbose()) {
+      message("now merge");
+      message("  index: " + writer.segString());
+    }
+    
+    // Iterate, pulling from the IndexWriter's queue of
+    // pending merges, until it's empty:
+    while (true) {
+
+      synchronized(this) {
+        long startStallTime = 0;
+        while (mergeThreadCount() >= 1+maxMergeCount) {
+          startStallTime = System.currentTimeMillis();
+          if (verbose()) {
+            message("    too many merges; stalling...");
+          }
+          try {
+            wait();
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+        }
+
+        if (verbose()) {
+          if (startStallTime != 0) {
+            message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+          }
+        }
+      }
+
+
+      // TODO: we could be careful about which merges to do in
+      // the BG (eg maybe the "biggest" ones) vs FG, which
+      // merges to do first (the easiest ones?), etc.
+      MergePolicy.OneMerge merge = writer.getNextMerge();
+      if (merge == null) {
+        if (verbose())
+          message("  no more merges pending; now return");
+        return;
+      }
+
+      // We do this w/ the primary thread to keep
+      // deterministic assignment of segment names
+      writer.mergeInit(merge);
+
+      boolean success = false;
+      try {
+        synchronized(this) {
+          message("  consider merge " + merge.segString(dir));
+
+          // OK to spawn a new merge thread to handle this
+          // merge:
+          final MergeThread merger = getMergeThread(writer, merge);
+          mergeThreads.add(merger);
+          if (verbose()) {
+            message("    launch new thread [" + merger.getName() + "]");
+          }
+
+          merger.start();
+
+          // Must call this after starting the thread else
+          // the new thread is removed from mergeThreads
+          // (since it's not alive yet):
+          updateMergeThreads();
+
+          success = true;
+        }
+      } finally {
+        if (!success) {
+          writer.mergeFinish(merge);
+        }
+      }
+    }
+  }
+
+  /** Does the actual merge, by calling {@link IndexWriter#merge} */
+  protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+    writer.merge(merge);
+  }
+
+  /** Create and return a new MergeThread */
+  protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
+    final MergeThread thread = new MergeThread(writer, merge);
+    thread.setThreadPriority(mergeThreadPriority);
+    thread.setDaemon(true);
+    thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
+    return thread;
+  }
+
+  protected class MergeThread extends Thread {
+
+    IndexWriter tWriter;
+    MergePolicy.OneMerge startMerge;
+    MergePolicy.OneMerge runningMerge;
+    private volatile boolean done;
+
+    public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
+      this.tWriter = writer;
+      this.startMerge = startMerge;
+    }
+
+    public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
+      runningMerge = merge;
+    }
+
+    public synchronized MergePolicy.OneMerge getRunningMerge() {
+      return runningMerge;
+    }
+
+    public synchronized MergePolicy.OneMerge getCurrentMerge() {
+      if (done) {
+        return null;
+      } else if (runningMerge != null) {
+        return runningMerge;
+      } else {
+        return startMerge;
+      }
+    }
+
+    public void setThreadPriority(int pri) {
+      try {
+        setPriority(pri);
+      } catch (NullPointerException npe) {
+        // Strangely, Sun's JDK 1.5 on Linux sometimes
+        // throws NPE out of here...
+      } catch (SecurityException se) {
+        // Ignore this because we will still run fine with
+        // normal thread priority
+      }
+    }
+
+    @Override
+    public void run() {
+      
+      // First time through the while loop we do the merge
+      // that we were started with:
+      MergePolicy.OneMerge merge = this.startMerge;
+      
+      try {
+
+        if (verbose())
+          message("  merge thread: start");
+
+        while(true) {
+          setRunningMerge(merge);
+          doMerge(merge);
+
+          // Subsequent times through the loop we do any new
+          // merge that writer says is necessary:
+          merge = tWriter.getNextMerge();
+          if (merge != null) {
+            tWriter.mergeInit(merge);
+            updateMergeThreads();
+            if (verbose())
+              message("  merge thread: do another merge " + merge.segString(dir));
+          } else {
+            break;
+          }
+        }
+
+        if (verbose())
+          message("  merge thread: done");
+
+      } catch (Throwable exc) {
+
+        // Ignore the exception if it was due to abort:
+        if (!(exc instanceof MergePolicy.MergeAbortedException)) {
+          if (!suppressExceptions) {
+            // suppressExceptions is normally only set during
+            // testing.
+            anyExceptions = true;
+            handleMergeException(exc);
+          }
+        }
+      } finally {
+        done = true;
+        synchronized(ConcurrentMergeScheduler.this) {
+          updateMergeThreads();
+          ConcurrentMergeScheduler.this.notifyAll();
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      MergePolicy.OneMerge merge = getRunningMerge();
+      if (merge == null)
+        merge = startMerge;
+      return "merge thread: " + merge.segString(dir);
+    }
+  }
+
+  /** Called when an exception is hit in a background merge
+   *  thread */
+  protected void handleMergeException(Throwable exc) {
+    try {
+      // When an exception is hit during merge, IndexWriter
+      // removes any partial files and then allows another
+      // merge to run.  If whatever caused the error is not
+      // transient then the exception will keep happening,
+      // so, we sleep here to avoid saturating CPU in such
+      // cases:
+      Thread.sleep(250);
+    } catch (InterruptedException ie) {
+      throw new ThreadInterruptedException(ie);
+    }
+    throw new MergePolicy.MergeException(exc, dir);
+  }
+
+  static boolean anyExceptions = false;
+
+  /** Used for testing */
+  public static boolean anyUnhandledExceptions() {
+    if (allInstances == null) {
+      throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
+    }
+    synchronized(allInstances) {
+      final int count = allInstances.size();
+      // Make sure all outstanding threads are done so we see
+      // any exceptions they may produce:
+      for(int i=0;i<count;i++)
+        allInstances.get(i).sync();
+      boolean v = anyExceptions;
+      anyExceptions = false;
+      return v;
+    }
+  }
+
+  public static void clearUnhandledExceptions() {
+    synchronized(allInstances) {
+      anyExceptions = false;
+    }
+  }
+
+  /** Used for testing */
+  private void addMyself() {
+    synchronized(allInstances) {
+      final int size = allInstances.size();
+      int upto = 0;
+      for(int i=0;i<size;i++) {
+        final ConcurrentMergeScheduler other = allInstances.get(i);
+        if (!(other.closed && 0 == other.mergeThreadCount()))
+          // Keep this one for now: it still has threads or
+          // may spawn new threads
+          allInstances.set(upto++, other);
+      }
+      allInstances.subList(upto, allInstances.size()).clear();
+      allInstances.add(this);
+    }
+  }
+
+  private boolean suppressExceptions;
+
+  /** Used for testing */
+  void setSuppressExceptions() {
+    suppressExceptions = true;
+  }
+
+  /** Used for testing */
+  void clearSuppressExceptions() {
+    suppressExceptions = false;
+  }
+
+  /** Used for testing */
+  private static List<ConcurrentMergeScheduler> allInstances;
+  
+  /** @deprecated this test mode code will be removed in a future release */
+  @Deprecated
+  public static void setTestMode() {
+    allInstances = new ArrayList<ConcurrentMergeScheduler>();
+  }
+}