1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.util.concurrent.CountDownLatch;
23 import org.apache.lucene.analysis.MockAnalyzer;
24 import org.apache.lucene.document.Document;
25 import org.apache.lucene.document.Field;
26 import org.apache.lucene.store.AlreadyClosedException;
27 import org.apache.lucene.store.Directory;
28 import org.apache.lucene.store.MockDirectoryWrapper;
29 import org.apache.lucene.util.LuceneTestCase;
30 import org.apache.lucene.util.ThreadInterruptedException;
33 * MultiThreaded IndexWriter tests
35 public class TestIndexWriterWithThreads extends LuceneTestCase {
37 // Used by test cases below
38 private class IndexerThread extends Thread {
42 AlreadyClosedException ace;
45 volatile int addCount;
47 public IndexerThread(IndexWriter writer, boolean noErrors) {
49 this.noErrors = noErrors;
55 final Document doc = new Document();
56 doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
60 final long stopTime = System.currentTimeMillis() + 200;
64 writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
66 } catch (IOException ioe) {
68 System.out.println("TEST: expected exc:");
69 ioe.printStackTrace(System.out);
71 //System.out.println(Thread.currentThread().getName() + ": hit exc");
72 //ioe.printStackTrace(System.out);
73 if (ioe.getMessage().startsWith("fake disk full at") ||
74 ioe.getMessage().equals("now failing on purpose")) {
78 } catch (InterruptedException ie) {
79 throw new ThreadInterruptedException(ie);
85 System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
86 ioe.printStackTrace(System.out);
91 } catch (Throwable t) {
92 //t.printStackTrace(System.out);
94 System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
95 t.printStackTrace(System.out);
100 } while(System.currentTimeMillis() < stopTime);
104 // LUCENE-1130: make sure immediate disk full on creating
105 // an IndexWriter (hit during DW.ThreadState.init()), with
106 // multiple threads, is OK:
107 public void testImmediateDiskFullWithThreads() throws Exception {
111 for(int iter=0;iter<10;iter++) {
113 System.out.println("\nTEST: iter=" + iter);
115 MockDirectoryWrapper dir = newDirectory();
116 IndexWriter writer = new IndexWriter(
118 newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
119 setMaxBufferedDocs(2).
120 setMergeScheduler(new ConcurrentMergeScheduler()).
121 setMergePolicy(newLogMergePolicy(4))
123 ((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
124 dir.setMaxSizeInBytes(4*1024+20*iter);
125 writer.setInfoStream(VERBOSE ? System.out : null);
127 IndexerThread[] threads = new IndexerThread[NUM_THREADS];
129 for(int i=0;i<NUM_THREADS;i++)
130 threads[i] = new IndexerThread(writer, true);
132 for(int i=0;i<NUM_THREADS;i++)
135 for(int i=0;i<NUM_THREADS;i++) {
136 // Without fix for LUCENE-1130: one of the
139 assertTrue("hit unexpected Throwable", threads[i].error == null);
142 // Make sure once disk space is avail again, we can
144 dir.setMaxSizeInBytes(0);
151 // LUCENE-1130: make sure we can close() even while
152 // threads are trying to add documents. Strictly
153 // speaking, this isn't valid us of Lucene's APIs, but we
154 // still want to be robust to this case:
155 public void testCloseWithThreads() throws Exception {
158 for(int iter=0;iter<7;iter++) {
159 Directory dir = newDirectory();
160 IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
161 .setMaxBufferedDocs(10).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy(4));
162 // We expect AlreadyClosedException
163 ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
164 IndexWriter writer = new IndexWriter(dir, conf);
166 IndexerThread[] threads = new IndexerThread[NUM_THREADS];
168 for(int i=0;i<NUM_THREADS;i++)
169 threads[i] = new IndexerThread(writer, false);
171 for(int i=0;i<NUM_THREADS;i++)
174 boolean done = false;
177 for(int i=0;i<NUM_THREADS;i++)
178 // only stop when at least one thread has added a doc
179 if (threads[i].addCount > 0) {
182 } else if (!threads[i].isAlive()) {
183 fail("thread failed before indexing a single document");
189 // Make sure threads that are adding docs are not hung:
190 for(int i=0;i<NUM_THREADS;i++) {
191 // Without fix for LUCENE-1130: one of the
194 if (threads[i].isAlive())
195 fail("thread seems to be hung");
198 // Quick test to make sure index is not corrupt:
199 IndexReader reader = IndexReader.open(dir, true);
200 TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
202 while(tdocs.next()) {
205 assertTrue(count > 0);
212 // Runs test, with multiple threads, using the specific
213 // failure to trigger an IOException
214 public void _testMultipleThreadsFailure(MockDirectoryWrapper.Failure failure) throws Exception {
218 for(int iter=0;iter<2;iter++) {
220 System.out.println("TEST: iter=" + iter);
222 MockDirectoryWrapper dir = newDirectory();
223 IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
224 new MockAnalyzer(random)).setMaxBufferedDocs(2)
225 .setMergeScheduler(new ConcurrentMergeScheduler())
226 .setMergePolicy(newLogMergePolicy(4));
227 // We expect disk full exceptions in the merge threads
228 ((ConcurrentMergeScheduler) conf.getMergeScheduler()).setSuppressExceptions();
229 IndexWriter writer = new IndexWriter(dir, conf);
230 writer.setInfoStream(VERBOSE ? System.out : null);
232 IndexerThread[] threads = new IndexerThread[NUM_THREADS];
234 for(int i=0;i<NUM_THREADS;i++)
235 threads[i] = new IndexerThread(writer, true);
237 for(int i=0;i<NUM_THREADS;i++)
245 for(int i=0;i<NUM_THREADS;i++) {
247 assertTrue("hit unexpected Throwable", threads[i].error == null);
250 boolean success = false;
254 } catch (IOException ioe) {
255 failure.clearDoFail();
260 IndexReader reader = IndexReader.open(dir, true);
261 for(int j=0;j<reader.maxDoc();j++) {
262 if (!reader.isDeleted(j)) {
264 reader.getTermFreqVectors(j);
274 // Runs test, with one thread, using the specific failure
275 // to trigger an IOException
276 public void _testSingleThreadFailure(MockDirectoryWrapper.Failure failure) throws IOException {
277 MockDirectoryWrapper dir = newDirectory();
279 IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random))
280 .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()));
281 final Document doc = new Document();
282 doc.add(newField("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
285 writer.addDocument(doc);
290 writer.addDocument(doc);
291 writer.addDocument(doc);
293 fail("did not hit exception");
294 } catch (IOException ioe) {
296 failure.clearDoFail();
297 writer.addDocument(doc);
302 // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
303 private static class FailOnlyOnAbortOrFlush extends MockDirectoryWrapper.Failure {
304 private boolean onlyOnce;
305 public FailOnlyOnAbortOrFlush(boolean onlyOnce) {
306 this.onlyOnce = onlyOnce;
309 public void eval(MockDirectoryWrapper dir) throws IOException {
311 StackTraceElement[] trace = new Exception().getStackTrace();
312 boolean sawAbortOrFlushDoc = false;
313 boolean sawClose = false;
314 for (int i = 0; i < trace.length; i++) {
315 if ("abort".equals(trace[i].getMethodName()) ||
316 "flushDocument".equals(trace[i].getMethodName())) {
317 sawAbortOrFlushDoc = true;
319 if ("close".equals(trace[i].getMethodName())) {
323 if (sawAbortOrFlushDoc && !sawClose) {
326 //System.out.println(Thread.currentThread().getName() + ": now fail");
327 //new Throwable().printStackTrace(System.out);
328 throw new IOException("now failing on purpose");
336 // LUCENE-1130: make sure initial IOException, and then 2nd
337 // IOException during rollback(), is OK:
338 public void testIOExceptionDuringAbort() throws IOException {
339 _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(false));
342 // LUCENE-1130: make sure initial IOException, and then 2nd
343 // IOException during rollback(), is OK:
344 public void testIOExceptionDuringAbortOnlyOnce() throws IOException {
345 _testSingleThreadFailure(new FailOnlyOnAbortOrFlush(true));
348 // LUCENE-1130: make sure initial IOException, and then 2nd
349 // IOException during rollback(), with multiple threads, is OK:
350 public void testIOExceptionDuringAbortWithThreads() throws Exception {
351 _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(false));
354 // LUCENE-1130: make sure initial IOException, and then 2nd
355 // IOException during rollback(), with multiple threads, is OK:
356 public void testIOExceptionDuringAbortWithThreadsOnlyOnce() throws Exception {
357 _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush(true));
360 // Throws IOException during DocumentsWriter.writeSegment
361 private static class FailOnlyInWriteSegment extends MockDirectoryWrapper.Failure {
362 private boolean onlyOnce;
363 public FailOnlyInWriteSegment(boolean onlyOnce) {
364 this.onlyOnce = onlyOnce;
367 public void eval(MockDirectoryWrapper dir) throws IOException {
369 StackTraceElement[] trace = new Exception().getStackTrace();
370 for (int i = 0; i < trace.length; i++) {
371 if ("flush".equals(trace[i].getMethodName()) && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
374 throw new IOException("now failing on purpose");
381 // LUCENE-1130: test IOException in writeSegment
382 public void testIOExceptionDuringWriteSegment() throws IOException {
383 _testSingleThreadFailure(new FailOnlyInWriteSegment(false));
386 // LUCENE-1130: test IOException in writeSegment
387 public void testIOExceptionDuringWriteSegmentOnlyOnce() throws IOException {
388 _testSingleThreadFailure(new FailOnlyInWriteSegment(true));
391 // LUCENE-1130: test IOException in writeSegment, with threads
392 public void testIOExceptionDuringWriteSegmentWithThreads() throws Exception {
393 _testMultipleThreadsFailure(new FailOnlyInWriteSegment(false));
396 // LUCENE-1130: test IOException in writeSegment, with threads
397 public void testIOExceptionDuringWriteSegmentWithThreadsOnlyOnce() throws Exception {
398 _testMultipleThreadsFailure(new FailOnlyInWriteSegment(true));
401 // LUCENE-3365: Test adding two documents with the same field from two different IndexWriters
402 // that we attempt to open at the same time. As long as the first IndexWriter completes
403 // and closes before the second IndexWriter time's out trying to get the Lock,
404 // we should see both documents
405 public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
406 final MockDirectoryWrapper dir = newDirectory();
407 CountDownLatch oneIWConstructed = new CountDownLatch(1);
408 DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
409 dir, oneIWConstructed);
410 DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
411 dir, oneIWConstructed);
415 oneIWConstructed.await();
417 thread1.startIndexing();
418 thread2.startIndexing();
423 assertFalse("Failed due to: " + thread1.failure, thread1.failed);
424 assertFalse("Failed due to: " + thread2.failure, thread2.failed);
425 // now verify that we have two documents in the index
426 IndexReader reader = IndexReader.open(dir, true);
427 assertEquals("IndexReader should have one document per thread running", 2,
434 static class DelayedIndexAndCloseRunnable extends Thread {
435 private final Directory dir;
436 boolean failed = false;
437 Throwable failure = null;
438 private final CountDownLatch startIndexing = new CountDownLatch(1);
439 private CountDownLatch iwConstructed;
441 public DelayedIndexAndCloseRunnable(Directory dir,
442 CountDownLatch iwConstructed) {
444 this.iwConstructed = iwConstructed;
447 public void startIndexing() {
448 this.startIndexing.countDown();
454 Document doc = new Document();
455 Field field = newField("field", "testData", Field.Store.YES,
456 Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
458 IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
459 TEST_VERSION_CURRENT, new MockAnalyzer(random)));
460 iwConstructed.countDown();
461 startIndexing.await();
462 writer.addDocument(doc);
464 } catch (Throwable e) {
467 failure.printStackTrace(System.out);