1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
33 import org.apache.lucene.analysis.MockAnalyzer;
34 import org.apache.lucene.document.Document;
35 import org.apache.lucene.document.Field;
36 import org.apache.lucene.document.Fieldable;
37 import org.apache.lucene.search.IndexSearcher;
38 import org.apache.lucene.search.PhraseQuery;
39 import org.apache.lucene.search.Query;
40 import org.apache.lucene.search.ScoreDoc;
41 import org.apache.lucene.search.Sort;
42 import org.apache.lucene.search.SortField;
43 import org.apache.lucene.search.TermQuery;
44 import org.apache.lucene.search.TopDocs;
45 import org.apache.lucene.store.MockDirectoryWrapper;
46 import org.apache.lucene.util.LineFileDocs;
47 import org.apache.lucene.util.LuceneTestCase;
48 import org.apache.lucene.util._TestUtil;
49 import org.junit.Test;
52 // - mix in optimize, addIndexes
53 // - randomoly mix in non-congruent docs
55 public class TestNRTThreads extends LuceneTestCase {
57 private static class SubDocs {
58 public final String packID;
59 public final List<String> subIDs;
60 public boolean deleted;
62 public SubDocs(String packID, List<String> subIDs) {
68 // TODO: is there a pre-existing way to do this!!!
69 private Document cloneDoc(Document doc1) {
70 final Document doc2 = new Document();
71 for(Fieldable f : doc1.getFields()) {
72 Field field1 = (Field) f;
74 Field field2 = new Field(field1.name(),
76 field1.isStored() ? Field.Store.YES : Field.Store.NO,
77 field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
78 if (field1.getOmitNorms()) {
79 field2.setOmitNorms(true);
81 if (field1.getOmitTermFreqAndPositions()) {
82 field2.setOmitTermFreqAndPositions(true);
91 public void testNRTThreads() throws Exception {
93 final long t0 = System.currentTimeMillis();
95 final LineFileDocs docs = new LineFileDocs(random);
96 final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
97 final MockDirectoryWrapper dir = newFSDirectory(tempDir);
98 dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
99 final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
100 conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
102 public void warm(IndexReader reader) throws IOException {
104 System.out.println("TEST: now warm merged reader=" + reader);
106 final int maxDoc = reader.maxDoc();
108 final int inc = Math.max(1, maxDoc/50);
109 for(int docID=0;docID<maxDoc;docID += inc) {
110 if (reader.isDeleted(docID)) {
111 final Document doc = reader.document(docID);
112 sum += doc.getFields().size();
116 IndexSearcher searcher = newSearcher(reader);
117 sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
121 System.out.println("TEST: warm visited " + sum + " fields");
126 final IndexWriter writer = new IndexWriter(dir, conf);
128 writer.setInfoStream(System.out);
130 _TestUtil.reduceOpenFiles(writer);
132 final int NUM_INDEX_THREADS = 2;
133 final int NUM_SEARCH_THREADS = 3;
135 final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
137 final AtomicBoolean failed = new AtomicBoolean();
138 final AtomicInteger addCount = new AtomicInteger();
139 final AtomicInteger delCount = new AtomicInteger();
140 final AtomicInteger packCount = new AtomicInteger();
142 final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
143 final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
145 final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
146 Thread[] threads = new Thread[NUM_INDEX_THREADS];
147 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
148 threads[thread] = new Thread() {
151 // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
152 final List<String> toDeleteIDs = new ArrayList<String>();
153 final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
154 while(System.currentTimeMillis() < stopTime && !failed.get()) {
156 Document doc = docs.nextDoc();
160 final String addedField;
161 if (random.nextBoolean()) {
162 addedField = "extra" + random.nextInt(10);
163 doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
167 if (random.nextBoolean()) {
169 System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
172 if (random.nextBoolean()) {
173 // Add a pack of adjacent sub-docs
175 final SubDocs delSubDocs;
176 if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
177 delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
178 assert !delSubDocs.deleted;
179 toDeleteSubDocs.remove(delSubDocs);
180 // reuse prior packID
181 packID = delSubDocs.packID;
185 packID = packCount.getAndIncrement() + "";
188 final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
189 final List<String> docIDs = new ArrayList<String>();
190 final SubDocs subDocs = new SubDocs(packID, docIDs);
191 final List<Document> docsList = new ArrayList<Document>();
193 allSubDocs.add(subDocs);
194 doc.add(packIDField);
195 docsList.add(cloneDoc(doc));
196 docIDs.add(doc.get("docid"));
198 final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
199 while(docsList.size() < maxDocCount) {
200 doc = docs.nextDoc();
204 docsList.add(cloneDoc(doc));
205 docIDs.add(doc.get("docid"));
207 addCount.addAndGet(docsList.size());
209 if (delSubDocs != null) {
210 delSubDocs.deleted = true;
211 delIDs.addAll(delSubDocs.subIDs);
212 delCount.addAndGet(delSubDocs.subIDs.size());
214 System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
216 writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
219 writer.deleteDocuments(new Term("packID", delSubDocs.packID));
220 for(Document subDoc : docsList) {
221 writer.addDocument(subDoc);
226 System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
228 writer.addDocuments(docsList);
232 for(Document subDoc : docsList) {
233 writer.addDocument(subDoc);
237 doc.removeField("packID");
239 if (random.nextInt(5) == 2) {
241 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
243 toDeleteSubDocs.add(subDocs);
247 writer.addDocument(doc);
248 addCount.getAndIncrement();
250 if (random.nextInt(5) == 3) {
252 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
254 toDeleteIDs.add(doc.get("docid"));
258 // we use update but it never replaces a
261 System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
263 writer.updateDocument(new Term("docid", doc.get("docid")), doc);
264 addCount.getAndIncrement();
266 if (random.nextInt(5) == 3) {
268 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
270 toDeleteIDs.add(doc.get("docid"));
274 if (random.nextInt(30) == 17) {
276 System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
278 for(String id : toDeleteIDs) {
280 System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
282 writer.deleteDocuments(new Term("docid", id));
284 final int count = delCount.addAndGet(toDeleteIDs.size());
286 System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
288 delIDs.addAll(toDeleteIDs);
291 for(SubDocs subDocs : toDeleteSubDocs) {
292 assert !subDocs.deleted;
293 writer.deleteDocuments(new Term("packID", subDocs.packID));
294 subDocs.deleted = true;
296 System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
298 delIDs.addAll(subDocs.subIDs);
299 delCount.addAndGet(subDocs.subIDs.size());
301 toDeleteSubDocs.clear();
303 if (addedField != null) {
304 doc.removeField(addedField);
306 } catch (Throwable t) {
307 System.out.println(Thread.currentThread().getName() + ": hit exc");
310 throw new RuntimeException(t);
314 System.out.println(Thread.currentThread().getName() + ": indexing done");
318 threads[thread].setDaemon(true);
319 threads[thread].start();
323 System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
326 // let index build up a bit
329 IndexReader r = IndexReader.open(writer, true);
332 // silly starting guess:
333 final AtomicInteger totTermCount = new AtomicInteger(100);
335 final ExecutorService es = Executors.newCachedThreadPool();
337 while(System.currentTimeMillis() < stopTime && !failed.get()) {
338 if (random.nextBoolean()) {
340 System.out.println("TEST: now reopen r=" + r);
342 final IndexReader r2 = r.reopen();
349 System.out.println("TEST: now close reader=" + r);
353 final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
354 if (openDeletedFiles.size() > 0) {
355 System.out.println("OBD files: " + openDeletedFiles);
357 any |= openDeletedFiles.size() > 0;
358 //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
360 System.out.println("TEST: now open");
362 r = IndexReader.open(writer, true);
365 System.out.println("TEST: got new reader=" + r);
367 //System.out.println("numDocs=" + r.numDocs() + "
368 //openDelFileCount=" + dir.openDeleteFileCount());
372 if (r.numDocs() > 0) {
374 final IndexSearcher s = new IndexSearcher(r, es);
376 // run search threads
377 final long searchStopTime = System.currentTimeMillis() + 500;
378 final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
379 final AtomicInteger totHits = new AtomicInteger();
380 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
381 searchThreads[thread] = new Thread() {
385 TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
386 int seenTermCount = 0;
389 if (totTermCount.get() < 10) {
393 trigger = totTermCount.get()/10;
394 shift = random.nextInt(trigger);
396 while(System.currentTimeMillis() < searchStopTime) {
397 Term term = termEnum.term();
399 if (seenTermCount < 10) {
402 totTermCount.set(seenTermCount);
404 trigger = totTermCount.get()/10;
405 //System.out.println("trigger " + trigger);
406 shift = random.nextInt(trigger);
407 termEnum = s.getIndexReader().terms(new Term("body", ""));
415 if ((seenTermCount + shift) % trigger == 0) {
417 //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
419 totHits.addAndGet(runQuery(s, new TermQuery(term)));
424 System.out.println(Thread.currentThread().getName() + ": search done");
426 } catch (Throwable t) {
427 System.out.println(Thread.currentThread().getName() + ": hit exc");
429 t.printStackTrace(System.out);
430 throw new RuntimeException(t);
434 searchThreads[thread].setDaemon(true);
435 searchThreads[thread].start();
438 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
439 searchThreads[thread].join();
443 System.out.println("TEST: DONE search: totHits=" + totHits);
451 es.awaitTermination(1, TimeUnit.SECONDS);
454 System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
457 //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
459 final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
460 if (openDeletedFiles.size() > 0) {
461 System.out.println("OBD files: " + openDeletedFiles);
463 any |= openDeletedFiles.size() > 0;
465 assertFalse("saw non-zero open-but-deleted count", any);
467 System.out.println("TEST: now join");
469 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
470 threads[thread].join();
473 System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
476 final IndexReader r2 = writer.getReader();
477 final IndexSearcher s = newSearcher(r2);
478 boolean doFail = false;
479 for(String id : delIDs) {
480 final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
481 if (hits.totalHits != 0) {
482 System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
487 // Make sure each group of sub-docs are still in docID order:
488 for(SubDocs subDocs : allSubDocs) {
489 if (!subDocs.deleted) {
490 // We sort by relevance but the scores should be identical so sort falls back to by docID:
491 TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
492 assertEquals(subDocs.subIDs.size(), hits.totalHits);
495 for(ScoreDoc scoreDoc : hits.scoreDocs) {
496 final int docID = scoreDoc.doc;
497 if (lastDocID != -1) {
498 assertEquals(1+lastDocID, docID);
503 final Document doc = s.doc(docID);
504 assertEquals(subDocs.packID, doc.get("packID"));
507 lastDocID = startDocID - 1;
508 for(String subID : subDocs.subIDs) {
509 hits = s.search(new TermQuery(new Term("docid", subID)), 1);
510 assertEquals(1, hits.totalHits);
511 final int docID = hits.scoreDocs[0].doc;
512 if (lastDocID != -1) {
513 assertEquals(1+lastDocID, docID);
518 for(String subID : subDocs.subIDs) {
519 assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
524 final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
525 for(int id=0;id<endID;id++) {
526 String stringID = ""+id;
527 if (!delIDs.contains(stringID)) {
528 final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
529 if (hits.totalHits != 1) {
530 System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
537 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
541 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
543 assertFalse(writer.anyNonBulkMerges);
545 _TestUtil.checkIndex(dir);
548 _TestUtil.rmDir(tempDir);
551 System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
555 private int runQuery(IndexSearcher s, Query q) throws Exception {
557 return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
560 private void smokeTestReader(IndexReader r) throws Exception {
561 IndexSearcher s = newSearcher(r);
562 runQuery(s, new TermQuery(new Term("body", "united")));
563 runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
564 PhraseQuery pq = new PhraseQuery();
565 pq.add(new Term("body", "united"));
566 pq.add(new Term("body", "states"));