pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / ConcurrentMergeScheduler.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import org.apache.lucene.store.Directory;
21 import org.apache.lucene.util.ThreadInterruptedException;
22 import org.apache.lucene.util.CollectionUtil;
23
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.ArrayList;
27 import java.util.Comparator;
28
29 /** A {@link MergeScheduler} that runs each merge using a
30  *  separate thread.
31  *
32  *  <p>Specify the max number of threads that may run at
33  *  once with {@link #setMaxThreadCount}.</p>
34  *
35  *  <p>Separately specify the maximum number of simultaneous
36  *  merges with {@link #setMaxMergeCount}.  If the number of
37  *  merges exceeds the max number of threads then the
38  *  largest merges are paused until one of the smaller
39  *  merges completes.</p>
40  *
41  *  <p>If more than {@link #getMaxMergeCount} merges are
42  *  requested then this class will forcefully throttle the
43  *  incoming threads by pausing until one more more merges
44  *  complete.</p>
45  */ 
46 public class ConcurrentMergeScheduler extends MergeScheduler {
47
48   private int mergeThreadPriority = -1;
49
50   protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
51
52   // Max number of merge threads allowed to be running at
53   // once.  When there are more merges then this, we
54   // forcefully pause the larger ones, letting the smaller
55   // ones run, up until maxMergeCount merges at which point
56   // we forcefully pause incoming threads (that presumably
57   // are the ones causing so much merging).  We dynamically
58   // default this from 1 to 3, depending on how many cores
59   // you have:
60   private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
61
62   // Max number of merges we accept before forcefully
63   // throttling the incoming threads
64   private int maxMergeCount = maxThreadCount+2;
65
66   protected Directory dir;
67
68   private volatile boolean closed;
69   protected IndexWriter writer;
70   protected int mergeThreadCount;
71
72   public ConcurrentMergeScheduler() {
73     if (allInstances != null) {
74       // Only for testing
75       addMyself();
76     }
77   }
78
79   /** Sets the max # simultaneous merge threads that should
80    *  be running at once.  This must be <= {@link
81    *  #setMaxMergeCount}. */
82   public void setMaxThreadCount(int count) {
83     if (count < 1) {
84       throw new IllegalArgumentException("count should be at least 1");
85     }
86     if (count > maxMergeCount) {
87       throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
88     }
89     maxThreadCount = count;
90   }
91
92   /** @see #setMaxThreadCount(int) */
93   public int getMaxThreadCount() {
94     return maxThreadCount;
95   }
96
97   /** Sets the max # simultaneous merges that are allowed.
98    *  If a merge is necessary yet we already have this many
99    *  threads running, the incoming thread (that is calling
100    *  add/updateDocument) will block until a merge thread
101    *  has completed.  Note that we will only run the
102    *  smallest {@link #setMaxThreadCount} merges at a time. */
103   public void setMaxMergeCount(int count) {
104     if (count < 1) {
105       throw new IllegalArgumentException("count should be at least 1");
106     }
107     if (count < maxThreadCount) {
108       throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
109     }
110     maxMergeCount = count;
111   }
112
113   /** See {@link #setMaxMergeCount}. */
114   public int getMaxMergeCount() {
115     return maxMergeCount;
116   }
117
118   /** Return the priority that merge threads run at.  By
119    *  default the priority is 1 plus the priority of (ie,
120    *  slightly higher priority than) the first thread that
121    *  calls merge. */
122   public synchronized int getMergeThreadPriority() {
123     initMergeThreadPriority();
124     return mergeThreadPriority;
125   }
126
127   /** Set the base priority that merge threads run at.
128    *  Note that CMS may increase priority of some merge
129    *  threads beyond this base priority.  It's best not to
130    *  set this any higher than
131    *  Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
132    *  room to set relative priority among threads.  */
133   public synchronized void setMergeThreadPriority(int pri) {
134     if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
135       throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
136     mergeThreadPriority = pri;
137     updateMergeThreads();
138   }
139
140   // Larger merges come first
141   protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
142     public int compare(MergeThread t1, MergeThread t2) {
143       final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
144       final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
145       
146       final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
147       final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
148
149       return c2 - c1;
150     }
151   };
152
153   /**
154    * Called whenever the running merges have changed, to pause & unpause
155    * threads. This method sorts the merge threads by their merge size in
156    * descending order and then pauses/unpauses threads from first to last --
157    * that way, smaller merges are guaranteed to run before larger ones.
158    */
159   protected synchronized void updateMergeThreads() {
160
161     // Only look at threads that are alive & not in the
162     // process of stopping (ie have an active merge):
163     final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
164
165     int threadIdx = 0;
166     while (threadIdx < mergeThreads.size()) {
167       final MergeThread mergeThread = mergeThreads.get(threadIdx);
168       if (!mergeThread.isAlive()) {
169         // Prune any dead threads
170         mergeThreads.remove(threadIdx);
171         continue;
172       }
173       if (mergeThread.getCurrentMerge() != null) {
174         activeMerges.add(mergeThread);
175       }
176       threadIdx++;
177     }
178
179     // Sort the merge threads in descending order.
180     CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
181     
182     int pri = mergeThreadPriority;
183     final int activeMergeCount = activeMerges.size();
184     for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
185       final MergeThread mergeThread = activeMerges.get(threadIdx);
186       final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
187       if (merge == null) { 
188         continue;
189       }
190
191       // pause the thread if maxThreadCount is smaller than the number of merge threads.
192       final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
193
194       if (verbose()) {
195         if (doPause != merge.getPause()) {
196           if (doPause) {
197             message("pause thread " + mergeThread.getName());
198           } else {
199             message("unpause thread " + mergeThread.getName());
200           }
201         }
202       }
203       if (doPause != merge.getPause()) {
204         merge.setPause(doPause);
205       }
206
207       if (!doPause) {
208         if (verbose()) {
209           message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
210         }
211         mergeThread.setThreadPriority(pri);
212         pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
213       }
214     }
215   }
216
217   /**
218    * Returns true if verbosing is enabled. This method is usually used in
219    * conjunction with {@link #message(String)}, like that:
220    * 
221    * <pre>
222    * if (verbose()) {
223    *   message(&quot;your message&quot;);
224    * }
225    * </pre>
226    */
227   protected boolean verbose() {
228     return writer != null && writer.verbose();
229   }
230   
231   /**
232    * Outputs the given message - this method assumes {@link #verbose()} was
233    * called and returned true.
234    */
235   protected void message(String message) {
236     writer.message("CMS: " + message);
237   }
238
239   private synchronized void initMergeThreadPriority() {
240     if (mergeThreadPriority == -1) {
241       // Default to slightly higher priority than our
242       // calling thread
243       mergeThreadPriority = 1+Thread.currentThread().getPriority();
244       if (mergeThreadPriority > Thread.MAX_PRIORITY)
245         mergeThreadPriority = Thread.MAX_PRIORITY;
246     }
247   }
248
249   @Override
250   public void close() {
251     closed = true;
252     sync();
253   }
254
255   /** Wait for any running merge threads to finish */
256   public void sync() {
257     while (true) {
258       MergeThread toSync = null;
259       synchronized (this) {
260         for (MergeThread t : mergeThreads) {
261           if (t.isAlive()) {
262             toSync = t;
263             break;
264           }
265         }
266       }
267       if (toSync != null) {
268         try {
269           toSync.join();
270         } catch (InterruptedException ie) {
271           throw new ThreadInterruptedException(ie);
272         }
273       } else {
274         break;
275       }
276     }
277   }
278
279   /**
280    * Returns the number of merge threads that are alive. Note that this number
281    * is &le; {@link #mergeThreads} size.
282    */
283   protected synchronized int mergeThreadCount() {
284     int count = 0;
285     for (MergeThread mt : mergeThreads) {
286       if (mt.isAlive() && mt.getCurrentMerge() != null) {
287         count++;
288       }
289     }
290     return count;
291   }
292
293   @Override
294   public void merge(IndexWriter writer) throws IOException {
295
296     assert !Thread.holdsLock(writer);
297
298     this.writer = writer;
299
300     initMergeThreadPriority();
301
302     dir = writer.getDirectory();
303
304     // First, quickly run through the newly proposed merges
305     // and add any orthogonal merges (ie a merge not
306     // involving segments already pending to be merged) to
307     // the queue.  If we are way behind on merging, many of
308     // these newly proposed merges will likely already be
309     // registered.
310
311     if (verbose()) {
312       message("now merge");
313       message("  index: " + writer.segString());
314     }
315     
316     // Iterate, pulling from the IndexWriter's queue of
317     // pending merges, until it's empty:
318     while (true) {
319
320       synchronized(this) {
321         long startStallTime = 0;
322         while (mergeThreadCount() >= 1+maxMergeCount) {
323           startStallTime = System.currentTimeMillis();
324           if (verbose()) {
325             message("    too many merges; stalling...");
326           }
327           try {
328             wait();
329           } catch (InterruptedException ie) {
330             throw new ThreadInterruptedException(ie);
331           }
332         }
333
334         if (verbose()) {
335           if (startStallTime != 0) {
336             message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
337           }
338         }
339       }
340
341
342       // TODO: we could be careful about which merges to do in
343       // the BG (eg maybe the "biggest" ones) vs FG, which
344       // merges to do first (the easiest ones?), etc.
345       MergePolicy.OneMerge merge = writer.getNextMerge();
346       if (merge == null) {
347         if (verbose())
348           message("  no more merges pending; now return");
349         return;
350       }
351
352       // We do this w/ the primary thread to keep
353       // deterministic assignment of segment names
354       writer.mergeInit(merge);
355
356       boolean success = false;
357       try {
358         synchronized(this) {
359           message("  consider merge " + merge.segString(dir));
360
361           // OK to spawn a new merge thread to handle this
362           // merge:
363           final MergeThread merger = getMergeThread(writer, merge);
364           mergeThreads.add(merger);
365           if (verbose()) {
366             message("    launch new thread [" + merger.getName() + "]");
367           }
368
369           merger.start();
370
371           // Must call this after starting the thread else
372           // the new thread is removed from mergeThreads
373           // (since it's not alive yet):
374           updateMergeThreads();
375
376           success = true;
377         }
378       } finally {
379         if (!success) {
380           writer.mergeFinish(merge);
381         }
382       }
383     }
384   }
385
386   /** Does the actual merge, by calling {@link IndexWriter#merge} */
387   protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
388     writer.merge(merge);
389   }
390
391   /** Create and return a new MergeThread */
392   protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
393     final MergeThread thread = new MergeThread(writer, merge);
394     thread.setThreadPriority(mergeThreadPriority);
395     thread.setDaemon(true);
396     thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
397     return thread;
398   }
399
400   protected class MergeThread extends Thread {
401
402     IndexWriter tWriter;
403     MergePolicy.OneMerge startMerge;
404     MergePolicy.OneMerge runningMerge;
405     private volatile boolean done;
406
407     public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
408       this.tWriter = writer;
409       this.startMerge = startMerge;
410     }
411
412     public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
413       runningMerge = merge;
414     }
415
416     public synchronized MergePolicy.OneMerge getRunningMerge() {
417       return runningMerge;
418     }
419
420     public synchronized MergePolicy.OneMerge getCurrentMerge() {
421       if (done) {
422         return null;
423       } else if (runningMerge != null) {
424         return runningMerge;
425       } else {
426         return startMerge;
427       }
428     }
429
430     public void setThreadPriority(int pri) {
431       try {
432         setPriority(pri);
433       } catch (NullPointerException npe) {
434         // Strangely, Sun's JDK 1.5 on Linux sometimes
435         // throws NPE out of here...
436       } catch (SecurityException se) {
437         // Ignore this because we will still run fine with
438         // normal thread priority
439       }
440     }
441
442     @Override
443     public void run() {
444       
445       // First time through the while loop we do the merge
446       // that we were started with:
447       MergePolicy.OneMerge merge = this.startMerge;
448       
449       try {
450
451         if (verbose())
452           message("  merge thread: start");
453
454         while(true) {
455           setRunningMerge(merge);
456           doMerge(merge);
457
458           // Subsequent times through the loop we do any new
459           // merge that writer says is necessary:
460           merge = tWriter.getNextMerge();
461           if (merge != null) {
462             tWriter.mergeInit(merge);
463             updateMergeThreads();
464             if (verbose())
465               message("  merge thread: do another merge " + merge.segString(dir));
466           } else {
467             break;
468           }
469         }
470
471         if (verbose())
472           message("  merge thread: done");
473
474       } catch (Throwable exc) {
475
476         // Ignore the exception if it was due to abort:
477         if (!(exc instanceof MergePolicy.MergeAbortedException)) {
478           if (!suppressExceptions) {
479             // suppressExceptions is normally only set during
480             // testing.
481             anyExceptions = true;
482             handleMergeException(exc);
483           }
484         }
485       } finally {
486         done = true;
487         synchronized(ConcurrentMergeScheduler.this) {
488           updateMergeThreads();
489           ConcurrentMergeScheduler.this.notifyAll();
490         }
491       }
492     }
493
494     @Override
495     public String toString() {
496       MergePolicy.OneMerge merge = getRunningMerge();
497       if (merge == null)
498         merge = startMerge;
499       return "merge thread: " + merge.segString(dir);
500     }
501   }
502
503   /** Called when an exception is hit in a background merge
504    *  thread */
505   protected void handleMergeException(Throwable exc) {
506     try {
507       // When an exception is hit during merge, IndexWriter
508       // removes any partial files and then allows another
509       // merge to run.  If whatever caused the error is not
510       // transient then the exception will keep happening,
511       // so, we sleep here to avoid saturating CPU in such
512       // cases:
513       Thread.sleep(250);
514     } catch (InterruptedException ie) {
515       throw new ThreadInterruptedException(ie);
516     }
517     throw new MergePolicy.MergeException(exc, dir);
518   }
519
520   static boolean anyExceptions = false;
521
522   /** Used for testing */
523   public static boolean anyUnhandledExceptions() {
524     if (allInstances == null) {
525       throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
526     }
527     synchronized(allInstances) {
528       final int count = allInstances.size();
529       // Make sure all outstanding threads are done so we see
530       // any exceptions they may produce:
531       for(int i=0;i<count;i++)
532         allInstances.get(i).sync();
533       boolean v = anyExceptions;
534       anyExceptions = false;
535       return v;
536     }
537   }
538
539   public static void clearUnhandledExceptions() {
540     synchronized(allInstances) {
541       anyExceptions = false;
542     }
543   }
544
545   /** Used for testing */
546   private void addMyself() {
547     synchronized(allInstances) {
548       final int size = allInstances.size();
549       int upto = 0;
550       for(int i=0;i<size;i++) {
551         final ConcurrentMergeScheduler other = allInstances.get(i);
552         if (!(other.closed && 0 == other.mergeThreadCount()))
553           // Keep this one for now: it still has threads or
554           // may spawn new threads
555           allInstances.set(upto++, other);
556       }
557       allInstances.subList(upto, allInstances.size()).clear();
558       allInstances.add(this);
559     }
560   }
561
562   private boolean suppressExceptions;
563
564   /** Used for testing */
565   void setSuppressExceptions() {
566     suppressExceptions = true;
567   }
568
569   /** Used for testing */
570   void clearSuppressExceptions() {
571     suppressExceptions = false;
572   }
573
574   /** Used for testing */
575   private static List<ConcurrentMergeScheduler> allInstances;
576   
577   /** @deprecated this test mode code will be removed in a future release */
578   @Deprecated
579   public static void setTestMode() {
580     allInstances = new ArrayList<ConcurrentMergeScheduler>();
581   }
582 }