X-Git-Url: https://git.mdrn.pl/pylucene.git/blobdiff_plain/a2e61f0c04805cfcb8706176758d1283c7e3a55c..aaeed5504b982cf3545252ab528713250aa33eed:/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestStressNRT.java diff --git a/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestStressNRT.java b/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestStressNRT.java new file mode 100644 index 0000000..9bdbb9e --- /dev/null +++ b/lucene-java-3.5.0/lucene/backwards/src/test/org/apache/lucene/index/TestStressNRT.java @@ -0,0 +1,384 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util._TestUtil; + +public class TestStressNRT extends LuceneTestCase { + volatile IndexReader reader; + + final ConcurrentHashMap model = new ConcurrentHashMap(); + Map committedModel = new HashMap(); + long snapshotCount; + long committedModelClock; + volatile int lastId; + final String field = "val_l"; + Object[] syncArr; + + private void initModel(int ndocs) { + snapshotCount = 0; + committedModelClock = 0; + lastId = 0; + + syncArr = new Object[ndocs]; + + for (int i=0; i threads = new ArrayList(); + + Directory dir = newDirectory(); + + final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))); + writer.setDoRandomOptimizeAssert(false); + writer.w.setInfoStream(VERBOSE ? System.out : null); + writer.commit(); + reader = IndexReader.open(dir); + + for (int i=0; i 0) { + int oper = rand.nextInt(100); + + if (oper < commitPercent) { + if (numCommitting.incrementAndGet() <= maxConcurrentCommits) { + Map newCommittedModel; + long version; + IndexReader oldReader; + + synchronized(TestStressNRT.this) { + newCommittedModel = new HashMap(model); // take a snapshot + version = snapshotCount++; + oldReader = reader; + oldReader.incRef(); // increment the reference since we will use this for reopening + } + + IndexReader newReader; + if (rand.nextInt(100) < softCommitPercent) { + // assertU(h.commit("softCommit","true")); + if (random.nextBoolean()) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader"); + } + newReader = writer.getReader(true); + } else { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version); + } + newReader = oldReader.reopen(writer.w, true); + } + } else { + // assertU(commit()); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version); + } + writer.commit(); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit"); + } + newReader = oldReader.reopen(); + } + + // Code below assumes newReader comes w/ + // extra ref: + if (newReader == oldReader) { + newReader.incRef(); + } + + oldReader.decRef(); + + synchronized(TestStressNRT.this) { + // install the new reader if it's newest (and check the current version since another reader may have already been installed) + //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion()); + assert newReader.getRefCount() > 0; + assert reader.getRefCount() > 0; + if (newReader.getVersion() > reader.getVersion()) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader); + } + reader.decRef(); + reader = newReader; + + // Silly: forces fieldInfos to be + // loaded so we don't hit IOE on later + // reader.toString + newReader.toString(); + + // install this snapshot only if it's newer than the current one + if (version >= committedModelClock) { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version); + } + committedModel = newCommittedModel; + committedModelClock = version; + } else { + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version); + } + } + } else { + // if the same reader, don't decRef. + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader); + } + newReader.decRef(); + } + } + } + numCommitting.decrementAndGet(); + } else { + + int id = rand.nextInt(ndocs); + Object sync = syncArr[id]; + + // set the lastId before we actually change it sometimes to try and + // uncover more race conditions between writing and reading + boolean before = random.nextBoolean(); + if (before) { + lastId = id; + } + + // We can't concurrently update the same document and retain our invariants of increasing values + // since we can't guarantee what order the updates will be executed. + synchronized (sync) { + Long val = model.get(id); + long nextVal = Math.abs(val)+1; + + if (oper < commitPercent + deletePercent) { + // assertU("" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal); + } + writer.deleteDocuments(new Term("id",Integer.toString(id))); + model.put(id, -nextVal); + } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) { + //assertU("id:" + id + ""); + + // add tombstone first + if (tombstones) { + Document d = new Document(); + d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal); + } + writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id)))); + model.put(id, -nextVal); + } else { + // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal))); + Document d = new Document(); + d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS)); + d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO)); + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal); + } + writer.updateDocument(new Term("id", Integer.toString(id)), d); + if (tombstones) { + // remove tombstone after new addition (this should be optional?) + writer.deleteDocuments(new Term("id","-"+Integer.toString(id))); + } + model.put(id, nextVal); + } + } + + if (!before) { + lastId = id; + } + } + } + } catch (Throwable e) { + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (int i=0; i= 0) { + // bias toward a recently changed doc + int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs); + + // when indexing, we update the index, then the model + // so when querying, we should first check the model, and then the index + + long val; + IndexReader r; + synchronized(TestStressNRT.this) { + val = committedModel.get(id); + r = reader; + r.incRef(); + } + + if (VERBOSE) { + System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion()); + } + + // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true"); + IndexSearcher searcher = new IndexSearcher(r); + Query q = new TermQuery(new Term("id",Integer.toString(id))); + TopDocs results = searcher.search(q, 10); + + if (results.totalHits == 0 && tombstones) { + // if we couldn't find the doc, look for its tombstone + q = new TermQuery(new Term("id","-"+Integer.toString(id))); + results = searcher.search(q, 1); + if (results.totalHits == 0) { + if (val == -1L) { + // expected... no doc was added yet + r.decRef(); + continue; + } + fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r); + } + } + + if (results.totalHits == 0 && !tombstones) { + // nothing to do - we can't tell anything from a deleted doc without tombstones + } else { + // we should have found the document, or its tombstone + if (results.totalHits != 1) { + System.out.println("FAIL: hits id:" + id + " val=" + val); + for(ScoreDoc sd : results.scoreDocs) { + final Document doc = r.document(sd.doc); + System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field)); + } + fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits); + } + Document doc = searcher.doc(results.scoreDocs[0].doc); + long foundVal = Long.parseLong(doc.get(field)); + if (foundVal < Math.abs(val)) { + fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r); + } + } + + r.decRef(); + } + } catch (Throwable e) { + operations.set(-1L); + System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception"); + e.printStackTrace(System.out); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + writer.close(); + if (VERBOSE) { + System.out.println("TEST: close reader=" + reader); + } + reader.close(); + dir.close(); + } +}