pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / backwards / src / test / org / apache / lucene / index / TestNRTThreads.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.lucene.analysis.MockAnalyzer;
34 import org.apache.lucene.document.Document;
35 import org.apache.lucene.document.Field;
36 import org.apache.lucene.search.IndexSearcher;
37 import org.apache.lucene.search.PhraseQuery;
38 import org.apache.lucene.search.Query;
39 import org.apache.lucene.search.ScoreDoc;
40 import org.apache.lucene.search.Sort;
41 import org.apache.lucene.search.SortField;
42 import org.apache.lucene.search.TermQuery;
43 import org.apache.lucene.search.TopDocs;
44 import org.apache.lucene.store.MockDirectoryWrapper;
45 import org.apache.lucene.util.LineFileDocs;
46 import org.apache.lucene.util.LuceneTestCase;
47 import org.apache.lucene.util._TestUtil;
48 import org.junit.Test;
49
50 // TODO
51 //   - mix in optimize, addIndexes
52 //   - randomoly mix in non-congruent docs
53
54 public class TestNRTThreads extends LuceneTestCase {
55
56   private static class SubDocs {
57     public final String packID;
58     public final List<String> subIDs;
59     public boolean deleted;
60
61     public SubDocs(String packID, List<String> subIDs) {
62       this.packID = packID;
63       this.subIDs = subIDs;
64     }
65   }
66
67   @Test
68   public void testNRTThreads() throws Exception {
69
70     final long t0 = System.currentTimeMillis();
71
72     final LineFileDocs docs = new LineFileDocs(random);
73     final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
74     final MockDirectoryWrapper dir = newFSDirectory(tempDir);
75     dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
76     final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
77     conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
78       @Override
79       public void warm(IndexReader reader) throws IOException {
80         if (VERBOSE) {
81           System.out.println("TEST: now warm merged reader=" + reader);
82         }
83         final int maxDoc = reader.maxDoc();
84         int sum = 0;
85         final int inc = Math.max(1, maxDoc/50);
86         for(int docID=0;docID<maxDoc;docID += inc) {
87           if (reader.isDeleted(docID)) {
88             final Document doc = reader.document(docID);
89             sum += doc.getFields().size();
90           }
91         }
92
93         IndexSearcher searcher = newSearcher(reader);
94         sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
95         searcher.close();
96
97         if (VERBOSE) {
98           System.out.println("TEST: warm visited " + sum + " fields");
99         }
100       }
101       });
102     
103     final IndexWriter writer = new IndexWriter(dir, conf);
104     if (VERBOSE) {
105       writer.setInfoStream(System.out);
106     }
107     _TestUtil.reduceOpenFiles(writer);
108
109     final int NUM_INDEX_THREADS = 2;
110     final int NUM_SEARCH_THREADS = 3;
111
112     final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
113
114     final AtomicBoolean failed = new AtomicBoolean();
115     final AtomicInteger addCount = new AtomicInteger();
116     final AtomicInteger delCount = new AtomicInteger();
117     final AtomicInteger packCount = new AtomicInteger();
118
119     final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
120     final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
121
122     final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
123     Thread[] threads = new Thread[NUM_INDEX_THREADS];
124     for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
125       threads[thread] = new Thread() {
126           @Override
127           public void run() {
128             // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
129             final List<String> toDeleteIDs = new ArrayList<String>();
130             final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
131             while(System.currentTimeMillis() < stopTime && !failed.get()) {
132               try {
133                 Document doc = docs.nextDoc();
134                 if (doc == null) {
135                   break;
136                 }
137                 final String addedField;
138                 if (random.nextBoolean()) {
139                   addedField = "extra" + random.nextInt(10);
140                   doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
141                 } else {
142                   addedField = null;
143                 }
144                 if (random.nextBoolean()) {
145                   if (VERBOSE) {
146                     System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
147                   }
148
149                   if (random.nextBoolean()) {
150                     // Add a pack of adjacent sub-docs
151                     final String packID;
152                     final SubDocs delSubDocs;
153                     if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
154                       delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
155                       assert !delSubDocs.deleted;
156                       toDeleteSubDocs.remove(delSubDocs);
157                       // reuse prior packID
158                       packID = delSubDocs.packID;
159                     } else {
160                       delSubDocs = null;
161                       // make new packID
162                       packID = packCount.getAndIncrement() + "";
163                     }
164
165                     final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
166                     final List<String> docIDs = new ArrayList<String>();
167                     final SubDocs subDocs = new SubDocs(packID, docIDs);
168                     final List<Document> docsList = new ArrayList<Document>();
169
170                     allSubDocs.add(subDocs);
171                     doc.add(packIDField);
172                     docsList.add(_TestUtil.cloneDocument(doc));
173                     docIDs.add(doc.get("docid"));
174
175                     final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
176                     while(docsList.size() < maxDocCount) {
177                       doc = docs.nextDoc();
178                       if (doc == null) {
179                         break;
180                       }
181                       docsList.add(_TestUtil.cloneDocument(doc));
182                       docIDs.add(doc.get("docid"));
183                     }
184                     addCount.addAndGet(docsList.size());
185
186                     if (delSubDocs != null) {
187                       delSubDocs.deleted = true;
188                       delIDs.addAll(delSubDocs.subIDs);
189                       delCount.addAndGet(delSubDocs.subIDs.size());
190                       if (VERBOSE) {
191                         System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
192                       }
193                       writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
194                       /*
195                       // non-atomic:
196                       writer.deleteDocuments(new Term("packID", delSubDocs.packID));
197                       for(Document subDoc : docsList) {
198                         writer.addDocument(subDoc);
199                       }
200                       */
201                     } else {
202                       if (VERBOSE) {
203                         System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
204                       }
205                       writer.addDocuments(docsList);
206                       
207                       /*
208                       // non-atomic:
209                       for(Document subDoc : docsList) {
210                         writer.addDocument(subDoc);
211                       }
212                       */
213                     }
214                     doc.removeField("packID");
215
216                     if (random.nextInt(5) == 2) {
217                       if (VERBOSE) {
218                         //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
219                       }
220                       toDeleteSubDocs.add(subDocs);
221                     }
222
223                   } else {
224                     writer.addDocument(doc);
225                     addCount.getAndIncrement();
226
227                     if (random.nextInt(5) == 3) {
228                       if (VERBOSE) {
229                         //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
230                       }
231                       toDeleteIDs.add(doc.get("docid"));
232                     }
233                   }
234                 } else {
235                   // we use update but it never replaces a
236                   // prior doc
237                   if (VERBOSE) {
238                     System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
239                   }
240                   writer.updateDocument(new Term("docid", doc.get("docid")), doc);
241                   addCount.getAndIncrement();
242
243                   if (random.nextInt(5) == 3) {
244                     if (VERBOSE) {
245                       //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
246                     }
247                     toDeleteIDs.add(doc.get("docid"));
248                   }
249                 }
250
251                 if (random.nextInt(30) == 17) {
252                   if (VERBOSE) {
253                     System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
254                   }
255                   for(String id : toDeleteIDs) {
256                     if (VERBOSE) {
257                       System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
258                     }
259                     writer.deleteDocuments(new Term("docid", id));
260                   }
261                   final int count = delCount.addAndGet(toDeleteIDs.size());
262                   if (VERBOSE) {
263                     System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
264                   }
265                   delIDs.addAll(toDeleteIDs);
266                   toDeleteIDs.clear();
267
268                   for(SubDocs subDocs : toDeleteSubDocs) {
269                     assert !subDocs.deleted;
270                     writer.deleteDocuments(new Term("packID", subDocs.packID));
271                     subDocs.deleted = true;
272                     if (VERBOSE) {
273                       System.out.println("  del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
274                     }
275                     delIDs.addAll(subDocs.subIDs);
276                     delCount.addAndGet(subDocs.subIDs.size());
277                   }
278                   toDeleteSubDocs.clear();
279                 }
280                 if (addedField != null) {
281                   doc.removeField(addedField);
282                 }
283               } catch (Throwable t) {
284                 System.out.println(Thread.currentThread().getName() + ": hit exc");
285                 t.printStackTrace();
286                 failed.set(true);
287                 throw new RuntimeException(t);
288               }
289             }
290             if (VERBOSE) {
291               System.out.println(Thread.currentThread().getName() + ": indexing done");
292             }
293           }
294         };
295       threads[thread].setDaemon(true);
296       threads[thread].start();
297     }
298
299     if (VERBOSE) {
300       System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
301     }
302
303     // let index build up a bit
304     Thread.sleep(100);
305
306     IndexReader r = IndexReader.open(writer, true);
307     boolean any = false;
308
309     // silly starting guess:
310     final AtomicInteger totTermCount = new AtomicInteger(100);
311
312     final ExecutorService es = Executors.newCachedThreadPool();
313
314     while(System.currentTimeMillis() < stopTime && !failed.get()) {
315       if (random.nextBoolean()) {
316         if (VERBOSE) {
317           System.out.println("TEST: now reopen r=" + r);
318         }
319         final IndexReader r2 = r.reopen();
320         if (r != r2) {
321           r.close();
322           r = r2;
323         }
324       } else {
325         if (VERBOSE) {
326           System.out.println("TEST: now close reader=" + r);
327         }
328         r.close();
329         writer.commit();
330         final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
331         if (openDeletedFiles.size() > 0) {
332           System.out.println("OBD files: " + openDeletedFiles);
333         }
334         any |= openDeletedFiles.size() > 0;
335         //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
336         if (VERBOSE) {
337           System.out.println("TEST: now open");
338         }
339         r = IndexReader.open(writer, true);
340       }
341       if (VERBOSE) {
342         System.out.println("TEST: got new reader=" + r);
343       }
344       //System.out.println("numDocs=" + r.numDocs() + "
345       //openDelFileCount=" + dir.openDeleteFileCount());
346
347       smokeTestReader(r);
348
349       if (r.numDocs() > 0) {
350
351         final IndexSearcher s = new IndexSearcher(r, es);
352
353         // run search threads
354         final long searchStopTime = System.currentTimeMillis() + 500;
355         final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
356         final AtomicInteger totHits = new AtomicInteger();
357         for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
358           searchThreads[thread] = new Thread() {
359               @Override
360               public void run() {
361                 try {
362                   TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
363                   int seenTermCount = 0;
364                   int shift;
365                   int trigger;
366                   if (totTermCount.get() < 10) {
367                     shift = 0;
368                     trigger = 1;
369                   } else {
370                     trigger = totTermCount.get()/10;
371                     shift = random.nextInt(trigger);
372                   }
373                   while(System.currentTimeMillis() < searchStopTime) {
374                     Term term = termEnum.term();
375                     if (term == null) {
376                       if (seenTermCount < 10) {
377                         break;
378                       }
379                       totTermCount.set(seenTermCount);
380                       seenTermCount = 0;
381                       trigger = totTermCount.get()/10;
382                       //System.out.println("trigger " + trigger);
383                       shift = random.nextInt(trigger);
384                       termEnum = s.getIndexReader().terms(new Term("body", ""));
385                       continue;
386                     }
387                     seenTermCount++;
388                     // search 10 terms
389                     if (trigger == 0) {
390                       trigger = 1;
391                     }
392                     if ((seenTermCount + shift) % trigger == 0) {
393                       //if (VERBOSE) {
394                       //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
395                       //}
396                       totHits.addAndGet(runQuery(s, new TermQuery(term)));
397                     }
398                     termEnum.next();
399                   }
400                   if (VERBOSE) {
401                     System.out.println(Thread.currentThread().getName() + ": search done");
402                   }
403                 } catch (Throwable t) {
404                   System.out.println(Thread.currentThread().getName() + ": hit exc");
405                   failed.set(true);
406                   t.printStackTrace(System.out);
407                   throw new RuntimeException(t);
408                 }
409               }
410             };
411           searchThreads[thread].setDaemon(true);
412           searchThreads[thread].start();
413         }
414
415         for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
416           searchThreads[thread].join();
417         }
418
419         if (VERBOSE) {
420           System.out.println("TEST: DONE search: totHits=" + totHits);
421         }
422       } else {
423         Thread.sleep(100);
424       }
425     }
426
427     es.shutdown();
428     es.awaitTermination(1, TimeUnit.SECONDS);
429
430     if (VERBOSE) {
431       System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
432     }
433
434     //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
435     r.close();
436     final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
437     if (openDeletedFiles.size() > 0) {
438       System.out.println("OBD files: " + openDeletedFiles);
439     }
440     any |= openDeletedFiles.size() > 0;
441
442     assertFalse("saw non-zero open-but-deleted count", any);
443     if (VERBOSE) {
444       System.out.println("TEST: now join");
445     }
446     for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
447       threads[thread].join();
448     }
449     if (VERBOSE) {
450       System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
451     }
452
453     final IndexReader r2 = writer.getReader();
454     final IndexSearcher s = newSearcher(r2);
455     boolean doFail = false;
456     for(String id : delIDs) {
457       final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
458       if (hits.totalHits != 0) {
459         System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
460         doFail = true;
461       }
462     }
463
464     // Make sure each group of sub-docs are still in docID order:
465     for(SubDocs subDocs : allSubDocs) {
466       if (!subDocs.deleted) {
467         // We sort by relevance but the scores should be identical so sort falls back to by docID:
468         TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
469         assertEquals(subDocs.subIDs.size(), hits.totalHits);
470         int lastDocID = -1;
471         int startDocID = -1;
472         for(ScoreDoc scoreDoc : hits.scoreDocs) {
473           final int docID = scoreDoc.doc;
474           if (lastDocID != -1) {
475             assertEquals(1+lastDocID, docID);
476           } else {
477             startDocID = docID;
478           }
479           lastDocID = docID;
480           final Document doc = s.doc(docID);
481           assertEquals(subDocs.packID, doc.get("packID"));
482         }
483
484         lastDocID = startDocID - 1;
485         for(String subID : subDocs.subIDs) {
486           hits = s.search(new TermQuery(new Term("docid", subID)), 1);
487           assertEquals(1, hits.totalHits);
488           final int docID = hits.scoreDocs[0].doc;
489           if (lastDocID != -1) {
490             assertEquals(1+lastDocID, docID);
491           }
492           lastDocID = docID;
493         }          
494       } else {
495         for(String subID : subDocs.subIDs) {
496           assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
497         }
498       }
499     }
500     
501     final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
502     for(int id=0;id<endID;id++) {
503       String stringID = ""+id;
504       if (!delIDs.contains(stringID)) {
505         final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
506         if (hits.totalHits != 1) {
507           System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
508           doFail = true;
509         }
510       }
511     }
512     assertFalse(doFail);
513     
514     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
515     r2.close();
516
517     writer.commit();
518     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
519
520     assertFalse(writer.anyNonBulkMerges);
521     writer.close(false);
522     _TestUtil.checkIndex(dir);
523     s.close();
524     dir.close();
525     _TestUtil.rmDir(tempDir);
526     docs.close();
527     if (VERBOSE) {
528       System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
529     }
530   }
531
532   private int runQuery(IndexSearcher s, Query q) throws Exception {
533     s.search(q, 10);
534     return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
535   }
536
537   private void smokeTestReader(IndexReader r) throws Exception {
538     IndexSearcher s = newSearcher(r);
539     runQuery(s, new TermQuery(new Term("body", "united")));
540     runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
541     PhraseQuery pq = new PhraseQuery();
542     pq.add(new Term("body", "united"));
543     pq.add(new Term("body", "states"));
544     runQuery(s, pq);
545     s.close();
546   }
547 }