1 package org.apache.lucene.search;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicLong;
28 import java.util.concurrent.locks.Condition;
29 import java.util.concurrent.locks.ReentrantLock;
31 import org.apache.lucene.analysis.Analyzer;
32 import org.apache.lucene.document.Document;
33 import org.apache.lucene.index.CorruptIndexException;
34 import org.apache.lucene.index.IndexReader; // javadocs
35 import org.apache.lucene.index.IndexWriter;
36 import org.apache.lucene.index.Term;
37 import org.apache.lucene.search.IndexSearcher; // javadocs
38 import org.apache.lucene.store.Directory;
39 import org.apache.lucene.util.IOUtils;
40 import org.apache.lucene.util.ThreadInterruptedException;
43 * Utility class to manage sharing near-real-time searchers
44 * across multiple searching threads.
46 * <p>NOTE: to use this class, you must call {@link #maybeReopen(boolean)}
47 * periodically. The {@link NRTManagerReopenThread} is a
48 * simple class to do this on a periodic basis. If you
49 * implement your own reopener, be sure to call {@link
50 * #addWaitingListener} so your reopener is notified when a
51 * caller is waiting for a specific generation searcher. </p>
53 * @lucene.experimental
56 public class NRTManager implements Closeable {
57 private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE;
58 private final IndexWriter writer;
59 private final SearcherManagerRef withoutDeletes;
60 private final SearcherManagerRef withDeletes;
61 private final AtomicLong indexingGen;
62 private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
63 private final ReentrantLock reopenLock = new ReentrantLock();
64 private final Condition newGeneration = reopenLock.newCondition();
67 * Create new NRTManager.
69 * @param writer IndexWriter to open near-real-time
71 * @param warmer optional {@link SearcherWarmer}. Pass
72 * null if you don't require the searcher to warmed
73 * before going live. If this is non-null then a
74 * merged segment warmer is installed on the
75 * provided IndexWriter's config.
77 * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
78 * not invoked for the initial searcher; you should
79 * warm it yourself if necessary.
81 public NRTManager(IndexWriter writer, SearcherWarmer warmer) throws IOException {
82 this(writer, null, warmer, true);
86 * Create new NRTManager.
88 * @param writer IndexWriter to open near-real-time
90 * @param es optional ExecutorService so different segments can
91 * be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}.
92 * Pass <code>null</code> to search segments sequentially.
93 * @param warmer optional {@link SearcherWarmer}. Pass
94 * null if you don't require the searcher to warmed
95 * before going live. If this is non-null then a
96 * merged segment warmer is installed on the
97 * provided IndexWriter's config.
99 * <p><b>NOTE</b>: the provided {@link SearcherWarmer} is
100 * not invoked for the initial searcher; you should
101 * warm it yourself if necessary.
103 public NRTManager(IndexWriter writer, ExecutorService es,
104 SearcherWarmer warmer) throws IOException {
105 this(writer, es, warmer, true);
109 * Expert: just like {@link
110 * #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)},
111 * but you can also specify whether every searcher must
112 * apply deletes. This is useful for cases where certain
113 * uses can tolerate seeing some deleted docs, since
114 * reopen time is faster if deletes need not be applied. */
115 public NRTManager(IndexWriter writer, ExecutorService es,
116 SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException {
117 this.writer = writer;
118 if (alwaysApplyDeletes) {
119 withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es));
121 withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es));
122 withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, warmer, es));
124 indexingGen = new AtomicLong(1);
127 /** NRTManager invokes this interface to notify it when a
128 * caller is waiting for a specific generation searcher
130 public static interface WaitingListener {
131 public void waiting(boolean requiresDeletes, long targetGen);
134 /** Adds a listener, to be notified when a caller is
135 * waiting for a specific generation searcher to be
137 public void addWaitingListener(WaitingListener l) {
138 waitingListeners.add(l);
141 /** Remove a listener added with {@link
142 * #addWaitingListener}. */
143 public void removeWaitingListener(WaitingListener l) {
144 waitingListeners.remove(l);
147 public long updateDocument(Term t, Document d, Analyzer a) throws IOException {
148 writer.updateDocument(t, d, a);
149 // Return gen as of when indexing finished:
150 return indexingGen.get();
153 public long updateDocument(Term t, Document d) throws IOException {
154 writer.updateDocument(t, d);
155 // Return gen as of when indexing finished:
156 return indexingGen.get();
159 public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException {
160 writer.updateDocuments(t, docs, a);
161 // Return gen as of when indexing finished:
162 return indexingGen.get();
165 public long updateDocuments(Term t, Collection<Document> docs) throws IOException {
166 writer.updateDocuments(t, docs);
167 // Return gen as of when indexing finished:
168 return indexingGen.get();
171 public long deleteDocuments(Term t) throws IOException {
172 writer.deleteDocuments(t);
173 // Return gen as of when indexing finished:
174 return indexingGen.get();
177 public long deleteDocuments(Term... terms) throws IOException {
178 writer.deleteDocuments(terms);
179 // Return gen as of when indexing finished:
180 return indexingGen.get();
183 public long deleteDocuments(Query q) throws IOException {
184 writer.deleteDocuments(q);
185 // Return gen as of when indexing finished:
186 return indexingGen.get();
189 public long deleteDocuments(Query... queries) throws IOException {
190 writer.deleteDocuments(queries);
191 // Return gen as of when indexing finished:
192 return indexingGen.get();
195 public long deleteAll() throws IOException {
197 // Return gen as of when indexing finished:
198 return indexingGen.get();
201 public long addDocument(Document d, Analyzer a) throws IOException {
202 writer.addDocument(d, a);
203 // Return gen as of when indexing finished:
204 return indexingGen.get();
207 public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException {
208 writer.addDocuments(docs, a);
209 // Return gen as of when indexing finished:
210 return indexingGen.get();
213 public long addDocument(Document d) throws IOException {
214 writer.addDocument(d);
215 // Return gen as of when indexing finished:
216 return indexingGen.get();
219 public long addDocuments(Collection<Document> docs) throws IOException {
220 writer.addDocuments(docs);
221 // Return gen as of when indexing finished:
222 return indexingGen.get();
225 public long addIndexes(Directory... dirs) throws CorruptIndexException, IOException {
226 writer.addIndexes(dirs);
227 // Return gen as of when indexing finished:
228 return indexingGen.get();
231 public long addIndexes(IndexReader... readers) throws CorruptIndexException, IOException {
232 writer.addIndexes(readers);
233 // Return gen as of when indexing finished:
234 return indexingGen.get();
238 * Waits for a given {@link SearcherManager} target generation to be available
239 * via {@link #getSearcherManager(boolean)}. If the current generation is less
240 * than the given target generation this method will block until the
241 * correspondent {@link SearcherManager} is reopened by another thread via
242 * {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed.
244 * @param targetGen the generation to wait for
245 * @param requireDeletes <code>true</code> iff the generation requires deletes to be applied otherwise <code>false</code>
246 * @return the {@link SearcherManager} with the given target generation
248 public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) {
249 return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS);
253 * Waits for a given {@link SearcherManager} target generation to be available
254 * via {@link #getSearcherManager(boolean)}. If the current generation is less
255 * than the given target generation this method will block until the
256 * correspondent {@link SearcherManager} is reopened by another thread via
257 * {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until
258 * the {@link NRTManager} is closed.
260 * NOTE: if the waiting time elapses before the requested target generation is
261 * available the latest {@link SearcherManager} is returned instead.
264 * the generation to wait for
265 * @param requireDeletes
266 * <code>true</code> iff the generation requires deletes to be
267 * applied otherwise <code>false</code>
269 * the time to wait for the target generation
271 * the waiting time's time unit
272 * @return the {@link SearcherManager} with the given target generation or the
273 * latest {@link SearcherManager} if the waiting time elapsed before
274 * the requested generation is available.
276 public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) {
278 final long curGen = indexingGen.get();
279 if (targetGen > curGen) {
280 throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")");
282 reopenLock.lockInterruptibly();
284 if (targetGen > getCurrentSearchingGen(requireDeletes)) {
285 for (WaitingListener listener : waitingListeners) {
286 listener.waiting(requireDeletes, targetGen);
288 while (targetGen > getCurrentSearchingGen(requireDeletes)) {
289 if (!waitOnGenCondition(time, unit)) {
290 return getSearcherManager(requireDeletes);
298 } catch (InterruptedException ie) {
299 throw new ThreadInterruptedException(ie);
301 return getSearcherManager(requireDeletes);
304 private boolean waitOnGenCondition(long time, TimeUnit unit)
305 throws InterruptedException {
306 assert reopenLock.isHeldByCurrentThread();
308 newGeneration.await();
311 return newGeneration.await(time, unit);
315 /** Returns generation of current searcher. */
316 public long getCurrentSearchingGen(boolean applyAllDeletes) {
317 if (applyAllDeletes) {
318 return withDeletes.generation;
320 return Math.max(withoutDeletes.generation, withDeletes.generation);
324 public boolean maybeReopen(boolean applyAllDeletes) throws IOException {
325 if (reopenLock.tryLock()) {
327 final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes;
328 // Mark gen as of when reopen started:
329 final long newSearcherGen = indexingGen.getAndIncrement();
330 boolean setSearchGen = false;
331 if (reference.generation == MAX_SEARCHER_GEN) {
332 newGeneration.signalAll(); // wake up threads if we have a new generation
335 if (!(setSearchGen = reference.manager.isSearcherCurrent())) {
336 setSearchGen = reference.manager.maybeReopen();
339 reference.generation = newSearcherGen;// update searcher gen
340 newGeneration.signalAll(); // wake up threads if we have a new generation
351 * Close this NRTManager to future searching. Any searches still in process in
352 * other threads won't be affected, and they should still call
353 * {@link SearcherManager#release(IndexSearcher)} after they are done.
356 * <b>NOTE</b>: caller must separately close the writer.
358 public void close() throws IOException {
362 IOUtils.close(withDeletes, withoutDeletes);
363 } finally { // make sure we signal even if close throws an exception
364 newGeneration.signalAll();
368 assert withDeletes.generation == MAX_SEARCHER_GEN && withoutDeletes.generation == MAX_SEARCHER_GEN;
373 * Returns a {@link SearcherManager}. If <code>applyAllDeletes</code> is
374 * <code>true</code> the returned manager is guaranteed to have all deletes
375 * applied on the last reopen. Otherwise the latest manager with or without deletes
378 public SearcherManager getSearcherManager(boolean applyAllDeletes) {
379 if (applyAllDeletes) {
380 return withDeletes.manager;
382 if (withDeletes.generation > withoutDeletes.generation) {
383 return withDeletes.manager;
385 return withoutDeletes.manager;
390 static final class SearcherManagerRef implements Closeable {
391 final boolean applyDeletes;
392 volatile long generation;
393 final SearcherManager manager;
395 SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) {
397 this.applyDeletes = applyDeletes;
398 this.generation = generation;
399 this.manager = manager;
402 public void close() throws IOException {
403 generation = MAX_SEARCHER_GEN; // max it out to make sure nobody can wait on another gen