1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.List;
24 import java.util.Random;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
29 import org.apache.lucene.analysis.MockAnalyzer;
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.document.Field;
32 import org.apache.lucene.search.IndexSearcher;
33 import org.apache.lucene.search.Query;
34 import org.apache.lucene.search.ScoreDoc;
35 import org.apache.lucene.search.TermQuery;
36 import org.apache.lucene.search.TopDocs;
37 import org.apache.lucene.store.Directory;
38 import org.apache.lucene.util.LuceneTestCase;
39 import org.apache.lucene.util._TestUtil;
41 public class TestStressNRT extends LuceneTestCase {
42 volatile IndexReader reader;
44 final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
45 Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
47 long committedModelClock;
49 final String field = "val_l";
52 private void initModel(int ndocs) {
54 committedModelClock = 0;
57 syncArr = new Object[ndocs];
59 for (int i=0; i<ndocs; i++) {
61 syncArr[i] = new Object();
63 committedModel.putAll(model);
66 public void test() throws Exception {
68 final int commitPercent = random.nextInt(20);
69 final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
70 final int deletePercent = random.nextInt(50);
71 final int deleteByQueryPercent = random.nextInt(25);
72 final int ndocs = atLeast(50);
73 final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
74 final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5); // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
76 final boolean tombstones = random.nextBoolean();
80 final AtomicLong operations = new AtomicLong(atLeast(50000)); // number of query operations to perform in total
82 final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
86 System.out.println("\n");
87 System.out.println("TEST: commitPercent=" + commitPercent);
88 System.out.println("TEST: softCommitPercent=" + softCommitPercent);
89 System.out.println("TEST: deletePercent=" + deletePercent);
90 System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
91 System.out.println("TEST: ndocs=" + ndocs);
92 System.out.println("TEST: nWriteThreads=" + nWriteThreads);
93 System.out.println("TEST: nReadThreads=" + nReadThreads);
94 System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
95 System.out.println("TEST: tombstones=" + tombstones);
96 System.out.println("TEST: operations=" + operations);
97 System.out.println("\n");
100 final AtomicInteger numCommitting = new AtomicInteger();
102 List<Thread> threads = new ArrayList<Thread>();
104 Directory dir = newDirectory();
106 final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
107 writer.setDoRandomForceMergeAssert(false);
108 writer.w.setInfoStream(VERBOSE ? System.out : null);
110 reader = IndexReader.open(dir);
112 for (int i=0; i<nWriteThreads; i++) {
113 Thread thread = new Thread("WRITER"+i) {
114 Random rand = new Random(random.nextInt());
119 while (operations.get() > 0) {
120 int oper = rand.nextInt(100);
122 if (oper < commitPercent) {
123 if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
124 Map<Integer,Long> newCommittedModel;
126 IndexReader oldReader;
128 synchronized(TestStressNRT.this) {
129 newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
130 version = snapshotCount++;
132 oldReader.incRef(); // increment the reference since we will use this for reopening
135 IndexReader newReader;
136 if (rand.nextInt(100) < softCommitPercent) {
137 // assertU(h.commit("softCommit","true"));
138 if (random.nextBoolean()) {
140 System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
142 newReader = writer.getReader(true);
145 System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
147 newReader = IndexReader.openIfChanged(oldReader, writer.w, true);
150 // assertU(commit());
152 System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
156 System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
158 newReader = IndexReader.openIfChanged(oldReader);
161 // Code below assumes newReader comes w/
163 if (newReader == null) {
165 newReader = oldReader;
170 synchronized(TestStressNRT.this) {
171 // install the new reader if it's newest (and check the current version since another reader may have already been installed)
172 //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
173 assert newReader.getRefCount() > 0;
174 assert reader.getRefCount() > 0;
175 if (newReader.getVersion() > reader.getVersion()) {
177 System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
182 // Silly: forces fieldInfos to be
183 // loaded so we don't hit IOE on later
185 newReader.toString();
187 // install this snapshot only if it's newer than the current one
188 if (version >= committedModelClock) {
190 System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
192 committedModel = newCommittedModel;
193 committedModelClock = version;
196 System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
200 // if the same reader, don't decRef.
202 System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
208 numCommitting.decrementAndGet();
211 int id = rand.nextInt(ndocs);
212 Object sync = syncArr[id];
214 // set the lastId before we actually change it sometimes to try and
215 // uncover more race conditions between writing and reading
216 boolean before = random.nextBoolean();
221 // We can't concurrently update the same document and retain our invariants of increasing values
222 // since we can't guarantee what order the updates will be executed.
223 synchronized (sync) {
224 Long val = model.get(id);
225 long nextVal = Math.abs(val)+1;
227 if (oper < commitPercent + deletePercent) {
228 // assertU("<delete><id>" + id + "</id></delete>");
230 // add tombstone first
232 Document d = new Document();
233 d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
234 d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
235 writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
239 System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
241 writer.deleteDocuments(new Term("id",Integer.toString(id)));
242 model.put(id, -nextVal);
243 } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
244 //assertU("<delete><query>id:" + id + "</query></delete>");
246 // add tombstone first
248 Document d = new Document();
249 d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
250 d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
251 writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
255 System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
257 writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
258 model.put(id, -nextVal);
260 // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
261 Document d = new Document();
262 d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
263 d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
265 System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
267 writer.updateDocument(new Term("id", Integer.toString(id)), d);
269 // remove tombstone after new addition (this should be optional?)
270 writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
272 model.put(id, nextVal);
281 } catch (Throwable e) {
282 System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
283 e.printStackTrace(System.out);
284 throw new RuntimeException(e);
292 for (int i=0; i<nReadThreads; i++) {
293 Thread thread = new Thread("READER"+i) {
294 Random rand = new Random(random.nextInt());
299 while (operations.decrementAndGet() >= 0) {
300 // bias toward a recently changed doc
301 int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
303 // when indexing, we update the index, then the model
304 // so when querying, we should first check the model, and then the index
308 synchronized(TestStressNRT.this) {
309 val = committedModel.get(id);
315 System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
318 // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
319 IndexSearcher searcher = new IndexSearcher(r);
320 Query q = new TermQuery(new Term("id",Integer.toString(id)));
321 TopDocs results = searcher.search(q, 10);
323 if (results.totalHits == 0 && tombstones) {
324 // if we couldn't find the doc, look for its tombstone
325 q = new TermQuery(new Term("id","-"+Integer.toString(id)));
326 results = searcher.search(q, 1);
327 if (results.totalHits == 0) {
329 // expected... no doc was added yet
333 fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
337 if (results.totalHits == 0 && !tombstones) {
338 // nothing to do - we can't tell anything from a deleted doc without tombstones
340 // we should have found the document, or its tombstone
341 if (results.totalHits != 1) {
342 System.out.println("FAIL: hits id:" + id + " val=" + val);
343 for(ScoreDoc sd : results.scoreDocs) {
344 final Document doc = r.document(sd.doc);
345 System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
347 fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
349 Document doc = searcher.doc(results.scoreDocs[0].doc);
350 long foundVal = Long.parseLong(doc.get(field));
351 if (foundVal < Math.abs(val)) {
352 fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
358 } catch (Throwable e) {
360 System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
361 e.printStackTrace(System.out);
362 throw new RuntimeException(e);
370 for (Thread thread : threads) {
374 for (Thread thread : threads) {
380 System.out.println("TEST: close reader=" + reader);