1 package org.apache.lucene.search;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.apache.lucene.analysis.Analyzer;
27 import org.apache.lucene.analysis.MockAnalyzer;
28 import org.apache.lucene.document.Document;
29 import org.apache.lucene.document.Field.Index;
30 import org.apache.lucene.document.Field.Store;
31 import org.apache.lucene.index.CorruptIndexException;
32 import org.apache.lucene.index.IndexWriter;
33 import org.apache.lucene.index.IndexWriterConfig;
34 import org.apache.lucene.index.Term;
35 import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
36 import org.apache.lucene.store.Directory;
37 import org.apache.lucene.store.LockObtainFailedException;
38 import org.apache.lucene.store.NRTCachingDirectory;
39 import org.apache.lucene.util.IOUtils;
40 import org.apache.lucene.util.ThreadInterruptedException;
42 public class TestNRTManager extends ThreadedIndexingAndSearchingTestCase {
44 private final ThreadLocal<Long> lastGens = new ThreadLocal<Long>();
45 private boolean warmCalled;
47 public void testNRTManager() throws Exception {
48 runTest("TestNRTManager");
52 protected IndexSearcher getFinalSearcher() throws Exception {
54 System.out.println("TEST: finalSearcher maxGen=" + maxGen);
56 final SearcherManager manager = nrt.waitForGeneration(maxGen, true);
57 return manager.acquire();
61 protected Directory getDirectory(Directory in) {
62 // Randomly swap in NRTCachingDir
63 if (random.nextBoolean()) {
65 System.out.println("TEST: wrap NRTCachingDir");
68 return new NRTCachingDirectory(in, 5.0, 60.0);
75 protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
76 final long gen = nrt.updateDocuments(id, docs);
78 // Randomly verify the update "took":
79 if (random.nextInt(20) == 2) {
81 System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
83 SearcherManager manager = nrt.waitForGeneration(gen, true);
84 final IndexSearcher s = manager.acquire();
86 System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
89 assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
99 protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
100 final long gen = nrt.addDocuments(docs);
101 // Randomly verify the add "took":
102 if (random.nextInt(20) == 2) {
104 System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
106 final SearcherManager manager = nrt.waitForGeneration(gen, false);
107 final IndexSearcher s = manager.acquire();
109 System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
112 assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
121 protected void addDocument(Term id, Document doc) throws Exception {
122 final long gen = nrt.addDocument(doc);
124 // Randomly verify the add "took":
125 if (random.nextInt(20) == 2) {
127 System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
129 final SearcherManager manager = nrt.waitForGeneration(gen, false);
130 final IndexSearcher s = manager.acquire();
132 System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
135 assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
144 protected void updateDocument(Term id, Document doc) throws Exception {
145 final long gen = nrt.updateDocument(id, doc);
146 // Randomly verify the udpate "took":
147 if (random.nextInt(20) == 2) {
149 System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
151 final SearcherManager manager = nrt.waitForGeneration(gen, true);
152 final IndexSearcher s = manager.acquire();
154 System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
157 assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
166 protected void deleteDocuments(Term id) throws Exception {
167 final long gen = nrt.deleteDocuments(id);
168 // randomly verify the delete "took":
169 if (random.nextInt(20) == 7) {
171 System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
173 final SearcherManager manager = nrt.waitForGeneration(gen, true);
174 final IndexSearcher s = manager.acquire();
176 System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
179 assertEquals(0, s.search(new TermQuery(id), 10).totalHits);
187 private NRTManager nrt;
188 private NRTManagerReopenThread nrtThread;
190 protected void doAfterWriter(ExecutorService es) throws Exception {
191 final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
192 final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
195 System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
198 nrt = new NRTManager(writer, es,
199 new SearcherWarmer() {
200 // Not with Java 5: @Override
201 public void warm(IndexSearcher s) throws IOException {
202 TestNRTManager.this.warmCalled = true;
203 s.search(new TermQuery(new Term("body", "united")), 10);
207 nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
208 nrtThread.setName("NRT Reopen Thread");
209 nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
210 nrtThread.setDaemon(true);
215 protected void doAfterIndexingThreadDone() {
216 Long gen = lastGens.get();
222 private long maxGen = -1;
224 private synchronized void addMaxGen(long gen) {
225 maxGen = Math.max(gen, maxGen);
229 protected void doSearching(ExecutorService es, long stopTime) throws Exception {
230 runSearchThreads(stopTime);
234 protected IndexSearcher getCurrentSearcher() throws Exception {
235 // Test doesn't assert deletions until the end, so we
236 // can randomize whether dels must be applied
237 return nrt.getSearcherManager(random.nextBoolean()).acquire();
241 protected void releaseSearcher(IndexSearcher s) throws Exception {
242 // Test doesn't assert deletions until the end, so we
243 // can randomize whether dels must be applied
244 nrt.getSearcherManager(random.nextBoolean()).release(s);
248 protected void doClose() throws Exception {
249 assertTrue(warmCalled);
251 System.out.println("TEST: now close NRTManager");
258 * LUCENE-3528 - NRTManager hangs in certain situations
260 public void testThreadStarvationNoDeleteNRTReader() throws IOException, InterruptedException {
261 IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
262 Directory d = newDirectory();
263 final CountDownLatch latch = new CountDownLatch(1);
264 final CountDownLatch signal = new CountDownLatch(1);
266 LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
267 final NRTManager manager = new NRTManager(writer, null, null, false);
268 Document doc = new Document();
269 doc.add(newField("test","test", Store.YES, Index.ANALYZED));
270 long gen = manager.addDocument(doc);
271 assertTrue(manager.maybeReopen(false));
272 assertFalse(gen < manager.getCurrentSearchingGen(false));
273 Thread t = new Thread() {
277 assertTrue(manager.maybeReopen(false));
278 manager.deleteDocuments(new TermQuery(new Term("foo", "barista")));
279 manager.maybeReopen(false); // kick off another reopen so we inc. the internal gen
280 } catch (Exception e) {
283 latch.countDown(); // let the add below finish
288 writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
289 final long lastGen = manager.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
290 assertFalse(manager.getSearcherManager(false).isSearcherCurrent()); // false since there is a delete in the queue
292 IndexSearcher acquire = manager.getSearcherManager(false).acquire();
294 assertEquals(2, acquire.getIndexReader().numDocs());
296 acquire.getIndexReader().decRef();
298 NRTManagerReopenThread thread = new NRTManagerReopenThread(manager, 0.01, 0.01);
299 thread.start(); // start reopening
301 System.out.println("waiting now for generation " + lastGen);
304 final AtomicBoolean finished = new AtomicBoolean(false);
305 Thread waiter = new Thread() {
307 manager.waitForGeneration(lastGen, false);
312 manager.maybeReopen(false);
314 if (!finished.get()) {
316 fail("thread deadlocked on waitForGeneration");
320 IOUtils.close(manager, writer, d);
323 public static class LatchedIndexWriter extends IndexWriter {
325 private CountDownLatch latch;
326 boolean waitAfterUpdate = false;
327 private CountDownLatch signal;
329 public LatchedIndexWriter(Directory d, IndexWriterConfig conf,
330 CountDownLatch latch, CountDownLatch signal)
331 throws CorruptIndexException, LockObtainFailedException, IOException {
334 this.signal = signal;
338 public void updateDocument(Term term,
339 Document doc, Analyzer analyzer)
340 throws CorruptIndexException, IOException {
341 super.updateDocument(term, doc, analyzer);
343 if (waitAfterUpdate) {
347 } catch (InterruptedException e) {
348 throw new ThreadInterruptedException(e);