pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / test / org / apache / lucene / index / TestStressNRT.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Random;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.lucene.analysis.MockAnalyzer;
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.document.Field;
32 import org.apache.lucene.search.IndexSearcher;
33 import org.apache.lucene.search.Query;
34 import org.apache.lucene.search.ScoreDoc;
35 import org.apache.lucene.search.TermQuery;
36 import org.apache.lucene.search.TopDocs;
37 import org.apache.lucene.store.Directory;
38 import org.apache.lucene.util.LuceneTestCase;
39 import org.apache.lucene.util._TestUtil;
40
41 public class TestStressNRT extends LuceneTestCase {
42   volatile IndexReader reader;
43
44   final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
45   Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
46   long snapshotCount;
47   long committedModelClock;
48   volatile int lastId;
49   final String field = "val_l";
50   Object[] syncArr;
51
52   private void initModel(int ndocs) {
53     snapshotCount = 0;
54     committedModelClock = 0;
55     lastId = 0;
56
57     syncArr = new Object[ndocs];
58
59     for (int i=0; i<ndocs; i++) {
60       model.put(i, -1L);
61       syncArr[i] = new Object();
62     }
63     committedModel.putAll(model);
64   }
65
66   public void test() throws Exception {
67     // update variables
68     final int commitPercent = random.nextInt(20);
69     final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
70     final int deletePercent = random.nextInt(50);
71     final int deleteByQueryPercent = random.nextInt(25);
72     final int ndocs = atLeast(50);
73     final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
74     final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);   // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
75     
76     final boolean tombstones = random.nextBoolean();
77     
78
79     // query variables
80     final AtomicLong operations = new AtomicLong(atLeast(50000));  // number of query operations to perform in total
81
82     final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
83     initModel(ndocs);
84
85     if (VERBOSE) {
86       System.out.println("\n");
87       System.out.println("TEST: commitPercent=" + commitPercent);
88       System.out.println("TEST: softCommitPercent=" + softCommitPercent);
89       System.out.println("TEST: deletePercent=" + deletePercent);
90       System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
91       System.out.println("TEST: ndocs=" + ndocs);
92       System.out.println("TEST: nWriteThreads=" + nWriteThreads);
93       System.out.println("TEST: nReadThreads=" + nReadThreads);
94       System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
95       System.out.println("TEST: tombstones=" + tombstones);
96       System.out.println("TEST: operations=" + operations);
97       System.out.println("\n");
98     }
99
100     final AtomicInteger numCommitting = new AtomicInteger();
101
102     List<Thread> threads = new ArrayList<Thread>();
103
104     Directory dir = newDirectory();
105
106     final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
107     writer.setDoRandomForceMergeAssert(false);
108     writer.w.setInfoStream(VERBOSE ? System.out : null);
109     writer.commit();
110     reader = IndexReader.open(dir);
111
112     for (int i=0; i<nWriteThreads; i++) {
113       Thread thread = new Thread("WRITER"+i) {
114         Random rand = new Random(random.nextInt());
115
116         @Override
117         public void run() {
118           try {
119             while (operations.get() > 0) {
120               int oper = rand.nextInt(100);
121
122               if (oper < commitPercent) {
123                 if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
124                   Map<Integer,Long> newCommittedModel;
125                   long version;
126                   IndexReader oldReader;
127
128                   synchronized(TestStressNRT.this) {
129                     newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
130                     version = snapshotCount++;
131                     oldReader = reader;
132                     oldReader.incRef();  // increment the reference since we will use this for reopening
133                   }
134
135                   IndexReader newReader;
136                   if (rand.nextInt(100) < softCommitPercent) {
137                     // assertU(h.commit("softCommit","true"));
138                     if (random.nextBoolean()) {
139                       if (VERBOSE) {
140                         System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
141                       }
142                       newReader = writer.getReader(true);
143                     } else {
144                       if (VERBOSE) {
145                         System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
146                       }
147                       newReader = IndexReader.openIfChanged(oldReader, writer.w, true);
148                     }
149                   } else {
150                     // assertU(commit());
151                     if (VERBOSE) {
152                       System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
153                     }
154                     writer.commit();
155                     if (VERBOSE) {
156                       System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
157                     }
158                     newReader = IndexReader.openIfChanged(oldReader);
159                   }
160
161                   // Code below assumes newReader comes w/
162                   // extra ref:
163                   if (newReader == null) {
164                     oldReader.incRef();
165                     newReader = oldReader;
166                   }
167
168                   oldReader.decRef();
169
170                   synchronized(TestStressNRT.this) {
171                     // install the new reader if it's newest (and check the current version since another reader may have already been installed)
172                     //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
173                     assert newReader.getRefCount() > 0;
174                     assert reader.getRefCount() > 0;
175                     if (newReader.getVersion() > reader.getVersion()) {
176                       if (VERBOSE) {
177                         System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
178                       }
179                       reader.decRef();
180                       reader = newReader;
181
182                       // Silly: forces fieldInfos to be
183                       // loaded so we don't hit IOE on later
184                       // reader.toString
185                       newReader.toString();
186
187                       // install this snapshot only if it's newer than the current one
188                       if (version >= committedModelClock) {
189                         if (VERBOSE) {
190                           System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
191                         }
192                         committedModel = newCommittedModel;
193                         committedModelClock = version;
194                       } else {
195                         if (VERBOSE) {
196                           System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
197                         }
198                       }
199                     } else {
200                       // if the same reader, don't decRef.
201                       if (VERBOSE) {
202                         System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
203                       }
204                       newReader.decRef();
205                     }
206                   }
207                 }
208                 numCommitting.decrementAndGet();
209               } else {
210
211                 int id = rand.nextInt(ndocs);
212                 Object sync = syncArr[id];
213
214                 // set the lastId before we actually change it sometimes to try and
215                 // uncover more race conditions between writing and reading
216                 boolean before = random.nextBoolean();
217                 if (before) {
218                   lastId = id;
219                 }
220
221                 // We can't concurrently update the same document and retain our invariants of increasing values
222                 // since we can't guarantee what order the updates will be executed.
223                 synchronized (sync) {
224                   Long val = model.get(id);
225                   long nextVal = Math.abs(val)+1;
226
227                   if (oper < commitPercent + deletePercent) {
228                     // assertU("<delete><id>" + id + "</id></delete>");
229
230                     // add tombstone first
231                     if (tombstones) {
232                       Document d = new Document();
233                       d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
234                       d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
235                       writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
236                     }
237
238                     if (VERBOSE) {
239                       System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
240                     }
241                     writer.deleteDocuments(new Term("id",Integer.toString(id)));
242                     model.put(id, -nextVal);
243                   } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
244                     //assertU("<delete><query>id:" + id + "</query></delete>");
245
246                     // add tombstone first
247                     if (tombstones) {
248                       Document d = new Document();
249                       d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
250                       d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
251                       writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
252                     }
253
254                     if (VERBOSE) {
255                       System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
256                     }
257                     writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
258                     model.put(id, -nextVal);
259                   } else {
260                     // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
261                     Document d = new Document();
262                     d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
263                     d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
264                     if (VERBOSE) {
265                       System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
266                     }
267                     writer.updateDocument(new Term("id", Integer.toString(id)), d);
268                     if (tombstones) {
269                       // remove tombstone after new addition (this should be optional?)
270                       writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
271                     }
272                     model.put(id, nextVal);
273                   }
274                 }
275
276                 if (!before) {
277                   lastId = id;
278                 }
279               }
280             }
281           } catch (Throwable e) {
282             System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
283             e.printStackTrace(System.out);
284             throw new RuntimeException(e);
285           }
286         }
287       };
288
289       threads.add(thread);
290     }
291
292     for (int i=0; i<nReadThreads; i++) {
293       Thread thread = new Thread("READER"+i) {
294         Random rand = new Random(random.nextInt());
295
296         @Override
297         public void run() {
298           try {
299             while (operations.decrementAndGet() >= 0) {
300               // bias toward a recently changed doc
301               int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
302
303               // when indexing, we update the index, then the model
304               // so when querying, we should first check the model, and then the index
305
306               long val;
307               IndexReader r;
308               synchronized(TestStressNRT.this) {
309                 val = committedModel.get(id);
310                 r = reader;
311                 r.incRef();
312               }
313
314               if (VERBOSE) {
315                 System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
316               }
317
318               //  sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
319               IndexSearcher searcher = new IndexSearcher(r);
320               Query q = new TermQuery(new Term("id",Integer.toString(id)));
321               TopDocs results = searcher.search(q, 10);
322
323               if (results.totalHits == 0 && tombstones) {
324                 // if we couldn't find the doc, look for its tombstone
325                 q = new TermQuery(new Term("id","-"+Integer.toString(id)));
326                 results = searcher.search(q, 1);
327                 if (results.totalHits == 0) {
328                   if (val == -1L) {
329                     // expected... no doc was added yet
330                     r.decRef();
331                     continue;
332                   }
333                   fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
334                 }
335               }
336
337               if (results.totalHits == 0 && !tombstones) {
338                 // nothing to do - we can't tell anything from a deleted doc without tombstones
339               } else {
340                 // we should have found the document, or its tombstone
341                 if (results.totalHits != 1) {
342                   System.out.println("FAIL: hits id:" + id + " val=" + val);
343                   for(ScoreDoc sd : results.scoreDocs) {
344                     final Document doc = r.document(sd.doc);
345                     System.out.println("  docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
346                   }
347                   fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
348                 }
349                 Document doc = searcher.doc(results.scoreDocs[0].doc);
350                 long foundVal = Long.parseLong(doc.get(field));
351                 if (foundVal < Math.abs(val)) {
352                   fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
353                 }
354               }
355
356               r.decRef();
357             }
358           } catch (Throwable e) {
359             operations.set(-1L);
360             System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
361             e.printStackTrace(System.out);
362             throw new RuntimeException(e);
363           }
364         }
365       };
366
367       threads.add(thread);
368     }
369
370     for (Thread thread : threads) {
371       thread.start();
372     }
373
374     for (Thread thread : threads) {
375       thread.join();
376     }
377
378     writer.close();
379     if (VERBOSE) {
380       System.out.println("TEST: close reader=" + reader);
381     }
382     reader.close();
383     dir.close();
384   }
385 }