pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / search / NRTManager.java
1 package org.apache.lucene.search;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicLong;
28 import java.util.concurrent.locks.Condition;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.lucene.analysis.Analyzer;
32 import org.apache.lucene.document.Document;
33 import org.apache.lucene.index.CorruptIndexException;
34 import org.apache.lucene.index.IndexReader; // javadocs
35 import org.apache.lucene.index.IndexWriter;
36 import org.apache.lucene.index.Term;
37 import org.apache.lucene.search.IndexSearcher; // javadocs
38 import org.apache.lucene.store.Directory;
39 import org.apache.lucene.util.IOUtils;
40 import org.apache.lucene.util.ThreadInterruptedException;
41
42 /**
43  * Utility class to manage sharing near-real-time searchers
44  * across multiple searching threads.
45  *
46  * <p>NOTE: to use this class, you must call {@link #maybeReopen(boolean)}
47  * periodically.  The {@link NRTManagerReopenThread} is a
48  * simple class to do this on a periodic basis.  If you
49  * implement your own reopener, be sure to call {@link
50  * #addWaitingListener} so your reopener is notified when a
51  * caller is waiting for a specific generation searcher. </p>
52  *
53  * @lucene.experimental
54  */
55
56 public class NRTManager implements Closeable {
57   private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE;
58   private final IndexWriter writer;
59   private final SearcherManagerRef withoutDeletes;
60   private final SearcherManagerRef withDeletes;
61   private final AtomicLong indexingGen;
62   private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
63   private final ReentrantLock reopenLock = new ReentrantLock();
64   private final Condition newGeneration = reopenLock.newCondition();
65
66   /**
67    * Create new NRTManager.
68    * 
69    *  @param writer IndexWriter to open near-real-time
70    *         readers
71    *  @param warmer optional {@link SearcherWarmer}.  Pass
72    *         null if you don't require the searcher to warmed
73    *         before going live.  If this is non-null then a
74    *         merged segment warmer is installed on the
75    *         provided IndexWriter's config.
76    *
77    *  <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
78    *  not invoked for the initial searcher; you should
79    *  warm it yourself if necessary.
80    */
81   public NRTManager(IndexWriter writer, SearcherWarmer warmer) throws IOException {
82     this(writer, null, warmer, true);
83   }
84
85   /**
86    * Create new NRTManager.
87    * 
88    *  @param writer IndexWriter to open near-real-time
89    *         readers
90    *  @param es optional ExecutorService so different segments can
91    *         be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}.
92    *         Pass <code>null</code> to search segments sequentially.
93    *  @param warmer optional {@link SearcherWarmer}.  Pass
94    *         null if you don't require the searcher to warmed
95    *         before going live.  If this is non-null then a
96    *         merged segment warmer is installed on the
97    *         provided IndexWriter's config.
98    *
99    *  <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
100    *  not invoked for the initial searcher; you should
101    *  warm it yourself if necessary.
102    */
103   public NRTManager(IndexWriter writer, ExecutorService es,
104       SearcherWarmer warmer) throws IOException {
105     this(writer, es, warmer, true);
106   }
107
108   /**
109    * Expert: just like {@link
110    * #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)},
111    * but you can also specify whether every searcher must
112    * apply deletes.  This is useful for cases where certain
113    * uses can tolerate seeing some deleted docs, since
114    * reopen time is faster if deletes need not be applied. */
115   public NRTManager(IndexWriter writer, ExecutorService es,
116       SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException {
117     this.writer = writer;
118     if (alwaysApplyDeletes) {
119       withoutDeletes = withDeletes = new SearcherManagerRef(true, 0,  new SearcherManager(writer, true, warmer, es));
120     } else {
121       withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es));
122       withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, warmer, es));
123     }
124     indexingGen = new AtomicLong(1);
125   }
126   
127   /** NRTManager invokes this interface to notify it when a
128    *  caller is waiting for a specific generation searcher
129    *  to be visible. */
130   public static interface WaitingListener {
131     public void waiting(boolean requiresDeletes, long targetGen);
132   }
133
134   /** Adds a listener, to be notified when a caller is
135    *  waiting for a specific generation searcher to be
136    *  visible. */
137   public void addWaitingListener(WaitingListener l) {
138     waitingListeners.add(l);
139   }
140
141   /** Remove a listener added with {@link
142    *  #addWaitingListener}. */
143   public void removeWaitingListener(WaitingListener l) {
144     waitingListeners.remove(l);
145   }
146
147   public long updateDocument(Term t, Document d, Analyzer a) throws IOException {
148     writer.updateDocument(t, d, a);
149     // Return gen as of when indexing finished:
150     return indexingGen.get();
151   }
152
153   public long updateDocument(Term t, Document d) throws IOException {
154     writer.updateDocument(t, d);
155     // Return gen as of when indexing finished:
156     return indexingGen.get();
157   }
158
159   public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException {
160     writer.updateDocuments(t, docs, a);
161     // Return gen as of when indexing finished:
162     return indexingGen.get();
163   }
164
165   public long updateDocuments(Term t, Collection<Document> docs) throws IOException {
166     writer.updateDocuments(t, docs);
167     // Return gen as of when indexing finished:
168     return indexingGen.get();
169   }
170
171   public long deleteDocuments(Term t) throws IOException {
172     writer.deleteDocuments(t);
173     // Return gen as of when indexing finished:
174     return indexingGen.get();
175   }
176
177   public long deleteDocuments(Term... terms) throws IOException {
178     writer.deleteDocuments(terms);
179     // Return gen as of when indexing finished:
180     return indexingGen.get();
181   }
182
183   public long deleteDocuments(Query q) throws IOException {
184     writer.deleteDocuments(q);
185     // Return gen as of when indexing finished:
186     return indexingGen.get();
187   }
188
189   public long deleteDocuments(Query... queries) throws IOException {
190     writer.deleteDocuments(queries);
191     // Return gen as of when indexing finished:
192     return indexingGen.get();
193   }
194
195   public long deleteAll() throws IOException {
196     writer.deleteAll();
197     // Return gen as of when indexing finished:
198     return indexingGen.get();
199   }
200
201   public long addDocument(Document d, Analyzer a) throws IOException {
202     writer.addDocument(d, a);
203     // Return gen as of when indexing finished:
204     return indexingGen.get();
205   }
206
207   public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException {
208     writer.addDocuments(docs, a);
209     // Return gen as of when indexing finished:
210     return indexingGen.get();
211   }
212
213   public long addDocument(Document d) throws IOException {
214     writer.addDocument(d);
215     // Return gen as of when indexing finished:
216     return indexingGen.get();
217   }
218
219   public long addDocuments(Collection<Document> docs) throws IOException {
220     writer.addDocuments(docs);
221     // Return gen as of when indexing finished:
222     return indexingGen.get();
223   }
224
225   public long addIndexes(Directory... dirs) throws CorruptIndexException, IOException {
226     writer.addIndexes(dirs);
227     // Return gen as of when indexing finished:
228     return indexingGen.get();
229   }
230
231   public long addIndexes(IndexReader... readers) throws CorruptIndexException, IOException {
232     writer.addIndexes(readers);
233     // Return gen as of when indexing finished:
234     return indexingGen.get();
235   }
236
237   /**
238    * Waits for a given {@link SearcherManager} target generation to be available
239    * via {@link #getSearcherManager(boolean)}. If the current generation is less
240    * than the given target generation this method will block until the
241    * correspondent {@link SearcherManager} is reopened by another thread via
242    * {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed.
243    * 
244    * @param targetGen the generation to wait for
245    * @param requireDeletes <code>true</code> iff the generation requires deletes to be applied otherwise <code>false</code>
246    * @return the {@link SearcherManager} with the given target generation
247    */
248   public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) {
249     return waitForGeneration(targetGen, requireDeletes, -1,  TimeUnit.NANOSECONDS);
250   }
251
252   /**
253    * Waits for a given {@link SearcherManager} target generation to be available
254    * via {@link #getSearcherManager(boolean)}. If the current generation is less
255    * than the given target generation this method will block until the
256    * correspondent {@link SearcherManager} is reopened by another thread via
257    * {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until
258    * the {@link NRTManager} is closed.
259    * <p>
260    * NOTE: if the waiting time elapses before the requested target generation is
261    * available the latest {@link SearcherManager} is returned instead.
262    * 
263    * @param targetGen
264    *          the generation to wait for
265    * @param requireDeletes
266    *          <code>true</code> iff the generation requires deletes to be
267    *          applied otherwise <code>false</code>
268    * @param time
269    *          the time to wait for the target generation
270    * @param unit
271    *          the waiting time's time unit
272    * @return the {@link SearcherManager} with the given target generation or the
273    *         latest {@link SearcherManager} if the waiting time elapsed before
274    *         the requested generation is available.
275    */
276   public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) {
277     try {
278       final long curGen = indexingGen.get();
279       if (targetGen > curGen) {
280         throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
281       }
282       reopenLock.lockInterruptibly();
283       try {
284         if (targetGen > getCurrentSearchingGen(requireDeletes)) {
285           for (WaitingListener listener : waitingListeners) {
286             listener.waiting(requireDeletes, targetGen);
287           }
288           while (targetGen > getCurrentSearchingGen(requireDeletes)) {
289             if (!waitOnGenCondition(time, unit)) {
290               return getSearcherManager(requireDeletes);
291             }
292           }
293         }
294
295       } finally {
296         reopenLock.unlock();
297       }
298     } catch (InterruptedException ie) {
299       throw new ThreadInterruptedException(ie);
300     }
301     return getSearcherManager(requireDeletes);
302   }
303   
304   private boolean waitOnGenCondition(long time, TimeUnit unit)
305       throws InterruptedException {
306     assert reopenLock.isHeldByCurrentThread();
307     if (time < 0) {
308       newGeneration.await();
309       return true;
310     } else {
311       return newGeneration.await(time, unit);
312     }
313   }
314
315   /** Returns generation of current searcher. */
316   public long getCurrentSearchingGen(boolean applyAllDeletes) {
317     if (applyAllDeletes) {
318       return withDeletes.generation;
319     } else {
320       return Math.max(withoutDeletes.generation, withDeletes.generation);
321     }
322   }
323
324   public boolean maybeReopen(boolean applyAllDeletes) throws IOException {
325     if (reopenLock.tryLock()) {
326       try {
327         final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes;
328         // Mark gen as of when reopen started:
329         final long newSearcherGen = indexingGen.getAndIncrement();
330         boolean setSearchGen = false;
331         if (reference.generation == MAX_SEARCHER_GEN) {
332           newGeneration.signalAll(); // wake up threads if we have a new generation
333           return false;
334         }
335         if (!(setSearchGen = reference.manager.isSearcherCurrent())) {
336           setSearchGen = reference.manager.maybeReopen();
337         }
338         if (setSearchGen) {
339           reference.generation = newSearcherGen;// update searcher gen
340           newGeneration.signalAll(); // wake up threads if we have a new generation
341         }
342         return setSearchGen;
343       } finally {
344         reopenLock.unlock();
345       }
346     }
347     return false;
348   }
349
350   /**
351    * Close this NRTManager to future searching. Any searches still in process in
352    * other threads won't be affected, and they should still call
353    * {@link SearcherManager#release(IndexSearcher)} after they are done.
354    * 
355    * <p>
356    * <b>NOTE</b>: caller must separately close the writer.
357    */
358   public void close() throws IOException {
359     reopenLock.lock();
360     try {
361       try {
362         IOUtils.close(withDeletes, withoutDeletes);
363       } finally { // make sure we signal even if close throws an exception
364         newGeneration.signalAll();
365       }
366     } finally {
367       reopenLock.unlock();
368       assert withDeletes.generation == MAX_SEARCHER_GEN && withoutDeletes.generation == MAX_SEARCHER_GEN;
369     }
370   }
371
372   /**
373    * Returns a {@link SearcherManager}. If <code>applyAllDeletes</code> is
374    * <code>true</code> the returned manager is guaranteed to have all deletes
375    * applied on the last reopen. Otherwise the latest manager with or without deletes
376    * is returned.
377    */
378   public SearcherManager getSearcherManager(boolean applyAllDeletes) {
379     if (applyAllDeletes) {
380       return withDeletes.manager;
381     } else {
382       if (withDeletes.generation > withoutDeletes.generation) {
383         return withDeletes.manager;
384       } else {
385         return withoutDeletes.manager;
386       }
387     }
388   }
389   
390   static final class SearcherManagerRef implements Closeable {
391     final boolean applyDeletes;
392     volatile long generation;
393     final SearcherManager manager;
394
395     SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) {
396       super();
397       this.applyDeletes = applyDeletes;
398       this.generation = generation;
399       this.manager = manager;
400     }
401     
402     public void close() throws IOException {
403       generation = MAX_SEARCHER_GEN; // max it out to make sure nobody can wait on another gen
404       manager.close();
405     }
406   }
407 }