1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
33 import org.apache.lucene.analysis.MockAnalyzer;
34 import org.apache.lucene.document.Document;
35 import org.apache.lucene.document.Field;
36 import org.apache.lucene.document.Fieldable;
37 import org.apache.lucene.search.IndexSearcher;
38 import org.apache.lucene.search.PhraseQuery;
39 import org.apache.lucene.search.Query;
40 import org.apache.lucene.search.ScoreDoc;
41 import org.apache.lucene.search.Sort;
42 import org.apache.lucene.search.SortField;
43 import org.apache.lucene.search.TermQuery;
44 import org.apache.lucene.search.TopDocs;
45 import org.apache.lucene.store.Directory;
46 import org.apache.lucene.store.MockDirectoryWrapper;
47 import org.apache.lucene.store.NRTCachingDirectory;
48 import org.apache.lucene.util.LineFileDocs;
49 import org.apache.lucene.util.LuceneTestCase;
50 import org.apache.lucene.util.NamedThreadFactory;
51 import org.apache.lucene.util._TestUtil;
52 import org.junit.Test;
55 // - mix in optimize, addIndexes
56 // - randomoly mix in non-congruent docs
58 // NOTE: This is a copy of TestNRTThreads, but swapping in
59 // NRTManager for adding/updating/searching
61 public class TestNRTManager extends LuceneTestCase {
63 private static class SubDocs {
64 public final String packID;
65 public final List<String> subIDs;
66 public boolean deleted;
68 public SubDocs(String packID, List<String> subIDs) {
74 // TODO: is there a pre-existing way to do this!!!
75 private Document cloneDoc(Document doc1) {
76 final Document doc2 = new Document();
77 for(Fieldable f : doc1.getFields()) {
78 Field field1 = (Field) f;
80 Field field2 = new Field(field1.name(),
82 field1.isStored() ? Field.Store.YES : Field.Store.NO,
83 field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
84 if (field1.getOmitNorms()) {
85 field2.setOmitNorms(true);
87 field2.setIndexOptions(field1.getIndexOptions());
95 public void testNRTManager() throws Exception {
97 final long t0 = System.currentTimeMillis();
99 final LineFileDocs docs = new LineFileDocs(random);
100 final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
101 final MockDirectoryWrapper _dir = newFSDirectory(tempDir);
102 _dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves
103 Directory dir = _dir;
104 final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).setOpenMode(IndexWriterConfig.OpenMode.CREATE);
106 if (LuceneTestCase.TEST_NIGHTLY) {
107 // newIWConfig makes smallish max seg size, which
108 // results in tons and tons of segments for this test
110 MergePolicy mp = conf.getMergePolicy();
111 if (mp instanceof TieredMergePolicy) {
112 ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
113 } else if (mp instanceof LogByteSizeMergePolicy) {
114 ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
115 } else if (mp instanceof LogMergePolicy) {
116 ((LogMergePolicy) mp).setMaxMergeDocs(100000);
120 conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
122 public void warm(IndexReader reader) throws IOException {
124 System.out.println("TEST: now warm merged reader=" + reader);
126 final int maxDoc = reader.maxDoc();
128 final int inc = Math.max(1, maxDoc/50);
129 for(int docID=0;docID<maxDoc;docID += inc) {
130 if (!reader.isDeleted(docID)) {
131 final Document doc = reader.document(docID);
132 sum += doc.getFields().size();
136 IndexSearcher searcher = newSearcher(reader);
137 sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
141 System.out.println("TEST: warm visited " + sum + " fields");
146 if (random.nextBoolean()) {
148 System.out.println("TEST: wrap NRTCachingDir");
151 NRTCachingDirectory nrtDir = new NRTCachingDirectory(dir, 5.0, 60.0);
152 conf.setMergeScheduler(nrtDir.getMergeScheduler());
156 final IndexWriter writer = new IndexWriter(dir, conf);
159 writer.setInfoStream(System.out);
161 _TestUtil.reduceOpenFiles(writer);
162 //System.out.println("TEST: conf=" + writer.getConfig());
164 final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory("NRT search threads"));
166 final double minReopenSec = 0.01 + 0.05 * random.nextDouble();
167 final double maxReopenSec = minReopenSec * (1.0 + 10 * random.nextDouble());
170 System.out.println("TEST: make NRTManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
173 final NRTManager nrt = new NRTManager(writer, es);
174 final NRTManagerReopenThread nrtThread = new NRTManagerReopenThread(nrt, maxReopenSec, minReopenSec);
175 nrtThread.setName("NRT Reopen Thread");
176 nrtThread.setPriority(Math.min(Thread.currentThread().getPriority()+2, Thread.MAX_PRIORITY));
177 nrtThread.setDaemon(true);
180 final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 1, 3);
181 final int NUM_SEARCH_THREADS = _TestUtil.nextInt(random, 1, 3);
182 //final int NUM_INDEX_THREADS = 1;
183 //final int NUM_SEARCH_THREADS = 1;
185 System.out.println("TEST: " + NUM_INDEX_THREADS + " index threads; " + NUM_SEARCH_THREADS + " search threads");
188 final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
190 final AtomicBoolean failed = new AtomicBoolean();
191 final AtomicInteger addCount = new AtomicInteger();
192 final AtomicInteger delCount = new AtomicInteger();
193 final AtomicInteger packCount = new AtomicInteger();
194 final List<Long> lastGens = new ArrayList<Long>();
196 final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
197 final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
199 final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
200 Thread[] threads = new Thread[NUM_INDEX_THREADS];
201 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
202 threads[thread] = new Thread() {
205 // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
206 final List<String> toDeleteIDs = new ArrayList<String>();
207 final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
210 while(System.currentTimeMillis() < stopTime && !failed.get()) {
212 //System.out.println(Thread.currentThread().getName() + ": cycle");
214 // Occassional longish pause if running
216 if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
218 System.out.println(Thread.currentThread().getName() + ": now long sleep");
220 Thread.sleep(_TestUtil.nextInt(random, 50, 500));
223 // Rate limit ingest rate:
224 Thread.sleep(_TestUtil.nextInt(random, 1, 10));
226 System.out.println(Thread.currentThread() + ": done sleep");
229 Document doc = docs.nextDoc();
233 final String addedField;
234 if (random.nextBoolean()) {
235 addedField = "extra" + random.nextInt(10);
236 doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
240 if (random.nextBoolean()) {
242 if (random.nextBoolean()) {
243 // Add a pack of adjacent sub-docs
245 final SubDocs delSubDocs;
246 if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
247 delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
248 assert !delSubDocs.deleted;
249 toDeleteSubDocs.remove(delSubDocs);
250 // reuse prior packID
251 packID = delSubDocs.packID;
255 packID = packCount.getAndIncrement() + "";
258 final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
259 final List<String> docIDs = new ArrayList<String>();
260 final SubDocs subDocs = new SubDocs(packID, docIDs);
261 final List<Document> docsList = new ArrayList<Document>();
263 allSubDocs.add(subDocs);
264 doc.add(packIDField);
265 docsList.add(cloneDoc(doc));
266 docIDs.add(doc.get("docid"));
268 final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
269 while(docsList.size() < maxDocCount) {
270 doc = docs.nextDoc();
274 docsList.add(cloneDoc(doc));
275 docIDs.add(doc.get("docid"));
277 addCount.addAndGet(docsList.size());
279 if (delSubDocs != null) {
280 delSubDocs.deleted = true;
281 delIDs.addAll(delSubDocs.subIDs);
282 delCount.addAndGet(delSubDocs.subIDs.size());
284 System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
286 gen = nrt.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
289 nrt.deleteDocuments(new Term("packID", delSubDocs.packID));
290 for(Document subDoc : docsList) {
291 nrt.addDocument(subDoc);
296 System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
298 gen = nrt.addDocuments(docsList);
302 for(Document subDoc : docsList) {
303 nrt.addDocument(subDoc);
307 doc.removeField("packID");
309 if (random.nextInt(5) == 2) {
311 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
313 toDeleteSubDocs.add(subDocs);
316 // randomly verify the add/update "took":
317 if (random.nextInt(20) == 2) {
318 final boolean applyDeletes = delSubDocs != null;
319 final IndexSearcher s = nrt.get(gen, applyDeletes);
321 assertEquals(docsList.size(), s.search(new TermQuery(new Term("packID", packID)), 10).totalHits);
329 System.out.println(Thread.currentThread().getName() + ": add doc docid:" + doc.get("docid"));
332 gen = nrt.addDocument(doc);
333 addCount.getAndIncrement();
335 // randomly verify the add "took":
336 if (random.nextInt(20) == 2) {
337 //System.out.println(Thread.currentThread().getName() + ": verify");
338 final IndexSearcher s = nrt.get(gen, false);
339 //System.out.println(Thread.currentThread().getName() + ": got s=" + s);
341 assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
345 //System.out.println(Thread.currentThread().getName() + ": done verify");
348 if (random.nextInt(5) == 3) {
350 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
352 toDeleteIDs.add(doc.get("docid"));
356 // we use update but it never replaces a
359 System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
361 gen = nrt.updateDocument(new Term("docid", doc.get("docid")), doc);
362 addCount.getAndIncrement();
364 // randomly verify the add "took":
365 if (random.nextInt(20) == 2) {
366 final IndexSearcher s = nrt.get(gen, true);
368 assertEquals(1, s.search(new TermQuery(new Term("docid", doc.get("docid"))), 10).totalHits);
374 if (random.nextInt(5) == 3) {
376 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
378 toDeleteIDs.add(doc.get("docid"));
382 if (random.nextInt(30) == 17) {
384 System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
386 for(String id : toDeleteIDs) {
388 System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
390 gen = nrt.deleteDocuments(new Term("docid", id));
392 // randomly verify the delete "took":
393 if (random.nextInt(20) == 7) {
394 final IndexSearcher s = nrt.get(gen, true);
396 assertEquals(0, s.search(new TermQuery(new Term("docid", id)), 10).totalHits);
403 final int count = delCount.addAndGet(toDeleteIDs.size());
405 System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
407 delIDs.addAll(toDeleteIDs);
410 for(SubDocs subDocs : toDeleteSubDocs) {
411 assertTrue(!subDocs.deleted);
412 gen = nrt.deleteDocuments(new Term("packID", subDocs.packID));
413 subDocs.deleted = true;
415 System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
417 delIDs.addAll(subDocs.subIDs);
418 delCount.addAndGet(subDocs.subIDs.size());
420 // randomly verify the delete "took":
421 if (random.nextInt(20) == 7) {
422 final IndexSearcher s = nrt.get(gen, true);
424 assertEquals(0, s.search(new TermQuery(new Term("packID", subDocs.packID)), 1).totalHits);
430 toDeleteSubDocs.clear();
432 if (addedField != null) {
433 doc.removeField(addedField);
435 } catch (Throwable t) {
436 System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
439 throw new RuntimeException(t);
445 System.out.println(Thread.currentThread().getName() + ": indexing done");
449 threads[thread].setDaemon(true);
450 threads[thread].start();
454 System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
457 // let index build up a bit
460 // silly starting guess:
461 final AtomicInteger totTermCount = new AtomicInteger(100);
463 // run search threads
464 final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
465 final AtomicInteger totHits = new AtomicInteger();
468 System.out.println("TEST: start search threads");
471 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
472 searchThreads[thread] = new Thread() {
475 while(System.currentTimeMillis() < stopTime && !failed.get()) {
476 final IndexSearcher s = nrt.get(random.nextBoolean());
479 smokeTestSearcher(s);
480 if (s.getIndexReader().numDocs() > 0) {
482 TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
483 int seenTermCount = 0;
486 if (totTermCount.get() < 10) {
490 trigger = totTermCount.get()/10;
491 shift = random.nextInt(trigger);
494 while(System.currentTimeMillis() < stopTime) {
495 Term term = termEnum.term();
497 if (seenTermCount == 0) {
500 totTermCount.set(seenTermCount);
502 if (totTermCount.get() < 10) {
506 trigger = totTermCount.get()/10;
507 //System.out.println("trigger " + trigger);
508 shift = random.nextInt(trigger);
510 termEnum = s.getIndexReader().terms(new Term("body", ""));
518 if ((seenTermCount + shift) % trigger == 0) {
520 //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
522 totHits.addAndGet(runQuery(s, new TermQuery(term)));
527 System.out.println(Thread.currentThread().getName() + ": search done");
533 } catch (Throwable t) {
534 System.out.println(Thread.currentThread().getName() + ": FAILED: hit exc");
536 t.printStackTrace(System.out);
537 throw new RuntimeException(t);
542 searchThreads[thread].setDaemon(true);
543 searchThreads[thread].start();
547 System.out.println("TEST: now join");
549 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
550 threads[thread].join();
552 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
553 searchThreads[thread].join();
557 System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
558 System.out.println("TEST: search totHits=" + totHits);
562 for(long gen : lastGens) {
563 maxGen = Math.max(maxGen, gen);
566 final IndexSearcher s = nrt.get(maxGen, true);
568 boolean doFail = false;
569 for(String id : delIDs) {
570 final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
571 if (hits.totalHits != 0) {
572 System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
577 // Make sure each group of sub-docs are still in docID order:
578 for(SubDocs subDocs : allSubDocs) {
579 if (!subDocs.deleted) {
580 // We sort by relevance but the scores should be identical so sort falls back to by docID:
581 TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
582 assertEquals(subDocs.subIDs.size(), hits.totalHits);
585 for(ScoreDoc scoreDoc : hits.scoreDocs) {
586 final int docID = scoreDoc.doc;
587 if (lastDocID != -1) {
588 assertEquals(1+lastDocID, docID);
593 final Document doc = s.doc(docID);
594 assertEquals(subDocs.packID, doc.get("packID"));
597 lastDocID = startDocID - 1;
598 for(String subID : subDocs.subIDs) {
599 hits = s.search(new TermQuery(new Term("docid", subID)), 1);
600 assertEquals(1, hits.totalHits);
601 final int docID = hits.scoreDocs[0].doc;
602 if (lastDocID != -1) {
603 assertEquals(1+lastDocID, docID);
608 for(String subID : subDocs.subIDs) {
609 assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
614 final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
615 for(int id=0;id<endID;id++) {
616 String stringID = ""+id;
617 if (!delIDs.contains(stringID)) {
618 final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
619 if (hits.totalHits != 1) {
620 System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
627 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
632 es.awaitTermination(1, TimeUnit.SECONDS);
636 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
639 System.out.println("TEST: now close NRTManager");
643 assertFalse(writer.anyNonBulkMerges);
645 _TestUtil.checkIndex(dir);
647 _TestUtil.rmDir(tempDir);
651 System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
655 private int runQuery(IndexSearcher s, Query q) throws Exception {
657 return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
660 private void smokeTestSearcher(IndexSearcher s) throws Exception {
661 runQuery(s, new TermQuery(new Term("body", "united")));
662 runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
663 PhraseQuery pq = new PhraseQuery();
664 pq.add(new Term("body", "united"));
665 pq.add(new Term("body", "states"));