1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import org.apache.lucene.store.Directory;
21 import org.apache.lucene.util.ThreadInterruptedException;
22 import org.apache.lucene.util.CollectionUtil;
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.ArrayList;
27 import java.util.Comparator;
29 /** A {@link MergeScheduler} that runs each merge using a
32 * <p>Specify the max number of threads that may run at
33 * once with {@link #setMaxThreadCount}.</p>
35 * <p>Separately specify the maximum number of simultaneous
36 * merges with {@link #setMaxMergeCount}. If the number of
37 * merges exceeds the max number of threads then the
38 * largest merges are paused until one of the smaller
39 * merges completes.</p>
41 * <p>If more than {@link #getMaxMergeCount} merges are
42 * requested then this class will forcefully throttle the
43 * incoming threads by pausing until one more more merges
46 public class ConcurrentMergeScheduler extends MergeScheduler {
48 private int mergeThreadPriority = -1;
50 protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
52 // Max number of merge threads allowed to be running at
53 // once. When there are more merges then this, we
54 // forcefully pause the larger ones, letting the smaller
55 // ones run, up until maxMergeCount merges at which point
56 // we forcefully pause incoming threads (that presumably
57 // are the ones causing so much merging). We dynamically
58 // default this from 1 to 3, depending on how many cores
60 private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
62 // Max number of merges we accept before forcefully
63 // throttling the incoming threads
64 private int maxMergeCount = maxThreadCount+2;
66 protected Directory dir;
68 private volatile boolean closed;
69 protected IndexWriter writer;
70 protected int mergeThreadCount;
72 public ConcurrentMergeScheduler() {
73 if (allInstances != null) {
79 /** Sets the max # simultaneous merge threads that should
80 * be running at once. This must be <= {@link
81 * #setMaxMergeCount}. */
82 public void setMaxThreadCount(int count) {
84 throw new IllegalArgumentException("count should be at least 1");
86 if (count > maxMergeCount) {
87 throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
89 maxThreadCount = count;
92 /** @see #setMaxThreadCount(int) */
93 public int getMaxThreadCount() {
94 return maxThreadCount;
97 /** Sets the max # simultaneous merges that are allowed.
98 * If a merge is necessary yet we already have this many
99 * threads running, the incoming thread (that is calling
100 * add/updateDocument) will block until a merge thread
101 * has completed. Note that we will only run the
102 * smallest {@link #setMaxThreadCount} merges at a time. */
103 public void setMaxMergeCount(int count) {
105 throw new IllegalArgumentException("count should be at least 1");
107 if (count < maxThreadCount) {
108 throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
110 maxMergeCount = count;
113 /** See {@link #setMaxMergeCount}. */
114 public int getMaxMergeCount() {
115 return maxMergeCount;
118 /** Return the priority that merge threads run at. By
119 * default the priority is 1 plus the priority of (ie,
120 * slightly higher priority than) the first thread that
122 public synchronized int getMergeThreadPriority() {
123 initMergeThreadPriority();
124 return mergeThreadPriority;
127 /** Set the base priority that merge threads run at.
128 * Note that CMS may increase priority of some merge
129 * threads beyond this base priority. It's best not to
130 * set this any higher than
131 * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
132 * room to set relative priority among threads. */
133 public synchronized void setMergeThreadPriority(int pri) {
134 if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
135 throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
136 mergeThreadPriority = pri;
137 updateMergeThreads();
140 // Larger merges come first
141 protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
142 public int compare(MergeThread t1, MergeThread t2) {
143 final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
144 final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
146 final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
147 final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
154 * Called whenever the running merges have changed, to pause & unpause
155 * threads. This method sorts the merge threads by their merge size in
156 * descending order and then pauses/unpauses threads from first to last --
157 * that way, smaller merges are guaranteed to run before larger ones.
159 protected synchronized void updateMergeThreads() {
161 // Only look at threads that are alive & not in the
162 // process of stopping (ie have an active merge):
163 final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
166 while (threadIdx < mergeThreads.size()) {
167 final MergeThread mergeThread = mergeThreads.get(threadIdx);
168 if (!mergeThread.isAlive()) {
169 // Prune any dead threads
170 mergeThreads.remove(threadIdx);
173 if (mergeThread.getCurrentMerge() != null) {
174 activeMerges.add(mergeThread);
179 // Sort the merge threads in descending order.
180 CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
182 int pri = mergeThreadPriority;
183 final int activeMergeCount = activeMerges.size();
184 for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
185 final MergeThread mergeThread = activeMerges.get(threadIdx);
186 final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
191 // pause the thread if maxThreadCount is smaller than the number of merge threads.
192 final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
195 if (doPause != merge.getPause()) {
197 message("pause thread " + mergeThread.getName());
199 message("unpause thread " + mergeThread.getName());
203 if (doPause != merge.getPause()) {
204 merge.setPause(doPause);
209 message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
211 mergeThread.setThreadPriority(pri);
212 pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
218 * Returns true if verbosing is enabled. This method is usually used in
219 * conjunction with {@link #message(String)}, like that:
223 * message("your message");
227 protected boolean verbose() {
228 return writer != null && writer.verbose();
232 * Outputs the given message - this method assumes {@link #verbose()} was
233 * called and returned true.
235 protected void message(String message) {
236 writer.message("CMS: " + message);
239 private synchronized void initMergeThreadPriority() {
240 if (mergeThreadPriority == -1) {
241 // Default to slightly higher priority than our
243 mergeThreadPriority = 1+Thread.currentThread().getPriority();
244 if (mergeThreadPriority > Thread.MAX_PRIORITY)
245 mergeThreadPriority = Thread.MAX_PRIORITY;
250 public void close() {
255 /** Wait for any running merge threads to finish */
258 MergeThread toSync = null;
259 synchronized (this) {
260 for (MergeThread t : mergeThreads) {
267 if (toSync != null) {
270 } catch (InterruptedException ie) {
271 throw new ThreadInterruptedException(ie);
280 * Returns the number of merge threads that are alive. Note that this number
281 * is ≤ {@link #mergeThreads} size.
283 protected synchronized int mergeThreadCount() {
285 for (MergeThread mt : mergeThreads) {
286 if (mt.isAlive() && mt.getCurrentMerge() != null) {
294 public void merge(IndexWriter writer) throws IOException {
296 assert !Thread.holdsLock(writer);
298 this.writer = writer;
300 initMergeThreadPriority();
302 dir = writer.getDirectory();
304 // First, quickly run through the newly proposed merges
305 // and add any orthogonal merges (ie a merge not
306 // involving segments already pending to be merged) to
307 // the queue. If we are way behind on merging, many of
308 // these newly proposed merges will likely already be
312 message("now merge");
313 message(" index: " + writer.segString());
316 // Iterate, pulling from the IndexWriter's queue of
317 // pending merges, until it's empty:
321 long startStallTime = 0;
322 while (mergeThreadCount() >= 1+maxMergeCount) {
323 startStallTime = System.currentTimeMillis();
325 message(" too many merges; stalling...");
329 } catch (InterruptedException ie) {
330 throw new ThreadInterruptedException(ie);
335 if (startStallTime != 0) {
336 message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
342 // TODO: we could be careful about which merges to do in
343 // the BG (eg maybe the "biggest" ones) vs FG, which
344 // merges to do first (the easiest ones?), etc.
345 MergePolicy.OneMerge merge = writer.getNextMerge();
348 message(" no more merges pending; now return");
352 // We do this w/ the primary thread to keep
353 // deterministic assignment of segment names
354 writer.mergeInit(merge);
356 boolean success = false;
359 message(" consider merge " + merge.segString(dir));
361 // OK to spawn a new merge thread to handle this
363 final MergeThread merger = getMergeThread(writer, merge);
364 mergeThreads.add(merger);
366 message(" launch new thread [" + merger.getName() + "]");
371 // Must call this after starting the thread else
372 // the new thread is removed from mergeThreads
373 // (since it's not alive yet):
374 updateMergeThreads();
380 writer.mergeFinish(merge);
386 /** Does the actual merge, by calling {@link IndexWriter#merge} */
387 protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
391 /** Create and return a new MergeThread */
392 protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
393 final MergeThread thread = new MergeThread(writer, merge);
394 thread.setThreadPriority(mergeThreadPriority);
395 thread.setDaemon(true);
396 thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
400 protected class MergeThread extends Thread {
403 MergePolicy.OneMerge startMerge;
404 MergePolicy.OneMerge runningMerge;
405 private volatile boolean done;
407 public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
408 this.tWriter = writer;
409 this.startMerge = startMerge;
412 public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
413 runningMerge = merge;
416 public synchronized MergePolicy.OneMerge getRunningMerge() {
420 public synchronized MergePolicy.OneMerge getCurrentMerge() {
423 } else if (runningMerge != null) {
430 public void setThreadPriority(int pri) {
433 } catch (NullPointerException npe) {
434 // Strangely, Sun's JDK 1.5 on Linux sometimes
435 // throws NPE out of here...
436 } catch (SecurityException se) {
437 // Ignore this because we will still run fine with
438 // normal thread priority
445 // First time through the while loop we do the merge
446 // that we were started with:
447 MergePolicy.OneMerge merge = this.startMerge;
452 message(" merge thread: start");
455 setRunningMerge(merge);
458 // Subsequent times through the loop we do any new
459 // merge that writer says is necessary:
460 merge = tWriter.getNextMerge();
462 tWriter.mergeInit(merge);
463 updateMergeThreads();
465 message(" merge thread: do another merge " + merge.segString(dir));
472 message(" merge thread: done");
474 } catch (Throwable exc) {
476 // Ignore the exception if it was due to abort:
477 if (!(exc instanceof MergePolicy.MergeAbortedException)) {
478 if (!suppressExceptions) {
479 // suppressExceptions is normally only set during
481 anyExceptions = true;
482 handleMergeException(exc);
487 synchronized(ConcurrentMergeScheduler.this) {
488 updateMergeThreads();
489 ConcurrentMergeScheduler.this.notifyAll();
495 public String toString() {
496 MergePolicy.OneMerge merge = getRunningMerge();
499 return "merge thread: " + merge.segString(dir);
503 /** Called when an exception is hit in a background merge
505 protected void handleMergeException(Throwable exc) {
507 // When an exception is hit during merge, IndexWriter
508 // removes any partial files and then allows another
509 // merge to run. If whatever caused the error is not
510 // transient then the exception will keep happening,
511 // so, we sleep here to avoid saturating CPU in such
514 } catch (InterruptedException ie) {
515 throw new ThreadInterruptedException(ie);
517 throw new MergePolicy.MergeException(exc, dir);
520 static boolean anyExceptions = false;
522 /** Used for testing */
523 public static boolean anyUnhandledExceptions() {
524 if (allInstances == null) {
525 throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
527 synchronized(allInstances) {
528 final int count = allInstances.size();
529 // Make sure all outstanding threads are done so we see
530 // any exceptions they may produce:
531 for(int i=0;i<count;i++)
532 allInstances.get(i).sync();
533 boolean v = anyExceptions;
534 anyExceptions = false;
539 public static void clearUnhandledExceptions() {
540 synchronized(allInstances) {
541 anyExceptions = false;
545 /** Used for testing */
546 private void addMyself() {
547 synchronized(allInstances) {
548 final int size = allInstances.size();
550 for(int i=0;i<size;i++) {
551 final ConcurrentMergeScheduler other = allInstances.get(i);
552 if (!(other.closed && 0 == other.mergeThreadCount()))
553 // Keep this one for now: it still has threads or
554 // may spawn new threads
555 allInstances.set(upto++, other);
557 allInstances.subList(upto, allInstances.size()).clear();
558 allInstances.add(this);
562 private boolean suppressExceptions;
564 /** Used for testing */
565 void setSuppressExceptions() {
566 suppressExceptions = true;
569 /** Used for testing */
570 void clearSuppressExceptions() {
571 suppressExceptions = false;
574 /** Used for testing */
575 private static List<ConcurrentMergeScheduler> allInstances;
577 /** @deprecated this test mode code will be removed in a future release */
579 public static void setTestMode() {
580 allInstances = new ArrayList<ConcurrentMergeScheduler>();