add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / backwards / src / test / org / apache / lucene / index / TestNRTThreads.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.lucene.analysis.MockAnalyzer;
34 import org.apache.lucene.document.Document;
35 import org.apache.lucene.document.Field;
36 import org.apache.lucene.document.Fieldable;
37 import org.apache.lucene.search.IndexSearcher;
38 import org.apache.lucene.search.PhraseQuery;
39 import org.apache.lucene.search.Query;
40 import org.apache.lucene.search.ScoreDoc;
41 import org.apache.lucene.search.Sort;
42 import org.apache.lucene.search.SortField;
43 import org.apache.lucene.search.TermQuery;
44 import org.apache.lucene.search.TopDocs;
45 import org.apache.lucene.store.MockDirectoryWrapper;
46 import org.apache.lucene.util.LineFileDocs;
47 import org.apache.lucene.util.LuceneTestCase;
48 import org.apache.lucene.util._TestUtil;
49 import org.junit.Test;
50
51 // TODO
52 //   - mix in optimize, addIndexes
53 //   - randomoly mix in non-congruent docs
54
55 public class TestNRTThreads extends LuceneTestCase {
56
57   private static class SubDocs {
58     public final String packID;
59     public final List<String> subIDs;
60     public boolean deleted;
61
62     public SubDocs(String packID, List<String> subIDs) {
63       this.packID = packID;
64       this.subIDs = subIDs;
65     }
66   }
67
68   // TODO: is there a pre-existing way to do this!!!
69   private Document cloneDoc(Document doc1) {
70     final Document doc2 = new Document();
71     for(Fieldable f : doc1.getFields()) {
72       Field field1 = (Field) f;
73       
74       Field field2 = new Field(field1.name(),
75                                field1.stringValue(),
76                                field1.isStored() ? Field.Store.YES : Field.Store.NO,
77                                field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
78       if (field1.getOmitNorms()) {
79         field2.setOmitNorms(true);
80       }
81       if (field1.getOmitTermFreqAndPositions()) {
82         field2.setOmitTermFreqAndPositions(true);
83       }
84       doc2.add(field2);
85     }
86
87     return doc2;
88   }
89
90   @Test
91   public void testNRTThreads() throws Exception {
92
93     final long t0 = System.currentTimeMillis();
94
95     final LineFileDocs docs = new LineFileDocs(random);
96     final File tempDir = _TestUtil.getTempDir("nrtopenfiles");
97     final MockDirectoryWrapper dir = newFSDirectory(tempDir);
98     dir.setCheckIndexOnClose(false); // don't double-checkIndex, we do it ourselves.
99     final IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
100     conf.setMergedSegmentWarmer(new IndexWriter.IndexReaderWarmer() {
101       @Override
102       public void warm(IndexReader reader) throws IOException {
103         if (VERBOSE) {
104           System.out.println("TEST: now warm merged reader=" + reader);
105         }
106         final int maxDoc = reader.maxDoc();
107         int sum = 0;
108         final int inc = Math.max(1, maxDoc/50);
109         for(int docID=0;docID<maxDoc;docID += inc) {
110           if (reader.isDeleted(docID)) {
111             final Document doc = reader.document(docID);
112             sum += doc.getFields().size();
113           }
114         }
115
116         IndexSearcher searcher = newSearcher(reader);
117         sum += searcher.search(new TermQuery(new Term("body", "united")), 10).totalHits;
118         searcher.close();
119
120         if (VERBOSE) {
121           System.out.println("TEST: warm visited " + sum + " fields");
122         }
123       }
124       });
125     
126     final IndexWriter writer = new IndexWriter(dir, conf);
127     if (VERBOSE) {
128       writer.setInfoStream(System.out);
129     }
130     _TestUtil.reduceOpenFiles(writer);
131
132     final int NUM_INDEX_THREADS = 2;
133     final int NUM_SEARCH_THREADS = 3;
134
135     final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : RANDOM_MULTIPLIER;
136
137     final AtomicBoolean failed = new AtomicBoolean();
138     final AtomicInteger addCount = new AtomicInteger();
139     final AtomicInteger delCount = new AtomicInteger();
140     final AtomicInteger packCount = new AtomicInteger();
141
142     final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
143     final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
144
145     final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
146     Thread[] threads = new Thread[NUM_INDEX_THREADS];
147     for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
148       threads[thread] = new Thread() {
149           @Override
150           public void run() {
151             // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
152             final List<String> toDeleteIDs = new ArrayList<String>();
153             final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
154             while(System.currentTimeMillis() < stopTime && !failed.get()) {
155               try {
156                 Document doc = docs.nextDoc();
157                 if (doc == null) {
158                   break;
159                 }
160                 final String addedField;
161                 if (random.nextBoolean()) {
162                   addedField = "extra" + random.nextInt(10);
163                   doc.add(new Field(addedField, "a random field", Field.Store.NO, Field.Index.ANALYZED));
164                 } else {
165                   addedField = null;
166                 }
167                 if (random.nextBoolean()) {
168                   if (VERBOSE) {
169                     System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
170                   }
171
172                   if (random.nextBoolean()) {
173                     // Add a pack of adjacent sub-docs
174                     final String packID;
175                     final SubDocs delSubDocs;
176                     if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
177                       delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
178                       assert !delSubDocs.deleted;
179                       toDeleteSubDocs.remove(delSubDocs);
180                       // reuse prior packID
181                       packID = delSubDocs.packID;
182                     } else {
183                       delSubDocs = null;
184                       // make new packID
185                       packID = packCount.getAndIncrement() + "";
186                     }
187
188                     final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
189                     final List<String> docIDs = new ArrayList<String>();
190                     final SubDocs subDocs = new SubDocs(packID, docIDs);
191                     final List<Document> docsList = new ArrayList<Document>();
192
193                     allSubDocs.add(subDocs);
194                     doc.add(packIDField);
195                     docsList.add(cloneDoc(doc));
196                     docIDs.add(doc.get("docid"));
197
198                     final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
199                     while(docsList.size() < maxDocCount) {
200                       doc = docs.nextDoc();
201                       if (doc == null) {
202                         break;
203                       }
204                       docsList.add(cloneDoc(doc));
205                       docIDs.add(doc.get("docid"));
206                     }
207                     addCount.addAndGet(docsList.size());
208
209                     if (delSubDocs != null) {
210                       delSubDocs.deleted = true;
211                       delIDs.addAll(delSubDocs.subIDs);
212                       delCount.addAndGet(delSubDocs.subIDs.size());
213                       if (VERBOSE) {
214                         System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
215                       }
216                       writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
217                       /*
218                       // non-atomic:
219                       writer.deleteDocuments(new Term("packID", delSubDocs.packID));
220                       for(Document subDoc : docsList) {
221                         writer.addDocument(subDoc);
222                       }
223                       */
224                     } else {
225                       if (VERBOSE) {
226                         System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
227                       }
228                       writer.addDocuments(docsList);
229                       
230                       /*
231                       // non-atomic:
232                       for(Document subDoc : docsList) {
233                         writer.addDocument(subDoc);
234                       }
235                       */
236                     }
237                     doc.removeField("packID");
238
239                     if (random.nextInt(5) == 2) {
240                       if (VERBOSE) {
241                         //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
242                       }
243                       toDeleteSubDocs.add(subDocs);
244                     }
245
246                   } else {
247                     writer.addDocument(doc);
248                     addCount.getAndIncrement();
249
250                     if (random.nextInt(5) == 3) {
251                       if (VERBOSE) {
252                         //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
253                       }
254                       toDeleteIDs.add(doc.get("docid"));
255                     }
256                   }
257                 } else {
258                   // we use update but it never replaces a
259                   // prior doc
260                   if (VERBOSE) {
261                     System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
262                   }
263                   writer.updateDocument(new Term("docid", doc.get("docid")), doc);
264                   addCount.getAndIncrement();
265
266                   if (random.nextInt(5) == 3) {
267                     if (VERBOSE) {
268                       //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
269                     }
270                     toDeleteIDs.add(doc.get("docid"));
271                   }
272                 }
273
274                 if (random.nextInt(30) == 17) {
275                   if (VERBOSE) {
276                     System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
277                   }
278                   for(String id : toDeleteIDs) {
279                     if (VERBOSE) {
280                       System.out.println(Thread.currentThread().getName() + ": del term=id:" + id);
281                     }
282                     writer.deleteDocuments(new Term("docid", id));
283                   }
284                   final int count = delCount.addAndGet(toDeleteIDs.size());
285                   if (VERBOSE) {
286                     System.out.println(Thread.currentThread().getName() + ": tot " + count + " deletes");
287                   }
288                   delIDs.addAll(toDeleteIDs);
289                   toDeleteIDs.clear();
290
291                   for(SubDocs subDocs : toDeleteSubDocs) {
292                     assert !subDocs.deleted;
293                     writer.deleteDocuments(new Term("packID", subDocs.packID));
294                     subDocs.deleted = true;
295                     if (VERBOSE) {
296                       System.out.println("  del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
297                     }
298                     delIDs.addAll(subDocs.subIDs);
299                     delCount.addAndGet(subDocs.subIDs.size());
300                   }
301                   toDeleteSubDocs.clear();
302                 }
303                 if (addedField != null) {
304                   doc.removeField(addedField);
305                 }
306               } catch (Throwable t) {
307                 System.out.println(Thread.currentThread().getName() + ": hit exc");
308                 t.printStackTrace();
309                 failed.set(true);
310                 throw new RuntimeException(t);
311               }
312             }
313             if (VERBOSE) {
314               System.out.println(Thread.currentThread().getName() + ": indexing done");
315             }
316           }
317         };
318       threads[thread].setDaemon(true);
319       threads[thread].start();
320     }
321
322     if (VERBOSE) {
323       System.out.println("TEST: DONE start indexing threads [" + (System.currentTimeMillis()-t0) + " ms]");
324     }
325
326     // let index build up a bit
327     Thread.sleep(100);
328
329     IndexReader r = IndexReader.open(writer, true);
330     boolean any = false;
331
332     // silly starting guess:
333     final AtomicInteger totTermCount = new AtomicInteger(100);
334
335     final ExecutorService es = Executors.newCachedThreadPool();
336
337     while(System.currentTimeMillis() < stopTime && !failed.get()) {
338       if (random.nextBoolean()) {
339         if (VERBOSE) {
340           System.out.println("TEST: now reopen r=" + r);
341         }
342         final IndexReader r2 = r.reopen();
343         if (r != r2) {
344           r.close();
345           r = r2;
346         }
347       } else {
348         if (VERBOSE) {
349           System.out.println("TEST: now close reader=" + r);
350         }
351         r.close();
352         writer.commit();
353         final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
354         if (openDeletedFiles.size() > 0) {
355           System.out.println("OBD files: " + openDeletedFiles);
356         }
357         any |= openDeletedFiles.size() > 0;
358         //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
359         if (VERBOSE) {
360           System.out.println("TEST: now open");
361         }
362         r = IndexReader.open(writer, true);
363       }
364       if (VERBOSE) {
365         System.out.println("TEST: got new reader=" + r);
366       }
367       //System.out.println("numDocs=" + r.numDocs() + "
368       //openDelFileCount=" + dir.openDeleteFileCount());
369
370       smokeTestReader(r);
371
372       if (r.numDocs() > 0) {
373
374         final IndexSearcher s = new IndexSearcher(r, es);
375
376         // run search threads
377         final long searchStopTime = System.currentTimeMillis() + 500;
378         final Thread[] searchThreads = new Thread[NUM_SEARCH_THREADS];
379         final AtomicInteger totHits = new AtomicInteger();
380         for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
381           searchThreads[thread] = new Thread() {
382               @Override
383               public void run() {
384                 try {
385                   TermEnum termEnum = s.getIndexReader().terms(new Term("body", ""));
386                   int seenTermCount = 0;
387                   int shift;
388                   int trigger;
389                   if (totTermCount.get() < 10) {
390                     shift = 0;
391                     trigger = 1;
392                   } else {
393                     trigger = totTermCount.get()/10;
394                     shift = random.nextInt(trigger);
395                   }
396                   while(System.currentTimeMillis() < searchStopTime) {
397                     Term term = termEnum.term();
398                     if (term == null) {
399                       if (seenTermCount < 10) {
400                         break;
401                       }
402                       totTermCount.set(seenTermCount);
403                       seenTermCount = 0;
404                       trigger = totTermCount.get()/10;
405                       //System.out.println("trigger " + trigger);
406                       shift = random.nextInt(trigger);
407                       termEnum = s.getIndexReader().terms(new Term("body", ""));
408                       continue;
409                     }
410                     seenTermCount++;
411                     // search 10 terms
412                     if (trigger == 0) {
413                       trigger = 1;
414                     }
415                     if ((seenTermCount + shift) % trigger == 0) {
416                       //if (VERBOSE) {
417                       //System.out.println(Thread.currentThread().getName() + " now search body:" + term.utf8ToString());
418                       //}
419                       totHits.addAndGet(runQuery(s, new TermQuery(term)));
420                     }
421                     termEnum.next();
422                   }
423                   if (VERBOSE) {
424                     System.out.println(Thread.currentThread().getName() + ": search done");
425                   }
426                 } catch (Throwable t) {
427                   System.out.println(Thread.currentThread().getName() + ": hit exc");
428                   failed.set(true);
429                   t.printStackTrace(System.out);
430                   throw new RuntimeException(t);
431                 }
432               }
433             };
434           searchThreads[thread].setDaemon(true);
435           searchThreads[thread].start();
436         }
437
438         for(int thread=0;thread<NUM_SEARCH_THREADS;thread++) {
439           searchThreads[thread].join();
440         }
441
442         if (VERBOSE) {
443           System.out.println("TEST: DONE search: totHits=" + totHits);
444         }
445       } else {
446         Thread.sleep(100);
447       }
448     }
449
450     es.shutdown();
451     es.awaitTermination(1, TimeUnit.SECONDS);
452
453     if (VERBOSE) {
454       System.out.println("TEST: all searching done [" + (System.currentTimeMillis()-t0) + " ms]");
455     }
456
457     //System.out.println("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
458     r.close();
459     final Set<String> openDeletedFiles = dir.getOpenDeletedFiles();
460     if (openDeletedFiles.size() > 0) {
461       System.out.println("OBD files: " + openDeletedFiles);
462     }
463     any |= openDeletedFiles.size() > 0;
464
465     assertFalse("saw non-zero open-but-deleted count", any);
466     if (VERBOSE) {
467       System.out.println("TEST: now join");
468     }
469     for(int thread=0;thread<NUM_INDEX_THREADS;thread++) {
470       threads[thread].join();
471     }
472     if (VERBOSE) {
473       System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
474     }
475
476     final IndexReader r2 = writer.getReader();
477     final IndexSearcher s = newSearcher(r2);
478     boolean doFail = false;
479     for(String id : delIDs) {
480       final TopDocs hits = s.search(new TermQuery(new Term("docid", id)), 1);
481       if (hits.totalHits != 0) {
482         System.out.println("doc id=" + id + " is supposed to be deleted, but got docID=" + hits.scoreDocs[0].doc);
483         doFail = true;
484       }
485     }
486
487     // Make sure each group of sub-docs are still in docID order:
488     for(SubDocs subDocs : allSubDocs) {
489       if (!subDocs.deleted) {
490         // We sort by relevance but the scores should be identical so sort falls back to by docID:
491         TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
492         assertEquals(subDocs.subIDs.size(), hits.totalHits);
493         int lastDocID = -1;
494         int startDocID = -1;
495         for(ScoreDoc scoreDoc : hits.scoreDocs) {
496           final int docID = scoreDoc.doc;
497           if (lastDocID != -1) {
498             assertEquals(1+lastDocID, docID);
499           } else {
500             startDocID = docID;
501           }
502           lastDocID = docID;
503           final Document doc = s.doc(docID);
504           assertEquals(subDocs.packID, doc.get("packID"));
505         }
506
507         lastDocID = startDocID - 1;
508         for(String subID : subDocs.subIDs) {
509           hits = s.search(new TermQuery(new Term("docid", subID)), 1);
510           assertEquals(1, hits.totalHits);
511           final int docID = hits.scoreDocs[0].doc;
512           if (lastDocID != -1) {
513             assertEquals(1+lastDocID, docID);
514           }
515           lastDocID = docID;
516         }          
517       } else {
518         for(String subID : subDocs.subIDs) {
519           assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
520         }
521       }
522     }
523     
524     final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
525     for(int id=0;id<endID;id++) {
526       String stringID = ""+id;
527       if (!delIDs.contains(stringID)) {
528         final TopDocs hits = s.search(new TermQuery(new Term("docid", stringID)), 1);
529         if (hits.totalHits != 1) {
530           System.out.println("doc id=" + stringID + " is not supposed to be deleted, but got hitCount=" + hits.totalHits);
531           doFail = true;
532         }
533       }
534     }
535     assertFalse(doFail);
536     
537     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), r2.numDocs());
538     r2.close();
539
540     writer.commit();
541     assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
542
543     assertFalse(writer.anyNonBulkMerges);
544     writer.close(false);
545     _TestUtil.checkIndex(dir);
546     s.close();
547     dir.close();
548     _TestUtil.rmDir(tempDir);
549     docs.close();
550     if (VERBOSE) {
551       System.out.println("TEST: done [" + (System.currentTimeMillis()-t0) + " ms]");
552     }
553   }
554
555   private int runQuery(IndexSearcher s, Query q) throws Exception {
556     s.search(q, 10);
557     return s.search(q, null, 10, new Sort(new SortField("title", SortField.STRING))).totalHits;
558   }
559
560   private void smokeTestReader(IndexReader r) throws Exception {
561     IndexSearcher s = newSearcher(r);
562     runQuery(s, new TermQuery(new Term("body", "united")));
563     runQuery(s, new TermQuery(new Term("titleTokenized", "states")));
564     PhraseQuery pq = new PhraseQuery();
565     pq.add(new Term("body", "united"));
566     pq.add(new Term("body", "states"));
567     runQuery(s, pq);
568     s.close();
569   }
570 }