1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.apache.lucene.analysis.MockAnalyzer;
35 import org.apache.lucene.document.Document;
36 import org.apache.lucene.document.Field;
37 import org.apache.lucene.search.IndexSearcher;
38 import org.apache.lucene.search.PhraseQuery;
39 import org.apache.lucene.search.Query;
40 import org.apache.lucene.search.ScoreDoc;
41 import org.apache.lucene.search.Sort;
42 import org.apache.lucene.search.SortField;
43 import org.apache.lucene.search.TermQuery;
44 import org.apache.lucene.search.TopDocs;
45 import org.apache.lucene.store.Directory;
46 import org.apache.lucene.store.MockDirectoryWrapper;
47 import org.apache.lucene.util.LineFileDocs;
48 import org.apache.lucene.util.LuceneTestCase;
49 import org.apache.lucene.util.NamedThreadFactory;
50 import org.apache.lucene.util._TestUtil;
53 // - mix in forceMerge, addIndexes
54 // - randomly mix in non-congruent docs
56 /** Utility class that spawns multiple indexing and
57 * searching threads. */
58 public abstract class ThreadedIndexingAndSearchingTestCase extends LuceneTestCase {
60 protected final AtomicBoolean failed = new AtomicBoolean();
61 protected final AtomicInteger addCount = new AtomicInteger();
62 protected final AtomicInteger delCount = new AtomicInteger();
63 protected final AtomicInteger packCount = new AtomicInteger();
65 protected Directory dir;
66 protected IndexWriter writer;
68 private static class SubDocs {
69 public final String packID;
70 public final List<String> subIDs;
71 public boolean deleted;
73 public SubDocs(String packID, List<String> subIDs) {
80 protected abstract IndexSearcher getCurrentSearcher() throws Exception;
82 protected abstract IndexSearcher getFinalSearcher() throws Exception;
84 protected void releaseSearcher(IndexSearcher s) throws Exception {
87 // Called once to run searching
88 protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception;
90 protected Directory getDirectory(Directory in) {
94 protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
95 writer.updateDocuments(id, docs);
98 protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
99 writer.addDocuments(docs);
102 protected void addDocument(Term id, Document doc) throws Exception {
103 writer.addDocument(doc);
106 protected void updateDocument(Term term, Document doc) throws Exception {
107 writer.updateDocument(term, doc);
110 protected void deleteDocuments(Term term) throws Exception {
111 writer.deleteDocuments(term);
114 protected void doAfterIndexingThreadDone() {
117 private Thread[] launchIndexingThreads(final LineFileDocs docs,
120 final Set<String> delIDs,
121 final Set<String> delPackIDs,
122 final List<SubDocs> allSubDocs)
124 final Thread[] threads = new Thread[numThreads];
125 for(int thread=0;thread<numThreads;thread++) {
126 threads[thread] = new Thread() {
129 // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
130 final List<String> toDeleteIDs = new ArrayList<String>();
131 final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
132 while(System.currentTimeMillis() < stopTime && !failed.get()) {
135 // Occasional longish pause if running
137 if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
139 System.out.println(Thread.currentThread().getName() + ": now long sleep");
141 Thread.sleep(_TestUtil.nextInt(random, 50, 500));
144 // Rate limit ingest rate:
145 if (random.nextInt(7) == 5) {
146 Thread.sleep(_TestUtil.nextInt(random, 1, 10));
148 System.out.println(Thread.currentThread().getName() + ": done sleep");
152 Document doc = docs.nextDoc();
157 // Maybe add randomly named field
158 final String addedField;
159 if (random.nextBoolean()) {
160 addedField = "extra" + random.nextInt(40);
161 doc.add(newField(addedField, "a random field", Field.Store.YES, Field.Index.ANALYZED));
166 if (random.nextBoolean()) {
168 if (random.nextBoolean()) {
169 // Add/update doc block:
171 final SubDocs delSubDocs;
172 if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
173 delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
174 assert !delSubDocs.deleted;
175 toDeleteSubDocs.remove(delSubDocs);
176 // Update doc block, replacing prior packID
177 packID = delSubDocs.packID;
180 // Add doc block, using new packID
181 packID = packCount.getAndIncrement() + "";
184 final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
185 final List<String> docIDs = new ArrayList<String>();
186 final SubDocs subDocs = new SubDocs(packID, docIDs);
187 final List<Document> docsList = new ArrayList<Document>();
189 allSubDocs.add(subDocs);
190 doc.add(packIDField);
191 docsList.add(_TestUtil.cloneDocument(doc));
192 docIDs.add(doc.get("docid"));
194 final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
195 while(docsList.size() < maxDocCount) {
196 doc = docs.nextDoc();
200 docsList.add(_TestUtil.cloneDocument(doc));
201 docIDs.add(doc.get("docid"));
203 addCount.addAndGet(docsList.size());
205 final Term packIDTerm = new Term("packID", packID);
207 if (delSubDocs != null) {
208 delSubDocs.deleted = true;
209 delIDs.addAll(delSubDocs.subIDs);
210 delCount.addAndGet(delSubDocs.subIDs.size());
212 System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
214 updateDocuments(packIDTerm, docsList);
217 System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
219 addDocuments(packIDTerm, docsList);
221 doc.removeField("packID");
223 if (random.nextInt(5) == 2) {
225 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
227 toDeleteSubDocs.add(subDocs);
232 final String docid = doc.get("docid");
234 System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid);
236 addDocument(new Term("docid", docid), doc);
237 addCount.getAndIncrement();
239 if (random.nextInt(5) == 3) {
241 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
243 toDeleteIDs.add(docid);
248 // Update single doc, but we never re-use
249 // and ID so the delete will never
252 System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
254 final String docid = doc.get("docid");
255 updateDocument(new Term("docid", docid), doc);
256 addCount.getAndIncrement();
258 if (random.nextInt(5) == 3) {
260 System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
262 toDeleteIDs.add(docid);
266 if (random.nextInt(30) == 17) {
268 System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
270 for(String id : toDeleteIDs) {
272 System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
274 deleteDocuments(new Term("docid", id));
276 final int count = delCount.addAndGet(toDeleteIDs.size());
278 System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
280 delIDs.addAll(toDeleteIDs);
283 for(SubDocs subDocs : toDeleteSubDocs) {
284 assert !subDocs.deleted;
285 delPackIDs.add(subDocs.packID);
286 deleteDocuments(new Term("packID", subDocs.packID));
287 subDocs.deleted = true;
289 System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
291 delIDs.addAll(subDocs.subIDs);
292 delCount.addAndGet(subDocs.subIDs.size());
294 toDeleteSubDocs.clear();
296 if (addedField != null) {
297 doc.removeField(addedField);
299 } catch (Throwable t) {
300 System.out.println(Thread.currentThread().getName() + ": hit exc");
303 throw new RuntimeException(t);
307 System.out.println(Thread.currentThread().getName() + ": indexing done");
310 doAfterIndexingThreadDone();
313 threads[thread].setDaemon(true);
314 threads[thread].start();
320 protected void runSearchThreads(final long stopTimeMS) throws Exception {
321 final int numThreads = _TestUtil.nextInt(random, 1, 5);
322 final Thread[] searchThreads = new Thread[numThreads];
323 final AtomicInteger totHits = new AtomicInteger();
325 // silly starting guess:
326 final AtomicInteger totTermCount = new AtomicInteger(100);
328 // TODO: we should enrich this to do more interesting searches
329 for(int thread=0;thread<searchThreads.length;thread++) {
330 searchThreads[thread] = new Thread() {
333 while (System.currentTimeMillis() < stopTimeMS) {
335 final IndexSearcher s = getCurrentSearcher();
337 if (s.getIndexReader().numDocs() > 0) {
338 smokeTestSearcher(s);
339 TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
340 int seenTermCount = 0;
343 if (totTermCount.get() < 10) {
347 trigger = totTermCount.get()/10;
348 shift = random.nextInt(trigger);
350 while(System.currentTimeMillis() < stopTimeMS) {
351 Term term = termEnum.term();
353 if (seenTermCount == 0) {
356 totTermCount.set(seenTermCount);
358 if (totTermCount.get() < 10) {
362 trigger = totTermCount.get()/10;
363 //System.out.println("trigger " + trigger);
364 shift = random.nextInt(trigger);
366 termEnum = s.getIndexReader().terms(new Term("body", ""));
374 if ((seenTermCount + shift) % trigger == 0) {
376 //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
378 totHits.addAndGet(runQuery(s, new TermQuery(term)));
382 //System.out.println(Thread.currentThread().getName() + ": search done");
388 } catch (Throwable t) {
389 System.out.println(Thread.currentThread().getName() + ": hit exc");
391 t.printStackTrace(System.out);
392 throw new RuntimeException(t);
397 searchThreads[thread].setDaemon(true);
398 searchThreads[thread].start();
401 for(int thread=0;thread<searchThreads.length;thread++) {
402 searchThreads[thread].join();
406 System.out.println("TEST: DONE search: totHits=" + totHits);
410 protected void doAfterWriter(ExecutorService es) throws Exception {
413 protected void doClose() throws Exception {
416 public void runTest(String testName) throws Exception {
423 final long t0 = System.currentTimeMillis();
425 final LineFileDocs docs = new LineFileDocs(random);
426 final File tempDir = _TestUtil.getTempDir(testName);
427 dir = newFSDirectory(tempDir);
428 ((MockDirectoryWrapper) dir).setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
429 final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
431 if (LuceneTestCase.TEST_NIGHTLY) {
432 // newIWConfig makes smallish max seg size, which
433 // results in tons and tons of segments for this test
435 MergePolicy mp = conf.getMergePolicy();
436 if (mp instanceof TieredMergePolicy) {
437 ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
438 } else if (mp instanceof LogByteSizeMergePolicy) {
439 ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
440 } else if (mp instanceof LogMergePolicy) {
441 ((LogMergePolicy) mp).setMaxMergeDocs(100000);
445 conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
447 public void warm(IndexReader reader) throws IOException {
449 System.out.println("TEST: now warm merged reader=" + reader);
451 final int maxDoc = reader.maxDoc();
453 final int inc = Math.max(1, maxDoc/50);
454 for(int docID=0;docID<maxDoc;docID += inc) {
455 if (!reader.isDeleted(docID)) {
456 final Document doc = reader.document(docID);
457 sum += doc.getFields().size();
461 IndexSearcher searcher = newSearcher(reader);
462 sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
466 System.out.println("TEST: warm visited " + sum + " fields");
471 writer = new IndexWriter(dir, conf);
473 writer.setInfoStream(System.out);
475 _TestUtil.reduceOpenFiles(writer);
477 final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory(testName));
481 final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 2, 4);
483 final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
485 final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
486 final Set<String> delPackIDs = Collections.synchronizedSet(new HashSet<String>());
487 final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
489 final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
491 final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs);
494 System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
497 // Let index build up a bit
500 doSearching(es, stopTime);
503 System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
506 for(int thread=0;thread<indexThreads.length;thread++) {
507 indexThreads[thread].join();
511 System.out.println("TEST: done join indexing threads [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
514 final IndexSearcher s = getFinalSearcher();
516 System.out.println("TEST: finalSearcher=" + s);
519 assertFalse(failed.get());
521 boolean doFail = false;
523 // Verify: make sure delIDs are in fact deleted:
524 for(String id : delIDs) {
525 final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
526 if (hits.totalHits != 0) {
527 System.out.println("doc id=" + id + " is supposed to be deleted, but got " + hits.totalHits + " hits; first docID=" + hits.scoreDocs[0].doc);
532 // Verify: make sure delPackIDs are in fact deleted:
533 for(String id : delPackIDs) {
534 final TopDocs hits = s.search(new TermQuery(new Term("packID", id)), 1);
535 if (hits.totalHits != 0) {
536 System.out.println("packID=" + id + " is supposed to be deleted, but got " + hits.totalHits + " matches");
541 // Verify: make sure each group of sub-docs are still in docID order:
542 for(SubDocs subDocs : allSubDocs) {
543 TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
544 if (!subDocs.deleted) {
545 // We sort by relevance but the scores should be identical so sort falls back to by docID:
546 if (hits.totalHits != subDocs.subIDs.size()) {
547 System.out.println("packID=" + subDocs.packID + ": expected " + subDocs.subIDs.size() + " hits but got " + hits.totalHits);
552 for(ScoreDoc scoreDoc : hits.scoreDocs) {
553 final int docID = scoreDoc.doc;
554 if (lastDocID != -1) {
555 assertEquals(1+lastDocID, docID);
560 final Document doc = s.doc(docID);
561 assertEquals(subDocs.packID, doc.get("packID"));
564 lastDocID = startDocID - 1;
565 for(String subID : subDocs.subIDs) {
566 hits = s.search(new TermQuery(new Term("docid", subID)), 1);
567 assertEquals(1, hits.totalHits);
568 final int docID = hits.scoreDocs[0].doc;
569 if (lastDocID != -1) {
570 assertEquals(1+lastDocID, docID);
576 // Pack was deleted -- make sure its docs are
577 // deleted. We can't verify packID is deleted
578 // because we can re-use packID for update:
579 for(String subID : subDocs.subIDs) {
580 assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
585 // Verify: make sure all not-deleted docs are in fact
587 final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
590 for(int id=0;id<endID;id++) {
591 String stringID = ""+id;
592 if (!delIDs.contains(stringID)) {
593 final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
594 if (hits.totalHits != 1) {
595 System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
602 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
607 assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
609 assertFalse(writer.anyNonBulkMerges);
613 // Cannot shutdown until after writer is closed because
614 // writer has merged segment warmer that uses IS to run
615 // searches, and that IS may be using this es!
618 es.awaitTermination(1, TimeUnit.SECONDS);
621 _TestUtil.checkIndex(dir);
623 _TestUtil.rmDir(tempDir);
626 System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
630 private int runQuery(IndexSearcher s, Query q) throws Exception {
632 return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
635 protected void smokeTestSearcher(IndexSearcher s) throws Exception {
636 runQuery(s, new TermQuery(new Term("body", "united")));
637 runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
638 PhraseQuery pq = new PhraseQuery();
639 pq.add(new Term("body", "united"));
640 pq.add(new Term("body", "states"));