1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.List;
28 import org.apache.lucene.analysis.Analyzer;
29 import org.apache.lucene.index.IndexReader; // javadocs
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.search.IndexSearcher;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.util.ThreadInterruptedException;
36 // - we could make this work also w/ "normal" reopen/commit?
39 * Utility class to manage sharing near-real-time searchers
40 * across multiple searching threads.
42 * <p>NOTE: to use this class, you must call reopen
43 * periodically. The {@link NRTManagerReopenThread} is a
44 * simple class to do this on a periodic basis. If you
45 * implement your own reopener, be sure to call {@link
46 * #addWaitingListener} so your reopener is notified when a
47 * caller is waiting for a specific generation searcher. </p>
49 * @lucene.experimental
52 public class NRTManager implements Closeable {
53 private final IndexWriter writer;
54 private final ExecutorService es;
55 private final AtomicLong indexingGen;
56 private final AtomicLong searchingGen;
57 private final AtomicLong noDeletesSearchingGen;
58 private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
60 private volatile IndexSearcher currentSearcher;
61 private volatile IndexSearcher noDeletesCurrentSearcher;
64 * Create new NRTManager. Note that this installs a
65 * merged segment warmer on the provided IndexWriter's
68 * @param writer IndexWriter to open near-real-time
71 public NRTManager(IndexWriter writer) throws IOException {
76 * Create new NRTManager. Note that this installs a
77 * merged segment warmer on the provided IndexWriter's
80 * @param writer IndexWriter to open near-real-time
82 * @param es ExecutorService to pass to the IndexSearcher
84 public NRTManager(IndexWriter writer, ExecutorService es) throws IOException {
88 indexingGen = new AtomicLong(1);
89 searchingGen = new AtomicLong(-1);
90 noDeletesSearchingGen = new AtomicLong(-1);
92 // Create initial reader:
93 swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
95 writer.getConfig().setMergedSegmentWarmer(
96 new IndexWriter.IndexReaderWarmer() {
98 public void warm(IndexReader reader) throws IOException {
99 NRTManager.this.warm(reader);
104 /** NRTManager invokes this interface to notify it when a
105 * caller is waiting for a specific generation searcher
107 public static interface WaitingListener {
108 public void waiting(boolean requiresDeletes, long targetGen);
111 /** Adds a listener, to be notified when a caller is
112 * waiting for a specific generation searcher to be
114 public void addWaitingListener(WaitingListener l) {
115 waitingListeners.add(l);
118 /** Remove a listener added with {@link
119 * #addWaitingListener}. */
120 public void removeWaitingListener(WaitingListener l) {
121 waitingListeners.remove(l);
124 public long updateDocument(Term t, Document d, Analyzer a) throws IOException {
125 writer.updateDocument(t, d, a);
126 // Return gen as of when indexing finished:
127 return indexingGen.get();
130 public long updateDocument(Term t, Document d) throws IOException {
131 writer.updateDocument(t, d);
132 // Return gen as of when indexing finished:
133 return indexingGen.get();
136 public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException {
137 writer.updateDocuments(t, docs, a);
138 // Return gen as of when indexing finished:
139 return indexingGen.get();
142 public long updateDocuments(Term t, Collection<Document> docs) throws IOException {
143 writer.updateDocuments(t, docs);
144 // Return gen as of when indexing finished:
145 return indexingGen.get();
148 public long deleteDocuments(Term t) throws IOException {
149 writer.deleteDocuments(t);
150 // Return gen as of when indexing finished:
151 return indexingGen.get();
154 public long deleteDocuments(Query q) throws IOException {
155 writer.deleteDocuments(q);
156 // Return gen as of when indexing finished:
157 return indexingGen.get();
160 public long addDocument(Document d, Analyzer a) throws IOException {
161 writer.addDocument(d, a);
162 // Return gen as of when indexing finished:
163 return indexingGen.get();
166 public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException {
167 writer.addDocuments(docs, a);
168 // Return gen as of when indexing finished:
169 return indexingGen.get();
172 public long addDocument(Document d) throws IOException {
173 writer.addDocument(d);
174 // Return gen as of when indexing finished:
175 return indexingGen.get();
178 public long addDocuments(Collection<Document> docs) throws IOException {
179 writer.addDocuments(docs);
180 // Return gen as of when indexing finished:
181 return indexingGen.get();
184 /** Returns the most current searcher. If you require a
185 * certain indexing generation be visible in the returned
186 * searcher, call {@link #get(long)}
189 public synchronized IndexSearcher get() {
193 /** Just like {@link #get}, but by passing <code>false</code> for
194 * requireDeletes, you can get faster reopen time, but
195 * the returned reader is allowed to not reflect all
196 * deletions. See {@link IndexReader#open(IndexWriter,boolean)} */
197 public synchronized IndexSearcher get(boolean requireDeletes) {
198 final IndexSearcher s;
199 if (requireDeletes) {
201 } else if (noDeletesSearchingGen.get() > searchingGen.get()) {
202 s = noDeletesCurrentSearcher;
206 s.getIndexReader().incRef();
210 /** Call this if you require a searcher reflecting all
211 * changes as of the target generation.
213 * @param targetGen Returned searcher must reflect changes
214 * as of this generation
216 public synchronized IndexSearcher get(long targetGen) {
217 return get(targetGen, true);
220 /** Call this if you require a searcher reflecting all
221 * changes as of the target generation, and you don't
222 * require deletions to be reflected. Note that the
223 * returned searcher may still reflect some or all
226 * @param targetGen Returned searcher must reflect changes
227 * as of this generation
229 * @param requireDeletes If true, the returned searcher must
230 * reflect all deletions. This can be substantially more
231 * costly than not applying deletes. Note that if you
232 * pass false, it's still possible that some or all
233 * deletes may have been applied.
235 public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
237 assert noDeletesSearchingGen.get() >= searchingGen.get();
239 if (targetGen > getCurrentSearchingGen(requireDeletes)) {
241 //final long t0 = System.nanoTime();
242 for(WaitingListener listener : waitingListeners) {
243 listener.waiting(requireDeletes, targetGen);
245 while (targetGen > getCurrentSearchingGen(requireDeletes)) {
246 //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
249 } catch (InterruptedException ie) {
250 throw new ThreadInterruptedException(ie);
253 //final long waitNS = System.nanoTime()-t0;
254 //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
257 return get(requireDeletes);
260 /** Returns generation of current searcher. */
261 public long getCurrentSearchingGen(boolean requiresDeletes) {
262 return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
265 /** Release the searcher obtained from {@link
266 * #get()} or {@link #get(long)}. */
267 public void release(IndexSearcher s) throws IOException {
268 s.getIndexReader().decRef();
271 /** Call this when you need the NRT reader to reopen.
273 * @param applyDeletes If true, the newly opened reader
274 * will reflect all deletes
276 public boolean reopen(boolean applyDeletes) throws IOException {
278 // Mark gen as of when reopen started:
279 final long newSearcherGen = indexingGen.getAndIncrement();
281 if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
282 //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
283 searchingGen.set(newSearcherGen);
284 noDeletesSearchingGen.set(newSearcherGen);
288 //System.out.println("reopen: skip: return");
290 } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
291 //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
292 noDeletesSearchingGen.set(newSearcherGen);
296 //System.out.println("reopen: skip: return");
300 //System.out.println("indexingGen now " + indexingGen);
302 // .reopen() returns a new reference:
304 // Start from whichever searcher is most current:
305 final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
306 final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
310 // Transfer reference to swapSearcher:
311 swapSearcher(new IndexSearcher(nextReader, es),
318 /** Override this to warm the newly opened reader before
319 * it's swapped in. Note that this is called both for
320 * newly merged segments and for new top-level readers
321 * opened by #reopen. */
322 protected void warm(IndexReader reader) throws IOException {
325 // Steals a reference from newSearcher:
326 private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
327 //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
329 // Always replace noDeletesCurrentSearcher:
330 if (noDeletesCurrentSearcher != null) {
331 noDeletesCurrentSearcher.getIndexReader().decRef();
333 noDeletesCurrentSearcher = newSearcher;
334 assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
335 noDeletesSearchingGen.set(newSearchingGen);
338 // Deletes were applied, so we also update currentSearcher:
339 if (currentSearcher != null) {
340 currentSearcher.getIndexReader().decRef();
342 currentSearcher = newSearcher;
343 if (newSearcher != null) {
344 newSearcher.getIndexReader().incRef();
346 assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
347 searchingGen.set(newSearchingGen);
351 //System.out.println(Thread.currentThread().getName() + ": done");
354 /** NOTE: caller must separately close the writer. */
355 // @Override -- not until Java 1.6
356 public void close() throws IOException {
357 swapSearcher(null, indexingGen.getAndIncrement(), true);