pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / test / org / apache / lucene / search / TestNRTManager.java
1 package org.apache.lucene.search;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.apache.lucene.analysis.Analyzer;
27 import org.apache.lucene.analysis.MockAnalyzer;
28 import org.apache.lucene.document.Document;
29 import org.apache.lucene.document.Field.Index;
30 import org.apache.lucene.document.Field.Store;
31 import org.apache.lucene.index.CorruptIndexException;
32 import org.apache.lucene.index.IndexWriter;
33 import org.apache.lucene.index.IndexWriterConfig;
34 import org.apache.lucene.index.Term;
35 import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
36 import org.apache.lucene.store.Directory;
37 import org.apache.lucene.store.LockObtainFailedException;
38 import org.apache.lucene.store.NRTCachingDirectory;
39 import org.apache.lucene.util.IOUtils;
40 import org.apache.lucene.util.ThreadInterruptedException;
41
42 public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
43
44   private final ThreadLocal<Long> lastGens = new ThreadLocal<Long>();
45   private boolean warmCalled;
46
47   public void testNRTManager() throws Exception {
48     runTest("TestNRTManager");
49   }
50
51   @Override
52   protected IndexSearcher getFinalSearcher() throws Exception  {
53     if (VERBOSE) {
54       System.out.println("TEST: finalSearcher maxGen=" + maxGen);
55     }
56     final SearcherManager manager = nrt.waitForGeneration(maxGen, true);
57     return manager.acquire();
58   }
59
60   @Override
61   protected Directory getDirectory(Directory in) {
62     // Randomly swap in NRTCachingDir
63     if (random.nextBoolean()) {
64       if (VERBOSE) {
65         System.out.println("TEST: wrap NRTCachingDir");
66       }
67
68       return new NRTCachingDirectory(in, 5.0, 60.0);
69     } else {
70       return in;
71     }
72   }
73
74   @Override
75   protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
76     final long gen = nrt.updateDocuments(id, docs);
77
78     // Randomly verify the update "took":
79     if (random.nextInt(20) == 2) {
80       if (VERBOSE) {
81         System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
82       }
83       SearcherManager manager = nrt.waitForGeneration(gen, true);
84       final IndexSearcher s = manager.acquire();
85       if (VERBOSE) {
86         System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
87       }
88       try {
89         assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
90       } finally {
91         manager.release(s);
92       }
93     }
94     
95     lastGens.set(gen);
96   }
97
98   @Override
99   protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
100     final long gen = nrt.addDocuments(docs);
101     // Randomly verify the add "took":
102     if (random.nextInt(20) == 2) {
103       if (VERBOSE) {
104         System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
105       }
106       final SearcherManager manager = nrt.waitForGeneration(gen, false);
107       final IndexSearcher s = manager.acquire();
108       if (VERBOSE) {
109         System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
110       }
111       try {
112         assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
113       } finally {
114         manager.release(s);
115       }
116     }
117     lastGens.set(gen);
118   }
119
120   @Override
121   protected void addDocument(Term id, Document doc) throws Exception {
122     final long gen = nrt.addDocument(doc);
123
124     // Randomly verify the add "took":
125     if (random.nextInt(20) == 2) {
126       if (VERBOSE) {
127         System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
128       }
129       final SearcherManager manager = nrt.waitForGeneration(gen, false);
130       final IndexSearcher s = manager.acquire();
131       if (VERBOSE) {
132         System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
133       }
134       try {
135         assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
136       } finally {
137         manager.release(s);
138       }
139     }
140     lastGens.set(gen);
141   }
142
143   @Override
144   protected void updateDocument(Term id, Document doc) throws Exception {
145     final long gen = nrt.updateDocument(id, doc);
146     // Randomly verify the udpate "took":
147     if (random.nextInt(20) == 2) {
148       if (VERBOSE) {
149         System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
150       }
151       final SearcherManager manager = nrt.waitForGeneration(gen, true);
152       final IndexSearcher s = manager.acquire();
153       if (VERBOSE) {
154         System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
155       }
156       try {
157         assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
158       } finally {
159         manager.release(s);
160       }
161     }
162     lastGens.set(gen);
163   }
164
165   @Override
166   protected void deleteDocuments(Term id) throws Exception {
167     final long gen = nrt.deleteDocuments(id);
168     // randomly verify the delete "took":
169     if (random.nextInt(20) == 7) {
170       if (VERBOSE) {
171         System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
172       }
173       final SearcherManager manager = nrt.waitForGeneration(gen, true);
174       final IndexSearcher s = manager.acquire();
175       if (VERBOSE) {
176         System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
177       }
178       try {
179         assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
180       } finally {
181         manager.release(s);
182       }
183     }
184     lastGens.set(gen);
185   }
186
187   private NRTManager nrt;
188   private NRTManagerReopenThread nrtThread;
189   @Override
190   protected void doAfterWriter(ExecutorService es) throws Exception {
191     final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
192     final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
193
194     if (VERBOSE) {
195       System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
196     }
197
198     nrt = new NRTManager(writer, es,
199                          new SearcherWarmer() {
200                            // Not with Java 5: @Override
201                            public void warm(IndexSearcher s) throws IOException {
202                              TestNRTManager.this.warmCalled = true;
203                              s.search(new TermQuery(new Term("body", "united")), 10);
204                            }
205                          }, false);
206                          
207     nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
208     nrtThread.setName("NRT Reopen Thread");
209     nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
210     nrtThread.setDaemon(true);
211     nrtThread.start();
212   }
213
214   @Override
215   protected void doAfterIndexingThreadDone() {
216     Long gen = lastGens.get();
217     if (gen != null) {
218       addMaxGen(gen);
219     }
220   }
221
222   private long maxGen = -1;
223
224   private synchronized void addMaxGen(long gen) {
225     maxGen = Math.max(gen, maxGen);
226   }
227
228   @Override
229   protected void doSearching(ExecutorService es, long stopTime) throws Exception {
230     runSearchThreads(stopTime);
231   }
232
233   @Override
234   protected IndexSearcher getCurrentSearcher() throws Exception {
235     // Test doesn't assert deletions until the end, so we
236     // can randomize whether dels must be applied
237     return nrt.getSearcherManager(random.nextBoolean()).acquire();
238   }
239
240   @Override
241   protected void releaseSearcher(IndexSearcher s) throws Exception {
242     // Test doesn't assert deletions until the end, so we
243     // can randomize whether dels must be applied
244     nrt.getSearcherManager(random.nextBoolean()).release(s);
245   }
246
247   @Override
248   protected void doClose() throws Exception {
249     assertTrue(warmCalled);
250     if (VERBOSE) {
251       System.out.println("TEST: now close NRTManager");
252     }
253     nrtThread.close();
254     nrt.close();
255   }
256   
257   /*
258    * LUCENE-3528 - NRTManager hangs in certain situations 
259    */
260   public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
261     IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
262     Directory d = newDirectory();
263     final CountDownLatch latch = new CountDownLatch(1);
264     final CountDownLatch signal = new CountDownLatch(1);
265
266     LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
267     final NRTManager manager = new NRTManager(writer, null, null, false);
268     Document doc = new Document();
269     doc.add(newField("test","test", Store.YES, Index.ANALYZED));
270     long gen = manager.addDocument(doc);
271     assertTrue(manager.maybeReopen(false));
272     assertFalse(gen < manager.getCurrentSearchingGen(false));
273     Thread t = new Thread() {
274       public void run() {
275         try {
276           signal.await();
277           assertTrue(manager.maybeReopen(false));
278           manager.deleteDocuments(new TermQuery(new Term("foo", "barista")));
279           manager.maybeReopen(false); // kick off another reopen so we inc. the internal gen
280         } catch (Exception e) {
281           e.printStackTrace();
282         } finally {
283           latch.countDown(); // let the add below finish
284         }
285       }
286     };
287     t.start();
288     writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
289     final long lastGen = manager.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
290     assertFalse(manager.getSearcherManager(false).isSearcherCurrent()); // false since there is a delete in the queue
291     
292     IndexSearcher acquire = manager.getSearcherManager(false).acquire();
293     try {
294       assertEquals(2, acquire.getIndexReader().numDocs());
295     } finally {
296       acquire.getIndexReader().decRef();
297     }
298     NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
299     thread.start(); // start reopening
300     if (VERBOSE) {
301       System.out.println("waiting now for generation " + lastGen);
302     }
303     
304     final AtomicBoolean finished = new AtomicBoolean(false);
305     Thread waiter = new Thread() {
306       public void run() {
307         manager.waitForGeneration(lastGen, false);
308         finished.set(true);
309       }
310     };
311     waiter.start();
312     manager.maybeReopen(false);
313     waiter.join(1000);
314     if (!finished.get()) {
315       waiter.interrupt();
316       fail("thread deadlocked on waitForGeneration");
317     }
318     thread.close();
319     thread.join();
320     IOUtils.close(manager, writer, d);
321   }
322   
323   public static class LatchedIndexWriter extends IndexWriter {
324
325     private CountDownLatch latch;
326     boolean waitAfterUpdate = false;
327     private CountDownLatch signal;
328
329     public LatchedIndexWriter(Directory d, IndexWriterConfig conf,
330         CountDownLatch latch, CountDownLatch signal)
331         throws CorruptIndexException, LockObtainFailedException, IOException {
332       super(d, conf);
333       this.latch = latch;
334       this.signal = signal;
335
336     }
337
338     public void updateDocument(Term term,
339         Document doc, Analyzer analyzer)
340         throws CorruptIndexException, IOException {
341       super.updateDocument(term, doc, analyzer);
342       try {
343         if (waitAfterUpdate) {
344           signal.countDown();
345           latch.await();
346         }
347       } catch (InterruptedException e) {
348         throw new ThreadInterruptedException(e);
349       }
350     }
351   }
352 }