pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / backwards / src / test / org / apache / lucene / index / TestIndexWriterWithThreads.java
diff --git a/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
new file mode 100644 (file)
index 0000000..885d0b9
--- /dev/null
@@ -0,0 +1,472 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * MultiThreaded IndexWriter tests
+ */
+public class TestIndexWriterWithThreads extends LuceneTestCase {
+
+  // Used by test cases below
+  private class IndexerThread extends Thread {
+
+    boolean diskFull;
+    Throwable error;
+    AlreadyClosedException ace;
+    IndexWriter writer;
+    boolean noErrors;
+    volatile int addCount;
+
+    public IndexerThread(IndexWriter writer, boolean noErrors) {
+      this.writer = writer;
+      this.noErrors = noErrors;
+    }
+
+    @Override
+    public void run() {
+
+      final Document doc = new Document();
+      doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+      int idUpto = 0;
+      int fullCount = 0;
+      final long stopTime = System.currentTimeMillis() + 200;
+
+      do {
+        try {
+          writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
+          addCount++;
+        } catch (IOException ioe) {
+          if (VERBOSE) {
+            System.out.println("TEST: expected exc:");
+            ioe.printStackTrace(System.out);
+          }
+          //System.out.println(Thread.currentThread().getName() + ": hit exc");
+          //ioe.printStackTrace(System.out);
+          if (ioe.getMessage().startsWith("fake disk full at") ||
+              ioe.getMessage().equals("now failing on purpose")) {
+            diskFull = true;
+            try {
+              Thread.sleep(1);
+            } catch (InterruptedException ie) {
+              throw new ThreadInterruptedException(ie);
+            }
+            if (fullCount++ >= 5)
+              break;
+          } else {
+            if (noErrors) {
+              System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
+              ioe.printStackTrace(System.out);
+              error = ioe;
+            }
+            break;
+          }
+        } catch (Throwable t) {
+          //t.printStackTrace(System.out);
+          if (noErrors) {
+            System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
+            t.printStackTrace(System.out);
+            error = t;
+          }
+          break;
+        }
+      } while(System.currentTimeMillis() < stopTime);
+    }
+  }
+
+  // LUCENE-1130: make sure immediate disk full on creating
+  // an IndexWriter (hit during DW.ThreadState.init()), with
+  // multiple threads, is OK:
+  public void testImmediateDiskFullWithThreads() throws Exception {
+
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<10;iter++) {
+      if (VERBOSE) {
+        System.out.println("\nTEST: iter=" + iter);
+      }
+      MockDirectoryWrapper dir = newDirectory();
+      IndexWriter writer = new IndexWriter(
+          dir,
+          newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
+              setMaxBufferedDocs(2).
+              setMergeScheduler(new ConcurrentMergeScheduler()).
+              setMergePolicy(newLogMergePolicy(4))
+      );
+      ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
+      dir.setMaxSizeInBytes(4*1024+20*iter);
+      writer.setInfoStream(VERBOSE ? System.out : null);
+
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, true);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      for(int i=0;i<NUM_THREADS;i++) {
+        // Without fix for LUCENE-1130: one of the
+        // threads will hang
+        threads[i].join();
+        assertTrue("hit unexpected Throwable", threads[i].error == null);
+      }
+
+      // Make sure once disk space is avail again, we can
+      // cleanly close:
+      dir.setMaxSizeInBytes(0);
+      writer.close(false);
+      dir.close();
+    }
+  }
+  
+
+  // LUCENE-1130: make sure we can close() even while
+  // threads are trying to add documents.  Strictly
+  // speaking, this isn't valid us of Lucene's APIs, but we
+  // still want to be robust to this case:
+  public void testCloseWithThreads() throws Exception {
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<7;iter++) {
+      Directory dir = newDirectory();
+      IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
+        .setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4));
+      // We expect AlreadyClosedException
+      ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
+      IndexWriter writer = new IndexWriter(dir, conf);
+
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, false);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      boolean done = false;
+      while(!done) {
+        Thread.sleep(100);
+        for(int i=0;i<NUM_THREADS;i++)
+          // only stop when at least one thread has added a doc
+          if (threads[i].addCount > 0) {
+            done = true;
+            break;
+          } else if (!threads[i].isAlive()) {
+            fail("thread failed before indexing a single document");
+          }
+      }
+
+      writer.close(false);
+
+      // Make sure threads that are adding docs are not hung:
+      for(int i=0;i<NUM_THREADS;i++) {
+        // Without fix for LUCENE-1130: one of the
+        // threads will hang
+        threads[i].join();
+        if (threads[i].isAlive())
+          fail("thread seems to be hung");
+      }
+
+      // Quick test to make sure index is not corrupt:
+      IndexReader reader = IndexReader.open(dir, true);
+      TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
+      int count = 0;
+      while(tdocs.next()) {
+        count++;
+      }
+      assertTrue(count > 0);
+      reader.close();
+      
+      dir.close();
+    }
+  }
+
+  // Runs test, with multiple threads, using the specific
+  // failure to trigger an IOException
+  public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception {
+
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<2;iter++) {
+      if (VERBOSE) {
+        System.out.println("TEST: iter=" + iter);
+      }
+      MockDirectoryWrapper dir = newDirectory();
+      IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
+          new MockAnalyzer(random)).setMaxBufferedDocs(2)
+          .setMergeScheduler(new ConcurrentMergeScheduler())
+          .setMergePolicy(newLogMergePolicy(4));
+      // We expect disk full exceptions in the merge threads
+      ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
+      IndexWriter writer = new IndexWriter(dir, conf);
+      writer.setInfoStream(VERBOSE ? System.out : null);
+      
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, true);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      Thread.sleep(10);
+
+      dir.failOn(failure);
+      failure.setDoFail();
+
+      for(int i=0;i<NUM_THREADS;i++) {
+        threads[i].join();
+        assertTrue("hit unexpected Throwable", threads[i].error == null);
+      }
+
+      boolean success = false;
+      try {
+        writer.close(false);
+        success = true;
+      } catch (IOException ioe) {
+        failure.clearDoFail();
+        writer.close(false);
+      }
+
+      if (success) {
+        IndexReader reader = IndexReader.open(dir, true);
+        for(int j=0;j<reader.maxDoc();j++) {
+          if (!reader.isDeleted(j)) {
+            reader.document(j);
+            reader.getTermFreqVectors(j);
+          }
+        }
+        reader.close();
+      }
+
+      dir.close();
+    }
+  }
+
+  // Runs test, with one thread, using the specific failure
+  // to trigger an IOException
+  public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException {
+    MockDirectoryWrapper dir = newDirectory();
+
+    IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))
+      .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()));
+    final Document doc = new Document();
+    doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+    for(int i=0;i<6;i++)
+      writer.addDocument(doc);
+
+    dir.failOn(failure);
+    failure.setDoFail();
+    try {
+      writer.addDocument(doc);
+      writer.addDocument(doc);
+      writer.commit();
+      fail("did not hit exception");
+    } catch (IOException ioe) {
+    }
+    failure.clearDoFail();
+    writer.addDocument(doc);
+    writer.close(false);
+    dir.close();
+  }
+
+  // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
+  private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure {
+    private boolean onlyOnce;
+    public FailOnlyOnAbortOrFlush(boolean onlyOnce) {
+      this.onlyOnce = onlyOnce;
+    }
+    @Override
+    public void eval(MockDirectoryWrapper dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        boolean sawAbortOrFlushDoc = false;
+        boolean sawClose = false;
+        for (int i = 0; i < trace.length; i++) {
+          if ("abort".equals(trace[i].getMethodName()) ||
+              "flushDocument".equals(trace[i].getMethodName())) {
+            sawAbortOrFlushDoc = true;
+          }
+          if ("close".equals(trace[i].getMethodName())) {
+            sawClose = true;
+          }
+        }
+        if (sawAbortOrFlushDoc && !sawClose) {
+          if (onlyOnce)
+            doFail = false;
+          //System.out.println(Thread.currentThread().getName() + ": now fail");
+          //new Throwable().printStackTrace(System.out);
+          throw new IOException("now failing on purpose");
+        }
+      }
+    }
+  }
+
+
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during rollback(), is OK:
+  public void testIOExceptionDuringAbort() throws IOException {
+    _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false));
+  }
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during rollback(), is OK:
+  public void testIOExceptionDuringAbortOnlyOnce() throws IOException {
+    _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true));
+  }
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during rollback(), with multiple threads, is OK:
+  public void testIOExceptionDuringAbortWithThreads() throws Exception {
+    _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false));
+  }
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during rollback(), with multiple threads, is OK:
+  public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception {
+    _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true));
+  }
+
+  // Throws IOException during DocumentsWriter.writeSegment
+  private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure {
+    private boolean onlyOnce;
+    public FailOnlyInWriteSegment(boolean onlyOnce) {
+      this.onlyOnce = onlyOnce;
+    }
+    @Override
+    public void eval(MockDirectoryWrapper dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
+            if (onlyOnce)
+              doFail = false;
+            throw new IOException("now failing on purpose");
+          }
+        }
+      }
+    }
+  }
+
+  // LUCENE-1130: test IOException in writeSegment
+  public void testIOExceptionDuringWriteSegment() throws IOException {
+    _testSingleThreadFailure(new FailOnlyInWriteSegment(false));
+  }
+
+  // LUCENE-1130: test IOException in writeSegment
+  public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException {
+    _testSingleThreadFailure(new FailOnlyInWriteSegment(true));
+  }
+
+  // LUCENE-1130: test IOException in writeSegment, with threads
+  public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception {
+    _testMultipleThreadsFailure(new FailOnlyInWriteSegment(false));
+  }
+
+  // LUCENE-1130: test IOException in writeSegment, with threads
+  public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception {
+    _testMultipleThreadsFailure(new FailOnlyInWriteSegment(true));
+  }
+  
+  //  LUCENE-3365: Test adding two documents with the same field from two different IndexWriters 
+  //  that we attempt to open at the same time.  As long as the first IndexWriter completes
+  //  and closes before the second IndexWriter time's out trying to get the Lock,
+  //  we should see both documents
+  public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
+     final MockDirectoryWrapper dir = newDirectory();
+     CountDownLatch oneIWConstructed = new CountDownLatch(1);
+     DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
+         dir, oneIWConstructed);
+     DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
+         dir, oneIWConstructed);
+
+     thread1.start();
+     thread2.start();
+     oneIWConstructed.await();
+
+     thread1.startIndexing();
+     thread2.startIndexing();
+
+     thread1.join();
+     thread2.join();
+     
+     assertFalse("Failed due to: " + thread1.failure, thread1.failed);
+     assertFalse("Failed due to: " + thread2.failure, thread2.failed);
+     // now verify that we have two documents in the index
+     IndexReader reader = IndexReader.open(dir, true);
+     assertEquals("IndexReader should have one document per thread running", 2,
+         reader.numDocs());
+     
+     reader.close();
+     dir.close();
+  }
+  
+   static class DelayedIndexAndCloseRunnable extends Thread {
+     private final Directory dir;
+     boolean failed = false;
+     Throwable failure = null;
+     private final CountDownLatch startIndexing = new CountDownLatch(1);
+     private CountDownLatch iwConstructed;
+
+     public DelayedIndexAndCloseRunnable(Directory dir,
+         CountDownLatch iwConstructed) {
+       this.dir = dir;
+       this.iwConstructed = iwConstructed;
+     }
+
+     public void startIndexing() {
+       this.startIndexing.countDown();
+     }
+
+     @Override
+     public void run() {
+       try {
+         Document doc = new Document();
+         Field field = newField("field", "testData", Field.Store.YES,
+             Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
+         doc.add(field);
+         IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
+             TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+         iwConstructed.countDown();
+         startIndexing.await();
+         writer.addDocument(doc);
+         writer.close();
+       } catch (Throwable e) {
+         failed = true;
+         failure = e;
+         failure.printStackTrace(System.out);
+         return;
+       }
+     }
+   }
+}