add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / contrib / misc / src / java / org / apache / lucene / index / NRTManager.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.List;
27
28 import org.apache.lucene.analysis.Analyzer;
29 import org.apache.lucene.index.IndexReader;       // javadocs
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.search.IndexSearcher;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.util.ThreadInterruptedException;
34
35 // TODO
36 //   - we could make this work also w/ "normal" reopen/commit?
37
38 /**
39  * Utility class to manage sharing near-real-time searchers
40  * across multiple searching threads.
41  *
42  * <p>NOTE: to use this class, you must call reopen
43  * periodically.  The {@link NRTManagerReopenThread} is a
44  * simple class to do this on a periodic basis.  If you
45  * implement your own reopener, be sure to call {@link
46  * #addWaitingListener} so your reopener is notified when a
47  * caller is waiting for a specific generation searcher. </p>
48  *
49  * @lucene.experimental
50 */
51
52 public class NRTManager implements Closeable {
53   private final IndexWriter writer;
54   private final ExecutorService es;
55   private final AtomicLong indexingGen;
56   private final AtomicLong searchingGen;
57   private final AtomicLong noDeletesSearchingGen;
58   private final List<WaitingListener> waitingListeners = new CopyOnWriteArrayList<WaitingListener>();
59
60   private volatile IndexSearcher currentSearcher;
61   private volatile IndexSearcher noDeletesCurrentSearcher;
62
63   /**
64    * Create new NRTManager.  Note that this installs a
65    * merged segment warmer on the provided IndexWriter's
66    * config.
67    * 
68    *  @param writer IndexWriter to open near-real-time
69    *         readers
70   */
71   public NRTManager(IndexWriter writer) throws IOException {
72     this(writer, null);
73   }
74
75   /**
76    * Create new NRTManager.  Note that this installs a
77    * merged segment warmer on the provided IndexWriter's
78    * config.
79    * 
80    *  @param writer IndexWriter to open near-real-time
81    *         readers
82    *  @param es ExecutorService to pass to the IndexSearcher
83   */
84   public NRTManager(IndexWriter writer, ExecutorService es) throws IOException {
85
86     this.writer = writer;
87     this.es = es;
88     indexingGen = new AtomicLong(1);
89     searchingGen = new AtomicLong(-1);
90     noDeletesSearchingGen = new AtomicLong(-1);
91
92     // Create initial reader:
93     swapSearcher(new IndexSearcher(IndexReader.open(writer, true), es), 0, true);
94
95     writer.getConfig().setMergedSegmentWarmer(
96          new IndexWriter.IndexReaderWarmer() {
97            @Override
98            public void warm(IndexReader reader) throws IOException {
99              NRTManager.this.warm(reader);
100            }
101          });
102   }
103
104   /** NRTManager invokes this interface to notify it when a
105    *  caller is waiting for a specific generation searcher
106    *  to be visible. */
107   public static interface WaitingListener {
108     public void waiting(boolean requiresDeletes, long targetGen);
109   }
110
111   /** Adds a listener, to be notified when a caller is
112    *  waiting for a specific generation searcher to be
113    *  visible. */
114   public void addWaitingListener(WaitingListener l) {
115     waitingListeners.add(l);
116   }
117
118   /** Remove a listener added with {@link
119    *  #addWaitingListener}. */
120   public void removeWaitingListener(WaitingListener l) {
121     waitingListeners.remove(l);
122   }
123
124   public long updateDocument(Term t, Document d, Analyzer a) throws IOException {
125     writer.updateDocument(t, d, a);
126     // Return gen as of when indexing finished:
127     return indexingGen.get();
128   }
129
130   public long updateDocument(Term t, Document d) throws IOException {
131     writer.updateDocument(t, d);
132     // Return gen as of when indexing finished:
133     return indexingGen.get();
134   }
135
136   public long updateDocuments(Term t, Collection<Document> docs, Analyzer a) throws IOException {
137     writer.updateDocuments(t, docs, a);
138     // Return gen as of when indexing finished:
139     return indexingGen.get();
140   }
141
142   public long updateDocuments(Term t, Collection<Document> docs) throws IOException {
143     writer.updateDocuments(t, docs);
144     // Return gen as of when indexing finished:
145     return indexingGen.get();
146   }
147
148   public long deleteDocuments(Term t) throws IOException {
149     writer.deleteDocuments(t);
150     // Return gen as of when indexing finished:
151     return indexingGen.get();
152   }
153
154   public long deleteDocuments(Query q) throws IOException {
155     writer.deleteDocuments(q);
156     // Return gen as of when indexing finished:
157     return indexingGen.get();
158   }
159
160   public long addDocument(Document d, Analyzer a) throws IOException {
161     writer.addDocument(d, a);
162     // Return gen as of when indexing finished:
163     return indexingGen.get();
164   }
165
166   public long addDocuments(Collection<Document> docs, Analyzer a) throws IOException {
167     writer.addDocuments(docs, a);
168     // Return gen as of when indexing finished:
169     return indexingGen.get();
170   }
171
172   public long addDocument(Document d) throws IOException {
173     writer.addDocument(d);
174     // Return gen as of when indexing finished:
175     return indexingGen.get();
176   }
177
178   public long addDocuments(Collection<Document> docs) throws IOException {
179     writer.addDocuments(docs);
180     // Return gen as of when indexing finished:
181     return indexingGen.get();
182   }
183
184   /** Returns the most current searcher.  If you require a
185    *  certain indexing generation be visible in the returned
186    *  searcher, call {@link #get(long)}
187    *  instead.
188    */
189   public synchronized IndexSearcher get() {
190     return get(true);
191   }
192
193   /** Just like {@link #get}, but by passing <code>false</code> for
194    *  requireDeletes, you can get faster reopen time, but
195    *  the returned reader is allowed to not reflect all
196    *  deletions.  See {@link IndexReader#open(IndexWriter,boolean)}  */
197   public synchronized IndexSearcher get(boolean requireDeletes) {
198     final IndexSearcher s;
199     if (requireDeletes) {
200       s = currentSearcher;
201     } else if (noDeletesSearchingGen.get() > searchingGen.get()) {
202       s = noDeletesCurrentSearcher;
203     } else {
204       s = currentSearcher;
205     }
206     s.getIndexReader().incRef();
207     return s;
208   }
209
210   /** Call this if you require a searcher reflecting all
211    *  changes as of the target generation.
212    *
213    * @param targetGen Returned searcher must reflect changes
214    * as of this generation
215    */
216   public synchronized IndexSearcher get(long targetGen) {
217     return get(targetGen, true);
218   }
219
220   /** Call this if you require a searcher reflecting all
221    *  changes as of the target generation, and you don't
222    *  require deletions to be reflected.  Note that the
223    *  returned searcher may still reflect some or all
224    *  deletions.
225    *
226    * @param targetGen Returned searcher must reflect changes
227    * as of this generation
228    *
229    * @param requireDeletes If true, the returned searcher must
230    * reflect all deletions.  This can be substantially more
231    * costly than not applying deletes.  Note that if you
232    * pass false, it's still possible that some or all
233    * deletes may have been applied.
234    **/
235   public synchronized IndexSearcher get(long targetGen, boolean requireDeletes) {
236
237     assert noDeletesSearchingGen.get() >= searchingGen.get();
238
239     if (targetGen > getCurrentSearchingGen(requireDeletes)) {
240       // Must wait
241       //final long t0 = System.nanoTime();
242       for(WaitingListener listener : waitingListeners) {
243         listener.waiting(requireDeletes, targetGen);
244       }
245       while (targetGen > getCurrentSearchingGen(requireDeletes)) {
246         //System.out.println(Thread.currentThread().getName() + ": wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes);
247         try {
248           wait();
249         } catch (InterruptedException ie) {
250           throw new ThreadInterruptedException(ie);
251         }
252       }
253       //final long waitNS = System.nanoTime()-t0;
254       //System.out.println(Thread.currentThread().getName() + ": done wait fresh searcher targetGen=" + targetGen + " vs searchingGen=" + getCurrentSearchingGen(requireDeletes) + " requireDeletes=" + requireDeletes + " WAIT msec=" + (waitNS/1000000.0));
255     }
256
257     return get(requireDeletes);
258   }
259
260   /** Returns generation of current searcher. */
261   public long getCurrentSearchingGen(boolean requiresDeletes) {
262     return requiresDeletes ? searchingGen.get() : noDeletesSearchingGen.get();
263   }
264
265   /** Release the searcher obtained from {@link
266    *  #get()} or {@link #get(long)}. */
267   public void release(IndexSearcher s) throws IOException {
268     s.getIndexReader().decRef();
269   }
270
271   /** Call this when you need the NRT reader to reopen.
272    *
273    * @param applyDeletes If true, the newly opened reader
274    *        will reflect all deletes
275    */
276   public boolean reopen(boolean applyDeletes) throws IOException {
277
278     // Mark gen as of when reopen started:
279     final long newSearcherGen = indexingGen.getAndIncrement();
280
281     if (applyDeletes && currentSearcher.getIndexReader().isCurrent()) {
282       //System.out.println("reopen: skip: isCurrent both force gen=" + newSearcherGen + " vs current gen=" + searchingGen);
283       searchingGen.set(newSearcherGen);
284       noDeletesSearchingGen.set(newSearcherGen);
285       synchronized(this) {
286         notifyAll();
287       }
288       //System.out.println("reopen: skip: return");
289       return false;
290     } else if (!applyDeletes && noDeletesCurrentSearcher.getIndexReader().isCurrent()) {
291       //System.out.println("reopen: skip: isCurrent force gen=" + newSearcherGen + " vs current gen=" + noDeletesSearchingGen);
292       noDeletesSearchingGen.set(newSearcherGen);
293       synchronized(this) {
294         notifyAll();
295       }
296       //System.out.println("reopen: skip: return");
297       return false;
298     }
299
300     //System.out.println("indexingGen now " + indexingGen);
301
302     // .reopen() returns a new reference:
303
304     // Start from whichever searcher is most current:
305     final IndexSearcher startSearcher = noDeletesSearchingGen.get() > searchingGen.get() ? noDeletesCurrentSearcher : currentSearcher;
306     final IndexReader nextReader = startSearcher.getIndexReader().reopen(writer, applyDeletes);
307
308     warm(nextReader);
309
310     // Transfer reference to swapSearcher:
311     swapSearcher(new IndexSearcher(nextReader, es),
312                  newSearcherGen,
313                  applyDeletes);
314
315     return true;
316   }
317
318   /** Override this to warm the newly opened reader before
319    *  it's swapped in.  Note that this is called both for
320    *  newly merged segments and for new top-level readers
321    *  opened by #reopen. */
322   protected void warm(IndexReader reader) throws IOException {
323   }
324
325   // Steals a reference from newSearcher:
326   private synchronized void swapSearcher(IndexSearcher newSearcher, long newSearchingGen, boolean applyDeletes) throws IOException {
327     //System.out.println(Thread.currentThread().getName() + ": swap searcher gen=" + newSearchingGen + " applyDeletes=" + applyDeletes);
328     
329     // Always replace noDeletesCurrentSearcher:
330     if (noDeletesCurrentSearcher != null) {
331       noDeletesCurrentSearcher.getIndexReader().decRef();
332     }
333     noDeletesCurrentSearcher = newSearcher;
334     assert newSearchingGen > noDeletesSearchingGen.get(): "newSearchingGen=" + newSearchingGen + " noDeletesSearchingGen=" + noDeletesSearchingGen;
335     noDeletesSearchingGen.set(newSearchingGen);
336
337     if (applyDeletes) {
338       // Deletes were applied, so we also update currentSearcher:
339       if (currentSearcher != null) {
340         currentSearcher.getIndexReader().decRef();
341       }
342       currentSearcher = newSearcher;
343       if (newSearcher != null) {
344         newSearcher.getIndexReader().incRef();
345       }
346       assert newSearchingGen > searchingGen.get(): "newSearchingGen=" + newSearchingGen + " searchingGen=" + searchingGen;
347       searchingGen.set(newSearchingGen);
348     }
349
350     notifyAll();
351     //System.out.println(Thread.currentThread().getName() + ": done");
352   }
353
354   /** NOTE: caller must separately close the writer. */
355   // @Override -- not until Java 1.6
356   public void close() throws IOException {
357     swapSearcher(null, indexingGen.getAndIncrement(), true);
358   }
359 }