--- /dev/null
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util.CollectionUtil;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Comparator;
+
+/** A {@link MergeScheduler} that runs each merge using a
+ * separate thread.
+ *
+ * <p>Specify the max number of threads that may run at
+ * once with {@link #setMaxThreadCount}.</p>
+ *
+ * <p>Separately specify the maximum number of simultaneous
+ * merges with {@link #setMaxMergeCount}. If the number of
+ * merges exceeds the max number of threads then the
+ * largest merges are paused until one of the smaller
+ * merges completes.</p>
+ *
+ * <p>If more than {@link #getMaxMergeCount} merges are
+ * requested then this class will forcefully throttle the
+ * incoming threads by pausing until one more more merges
+ * complete.</p>
+ */
+public class ConcurrentMergeScheduler extends MergeScheduler {
+
+ private int mergeThreadPriority = -1;
+
+ protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
+
+ // Max number of merge threads allowed to be running at
+ // once. When there are more merges then this, we
+ // forcefully pause the larger ones, letting the smaller
+ // ones run, up until maxMergeCount merges at which point
+ // we forcefully pause incoming threads (that presumably
+ // are the ones causing so much merging). We dynamically
+ // default this from 1 to 3, depending on how many cores
+ // you have:
+ private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
+
+ // Max number of merges we accept before forcefully
+ // throttling the incoming threads
+ private int maxMergeCount = maxThreadCount+2;
+
+ protected Directory dir;
+
+ private volatile boolean closed;
+ protected IndexWriter writer;
+ protected int mergeThreadCount;
+
+ public ConcurrentMergeScheduler() {
+ if (allInstances != null) {
+ // Only for testing
+ addMyself();
+ }
+ }
+
+ /** Sets the max # simultaneous merge threads that should
+ * be running at once. This must be <= {@link
+ * #setMaxMergeCount}. */
+ public void setMaxThreadCount(int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException("count should be at least 1");
+ }
+ if (count > maxMergeCount) {
+ throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
+ }
+ maxThreadCount = count;
+ }
+
+ /** @see #setMaxThreadCount(int) */
+ public int getMaxThreadCount() {
+ return maxThreadCount;
+ }
+
+ /** Sets the max # simultaneous merges that are allowed.
+ * If a merge is necessary yet we already have this many
+ * threads running, the incoming thread (that is calling
+ * add/updateDocument) will block until a merge thread
+ * has completed. Note that we will only run the
+ * smallest {@link #setMaxThreadCount} merges at a time. */
+ public void setMaxMergeCount(int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException("count should be at least 1");
+ }
+ if (count < maxThreadCount) {
+ throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
+ }
+ maxMergeCount = count;
+ }
+
+ /** See {@link #setMaxMergeCount}. */
+ public int getMaxMergeCount() {
+ return maxMergeCount;
+ }
+
+ /** Return the priority that merge threads run at. By
+ * default the priority is 1 plus the priority of (ie,
+ * slightly higher priority than) the first thread that
+ * calls merge. */
+ public synchronized int getMergeThreadPriority() {
+ initMergeThreadPriority();
+ return mergeThreadPriority;
+ }
+
+ /** Set the base priority that merge threads run at.
+ * Note that CMS may increase priority of some merge
+ * threads beyond this base priority. It's best not to
+ * set this any higher than
+ * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
+ * room to set relative priority among threads. */
+ public synchronized void setMergeThreadPriority(int pri) {
+ if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
+ throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
+ mergeThreadPriority = pri;
+ updateMergeThreads();
+ }
+
+ // Larger merges come first
+ protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
+ public int compare(MergeThread t1, MergeThread t2) {
+ final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
+ final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
+
+ final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
+ final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
+
+ return c2 - c1;
+ }
+ };
+
+ /**
+ * Called whenever the running merges have changed, to pause & unpause
+ * threads. This method sorts the merge threads by their merge size in
+ * descending order and then pauses/unpauses threads from first to last --
+ * that way, smaller merges are guaranteed to run before larger ones.
+ */
+ protected synchronized void updateMergeThreads() {
+
+ // Only look at threads that are alive & not in the
+ // process of stopping (ie have an active merge):
+ final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
+
+ int threadIdx = 0;
+ while (threadIdx < mergeThreads.size()) {
+ final MergeThread mergeThread = mergeThreads.get(threadIdx);
+ if (!mergeThread.isAlive()) {
+ // Prune any dead threads
+ mergeThreads.remove(threadIdx);
+ continue;
+ }
+ if (mergeThread.getCurrentMerge() != null) {
+ activeMerges.add(mergeThread);
+ }
+ threadIdx++;
+ }
+
+ // Sort the merge threads in descending order.
+ CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
+
+ int pri = mergeThreadPriority;
+ final int activeMergeCount = activeMerges.size();
+ for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+ final MergeThread mergeThread = activeMerges.get(threadIdx);
+ final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
+ if (merge == null) {
+ continue;
+ }
+
+ // pause the thread if maxThreadCount is smaller than the number of merge threads.
+ final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
+
+ if (verbose()) {
+ if (doPause != merge.getPause()) {
+ if (doPause) {
+ message("pause thread " + mergeThread.getName());
+ } else {
+ message("unpause thread " + mergeThread.getName());
+ }
+ }
+ }
+ if (doPause != merge.getPause()) {
+ merge.setPause(doPause);
+ }
+
+ if (!doPause) {
+ if (verbose()) {
+ message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
+ }
+ mergeThread.setThreadPriority(pri);
+ pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
+ }
+ }
+ }
+
+ /**
+ * Returns true if verbosing is enabled. This method is usually used in
+ * conjunction with {@link #message(String)}, like that:
+ *
+ * <pre>
+ * if (verbose()) {
+ * message("your message");
+ * }
+ * </pre>
+ */
+ protected boolean verbose() {
+ return writer != null && writer.verbose();
+ }
+
+ /**
+ * Outputs the given message - this method assumes {@link #verbose()} was
+ * called and returned true.
+ */
+ protected void message(String message) {
+ writer.message("CMS: " + message);
+ }
+
+ private synchronized void initMergeThreadPriority() {
+ if (mergeThreadPriority == -1) {
+ // Default to slightly higher priority than our
+ // calling thread
+ mergeThreadPriority = 1+Thread.currentThread().getPriority();
+ if (mergeThreadPriority > Thread.MAX_PRIORITY)
+ mergeThreadPriority = Thread.MAX_PRIORITY;
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ sync();
+ }
+
+ /** Wait for any running merge threads to finish */
+ public void sync() {
+ while (true) {
+ MergeThread toSync = null;
+ synchronized (this) {
+ for (MergeThread t : mergeThreads) {
+ if (t.isAlive()) {
+ toSync = t;
+ break;
+ }
+ }
+ }
+ if (toSync != null) {
+ try {
+ toSync.join();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Returns the number of merge threads that are alive. Note that this number
+ * is ≤ {@link #mergeThreads} size.
+ */
+ protected synchronized int mergeThreadCount() {
+ int count = 0;
+ for (MergeThread mt : mergeThreads) {
+ if (mt.isAlive() && mt.getCurrentMerge() != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void merge(IndexWriter writer) throws IOException {
+
+ assert !Thread.holdsLock(writer);
+
+ this.writer = writer;
+
+ initMergeThreadPriority();
+
+ dir = writer.getDirectory();
+
+ // First, quickly run through the newly proposed merges
+ // and add any orthogonal merges (ie a merge not
+ // involving segments already pending to be merged) to
+ // the queue. If we are way behind on merging, many of
+ // these newly proposed merges will likely already be
+ // registered.
+
+ if (verbose()) {
+ message("now merge");
+ message(" index: " + writer.segString());
+ }
+
+ // Iterate, pulling from the IndexWriter's queue of
+ // pending merges, until it's empty:
+ while (true) {
+
+ synchronized(this) {
+ long startStallTime = 0;
+ while (mergeThreadCount() >= 1+maxMergeCount) {
+ startStallTime = System.currentTimeMillis();
+ if (verbose()) {
+ message(" too many merges; stalling...");
+ }
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+
+ if (verbose()) {
+ if (startStallTime != 0) {
+ message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+ }
+ }
+ }
+
+
+ // TODO: we could be careful about which merges to do in
+ // the BG (eg maybe the "biggest" ones) vs FG, which
+ // merges to do first (the easiest ones?), etc.
+ MergePolicy.OneMerge merge = writer.getNextMerge();
+ if (merge == null) {
+ if (verbose())
+ message(" no more merges pending; now return");
+ return;
+ }
+
+ // We do this w/ the primary thread to keep
+ // deterministic assignment of segment names
+ writer.mergeInit(merge);
+
+ boolean success = false;
+ try {
+ synchronized(this) {
+ message(" consider merge " + merge.segString(dir));
+
+ // OK to spawn a new merge thread to handle this
+ // merge:
+ final MergeThread merger = getMergeThread(writer, merge);
+ mergeThreads.add(merger);
+ if (verbose()) {
+ message(" launch new thread [" + merger.getName() + "]");
+ }
+
+ merger.start();
+
+ // Must call this after starting the thread else
+ // the new thread is removed from mergeThreads
+ // (since it's not alive yet):
+ updateMergeThreads();
+
+ success = true;
+ }
+ } finally {
+ if (!success) {
+ writer.mergeFinish(merge);
+ }
+ }
+ }
+ }
+
+ /** Does the actual merge, by calling {@link IndexWriter#merge} */
+ protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ writer.merge(merge);
+ }
+
+ /** Create and return a new MergeThread */
+ protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
+ final MergeThread thread = new MergeThread(writer, merge);
+ thread.setThreadPriority(mergeThreadPriority);
+ thread.setDaemon(true);
+ thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
+ return thread;
+ }
+
+ protected class MergeThread extends Thread {
+
+ IndexWriter tWriter;
+ MergePolicy.OneMerge startMerge;
+ MergePolicy.OneMerge runningMerge;
+ private volatile boolean done;
+
+ public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
+ this.tWriter = writer;
+ this.startMerge = startMerge;
+ }
+
+ public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
+ runningMerge = merge;
+ }
+
+ public synchronized MergePolicy.OneMerge getRunningMerge() {
+ return runningMerge;
+ }
+
+ public synchronized MergePolicy.OneMerge getCurrentMerge() {
+ if (done) {
+ return null;
+ } else if (runningMerge != null) {
+ return runningMerge;
+ } else {
+ return startMerge;
+ }
+ }
+
+ public void setThreadPriority(int pri) {
+ try {
+ setPriority(pri);
+ } catch (NullPointerException npe) {
+ // Strangely, Sun's JDK 1.5 on Linux sometimes
+ // throws NPE out of here...
+ } catch (SecurityException se) {
+ // Ignore this because we will still run fine with
+ // normal thread priority
+ }
+ }
+
+ @Override
+ public void run() {
+
+ // First time through the while loop we do the merge
+ // that we were started with:
+ MergePolicy.OneMerge merge = this.startMerge;
+
+ try {
+
+ if (verbose())
+ message(" merge thread: start");
+
+ while(true) {
+ setRunningMerge(merge);
+ doMerge(merge);
+
+ // Subsequent times through the loop we do any new
+ // merge that writer says is necessary:
+ merge = tWriter.getNextMerge();
+ if (merge != null) {
+ tWriter.mergeInit(merge);
+ updateMergeThreads();
+ if (verbose())
+ message(" merge thread: do another merge " + merge.segString(dir));
+ } else {
+ break;
+ }
+ }
+
+ if (verbose())
+ message(" merge thread: done");
+
+ } catch (Throwable exc) {
+
+ // Ignore the exception if it was due to abort:
+ if (!(exc instanceof MergePolicy.MergeAbortedException)) {
+ if (!suppressExceptions) {
+ // suppressExceptions is normally only set during
+ // testing.
+ anyExceptions = true;
+ handleMergeException(exc);
+ }
+ }
+ } finally {
+ done = true;
+ synchronized(ConcurrentMergeScheduler.this) {
+ updateMergeThreads();
+ ConcurrentMergeScheduler.this.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ MergePolicy.OneMerge merge = getRunningMerge();
+ if (merge == null)
+ merge = startMerge;
+ return "merge thread: " + merge.segString(dir);
+ }
+ }
+
+ /** Called when an exception is hit in a background merge
+ * thread */
+ protected void handleMergeException(Throwable exc) {
+ try {
+ // When an exception is hit during merge, IndexWriter
+ // removes any partial files and then allows another
+ // merge to run. If whatever caused the error is not
+ // transient then the exception will keep happening,
+ // so, we sleep here to avoid saturating CPU in such
+ // cases:
+ Thread.sleep(250);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ throw new MergePolicy.MergeException(exc, dir);
+ }
+
+ static boolean anyExceptions = false;
+
+ /** Used for testing */
+ public static boolean anyUnhandledExceptions() {
+ if (allInstances == null) {
+ throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
+ }
+ synchronized(allInstances) {
+ final int count = allInstances.size();
+ // Make sure all outstanding threads are done so we see
+ // any exceptions they may produce:
+ for(int i=0;i<count;i++)
+ allInstances.get(i).sync();
+ boolean v = anyExceptions;
+ anyExceptions = false;
+ return v;
+ }
+ }
+
+ public static void clearUnhandledExceptions() {
+ synchronized(allInstances) {
+ anyExceptions = false;
+ }
+ }
+
+ /** Used for testing */
+ private void addMyself() {
+ synchronized(allInstances) {
+ final int size = allInstances.size();
+ int upto = 0;
+ for(int i=0;i<size;i++) {
+ final ConcurrentMergeScheduler other = allInstances.get(i);
+ if (!(other.closed && 0 == other.mergeThreadCount()))
+ // Keep this one for now: it still has threads or
+ // may spawn new threads
+ allInstances.set(upto++, other);
+ }
+ allInstances.subList(upto, allInstances.size()).clear();
+ allInstances.add(this);
+ }
+ }
+
+ private boolean suppressExceptions;
+
+ /** Used for testing */
+ void setSuppressExceptions() {
+ suppressExceptions = true;
+ }
+
+ /** Used for testing */
+ void clearSuppressExceptions() {
+ suppressExceptions = false;
+ }
+
+ /** Used for testing */
+ private static List<ConcurrentMergeScheduler> allInstances;
+
+ /** @deprecated this test mode code will be removed in a future release */
+ @Deprecated
+ public static void setTestMode() {
+ allInstances = new ArrayList<ConcurrentMergeScheduler>();
+ }
+}