X-Git-Url: https://git.mdrn.pl/pylucene.git/blobdiff_plain/a2e61f0c04805cfcb8706176758d1283c7e3a55c..aaeed5504b982cf3545252ab528713250aa33eed:/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java new file mode 100644 index 0000000..a8bda0e --- /dev/null +++ b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java @@ -0,0 +1,426 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; +import java.util.ArrayList; +import java.util.Date; +import java.util.Comparator; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryWrapperFilter; +import org.apache.lucene.util.BytesRef; + +/* Tracks the stream of {@link BuffereDeletes}. + * When DocumensWriter flushes, its buffered + * deletes are appended to this stream. We later + * apply these deletes (resolve them to the actual + * docIDs, per segment) when a merge is started + * (only to the to-be-merged segments). We + * also apply to all segments when NRT reader is pulled, + * commit/close is called, or when too many deletes are + * buffered and must be flushed (by RAM usage or by count). + * + * Each packet is assigned a generation, and each flushed or + * merged segment is also assigned a generation, so we can + * track which BufferedDeletes packets to apply to any given + * segment. */ + +class BufferedDeletesStream { + + // TODO: maybe linked list? + private final List deletes = new ArrayList(); + + // Starts at 1 so that SegmentInfos that have never had + // deletes applied (whose bufferedDelGen defaults to 0) + // will be correct: + private long nextGen = 1; + + // used only by assert + private Term lastDeleteTerm; + + private PrintStream infoStream; + private final AtomicLong bytesUsed = new AtomicLong(); + private final AtomicInteger numTerms = new AtomicInteger(); + private final int messageID; + + public BufferedDeletesStream(int messageID) { + this.messageID = messageID; + } + + private synchronized void message(String message) { + if (infoStream != null) { + infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message); + } + } + + public synchronized void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + } + + // Appends a new packet of buffered deletes to the stream, + // setting its generation: + public synchronized void push(FrozenBufferedDeletes packet) { + assert packet.any(); + assert checkDeleteStats(); + assert packet.gen < nextGen; + deletes.add(packet); + numTerms.addAndGet(packet.numTermDeletes); + bytesUsed.addAndGet(packet.bytesUsed); + if (infoStream != null) { + message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size()); + } + assert checkDeleteStats(); + } + + public synchronized void clear() { + deletes.clear(); + nextGen = 1; + numTerms.set(0); + bytesUsed.set(0); + } + + public boolean any() { + return bytesUsed.get() != 0; + } + + public int numTerms() { + return numTerms.get(); + } + + public long bytesUsed() { + return bytesUsed.get(); + } + + public static class ApplyDeletesResult { + // True if any actual deletes took place: + public final boolean anyDeletes; + + // Current gen, for the merged segment: + public final long gen; + + // If non-null, contains segments that are 100% deleted + public final List allDeleted; + + ApplyDeletesResult(boolean anyDeletes, long gen, List allDeleted) { + this.anyDeletes = anyDeletes; + this.gen = gen; + this.allDeleted = allDeleted; + } + } + + // Sorts SegmentInfos from smallest to biggest bufferedDelGen: + private static final Comparator sortByDelGen = new Comparator() { + // @Override -- not until Java 1.6 + public int compare(SegmentInfo si1, SegmentInfo si2) { + final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen(); + if (cmp > 0) { + return 1; + } else if (cmp < 0) { + return -1; + } else { + return 0; + } + } + }; + + /** Resolves the buffered deleted Term/Query/docIDs, into + * actual deleted docIDs in the deletedDocs BitVector for + * each SegmentReader. */ + public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, List infos) throws IOException { + final long t0 = System.currentTimeMillis(); + + if (infos.size() == 0) { + return new ApplyDeletesResult(false, nextGen++, null); + } + + assert checkDeleteStats(); + + if (!any()) { + message("applyDeletes: no deletes; skipping"); + return new ApplyDeletesResult(false, nextGen++, null); + } + + if (infoStream != null) { + message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size()); + } + + List infos2 = new ArrayList(); + infos2.addAll(infos); + Collections.sort(infos2, sortByDelGen); + + CoalescedDeletes coalescedDeletes = null; + boolean anyNewDeletes = false; + + int infosIDX = infos2.size()-1; + int delIDX = deletes.size()-1; + + List allDeleted = null; + + while (infosIDX >= 0) { + //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); + + final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null; + final SegmentInfo info = infos2.get(infosIDX); + final long segGen = info.getBufferedDeletesGen(); + + if (packet != null && segGen < packet.gen) { + //System.out.println(" coalesce"); + if (coalescedDeletes == null) { + coalescedDeletes = new CoalescedDeletes(); + } + coalescedDeletes.update(packet); + delIDX--; + } else if (packet != null && segGen == packet.gen) { + //System.out.println(" eq"); + + // Lock order: IW -> BD -> RP + assert readerPool.infoIsLive(info); + SegmentReader reader = readerPool.get(info, false); + int delCount = 0; + final boolean segAllDeletes; + try { + if (coalescedDeletes != null) { + //System.out.println(" del coalesced"); + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); + } + //System.out.println(" del exact"); + // Don't delete by Term here; DocumentsWriter + // already did that on flush: + delCount += applyQueryDeletes(packet.queriesIterable(), reader); + segAllDeletes = reader.numDocs() == 0; + } finally { + readerPool.release(reader); + } + anyNewDeletes |= delCount > 0; + + if (segAllDeletes) { + if (allDeleted == null) { + allDeleted = new ArrayList(); + } + allDeleted.add(info); + } + + if (infoStream != null) { + message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + } + + if (coalescedDeletes == null) { + coalescedDeletes = new CoalescedDeletes(); + } + coalescedDeletes.update(packet); + delIDX--; + infosIDX--; + info.setBufferedDeletesGen(nextGen); + + } else { + //System.out.println(" gt"); + + if (coalescedDeletes != null) { + // Lock order: IW -> BD -> RP + assert readerPool.infoIsLive(info); + SegmentReader reader = readerPool.get(info, false); + int delCount = 0; + final boolean segAllDeletes; + try { + delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader); + delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); + segAllDeletes = reader.numDocs() == 0; + } finally { + readerPool.release(reader); + } + anyNewDeletes |= delCount > 0; + + if (segAllDeletes) { + if (allDeleted == null) { + allDeleted = new ArrayList(); + } + allDeleted.add(info); + } + + if (infoStream != null) { + message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : "")); + } + } + info.setBufferedDeletesGen(nextGen); + + infosIDX--; + } + } + + assert checkDeleteStats(); + if (infoStream != null) { + message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec"); + } + // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any; + + return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted); + } + + public synchronized long getNextGen() { + return nextGen++; + } + + // Lock order IW -> BD + /* Removes any BufferedDeletes that we no longer need to + * store because all segments in the index have had the + * deletes applied. */ + public synchronized void prune(SegmentInfos segmentInfos) { + assert checkDeleteStats(); + long minGen = Long.MAX_VALUE; + for(SegmentInfo info : segmentInfos) { + minGen = Math.min(info.getBufferedDeletesGen(), minGen); + } + + if (infoStream != null) { + message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size()); + } + + final int limit = deletes.size(); + for(int delIDX=0;delIDX= minGen) { + prune(delIDX); + assert checkDeleteStats(); + return; + } + } + + // All deletes pruned + prune(limit); + assert !any(); + assert checkDeleteStats(); + } + + private synchronized void prune(int count) { + if (count > 0) { + if (infoStream != null) { + message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain"); + } + for(int delIDX=0;delIDX= 0; + bytesUsed.addAndGet(-packet.bytesUsed); + assert bytesUsed.get() >= 0; + } + deletes.subList(0, count).clear(); + } + } + + // Delete by Term + private synchronized long applyTermDeletes(Iterable termsIter, SegmentReader reader) throws IOException { + long delCount = 0; + + assert checkDeleteTerm(null); + + final TermDocs docs = reader.termDocs(); + + for (Term term : termsIter) { + + // Since we visit terms sorted, we gain performance + // by re-using the same TermsEnum and seeking only + // forwards + assert checkDeleteTerm(term); + docs.seek(term); + + while (docs.next()) { + final int docID = docs.doc(); + reader.deleteDocument(docID); + // TODO: we could/should change + // reader.deleteDocument to return boolean + // true if it did in fact delete, because here + // we could be deleting an already-deleted doc + // which makes this an upper bound: + delCount++; + } + } + + return delCount; + } + + public static class QueryAndLimit { + public final Query query; + public final int limit; + public QueryAndLimit(Query query, int limit) { + this.query = query; + this.limit = limit; + } + } + + // Delete by query + private synchronized long applyQueryDeletes(Iterable queriesIter, SegmentReader reader) throws IOException { + long delCount = 0; + + for (QueryAndLimit ent : queriesIter) { + Query query = ent.query; + int limit = ent.limit; + final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(reader); + if (docs != null) { + final DocIdSetIterator it = docs.iterator(); + if (it != null) { + while(true) { + int doc = it.nextDoc(); + if (doc >= limit) + break; + + reader.deleteDocument(doc); + // TODO: we could/should change + // reader.deleteDocument to return boolean + // true if it did in fact delete, because here + // we could be deleting an already-deleted doc + // which makes this an upper bound: + delCount++; + } + } + } + } + + return delCount; + } + + // used only by assert + private boolean checkDeleteTerm(Term term) { + if (term != null) { + assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; + } + // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert + lastDeleteTerm = term == null ? null : new Term(term.field(), term.text()); + return true; + } + + // only for assert + private boolean checkDeleteStats() { + int numTerms2 = 0; + long bytesUsed2 = 0; + for(FrozenBufferedDeletes packet : deletes) { + numTerms2 += packet.numTermDeletes; + bytesUsed2 += packet.bytesUsed; + } + assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get(); + assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed; + return true; + } +}