pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / backwards / src / test / org / apache / lucene / index / TestIndexWriterWithThreads.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.util.concurrent.CountDownLatch;
22
23 import org.apache.lucene.analysis.MockAnalyzer;
24 import org.apache.lucene.document.Document;
25 import org.apache.lucene.document.Field;
26 import org.apache.lucene.store.AlreadyClosedException;
27 import org.apache.lucene.store.Directory;
28 import org.apache.lucene.store.MockDirectoryWrapper;
29 import org.apache.lucene.util.LuceneTestCase;
30 import org.apache.lucene.util.ThreadInterruptedException;
31
32 /**
33  * MultiThreaded IndexWriter tests
34  */
35 public class TestIndexWriterWithThreads extends LuceneTestCase {
36
37   // Used by test cases below
38   private class IndexerThread extends Thread {
39
40     boolean diskFull;
41     Throwable error;
42     AlreadyClosedException ace;
43     IndexWriter writer;
44     boolean noErrors;
45     volatile int addCount;
46
47     public IndexerThread(IndexWriter writer, boolean noErrors) {
48       this.writer = writer;
49       this.noErrors = noErrors;
50     }
51
52     @Override
53     public void run() {
54
55       final Document doc = new Document();
56       doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
57
58       int idUpto = 0;
59       int fullCount = 0;
60       final long stopTime = System.currentTimeMillis() + 200;
61
62       do {
63         try {
64           writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
65           addCount++;
66         } catch (IOException ioe) {
67           if (VERBOSE) {
68             System.out.println("TEST: expected exc:");
69             ioe.printStackTrace(System.out);
70           }
71           //System.out.println(Thread.currentThread().getName() + ": hit exc");
72           //ioe.printStackTrace(System.out);
73           if (ioe.getMessage().startsWith("fake disk full at") ||
74               ioe.getMessage().equals("now failing on purpose")) {
75             diskFull = true;
76             try {
77               Thread.sleep(1);
78             } catch (InterruptedException ie) {
79               throw new ThreadInterruptedException(ie);
80             }
81             if (fullCount++ >= 5)
82               break;
83           } else {
84             if (noErrors) {
85               System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
86               ioe.printStackTrace(System.out);
87               error = ioe;
88             }
89             break;
90           }
91         } catch (Throwable t) {
92           //t.printStackTrace(System.out);
93           if (noErrors) {
94             System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
95             t.printStackTrace(System.out);
96             error = t;
97           }
98           break;
99         }
100       } while(System.currentTimeMillis() < stopTime);
101     }
102   }
103
104   // LUCENE-1130: make sure immediate disk full on creating
105   // an IndexWriter (hit during DW.ThreadState.init()), with
106   // multiple threads, is OK:
107   public void testImmediateDiskFullWithThreads() throws Exception {
108
109     int NUM_THREADS = 3;
110
111     for(int iter=0;iter<10;iter++) {
112       if (VERBOSE) {
113         System.out.println("\nTEST: iter=" + iter);
114       }
115       MockDirectoryWrapper dir = newDirectory();
116       IndexWriter writer = new IndexWriter(
117           dir,
118           newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
119               setMaxBufferedDocs(2).
120               setMergeScheduler(new ConcurrentMergeScheduler()).
121               setMergePolicy(newLogMergePolicy(4))
122       );
123       ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
124       dir.setMaxSizeInBytes(4*1024+20*iter);
125       writer.setInfoStream(VERBOSE ? System.out : null);
126
127       IndexerThread[] threads = new IndexerThread[NUM_THREADS];
128
129       for(int i=0;i<NUM_THREADS;i++)
130         threads[i] = new IndexerThread(writer, true);
131
132       for(int i=0;i<NUM_THREADS;i++)
133         threads[i].start();
134
135       for(int i=0;i<NUM_THREADS;i++) {
136         // Without fix for LUCENE-1130: one of the
137         // threads will hang
138         threads[i].join();
139         assertTrue("hit unexpected Throwable", threads[i].error == null);
140       }
141
142       // Make sure once disk space is avail again, we can
143       // cleanly close:
144       dir.setMaxSizeInBytes(0);
145       writer.close(false);
146       dir.close();
147     }
148   }
149   
150
151   // LUCENE-1130: make sure we can close() even while
152   // threads are trying to add documents.  Strictly
153   // speaking, this isn't valid us of Lucene's APIs, but we
154   // still want to be robust to this case:
155   public void testCloseWithThreads() throws Exception {
156     int NUM_THREADS = 3;
157
158     for(int iter=0;iter<7;iter++) {
159       Directory dir = newDirectory();
160       IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
161         .setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4));
162       // We expect AlreadyClosedException
163       ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
164       IndexWriter writer = new IndexWriter(dir, conf);
165
166       IndexerThread[] threads = new IndexerThread[NUM_THREADS];
167
168       for(int i=0;i<NUM_THREADS;i++)
169         threads[i] = new IndexerThread(writer, false);
170
171       for(int i=0;i<NUM_THREADS;i++)
172         threads[i].start();
173
174       boolean done = false;
175       while(!done) {
176         Thread.sleep(100);
177         for(int i=0;i<NUM_THREADS;i++)
178           // only stop when at least one thread has added a doc
179           if (threads[i].addCount > 0) {
180             done = true;
181             break;
182           } else if (!threads[i].isAlive()) {
183             fail("thread failed before indexing a single document");
184           }
185       }
186
187       writer.close(false);
188
189       // Make sure threads that are adding docs are not hung:
190       for(int i=0;i<NUM_THREADS;i++) {
191         // Without fix for LUCENE-1130: one of the
192         // threads will hang
193         threads[i].join();
194         if (threads[i].isAlive())
195           fail("thread seems to be hung");
196       }
197
198       // Quick test to make sure index is not corrupt:
199       IndexReader reader = IndexReader.open(dir, true);
200       TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
201       int count = 0;
202       while(tdocs.next()) {
203         count++;
204       }
205       assertTrue(count > 0);
206       reader.close();
207       
208       dir.close();
209     }
210   }
211
212   // Runs test, with multiple threads, using the specific
213   // failure to trigger an IOException
214   public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception {
215
216     int NUM_THREADS = 3;
217
218     for(int iter=0;iter<2;iter++) {
219       if (VERBOSE) {
220         System.out.println("TEST: iter=" + iter);
221       }
222       MockDirectoryWrapper dir = newDirectory();
223       IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
224           new MockAnalyzer(random)).setMaxBufferedDocs(2)
225           .setMergeScheduler(new ConcurrentMergeScheduler())
226           .setMergePolicy(newLogMergePolicy(4));
227       // We expect disk full exceptions in the merge threads
228       ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
229       IndexWriter writer = new IndexWriter(dir, conf);
230       writer.setInfoStream(VERBOSE ? System.out : null);
231       
232       IndexerThread[] threads = new IndexerThread[NUM_THREADS];
233
234       for(int i=0;i<NUM_THREADS;i++)
235         threads[i] = new IndexerThread(writer, true);
236
237       for(int i=0;i<NUM_THREADS;i++)
238         threads[i].start();
239
240       Thread.sleep(10);
241
242       dir.failOn(failure);
243       failure.setDoFail();
244
245       for(int i=0;i<NUM_THREADS;i++) {
246         threads[i].join();
247         assertTrue("hit unexpected Throwable", threads[i].error == null);
248       }
249
250       boolean success = false;
251       try {
252         writer.close(false);
253         success = true;
254       } catch (IOException ioe) {
255         failure.clearDoFail();
256         writer.close(false);
257       }
258
259       if (success) {
260         IndexReader reader = IndexReader.open(dir, true);
261         for(int j=0;j<reader.maxDoc();j++) {
262           if (!reader.isDeleted(j)) {
263             reader.document(j);
264             reader.getTermFreqVectors(j);
265           }
266         }
267         reader.close();
268       }
269
270       dir.close();
271     }
272   }
273
274   // Runs test, with one thread, using the specific failure
275   // to trigger an IOException
276   public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException {
277     MockDirectoryWrapper dir = newDirectory();
278
279     IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))
280       .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()));
281     final Document doc = new Document();
282     doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
283
284     for(int i=0;i<6;i++)
285       writer.addDocument(doc);
286
287     dir.failOn(failure);
288     failure.setDoFail();
289     try {
290       writer.addDocument(doc);
291       writer.addDocument(doc);
292       writer.commit();
293       fail("did not hit exception");
294     } catch (IOException ioe) {
295     }
296     failure.clearDoFail();
297     writer.addDocument(doc);
298     writer.close(false);
299     dir.close();
300   }
301
302   // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
303   private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure {
304     private boolean onlyOnce;
305     public FailOnlyOnAbortOrFlush(boolean onlyOnce) {
306       this.onlyOnce = onlyOnce;
307     }
308     @Override
309     public void eval(MockDirectoryWrapper dir)  throws IOException {
310       if (doFail) {
311         StackTraceElement[] trace = new Exception().getStackTrace();
312         boolean sawAbortOrFlushDoc = false;
313         boolean sawClose = false;
314         for (int i = 0; i < trace.length; i++) {
315           if ("abort".equals(trace[i].getMethodName()) ||
316               "flushDocument".equals(trace[i].getMethodName())) {
317             sawAbortOrFlushDoc = true;
318           }
319           if ("close".equals(trace[i].getMethodName())) {
320             sawClose = true;
321           }
322         }
323         if (sawAbortOrFlushDoc && !sawClose) {
324           if (onlyOnce)
325             doFail = false;
326           //System.out.println(Thread.currentThread().getName() + ": now fail");
327           //new Throwable().printStackTrace(System.out);
328           throw new IOException("now failing on purpose");
329         }
330       }
331     }
332   }
333
334
335
336   // LUCENE-1130: make sure initial IOException, and then 2nd
337   // IOException during rollback(), is OK:
338   public void testIOExceptionDuringAbort() throws IOException {
339     _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false));
340   }
341
342   // LUCENE-1130: make sure initial IOException, and then 2nd
343   // IOException during rollback(), is OK:
344   public void testIOExceptionDuringAbortOnlyOnce() throws IOException {
345     _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true));
346   }
347
348   // LUCENE-1130: make sure initial IOException, and then 2nd
349   // IOException during rollback(), with multiple threads, is OK:
350   public void testIOExceptionDuringAbortWithThreads() throws Exception {
351     _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false));
352   }
353
354   // LUCENE-1130: make sure initial IOException, and then 2nd
355   // IOException during rollback(), with multiple threads, is OK:
356   public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception {
357     _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true));
358   }
359
360   // Throws IOException during DocumentsWriter.writeSegment
361   private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure {
362     private boolean onlyOnce;
363     public FailOnlyInWriteSegment(boolean onlyOnce) {
364       this.onlyOnce = onlyOnce;
365     }
366     @Override
367     public void eval(MockDirectoryWrapper dir)  throws IOException {
368       if (doFail) {
369         StackTraceElement[] trace = new Exception().getStackTrace();
370         for (int i = 0; i < trace.length; i++) {
371           if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
372             if (onlyOnce)
373               doFail = false;
374             throw new IOException("now failing on purpose");
375           }
376         }
377       }
378     }
379   }
380
381   // LUCENE-1130: test IOException in writeSegment
382   public void testIOExceptionDuringWriteSegment() throws IOException {
383     _testSingleThreadFailure(new FailOnlyInWriteSegment(false));
384   }
385
386   // LUCENE-1130: test IOException in writeSegment
387   public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException {
388     _testSingleThreadFailure(new FailOnlyInWriteSegment(true));
389   }
390
391   // LUCENE-1130: test IOException in writeSegment, with threads
392   public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception {
393     _testMultipleThreadsFailure(new FailOnlyInWriteSegment(false));
394   }
395
396   // LUCENE-1130: test IOException in writeSegment, with threads
397   public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception {
398     _testMultipleThreadsFailure(new FailOnlyInWriteSegment(true));
399   }
400   
401   //  LUCENE-3365: Test adding two documents with the same field from two different IndexWriters 
402   //  that we attempt to open at the same time.  As long as the first IndexWriter completes
403   //  and closes before the second IndexWriter time's out trying to get the Lock,
404   //  we should see both documents
405   public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
406      final MockDirectoryWrapper dir = newDirectory();
407      CountDownLatch oneIWConstructed = new CountDownLatch(1);
408      DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
409          dir, oneIWConstructed);
410      DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
411          dir, oneIWConstructed);
412
413      thread1.start();
414      thread2.start();
415      oneIWConstructed.await();
416
417      thread1.startIndexing();
418      thread2.startIndexing();
419
420      thread1.join();
421      thread2.join();
422      
423      assertFalse("Failed due to: " + thread1.failure, thread1.failed);
424      assertFalse("Failed due to: " + thread2.failure, thread2.failed);
425      // now verify that we have two documents in the index
426      IndexReader reader = IndexReader.open(dir, true);
427      assertEquals("IndexReader should have one document per thread running", 2,
428          reader.numDocs());
429      
430      reader.close();
431      dir.close();
432   }
433   
434    static class DelayedIndexAndCloseRunnable extends Thread {
435      private final Directory dir;
436      boolean failed = false;
437      Throwable failure = null;
438      private final CountDownLatch startIndexing = new CountDownLatch(1);
439      private CountDownLatch iwConstructed;
440
441      public DelayedIndexAndCloseRunnable(Directory dir,
442          CountDownLatch iwConstructed) {
443        this.dir = dir;
444        this.iwConstructed = iwConstructed;
445      }
446
447      public void startIndexing() {
448        this.startIndexing.countDown();
449      }
450
451      @Override
452      public void run() {
453        try {
454          Document doc = new Document();
455          Field field = newField("field", "testData", Field.Store.YES,
456              Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
457          doc.add(field);
458          IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
459              TEST_VERSION_CURRENT, new MockAnalyzer(random)));
460          iwConstructed.countDown();
461          startIndexing.await();
462          writer.addDocument(doc);
463          writer.close();
464        } catch (Throwable e) {
465          failed = true;
466          failure = e;
467          failure.printStackTrace(System.out);
468          return;
469        }
470      }
471    }
472 }