pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / test-framework / java / org / apache / lucene / index / ThreadedIndexingAndSearchingTestCase.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.apache.lucene.analysis.MockAnalyzer;
35 import org.apache.lucene.document.Document;
36 import org.apache.lucene.document.Field;
37 import org.apache.lucene.search.IndexSearcher;
38 import org.apache.lucene.search.PhraseQuery;
39 import org.apache.lucene.search.Query;
40 import org.apache.lucene.search.ScoreDoc;
41 import org.apache.lucene.search.Sort;
42 import org.apache.lucene.search.SortField;
43 import org.apache.lucene.search.TermQuery;
44 import org.apache.lucene.search.TopDocs;
45 import org.apache.lucene.store.Directory;
46 import org.apache.lucene.store.MockDirectoryWrapper;
47 import org.apache.lucene.util.LineFileDocs;
48 import org.apache.lucene.util.LuceneTestCase;
49 import org.apache.lucene.util.NamedThreadFactory;
50 import org.apache.lucene.util._TestUtil;
51
52 // TODO
53 //   - mix in forceMerge, addIndexes
54 //   - randomly mix in non-congruent docs
55
56 /** Utility class that spawns multiple indexing and
57  *  searching threads. */
58 public abstract class ThreadedIndexingAndSearchingTestCase extends LuceneTestCase {
59
60   protected final AtomicBoolean failed = new AtomicBoolean();
61   protected final AtomicInteger addCount = new AtomicInteger();
62   protected final AtomicInteger delCount = new AtomicInteger();
63   protected final AtomicInteger packCount = new AtomicInteger();
64
65   protected Directory dir;
66   protected IndexWriter writer;
67
68   private static class SubDocs {
69     public final String packID;
70     public final List<String> subIDs;
71     public boolean deleted;
72
73     public SubDocs(String packID, List<String> subIDs) {
74       this.packID = packID;
75       this.subIDs = subIDs;
76     }
77   }
78
79   // Called per-search
80   protected abstract IndexSearcher getCurrentSearcher() throws Exception;
81
82   protected abstract IndexSearcher getFinalSearcher() throws Exception;
83
84   protected void releaseSearcher(IndexSearcher s) throws Exception {
85   }
86
87   // Called once to run searching
88   protected abstract void doSearching(ExecutorService es, long stopTime) throws Exception;
89
90   protected Directory getDirectory(Directory in) {
91     return in;
92   }
93
94   protected void updateDocuments(Term id, Collection<Document> docs) throws Exception {
95     writer.updateDocuments(id, docs);
96   }
97
98   protected void addDocuments(Term id, Collection<Document> docs) throws Exception {
99     writer.addDocuments(docs);
100   }
101
102   protected void addDocument(Term id, Document doc) throws Exception {
103     writer.addDocument(doc);
104   }
105
106   protected void updateDocument(Term term, Document doc) throws Exception {
107     writer.updateDocument(term, doc);
108   }
109
110   protected void deleteDocuments(Term term) throws Exception {
111     writer.deleteDocuments(term);
112   }
113
114   protected void doAfterIndexingThreadDone() {
115   }
116
117   private Thread[] launchIndexingThreads(final LineFileDocs docs,
118                                          int numThreads,
119                                          final long stopTime,
120                                          final Set<String> delIDs,
121                                          final Set<String> delPackIDs,
122                                          final List<SubDocs> allSubDocs)
123     throws Exception {
124     final Thread[] threads = new Thread[numThreads];
125     for(int thread=0;thread<numThreads;thread++) {
126       threads[thread] = new Thread() {
127           @Override
128           public void run() {
129             // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
130             final List<String> toDeleteIDs = new ArrayList<String>();
131             final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
132             while(System.currentTimeMillis() < stopTime && !failed.get()) {
133               try {
134
135                 // Occasional longish pause if running
136                 // nightly
137                 if (LuceneTestCase.TEST_NIGHTLY && random.nextInt(6) == 3) {
138                   if (VERBOSE) {
139                     System.out.println(Thread.currentThread().getName() + ": now long sleep");
140                   }
141                   Thread.sleep(_TestUtil.nextInt(random, 50, 500));
142                 }
143
144                 // Rate limit ingest rate:
145                 if (random.nextInt(7) == 5) {
146                   Thread.sleep(_TestUtil.nextInt(random, 1, 10));
147                   if (VERBOSE) {
148                     System.out.println(Thread.currentThread().getName() + ": done sleep");
149                   }
150                 }
151
152                 Document doc = docs.nextDoc();
153                 if (doc == null) {
154                   break;
155                 }
156
157                 // Maybe add randomly named field
158                 final String addedField;
159                 if (random.nextBoolean()) {
160                   addedField = "extra" + random.nextInt(40);
161                   doc.add(newField(addedField, "a random field", Field.Store.YES, Field.Index.ANALYZED));
162                 } else {
163                   addedField = null;
164                 }
165
166                 if (random.nextBoolean()) {
167
168                   if (random.nextBoolean()) {
169                     // Add/update doc block:
170                     final String packID;
171                     final SubDocs delSubDocs;
172                     if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
173                       delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
174                       assert !delSubDocs.deleted;
175                       toDeleteSubDocs.remove(delSubDocs);
176                       // Update doc block, replacing prior packID
177                       packID = delSubDocs.packID;
178                     } else {
179                       delSubDocs = null;
180                       // Add doc block, using new packID
181                       packID = packCount.getAndIncrement() + "";
182                     }
183
184                     final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
185                     final List<String> docIDs = new ArrayList<String>();
186                     final SubDocs subDocs = new SubDocs(packID, docIDs);
187                     final List<Document> docsList = new ArrayList<Document>();
188
189                     allSubDocs.add(subDocs);
190                     doc.add(packIDField);
191                     docsList.add(_TestUtil.cloneDocument(doc));
192                     docIDs.add(doc.get("docid"));
193
194                     final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
195                     while(docsList.size() < maxDocCount) {
196                       doc = docs.nextDoc();
197                       if (doc == null) {
198                         break;
199                       }
200                       docsList.add(_TestUtil.cloneDocument(doc));
201                       docIDs.add(doc.get("docid"));
202                     }
203                     addCount.addAndGet(docsList.size());
204
205                     final Term packIDTerm = new Term("packID", packID);
206
207                     if (delSubDocs != null) {
208                       delSubDocs.deleted = true;
209                       delIDs.addAll(delSubDocs.subIDs);
210                       delCount.addAndGet(delSubDocs.subIDs.size());
211                       if (VERBOSE) {
212                         System.out.println(Thread.currentThread().getName() + ": update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
213                       }
214                       updateDocuments(packIDTerm, docsList);
215                     } else {
216                       if (VERBOSE) {
217                         System.out.println(Thread.currentThread().getName() + ": add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
218                       }
219                       addDocuments(packIDTerm, docsList);
220                     }
221                     doc.removeField("packID");
222
223                     if (random.nextInt(5) == 2) {
224                       if (VERBOSE) {
225                         System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
226                       }
227                       toDeleteSubDocs.add(subDocs);
228                     }
229
230                   } else {
231                     // Add single doc
232                     final String docid = doc.get("docid");
233                     if (VERBOSE) {
234                       System.out.println(Thread.currentThread().getName() + ": add doc docid:" + docid);
235                     }
236                     addDocument(new Term("docid", docid), doc);
237                     addCount.getAndIncrement();
238
239                     if (random.nextInt(5) == 3) {
240                       if (VERBOSE) {
241                         System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
242                       }
243                       toDeleteIDs.add(docid);
244                     }
245                   }
246                 } else {
247
248                   // Update single doc, but we never re-use
249                   // and ID so the delete will never
250                   // actually happen:
251                   if (VERBOSE) {
252                     System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
253                   }
254                   final String docid = doc.get("docid");
255                   updateDocument(new Term("docid", docid), doc);
256                   addCount.getAndIncrement();
257
258                   if (random.nextInt(5) == 3) {
259                     if (VERBOSE) {
260                       System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
261                     }
262                     toDeleteIDs.add(docid);
263                   }
264                 }
265
266                 if (random.nextInt(30) == 17) {
267                   if (VERBOSE) {
268                     System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
269                   }
270                   for(String id : toDeleteIDs) {
271                     if (VERBOSE) {
272                       System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
273                     }
274                     deleteDocuments(new Term("docid", id));
275                   }
276                   final int count = delCount.addAndGet(toDeleteIDs.size());
277                   if (VERBOSE) {
278                     System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
279                   }
280                   delIDs.addAll(toDeleteIDs);
281                   toDeleteIDs.clear();
282
283                   for(SubDocs subDocs : toDeleteSubDocs) {
284                     assert !subDocs.deleted;
285                     delPackIDs.add(subDocs.packID);
286                     deleteDocuments(new Term("packID", subDocs.packID));
287                     subDocs.deleted = true;
288                     if (VERBOSE) {
289                       System.out.println(Thread.currentThread().getName() + ": del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
290                     }
291                     delIDs.addAll(subDocs.subIDs);
292                     delCount.addAndGet(subDocs.subIDs.size());
293                   }
294                   toDeleteSubDocs.clear();
295                 }
296                 if (addedField != null) {
297                   doc.removeField(addedField);
298                 }
299               } catch (Throwable t) {
300                 System.out.println(Thread.currentThread().getName() + ": hit exc");
301                 t.printStackTrace();
302                 failed.set(true);
303                 throw new RuntimeException(t);
304               }
305             }
306             if (VERBOSE) {
307               System.out.println(Thread.currentThread().getName() + ": indexing done");
308             }
309
310             doAfterIndexingThreadDone();
311           }
312         };
313       threads[thread].setDaemon(true);
314       threads[thread].start();
315     }
316
317     return threads;
318   }
319
320   protected void runSearchThreads(final long stopTimeMS) throws Exception {
321     final int numThreads = _TestUtil.nextInt(random, 1, 5);
322     final Thread[] searchThreads = new Thread[numThreads];
323     final AtomicInteger totHits = new AtomicInteger();
324
325     // silly starting guess:
326     final AtomicInteger totTermCount = new AtomicInteger(100);
327
328     // TODO: we should enrich this to do more interesting searches
329     for(int thread=0;thread<searchThreads.length;thread++) {
330       searchThreads[thread] = new Thread() {
331           @Override
332           public void run() {
333             while (System.currentTimeMillis() < stopTimeMS) {
334               try {
335                 final IndexSearcher s = getCurrentSearcher();
336                 try {
337                   if (s.getIndexReader().numDocs() > 0) {
338                     smokeTestSearcher(s);
339                     TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
340                     int seenTermCount = 0;
341                     int shift;
342                     int trigger; 
343                     if (totTermCount.get() < 10) {
344                       shift = 0;
345                       trigger = 1;
346                     } else {
347                       trigger = totTermCount.get()/10;
348                       shift = random.nextInt(trigger);
349                     }
350                     while(System.currentTimeMillis() < stopTimeMS) {
351                       Term term = termEnum.term();
352                       if (term == null) {
353                         if (seenTermCount == 0) {
354                           break;
355                         }
356                         totTermCount.set(seenTermCount);
357                         seenTermCount = 0;
358                         if (totTermCount.get() < 10) {
359                           shift = 0;
360                           trigger = 1;
361                         } else {
362                           trigger = totTermCount.get()/10;
363                           //System.out.println("trigger " + trigger);
364                           shift = random.nextInt(trigger);
365                         }
366                         termEnum = s.getIndexReader().terms(new Term("body", ""));
367                         continue;
368                       }
369                       seenTermCount++;
370                       // search 10 terms
371                       if (trigger == 0) {
372                         trigger = 1;
373                       }
374                       if ((seenTermCount + shift) % trigger == 0) {
375                         //if (VERBOSE) {
376                         //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
377                         //}
378                         totHits.addAndGet(runQuery(s, new TermQuery(term)));
379                       }
380                     }
381                     //if (VERBOSE) {
382                     //System.out.println(Thread.currentThread().getName() + ": search done");
383                     //}
384                   }
385                 } finally {
386                   releaseSearcher(s);
387                 }
388               } catch (Throwable t) {
389                 System.out.println(Thread.currentThread().getName() + ": hit exc");
390                 failed.set(true);
391                 t.printStackTrace(System.out);
392                 throw new RuntimeException(t);
393               }
394             }
395           }
396         };
397       searchThreads[thread].setDaemon(true);
398       searchThreads[thread].start();
399     }
400
401     for(int thread=0;thread<searchThreads.length;thread++) {
402       searchThreads[thread].join();
403     }
404
405     if (VERBOSE) {
406       System.out.println("TEST: DONE search: totHits=" + totHits);
407     }
408   }
409
410   protected void doAfterWriter(ExecutorService es) throws Exception {
411   }
412
413   protected void doClose() throws Exception {
414   }
415
416   public void runTest(String testName) throws Exception {
417
418     failed.set(false);
419     addCount.set(0);
420     delCount.set(0);
421     packCount.set(0);
422
423     final long t0 = System.currentTimeMillis();
424
425     final LineFileDocs docs = new LineFileDocs(random);
426     final File tempDir = _TestUtil.getTempDir(testName);
427     dir = newFSDirectory(tempDir);
428     ((MockDirectoryWrapper) dir).setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
429     final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
430
431     if (LuceneTestCase.TEST_NIGHTLY) {
432       // newIWConfig makes smallish max seg size, which
433       // results in tons and tons of segments for this test
434       // when run nightly:
435       MergePolicy mp = conf.getMergePolicy();
436       if (mp instanceof TieredMergePolicy) {
437         ((TieredMergePolicy) mp).setMaxMergedSegmentMB(5000.);
438       } else if (mp instanceof LogByteSizeMergePolicy) {
439         ((LogByteSizeMergePolicy) mp).setMaxMergeMB(1000.);
440       } else if (mp instanceof LogMergePolicy) {
441         ((LogMergePolicy) mp).setMaxMergeDocs(100000);
442       }
443     }
444
445     conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
446       @Override
447       public void warm(IndexReader reader) throws IOException {
448         if (VERBOSE) {
449           System.out.println("TEST: now warm merged reader=" + reader);
450         }
451         final int maxDoc = reader.maxDoc();
452         int sum = 0;
453         final int inc = Math.max(1, maxDoc/50);
454         for(int docID=0;docID<maxDoc;docID += inc) {
455           if (!reader.isDeleted(docID)) {
456             final Document doc = reader.document(docID);
457             sum += doc.getFields().size();
458           }
459         }
460
461         IndexSearcher searcher = newSearcher(reader);
462         sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
463         searcher.close();
464
465         if (VERBOSE) {
466           System.out.println("TEST: warm visited " + sum + " fields");
467         }
468       }
469       });
470
471     writer = new IndexWriter(dir, conf);
472     if (VERBOSE) {
473       writer.setInfoStream(System.out);
474     }
475     _TestUtil.reduceOpenFiles(writer);
476
477     final ExecutorService es = random.nextBoolean() ? null : Executors.newCachedThreadPool(new NamedThreadFactory(testName));
478
479     doAfterWriter(es);
480
481     final int NUM_INDEX_THREADS = _TestUtil.nextInt(random, 2, 4);
482
483     final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
484
485     final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
486     final Set<String> delPackIDs = Collections.synchronizedSet(new HashSet<String>());
487     final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
488
489     final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
490
491     final Thread[] indexThreads = launchIndexingThreads(docs, NUM_INDEX_THREADS, stopTime, delIDs, delPackIDs, allSubDocs);
492
493     if (VERBOSE) {
494       System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
495     }
496
497     // Let index build up a bit
498     Thread.sleep(100);
499
500     doSearching(es, stopTime);
501
502     if (VERBOSE) {
503       System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
504     }
505     
506     for(int thread=0;thread<indexThreads.length;thread++) {
507       indexThreads[thread].join();
508     }
509
510     if (VERBOSE) {
511       System.out.println("TEST: done join indexing threads [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
512     }
513
514     final IndexSearcher s = getFinalSearcher();
515     if (VERBOSE) {
516       System.out.println("TEST: finalSearcher=" + s);
517     }
518
519     assertFalse(failed.get());
520
521     boolean doFail = false;
522
523     // Verify: make sure delIDs are in fact deleted:
524     for(String id : delIDs) {
525       final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
526       if (hits.totalHits != 0) {
527         System.out.println("doc id=" + id + " is supposed to be deleted, but got " + hits.totalHits + " hits; first docID=" + hits.scoreDocs[0].doc);
528         doFail = true;
529       }
530     }
531
532     // Verify: make sure delPackIDs are in fact deleted:
533     for(String id : delPackIDs) {
534       final TopDocs hits = s.search(new TermQuery(new Term("packID", id)), 1);
535       if (hits.totalHits != 0) {
536         System.out.println("packID=" + id + " is supposed to be deleted, but got " + hits.totalHits + " matches");
537         doFail = true;
538       }
539     }
540
541     // Verify: make sure each group of sub-docs are still in docID order:
542     for(SubDocs subDocs : allSubDocs) {
543       TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
544       if (!subDocs.deleted) {
545         // We sort by relevance but the scores should be identical so sort falls back to by docID:
546         if (hits.totalHits != subDocs.subIDs.size()) {
547           System.out.println("packID=" + subDocs.packID + ": expected " + subDocs.subIDs.size() + " hits but got " + hits.totalHits);
548           doFail = true;
549         } else {
550           int lastDocID = -1;
551           int startDocID = -1;
552           for(ScoreDoc scoreDoc : hits.scoreDocs) {
553             final int docID = scoreDoc.doc;
554             if (lastDocID != -1) {
555               assertEquals(1+lastDocID, docID);
556             } else {
557               startDocID = docID;
558             }
559             lastDocID = docID;
560             final Document doc = s.doc(docID);
561             assertEquals(subDocs.packID, doc.get("packID"));
562           }
563
564           lastDocID = startDocID - 1;
565           for(String subID : subDocs.subIDs) {
566             hits = s.search(new TermQuery(new Term("docid", subID)), 1);
567             assertEquals(1, hits.totalHits);
568             final int docID = hits.scoreDocs[0].doc;
569             if (lastDocID != -1) {
570               assertEquals(1+lastDocID, docID);
571             }
572             lastDocID = docID;
573           }
574         }
575       } else {
576         // Pack was deleted -- make sure its docs are
577         // deleted.  We can't verify packID is deleted
578         // because we can re-use packID for update:
579         for(String subID : subDocs.subIDs) {
580           assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
581         }
582       }
583     }
584
585     // Verify: make sure all not-deleted docs are in fact
586     // not deleted:
587     final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
588     docs.close();
589
590     for(int id=0;id<endID;id++) {
591       String stringID = ""+id;
592       if (!delIDs.contains(stringID)) {
593         final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
594         if (hits.totalHits != 1) {
595           System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
596           doFail = true;
597         }
598       }
599     }
600     assertFalse(doFail);
601     
602     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), s.getIndexReader().numDocs());
603     releaseSearcher(s);
604
605     writer.commit();
606
607     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
608
609     assertFalse(writer.anyNonBulkMerges);
610     doClose();
611     writer.close(false);
612
613     // Cannot shutdown until after writer is closed because
614     // writer has merged segment warmer that uses IS to run
615     // searches, and that IS may be using this es!
616     if (es != null) {
617       es.shutdown();
618       es.awaitTermination(1, TimeUnit.SECONDS);
619     }
620
621     _TestUtil.checkIndex(dir);
622     dir.close();
623     _TestUtil.rmDir(tempDir);
624
625     if (VERBOSE) {
626       System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
627     }
628   }
629
630   private int runQuery(IndexSearcher s, Query q) throws Exception {
631     s.search(q, 10);
632     return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
633   }
634
635   protected void smokeTestSearcher(IndexSearcher s) throws Exception {
636     runQuery(s, new TermQuery(new Term("body", "united")));
637     runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
638     PhraseQuery pq = new PhraseQuery();
639     pq.add(new Term("body", "united"));
640     pq.add(new Term("body", "states"));
641     runQuery(s, pq);
642   }
643 }