1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
33 import org.apache.lucene.analysis.MockAnalyzer;
34 import org.apache.lucene.document.Document;
35 import org.apache.lucene.document.Field;
36 import org.apache.lucene.search.IndexSearcher;
37 import org.apache.lucene.search.PhraseQuery;
38 import org.apache.lucene.search.Query;
39 import org.apache.lucene.search.ScoreDoc;
40 import org.apache.lucene.search.Sort;
41 import org.apache.lucene.search.SortField;
42 import org.apache.lucene.search.TermQuery;
43 import org.apache.lucene.search.TopDocs;
44 import org.apache.lucene.store.MockDirectoryWrapper;
45 import org.apache.lucene.util.LineFileDocs;
46 import org.apache.lucene.util.LuceneTestCase;
47 import org.apache.lucene.util._TestUtil;
48 import org.junit.Test;
51 // - mix in optimize, addIndexes
52 // - randomoly mix in non-congruent docs
54 public class TestNRTThreads extends LuceneTestCase {
56 private static class SubDocs {
57 public final String packID;
58 public final List<String> subIDs;
59 public boolean deleted;
61 public SubDocs(String packID, List<String> subIDs) {
68 public void testNRTThreads() throws Exception {
70 final long t0 = System.currentTimeMillis();
72 final LineFileDocs docs = new LineFileDocs(random);
73 final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
74 final MockDirectoryWrapper dir = newFSDirectory(tempDir);
75 dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
76 final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
77 conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
79 public void warm(IndexReader reader) throws IOException {
81 System.out.println("TEST: now warm merged reader=" + reader);
83 final int maxDoc = reader.maxDoc();
85 final int inc = Math.max(1, maxDoc/50);
86 for(int docID=0;docID<maxDoc;docID += inc) {
87 if (reader.isDeleted(docID)) {
88 final Document doc = reader.document(docID);
89 sum += doc.getFields().size();
93 IndexSearcher searcher = newSearcher(reader);
94 sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
98 System.out.println("TEST: warm visited " + sum + " fields");
103 final IndexWriter writer = new IndexWriter(dir, conf);
105 writer.setInfoStream(System.out);
107 _TestUtil.reduceOpenFiles(writer);
109 final int NUM_INDEX_THREADS = 2;
110 final int NUM_SEARCH_THREADS = 3;
112 final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
114 final AtomicBoolean failed = new AtomicBoolean();
115 final AtomicInteger addCount = new AtomicInteger();
116 final AtomicInteger delCount = new AtomicInteger();
117 final AtomicInteger packCount = new AtomicInteger();
119 final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
120 final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
122 final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
123 Thread[] threads = new Thread[NUM_INDEX_THREADS];
124 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
125 threads[thread] = new Thread() {
128 // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
129 final List<String> toDeleteIDs = new ArrayList<String>();
130 final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
131 while(System.currentTimeMillis() < stopTime && !failed.get()) {
133 Document doc = docs.nextDoc();
137 final String addedField;
138 if (random.nextBoolean()) {
139 addedField = "extra" + random.nextInt(10);
140 doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
144 if (random.nextBoolean()) {
146 System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
149 if (random.nextBoolean()) {
150 // Add a pack of adjacent sub-docs
152 final SubDocs delSubDocs;
153 if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
154 delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
155 assert !delSubDocs.deleted;
156 toDeleteSubDocs.remove(delSubDocs);
157 // reuse prior packID
158 packID = delSubDocs.packID;
162 packID = packCount.getAndIncrement() + "";
165 final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
166 final List<String> docIDs = new ArrayList<String>();
167 final SubDocs subDocs = new SubDocs(packID, docIDs);
168 final List<Document> docsList = new ArrayList<Document>();
170 allSubDocs.add(subDocs);
171 doc.add(packIDField);
172 docsList.add(_TestUtil.cloneDocument(doc));
173 docIDs.add(doc.get("docid"));
175 final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
176 while(docsList.size() < maxDocCount) {
177 doc = docs.nextDoc();
181 docsList.add(_TestUtil.cloneDocument(doc));
182 docIDs.add(doc.get("docid"));
184 addCount.addAndGet(docsList.size());
186 if (delSubDocs != null) {
187 delSubDocs.deleted = true;
188 delIDs.addAll(delSubDocs.subIDs);
189 delCount.addAndGet(delSubDocs.subIDs.size());
191 System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
193 writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
196 writer.deleteDocuments(new Term("packID", delSubDocs.packID));
197 for(Document subDoc : docsList) {
198 writer.addDocument(subDoc);
203 System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
205 writer.addDocuments(docsList);
209 for(Document subDoc : docsList) {
210 writer.addDocument(subDoc);
214 doc.removeField("packID");
216 if (random.nextInt(5) == 2) {
218 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
220 toDeleteSubDocs.add(subDocs);
224 writer.addDocument(doc);
225 addCount.getAndIncrement();
227 if (random.nextInt(5) == 3) {
229 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
231 toDeleteIDs.add(doc.get("docid"));
235 // we use update but it never replaces a
238 System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
240 writer.updateDocument(new Term("docid", doc.get("docid")), doc);
241 addCount.getAndIncrement();
243 if (random.nextInt(5) == 3) {
245 //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
247 toDeleteIDs.add(doc.get("docid"));
251 if (random.nextInt(30) == 17) {
253 System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
255 for(String id : toDeleteIDs) {
257 System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
259 writer.deleteDocuments(new Term("docid", id));
261 final int count = delCount.addAndGet(toDeleteIDs.size());
263 System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
265 delIDs.addAll(toDeleteIDs);
268 for(SubDocs subDocs : toDeleteSubDocs) {
269 assert !subDocs.deleted;
270 writer.deleteDocuments(new Term("packID", subDocs.packID));
271 subDocs.deleted = true;
273 System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
275 delIDs.addAll(subDocs.subIDs);
276 delCount.addAndGet(subDocs.subIDs.size());
278 toDeleteSubDocs.clear();
280 if (addedField != null) {
281 doc.removeField(addedField);
283 } catch (Throwable t) {
284 System.out.println(Thread.currentThread().getName() + ": hit exc");
287 throw new RuntimeException(t);
291 System.out.println(Thread.currentThread().getName() + ": indexing done");
295 threads[thread].setDaemon(true);
296 threads[thread].start();
300 System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
303 // let index build up a bit
306 IndexReader r = IndexReader.open(writer, true);
309 // silly starting guess:
310 final AtomicInteger totTermCount = new AtomicInteger(100);
312 final ExecutorService es = Executors.newCachedThreadPool();
314 while(System.currentTimeMillis() < stopTime && !failed.get()) {
315 if (random.nextBoolean()) {
317 System.out.println("TEST: now reopen r=" + r);
319 final IndexReader r2 = r.reopen();
326 System.out.println("TEST: now close reader=" + r);
330 final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
331 if (openDeletedFiles.size() > 0) {
332 System.out.println("OBD files: " + openDeletedFiles);
334 any |= openDeletedFiles.size() > 0;
335 //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
337 System.out.println("TEST: now open");
339 r = IndexReader.open(writer, true);
342 System.out.println("TEST: got new reader=" + r);
344 //System.out.println("numDocs=" + r.numDocs() + "
345 //openDelFileCount=" + dir.openDeleteFileCount());
349 if (r.numDocs() > 0) {
351 final IndexSearcher s = new IndexSearcher(r, es);
353 // run search threads
354 final long searchStopTime = System.currentTimeMillis() + 500;
355 final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
356 final AtomicInteger totHits = new AtomicInteger();
357 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
358 searchThreads[thread] = new Thread() {
362 TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
363 int seenTermCount = 0;
366 if (totTermCount.get() < 10) {
370 trigger = totTermCount.get()/10;
371 shift = random.nextInt(trigger);
373 while(System.currentTimeMillis() < searchStopTime) {
374 Term term = termEnum.term();
376 if (seenTermCount < 10) {
379 totTermCount.set(seenTermCount);
381 trigger = totTermCount.get()/10;
382 //System.out.println("trigger " + trigger);
383 shift = random.nextInt(trigger);
384 termEnum = s.getIndexReader().terms(new Term("body", ""));
392 if ((seenTermCount + shift) % trigger == 0) {
394 //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
396 totHits.addAndGet(runQuery(s, new TermQuery(term)));
401 System.out.println(Thread.currentThread().getName() + ": search done");
403 } catch (Throwable t) {
404 System.out.println(Thread.currentThread().getName() + ": hit exc");
406 t.printStackTrace(System.out);
407 throw new RuntimeException(t);
411 searchThreads[thread].setDaemon(true);
412 searchThreads[thread].start();
415 for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
416 searchThreads[thread].join();
420 System.out.println("TEST: DONE search: totHits=" + totHits);
428 es.awaitTermination(1, TimeUnit.SECONDS);
431 System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
434 //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
436 final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
437 if (openDeletedFiles.size() > 0) {
438 System.out.println("OBD files: " + openDeletedFiles);
440 any |= openDeletedFiles.size() > 0;
442 assertFalse("saw non-zero open-but-deleted count", any);
444 System.out.println("TEST: now join");
446 for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
447 threads[thread].join();
450 System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
453 final IndexReader r2 = writer.getReader();
454 final IndexSearcher s = newSearcher(r2);
455 boolean doFail = false;
456 for(String id : delIDs) {
457 final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
458 if (hits.totalHits != 0) {
459 System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
464 // Make sure each group of sub-docs are still in docID order:
465 for(SubDocs subDocs : allSubDocs) {
466 if (!subDocs.deleted) {
467 // We sort by relevance but the scores should be identical so sort falls back to by docID:
468 TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
469 assertEquals(subDocs.subIDs.size(), hits.totalHits);
472 for(ScoreDoc scoreDoc : hits.scoreDocs) {
473 final int docID = scoreDoc.doc;
474 if (lastDocID != -1) {
475 assertEquals(1+lastDocID, docID);
480 final Document doc = s.doc(docID);
481 assertEquals(subDocs.packID, doc.get("packID"));
484 lastDocID = startDocID - 1;
485 for(String subID : subDocs.subIDs) {
486 hits = s.search(new TermQuery(new Term("docid", subID)), 1);
487 assertEquals(1, hits.totalHits);
488 final int docID = hits.scoreDocs[0].doc;
489 if (lastDocID != -1) {
490 assertEquals(1+lastDocID, docID);
495 for(String subID : subDocs.subIDs) {
496 assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
501 final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
502 for(int id=0;id<endID;id++) {
503 String stringID = ""+id;
504 if (!delIDs.contains(stringID)) {
505 final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
506 if (hits.totalHits != 1) {
507 System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
514 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
518 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
520 assertFalse(writer.anyNonBulkMerges);
522 _TestUtil.checkIndex(dir);
525 _TestUtil.rmDir(tempDir);
528 System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
532 private int runQuery(IndexSearcher s, Query q) throws Exception {
534 return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
537 private void smokeTestReader(IndexReader r) throws Exception {
538 IndexSearcher s = newSearcher(r);
539 runQuery(s, new TermQuery(new Term("body", "united")));
540 runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
541 PhraseQuery pq = new PhraseQuery();
542 pq.add(new Term("body", "united"));
543 pq.add(new Term("body", "states"));