+++ /dev/null
-package org.apache.lucene.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.ThreadInterruptedException;
-import org.apache.lucene.util.CollectionUtil;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Comparator;
-
-/** A {@link MergeScheduler} that runs each merge using a
- * separate thread.
- *
- * <p>Specify the max number of threads that may run at
- * once with {@link #setMaxThreadCount}.</p>
- *
- * <p>Separately specify the maximum number of simultaneous
- * merges with {@link #setMaxMergeCount}. If the number of
- * merges exceeds the max number of threads then the
- * largest merges are paused until one of the smaller
- * merges completes.</p>
- *
- * <p>If more than {@link #getMaxMergeCount} merges are
- * requested then this class will forcefully throttle the
- * incoming threads by pausing until one more more merges
- * complete.</p>
- */
-public class ConcurrentMergeScheduler extends MergeScheduler {
-
- private int mergeThreadPriority = -1;
-
- protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
-
- // Max number of merge threads allowed to be running at
- // once. When there are more merges then this, we
- // forcefully pause the larger ones, letting the smaller
- // ones run, up until maxMergeCount merges at which point
- // we forcefully pause incoming threads (that presumably
- // are the ones causing so much merging). We dynamically
- // default this from 1 to 3, depending on how many cores
- // you have:
- private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
-
- // Max number of merges we accept before forcefully
- // throttling the incoming threads
- private int maxMergeCount = maxThreadCount+2;
-
- protected Directory dir;
-
- private volatile boolean closed;
- protected IndexWriter writer;
- protected int mergeThreadCount;
-
- public ConcurrentMergeScheduler() {
- if (allInstances != null) {
- // Only for testing
- addMyself();
- }
- }
-
- /** Sets the max # simultaneous merge threads that should
- * be running at once. This must be <= {@link
- * #setMaxMergeCount}. */
- public void setMaxThreadCount(int count) {
- if (count < 1) {
- throw new IllegalArgumentException("count should be at least 1");
- }
- if (count > maxMergeCount) {
- throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
- }
- maxThreadCount = count;
- }
-
- /** @see #setMaxThreadCount(int) */
- public int getMaxThreadCount() {
- return maxThreadCount;
- }
-
- /** Sets the max # simultaneous merges that are allowed.
- * If a merge is necessary yet we already have this many
- * threads running, the incoming thread (that is calling
- * add/updateDocument) will block until a merge thread
- * has completed. Note that we will only run the
- * smallest {@link #setMaxThreadCount} merges at a time. */
- public void setMaxMergeCount(int count) {
- if (count < 1) {
- throw new IllegalArgumentException("count should be at least 1");
- }
- if (count < maxThreadCount) {
- throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
- }
- maxMergeCount = count;
- }
-
- /** See {@link #setMaxMergeCount}. */
- public int getMaxMergeCount() {
- return maxMergeCount;
- }
-
- /** Return the priority that merge threads run at. By
- * default the priority is 1 plus the priority of (ie,
- * slightly higher priority than) the first thread that
- * calls merge. */
- public synchronized int getMergeThreadPriority() {
- initMergeThreadPriority();
- return mergeThreadPriority;
- }
-
- /** Set the base priority that merge threads run at.
- * Note that CMS may increase priority of some merge
- * threads beyond this base priority. It's best not to
- * set this any higher than
- * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
- * room to set relative priority among threads. */
- public synchronized void setMergeThreadPriority(int pri) {
- if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
- throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
- mergeThreadPriority = pri;
- updateMergeThreads();
- }
-
- // Larger merges come first
- protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
- public int compare(MergeThread t1, MergeThread t2) {
- final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
- final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
-
- final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
- final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
-
- return c2 - c1;
- }
- };
-
- /**
- * Called whenever the running merges have changed, to pause & unpause
- * threads. This method sorts the merge threads by their merge size in
- * descending order and then pauses/unpauses threads from first to last --
- * that way, smaller merges are guaranteed to run before larger ones.
- */
- protected synchronized void updateMergeThreads() {
-
- // Only look at threads that are alive & not in the
- // process of stopping (ie have an active merge):
- final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
-
- int threadIdx = 0;
- while (threadIdx < mergeThreads.size()) {
- final MergeThread mergeThread = mergeThreads.get(threadIdx);
- if (!mergeThread.isAlive()) {
- // Prune any dead threads
- mergeThreads.remove(threadIdx);
- continue;
- }
- if (mergeThread.getCurrentMerge() != null) {
- activeMerges.add(mergeThread);
- }
- threadIdx++;
- }
-
- // Sort the merge threads in descending order.
- CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
-
- int pri = mergeThreadPriority;
- final int activeMergeCount = activeMerges.size();
- for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
- final MergeThread mergeThread = activeMerges.get(threadIdx);
- final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
- if (merge == null) {
- continue;
- }
-
- // pause the thread if maxThreadCount is smaller than the number of merge threads.
- final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
-
- if (verbose()) {
- if (doPause != merge.getPause()) {
- if (doPause) {
- message("pause thread " + mergeThread.getName());
- } else {
- message("unpause thread " + mergeThread.getName());
- }
- }
- }
- if (doPause != merge.getPause()) {
- merge.setPause(doPause);
- }
-
- if (!doPause) {
- if (verbose()) {
- message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
- }
- mergeThread.setThreadPriority(pri);
- pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
- }
- }
- }
-
- /**
- * Returns true if verbosing is enabled. This method is usually used in
- * conjunction with {@link #message(String)}, like that:
- *
- * <pre>
- * if (verbose()) {
- * message("your message");
- * }
- * </pre>
- */
- protected boolean verbose() {
- return writer != null && writer.verbose();
- }
-
- /**
- * Outputs the given message - this method assumes {@link #verbose()} was
- * called and returned true.
- */
- protected void message(String message) {
- writer.message("CMS: " + message);
- }
-
- private synchronized void initMergeThreadPriority() {
- if (mergeThreadPriority == -1) {
- // Default to slightly higher priority than our
- // calling thread
- mergeThreadPriority = 1+Thread.currentThread().getPriority();
- if (mergeThreadPriority > Thread.MAX_PRIORITY)
- mergeThreadPriority = Thread.MAX_PRIORITY;
- }
- }
-
- @Override
- public void close() {
- closed = true;
- sync();
- }
-
- /** Wait for any running merge threads to finish */
- public void sync() {
- while (true) {
- MergeThread toSync = null;
- synchronized (this) {
- for (MergeThread t : mergeThreads) {
- if (t.isAlive()) {
- toSync = t;
- break;
- }
- }
- }
- if (toSync != null) {
- try {
- toSync.join();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- } else {
- break;
- }
- }
- }
-
- /**
- * Returns the number of merge threads that are alive. Note that this number
- * is ≤ {@link #mergeThreads} size.
- */
- protected synchronized int mergeThreadCount() {
- int count = 0;
- for (MergeThread mt : mergeThreads) {
- if (mt.isAlive() && mt.getCurrentMerge() != null) {
- count++;
- }
- }
- return count;
- }
-
- @Override
- public void merge(IndexWriter writer) throws IOException {
-
- assert !Thread.holdsLock(writer);
-
- this.writer = writer;
-
- initMergeThreadPriority();
-
- dir = writer.getDirectory();
-
- // First, quickly run through the newly proposed merges
- // and add any orthogonal merges (ie a merge not
- // involving segments already pending to be merged) to
- // the queue. If we are way behind on merging, many of
- // these newly proposed merges will likely already be
- // registered.
-
- if (verbose()) {
- message("now merge");
- message(" index: " + writer.segString());
- }
-
- // Iterate, pulling from the IndexWriter's queue of
- // pending merges, until it's empty:
- while (true) {
-
- synchronized(this) {
- long startStallTime = 0;
- while (mergeThreadCount() >= 1+maxMergeCount) {
- startStallTime = System.currentTimeMillis();
- if (verbose()) {
- message(" too many merges; stalling...");
- }
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- if (verbose()) {
- if (startStallTime != 0) {
- message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
- }
- }
- }
-
-
- // TODO: we could be careful about which merges to do in
- // the BG (eg maybe the "biggest" ones) vs FG, which
- // merges to do first (the easiest ones?), etc.
- MergePolicy.OneMerge merge = writer.getNextMerge();
- if (merge == null) {
- if (verbose())
- message(" no more merges pending; now return");
- return;
- }
-
- // We do this w/ the primary thread to keep
- // deterministic assignment of segment names
- writer.mergeInit(merge);
-
- boolean success = false;
- try {
- synchronized(this) {
- message(" consider merge " + merge.segString(dir));
-
- // OK to spawn a new merge thread to handle this
- // merge:
- final MergeThread merger = getMergeThread(writer, merge);
- mergeThreads.add(merger);
- if (verbose()) {
- message(" launch new thread [" + merger.getName() + "]");
- }
-
- merger.start();
-
- // Must call this after starting the thread else
- // the new thread is removed from mergeThreads
- // (since it's not alive yet):
- updateMergeThreads();
-
- success = true;
- }
- } finally {
- if (!success) {
- writer.mergeFinish(merge);
- }
- }
- }
- }
-
- /** Does the actual merge, by calling {@link IndexWriter#merge} */
- protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
- writer.merge(merge);
- }
-
- /** Create and return a new MergeThread */
- protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
- final MergeThread thread = new MergeThread(writer, merge);
- thread.setThreadPriority(mergeThreadPriority);
- thread.setDaemon(true);
- thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
- return thread;
- }
-
- protected class MergeThread extends Thread {
-
- IndexWriter tWriter;
- MergePolicy.OneMerge startMerge;
- MergePolicy.OneMerge runningMerge;
- private volatile boolean done;
-
- public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
- this.tWriter = writer;
- this.startMerge = startMerge;
- }
-
- public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
- runningMerge = merge;
- }
-
- public synchronized MergePolicy.OneMerge getRunningMerge() {
- return runningMerge;
- }
-
- public synchronized MergePolicy.OneMerge getCurrentMerge() {
- if (done) {
- return null;
- } else if (runningMerge != null) {
- return runningMerge;
- } else {
- return startMerge;
- }
- }
-
- public void setThreadPriority(int pri) {
- try {
- setPriority(pri);
- } catch (NullPointerException npe) {
- // Strangely, Sun's JDK 1.5 on Linux sometimes
- // throws NPE out of here...
- } catch (SecurityException se) {
- // Ignore this because we will still run fine with
- // normal thread priority
- }
- }
-
- @Override
- public void run() {
-
- // First time through the while loop we do the merge
- // that we were started with:
- MergePolicy.OneMerge merge = this.startMerge;
-
- try {
-
- if (verbose())
- message(" merge thread: start");
-
- while(true) {
- setRunningMerge(merge);
- doMerge(merge);
-
- // Subsequent times through the loop we do any new
- // merge that writer says is necessary:
- merge = tWriter.getNextMerge();
- if (merge != null) {
- tWriter.mergeInit(merge);
- updateMergeThreads();
- if (verbose())
- message(" merge thread: do another merge " + merge.segString(dir));
- } else {
- break;
- }
- }
-
- if (verbose())
- message(" merge thread: done");
-
- } catch (Throwable exc) {
-
- // Ignore the exception if it was due to abort:
- if (!(exc instanceof MergePolicy.MergeAbortedException)) {
- if (!suppressExceptions) {
- // suppressExceptions is normally only set during
- // testing.
- anyExceptions = true;
- handleMergeException(exc);
- }
- }
- } finally {
- done = true;
- synchronized(ConcurrentMergeScheduler.this) {
- updateMergeThreads();
- ConcurrentMergeScheduler.this.notifyAll();
- }
- }
- }
-
- @Override
- public String toString() {
- MergePolicy.OneMerge merge = getRunningMerge();
- if (merge == null)
- merge = startMerge;
- return "merge thread: " + merge.segString(dir);
- }
- }
-
- /** Called when an exception is hit in a background merge
- * thread */
- protected void handleMergeException(Throwable exc) {
- try {
- // When an exception is hit during merge, IndexWriter
- // removes any partial files and then allows another
- // merge to run. If whatever caused the error is not
- // transient then the exception will keep happening,
- // so, we sleep here to avoid saturating CPU in such
- // cases:
- Thread.sleep(250);
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- throw new MergePolicy.MergeException(exc, dir);
- }
-
- static boolean anyExceptions = false;
-
- /** Used for testing */
- public static boolean anyUnhandledExceptions() {
- if (allInstances == null) {
- throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
- }
- synchronized(allInstances) {
- final int count = allInstances.size();
- // Make sure all outstanding threads are done so we see
- // any exceptions they may produce:
- for(int i=0;i<count;i++)
- allInstances.get(i).sync();
- boolean v = anyExceptions;
- anyExceptions = false;
- return v;
- }
- }
-
- public static void clearUnhandledExceptions() {
- synchronized(allInstances) {
- anyExceptions = false;
- }
- }
-
- /** Used for testing */
- private void addMyself() {
- synchronized(allInstances) {
- final int size = allInstances.size();
- int upto = 0;
- for(int i=0;i<size;i++) {
- final ConcurrentMergeScheduler other = allInstances.get(i);
- if (!(other.closed && 0 == other.mergeThreadCount()))
- // Keep this one for now: it still has threads or
- // may spawn new threads
- allInstances.set(upto++, other);
- }
- allInstances.subList(upto, allInstances.size()).clear();
- allInstances.add(this);
- }
- }
-
- private boolean suppressExceptions;
-
- /** Used for testing */
- void setSuppressExceptions() {
- suppressExceptions = true;
- }
-
- /** Used for testing */
- void clearSuppressExceptions() {
- suppressExceptions = false;
- }
-
- /** Used for testing */
- private static List<ConcurrentMergeScheduler> allInstances;
-
- /** @deprecated this test mode code will be removed in a future release */
- @Deprecated
- public static void setTestMode() {
- allInstances = new ArrayList<ConcurrentMergeScheduler>();
- }
-}