pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / test / org / apache / lucene / index / TestStressNRT.java
diff --git a/lucene-java-3.5.0/lucene/src/test/org/apache/lucene/index/TestStressNRT.java b/lucene-java-3.5.0/lucene/src/test/org/apache/lucene/index/TestStressNRT.java
new file mode 100644 (file)
index 0000000..e8b6bc0
--- /dev/null
@@ -0,0 +1,385 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestStressNRT extends LuceneTestCase {
+  volatile IndexReader reader;
+
+  final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
+  Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
+  long snapshotCount;
+  long committedModelClock;
+  volatile int lastId;
+  final String field = "val_l";
+  Object[] syncArr;
+
+  private void initModel(int ndocs) {
+    snapshotCount = 0;
+    committedModelClock = 0;
+    lastId = 0;
+
+    syncArr = new Object[ndocs];
+
+    for (int i=0; i<ndocs; i++) {
+      model.put(i, -1L);
+      syncArr[i] = new Object();
+    }
+    committedModel.putAll(model);
+  }
+
+  public void test() throws Exception {
+    // update variables
+    final int commitPercent = random.nextInt(20);
+    final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
+    final int deletePercent = random.nextInt(50);
+    final int deleteByQueryPercent = random.nextInt(25);
+    final int ndocs = atLeast(50);
+    final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+    final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);   // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
+    
+    final boolean tombstones = random.nextBoolean();
+    
+
+    // query variables
+    final AtomicLong operations = new AtomicLong(atLeast(50000));  // number of query operations to perform in total
+
+    final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+    initModel(ndocs);
+
+    if (VERBOSE) {
+      System.out.println("\n");
+      System.out.println("TEST: commitPercent=" + commitPercent);
+      System.out.println("TEST: softCommitPercent=" + softCommitPercent);
+      System.out.println("TEST: deletePercent=" + deletePercent);
+      System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
+      System.out.println("TEST: ndocs=" + ndocs);
+      System.out.println("TEST: nWriteThreads=" + nWriteThreads);
+      System.out.println("TEST: nReadThreads=" + nReadThreads);
+      System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
+      System.out.println("TEST: tombstones=" + tombstones);
+      System.out.println("TEST: operations=" + operations);
+      System.out.println("\n");
+    }
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+    Directory dir = newDirectory();
+
+    final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+    writer.setDoRandomForceMergeAssert(false);
+    writer.w.setInfoStream(VERBOSE ? System.out : null);
+    writer.commit();
+    reader = IndexReader.open(dir);
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.get() > 0) {
+              int oper = rand.nextInt(100);
+
+              if (oper < commitPercent) {
+                if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                  Map<Integer,Long> newCommittedModel;
+                  long version;
+                  IndexReader oldReader;
+
+                  synchronized(TestStressNRT.this) {
+                    newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
+                    version = snapshotCount++;
+                    oldReader = reader;
+                    oldReader.incRef();  // increment the reference since we will use this for reopening
+                  }
+
+                  IndexReader newReader;
+                  if (rand.nextInt(100) < softCommitPercent) {
+                    // assertU(h.commit("softCommit","true"));
+                    if (random.nextBoolean()) {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
+                      }
+                      newReader = writer.getReader(true);
+                    } else {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
+                      }
+                      newReader = IndexReader.openIfChanged(oldReader, writer.w, true);
+                    }
+                  } else {
+                    // assertU(commit());
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
+                    }
+                    writer.commit();
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
+                    }
+                    newReader = IndexReader.openIfChanged(oldReader);
+                  }
+
+                  // Code below assumes newReader comes w/
+                  // extra ref:
+                  if (newReader == null) {
+                    oldReader.incRef();
+                    newReader = oldReader;
+                  }
+
+                  oldReader.decRef();
+
+                  synchronized(TestStressNRT.this) {
+                    // install the new reader if it's newest (and check the current version since another reader may have already been installed)
+                    //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
+                    assert newReader.getRefCount() > 0;
+                    assert reader.getRefCount() > 0;
+                    if (newReader.getVersion() > reader.getVersion()) {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
+                      }
+                      reader.decRef();
+                      reader = newReader;
+
+                      // Silly: forces fieldInfos to be
+                      // loaded so we don't hit IOE on later
+                      // reader.toString
+                      newReader.toString();
+
+                      // install this snapshot only if it's newer than the current one
+                      if (version >= committedModelClock) {
+                        if (VERBOSE) {
+                          System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
+                        }
+                        committedModel = newCommittedModel;
+                        committedModelClock = version;
+                      } else {
+                        if (VERBOSE) {
+                          System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
+                        }
+                      }
+                    } else {
+                      // if the same reader, don't decRef.
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
+                      }
+                      newReader.decRef();
+                    }
+                  }
+                }
+                numCommitting.decrementAndGet();
+              } else {
+
+                int id = rand.nextInt(ndocs);
+                Object sync = syncArr[id];
+
+                // set the lastId before we actually change it sometimes to try and
+                // uncover more race conditions between writing and reading
+                boolean before = random.nextBoolean();
+                if (before) {
+                  lastId = id;
+                }
+
+                // We can't concurrently update the same document and retain our invariants of increasing values
+                // since we can't guarantee what order the updates will be executed.
+                synchronized (sync) {
+                  Long val = model.get(id);
+                  long nextVal = Math.abs(val)+1;
+
+                  if (oper < commitPercent + deletePercent) {
+                    // assertU("<delete><id>" + id + "</id></delete>");
+
+                    // add tombstone first
+                    if (tombstones) {
+                      Document d = new Document();
+                      d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                      d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                      writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+                    }
+
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
+                    }
+                    writer.deleteDocuments(new Term("id",Integer.toString(id)));
+                    model.put(id, -nextVal);
+                  } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+                    //assertU("<delete><query>id:" + id + "</query></delete>");
+
+                    // add tombstone first
+                    if (tombstones) {
+                      Document d = new Document();
+                      d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                      d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                      writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+                    }
+
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
+                    }
+                    writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
+                    model.put(id, -nextVal);
+                  } else {
+                    // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                    Document d = new Document();
+                    d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                    d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
+                    }
+                    writer.updateDocument(new Term("id", Integer.toString(id)), d);
+                    if (tombstones) {
+                      // remove tombstone after new addition (this should be optional?)
+                      writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
+                    }
+                    model.put(id, nextVal);
+                  }
+                }
+
+                if (!before) {
+                  lastId = id;
+                }
+              }
+            }
+          } catch (Throwable e) {
+            System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+            e.printStackTrace(System.out);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              long val;
+              IndexReader r;
+              synchronized(TestStressNRT.this) {
+                val = committedModel.get(id);
+                r = reader;
+                r.incRef();
+              }
+
+              if (VERBOSE) {
+                System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
+              }
+
+              //  sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              IndexSearcher searcher = new IndexSearcher(r);
+              Query q = new TermQuery(new Term("id",Integer.toString(id)));
+              TopDocs results = searcher.search(q, 10);
+
+              if (results.totalHits == 0 && tombstones) {
+                // if we couldn't find the doc, look for its tombstone
+                q = new TermQuery(new Term("id","-"+Integer.toString(id)));
+                results = searcher.search(q, 1);
+                if (results.totalHits == 0) {
+                  if (val == -1L) {
+                    // expected... no doc was added yet
+                    r.decRef();
+                    continue;
+                  }
+                  fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
+                }
+              }
+
+              if (results.totalHits == 0 && !tombstones) {
+                // nothing to do - we can't tell anything from a deleted doc without tombstones
+              } else {
+                // we should have found the document, or its tombstone
+                if (results.totalHits != 1) {
+                  System.out.println("FAIL: hits id:" + id + " val=" + val);
+                  for(ScoreDoc sd : results.scoreDocs) {
+                    final Document doc = r.document(sd.doc);
+                    System.out.println("  docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
+                  }
+                  fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
+                }
+                Document doc = searcher.doc(results.scoreDocs[0].doc);
+                long foundVal = Long.parseLong(doc.get(field));
+                if (foundVal < Math.abs(val)) {
+                  fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
+                }
+              }
+
+              r.decRef();
+            }
+          } catch (Throwable e) {
+            operations.set(-1L);
+            System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+            e.printStackTrace(System.out);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    writer.close();
+    if (VERBOSE) {
+      System.out.println("TEST: close reader=" + reader);
+    }
+    reader.close();
+    dir.close();
+  }
+}