--- /dev/null
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * MultiThreaded IndexWriter tests
+ */
+public class TestIndexWriterWithThreads extends LuceneTestCase {
+
+ // Used by test cases below
+ private class IndexerThread extends Thread {
+
+ boolean diskFull;
+ Throwable error;
+ AlreadyClosedException ace;
+ IndexWriter writer;
+ boolean noErrors;
+ volatile int addCount;
+
+ public IndexerThread(IndexWriter writer, boolean noErrors) {
+ this.writer = writer;
+ this.noErrors = noErrors;
+ }
+
+ @Override
+ public void run() {
+
+ final Document doc = new Document();
+ doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+ int idUpto = 0;
+ int fullCount = 0;
+ final long stopTime = System.currentTimeMillis() + 200;
+
+ do {
+ try {
+ writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
+ addCount++;
+ } catch (IOException ioe) {
+ if (VERBOSE) {
+ System.out.println("TEST: expected exc:");
+ ioe.printStackTrace(System.out);
+ }
+ //System.out.println(Thread.currentThread().getName() + ": hit exc");
+ //ioe.printStackTrace(System.out);
+ if (ioe.getMessage().startsWith("fake disk full at") ||
+ ioe.getMessage().equals("now failing on purpose")) {
+ diskFull = true;
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ if (fullCount++ >= 5)
+ break;
+ } else {
+ if (noErrors) {
+ System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
+ ioe.printStackTrace(System.out);
+ error = ioe;
+ }
+ break;
+ }
+ } catch (Throwable t) {
+ //t.printStackTrace(System.out);
+ if (noErrors) {
+ System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
+ t.printStackTrace(System.out);
+ error = t;
+ }
+ break;
+ }
+ } while(System.currentTimeMillis() < stopTime);
+ }
+ }
+
+ // LUCENE-1130: make sure immediate disk full on creating
+ // an IndexWriter (hit during DW.ThreadState.init()), with
+ // multiple threads, is OK:
+ public void testImmediateDiskFullWithThreads() throws Exception {
+
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<10;iter++) {
+ if (VERBOSE) {
+ System.out.println("\nTEST: iter=" + iter);
+ }
+ MockDirectoryWrapper dir = newDirectory();
+ IndexWriter writer = new IndexWriter(
+ dir,
+ newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
+ setMaxBufferedDocs(2).
+ setMergeScheduler(new ConcurrentMergeScheduler()).
+ setMergePolicy(newLogMergePolicy(4))
+ );
+ ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
+ dir.setMaxSizeInBytes(4*1024+20*iter);
+ writer.setInfoStream(VERBOSE ? System.out : null);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, true);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ for(int i=0;i<NUM_THREADS;i++) {
+ // Without fix for LUCENE-1130: one of the
+ // threads will hang
+ threads[i].join();
+ assertTrue("hit unexpected Throwable", threads[i].error == null);
+ }
+
+ // Make sure once disk space is avail again, we can
+ // cleanly close:
+ dir.setMaxSizeInBytes(0);
+ writer.close(false);
+ dir.close();
+ }
+ }
+
+
+ // LUCENE-1130: make sure we can close() even while
+ // threads are trying to add documents. Strictly
+ // speaking, this isn't valid us of Lucene's APIs, but we
+ // still want to be robust to this case:
+ public void testCloseWithThreads() throws Exception {
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<7;iter++) {
+ Directory dir = newDirectory();
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
+ .setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4));
+ // We expect AlreadyClosedException
+ ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
+ IndexWriter writer = new IndexWriter(dir, conf);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, false);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ boolean done = false;
+ while(!done) {
+ Thread.sleep(100);
+ for(int i=0;i<NUM_THREADS;i++)
+ // only stop when at least one thread has added a doc
+ if (threads[i].addCount > 0) {
+ done = true;
+ break;
+ } else if (!threads[i].isAlive()) {
+ fail("thread failed before indexing a single document");
+ }
+ }
+
+ writer.close(false);
+
+ // Make sure threads that are adding docs are not hung:
+ for(int i=0;i<NUM_THREADS;i++) {
+ // Without fix for LUCENE-1130: one of the
+ // threads will hang
+ threads[i].join();
+ if (threads[i].isAlive())
+ fail("thread seems to be hung");
+ }
+
+ // Quick test to make sure index is not corrupt:
+ IndexReader reader = IndexReader.open(dir, true);
+ TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
+ int count = 0;
+ while(tdocs.next()) {
+ count++;
+ }
+ assertTrue(count > 0);
+ reader.close();
+
+ dir.close();
+ }
+ }
+
+ // Runs test, with multiple threads, using the specific
+ // failure to trigger an IOException
+ public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception {
+
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<2;iter++) {
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter);
+ }
+ MockDirectoryWrapper dir = newDirectory();
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer(random)).setMaxBufferedDocs(2)
+ .setMergeScheduler(new ConcurrentMergeScheduler())
+ .setMergePolicy(newLogMergePolicy(4));
+ // We expect disk full exceptions in the merge threads
+ ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
+ IndexWriter writer = new IndexWriter(dir, conf);
+ writer.setInfoStream(VERBOSE ? System.out : null);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, true);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ Thread.sleep(10);
+
+ dir.failOn(failure);
+ failure.setDoFail();
+
+ for(int i=0;i<NUM_THREADS;i++) {
+ threads[i].join();
+ assertTrue("hit unexpected Throwable", threads[i].error == null);
+ }
+
+ boolean success = false;
+ try {
+ writer.close(false);
+ success = true;
+ } catch (IOException ioe) {
+ failure.clearDoFail();
+ writer.close(false);
+ }
+
+ if (success) {
+ IndexReader reader = IndexReader.open(dir, true);
+ for(int j=0;j<reader.maxDoc();j++) {
+ if (!reader.isDeleted(j)) {
+ reader.document(j);
+ reader.getTermFreqVectors(j);
+ }
+ }
+ reader.close();
+ }
+
+ dir.close();
+ }
+ }
+
+ // Runs test, with one thread, using the specific failure
+ // to trigger an IOException
+ public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException {
+ MockDirectoryWrapper dir = newDirectory();
+
+ IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))
+ .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()));
+ final Document doc = new Document();
+ doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+ for(int i=0;i<6;i++)
+ writer.addDocument(doc);
+
+ dir.failOn(failure);
+ failure.setDoFail();
+ try {
+ writer.addDocument(doc);
+ writer.addDocument(doc);
+ writer.commit();
+ fail("did not hit exception");
+ } catch (IOException ioe) {
+ }
+ failure.clearDoFail();
+ writer.addDocument(doc);
+ writer.close(false);
+ dir.close();
+ }
+
+ // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
+ private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure {
+ private boolean onlyOnce;
+ public FailOnlyOnAbortOrFlush(boolean onlyOnce) {
+ this.onlyOnce = onlyOnce;
+ }
+ @Override
+ public void eval(MockDirectoryWrapper dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ boolean sawAbortOrFlushDoc = false;
+ boolean sawClose = false;
+ for (int i = 0; i < trace.length; i++) {
+ if ("abort".equals(trace[i].getMethodName()) ||
+ "flushDocument".equals(trace[i].getMethodName())) {
+ sawAbortOrFlushDoc = true;
+ }
+ if ("close".equals(trace[i].getMethodName())) {
+ sawClose = true;
+ }
+ }
+ if (sawAbortOrFlushDoc && !sawClose) {
+ if (onlyOnce)
+ doFail = false;
+ //System.out.println(Thread.currentThread().getName() + ": now fail");
+ //new Throwable().printStackTrace(System.out);
+ throw new IOException("now failing on purpose");
+ }
+ }
+ }
+ }
+
+
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during rollback(), is OK:
+ public void testIOExceptionDuringAbort() throws IOException {
+ _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false));
+ }
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during rollback(), is OK:
+ public void testIOExceptionDuringAbortOnlyOnce() throws IOException {
+ _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true));
+ }
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during rollback(), with multiple threads, is OK:
+ public void testIOExceptionDuringAbortWithThreads() throws Exception {
+ _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false));
+ }
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during rollback(), with multiple threads, is OK:
+ public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception {
+ _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true));
+ }
+
+ // Throws IOException during DocumentsWriter.writeSegment
+ private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure {
+ private boolean onlyOnce;
+ public FailOnlyInWriteSegment(boolean onlyOnce) {
+ this.onlyOnce = onlyOnce;
+ }
+ @Override
+ public void eval(MockDirectoryWrapper dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
+ if (onlyOnce)
+ doFail = false;
+ throw new IOException("now failing on purpose");
+ }
+ }
+ }
+ }
+ }
+
+ // LUCENE-1130: test IOException in writeSegment
+ public void testIOExceptionDuringWriteSegment() throws IOException {
+ _testSingleThreadFailure(new FailOnlyInWriteSegment(false));
+ }
+
+ // LUCENE-1130: test IOException in writeSegment
+ public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException {
+ _testSingleThreadFailure(new FailOnlyInWriteSegment(true));
+ }
+
+ // LUCENE-1130: test IOException in writeSegment, with threads
+ public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception {
+ _testMultipleThreadsFailure(new FailOnlyInWriteSegment(false));
+ }
+
+ // LUCENE-1130: test IOException in writeSegment, with threads
+ public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception {
+ _testMultipleThreadsFailure(new FailOnlyInWriteSegment(true));
+ }
+
+ // LUCENE-3365: Test adding two documents with the same field from two different IndexWriters
+ // that we attempt to open at the same time. As long as the first IndexWriter completes
+ // and closes before the second IndexWriter time's out trying to get the Lock,
+ // we should see both documents
+ public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
+ final MockDirectoryWrapper dir = newDirectory();
+ CountDownLatch oneIWConstructed = new CountDownLatch(1);
+ DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
+ dir, oneIWConstructed);
+ DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
+ dir, oneIWConstructed);
+
+ thread1.start();
+ thread2.start();
+ oneIWConstructed.await();
+
+ thread1.startIndexing();
+ thread2.startIndexing();
+
+ thread1.join();
+ thread2.join();
+
+ assertFalse("Failed due to: " + thread1.failure, thread1.failed);
+ assertFalse("Failed due to: " + thread2.failure, thread2.failed);
+ // now verify that we have two documents in the index
+ IndexReader reader = IndexReader.open(dir, true);
+ assertEquals("IndexReader should have one document per thread running", 2,
+ reader.numDocs());
+
+ reader.close();
+ dir.close();
+ }
+
+ static class DelayedIndexAndCloseRunnable extends Thread {
+ private final Directory dir;
+ boolean failed = false;
+ Throwable failure = null;
+ private final CountDownLatch startIndexing = new CountDownLatch(1);
+ private CountDownLatch iwConstructed;
+
+ public DelayedIndexAndCloseRunnable(Directory dir,
+ CountDownLatch iwConstructed) {
+ this.dir = dir;
+ this.iwConstructed = iwConstructed;
+ }
+
+ public void startIndexing() {
+ this.startIndexing.countDown();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Document doc = new Document();
+ Field field = newField("field", "testData", Field.Store.YES,
+ Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
+ doc.add(field);
+ IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
+ TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+ iwConstructed.countDown();
+ startIndexing.await();
+ writer.addDocument(doc);
+ writer.close();
+ } catch (Throwable e) {
+ failed = true;
+ failure = e;
+ failure.printStackTrace(System.out);
+ return;
+ }
+ }
+ }
+}