1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.util.List;
23 import java.util.ArrayList;
24 import java.util.Date;
25 import java.util.Comparator;
26 import java.util.Collections;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
30 import org.apache.lucene.search.DocIdSet;
31 import org.apache.lucene.search.DocIdSetIterator;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.search.QueryWrapperFilter;
34 import org.apache.lucene.util.BytesRef;
36 /* Tracks the stream of {@link BuffereDeletes}.
37 * When DocumensWriter flushes, its buffered
38 * deletes are appended to this stream. We later
39 * apply these deletes (resolve them to the actual
40 * docIDs, per segment) when a merge is started
41 * (only to the to-be-merged segments). We
42 * also apply to all segments when NRT reader is pulled,
43 * commit/close is called, or when too many deletes are
44 * buffered and must be flushed (by RAM usage or by count).
46 * Each packet is assigned a generation, and each flushed or
47 * merged segment is also assigned a generation, so we can
48 * track which BufferedDeletes packets to apply to any given
51 class BufferedDeletesStream {
53 // TODO: maybe linked list?
54 private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
56 // Starts at 1 so that SegmentInfos that have never had
57 // deletes applied (whose bufferedDelGen defaults to 0)
59 private long nextGen = 1;
61 // used only by assert
62 private Term lastDeleteTerm;
64 private PrintStream infoStream;
65 private final AtomicLong bytesUsed = new AtomicLong();
66 private final AtomicInteger numTerms = new AtomicInteger();
67 private final int messageID;
69 public BufferedDeletesStream(int messageID) {
70 this.messageID = messageID;
73 private synchronized void message(String message) {
74 if (infoStream != null) {
75 infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
79 public synchronized void setInfoStream(PrintStream infoStream) {
80 this.infoStream = infoStream;
83 // Appends a new packet of buffered deletes to the stream,
84 // setting its generation:
85 public synchronized void push(FrozenBufferedDeletes packet) {
87 assert checkDeleteStats();
88 assert packet.gen < nextGen;
90 numTerms.addAndGet(packet.numTermDeletes);
91 bytesUsed.addAndGet(packet.bytesUsed);
92 if (infoStream != null) {
93 message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
95 assert checkDeleteStats();
98 public synchronized void clear() {
105 public boolean any() {
106 return bytesUsed.get() != 0;
109 public int numTerms() {
110 return numTerms.get();
113 public long bytesUsed() {
114 return bytesUsed.get();
117 public static class ApplyDeletesResult {
118 // True if any actual deletes took place:
119 public final boolean anyDeletes;
121 // Current gen, for the merged segment:
122 public final long gen;
124 // If non-null, contains segments that are 100% deleted
125 public final List<SegmentInfo> allDeleted;
127 ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentInfo> allDeleted) {
128 this.anyDeletes = anyDeletes;
130 this.allDeleted = allDeleted;
134 // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
135 private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() {
136 // @Override -- not until Java 1.6
137 public int compare(SegmentInfo si1, SegmentInfo si2) {
138 final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
141 } else if (cmp < 0) {
149 /** Resolves the buffered deleted Term/Query/docIDs, into
150 * actual deleted docIDs in the deletedDocs BitVector for
151 * each SegmentReader. */
152 public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, List<SegmentInfo> infos) throws IOException {
153 final long t0 = System.currentTimeMillis();
155 if (infos.size() == 0) {
156 return new ApplyDeletesResult(false, nextGen++, null);
159 assert checkDeleteStats();
162 message("applyDeletes: no deletes; skipping");
163 return new ApplyDeletesResult(false, nextGen++, null);
166 if (infoStream != null) {
167 message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size());
170 List<SegmentInfo> infos2 = new ArrayList<SegmentInfo>();
171 infos2.addAll(infos);
172 Collections.sort(infos2, sortByDelGen);
174 CoalescedDeletes coalescedDeletes = null;
175 boolean anyNewDeletes = false;
177 int infosIDX = infos2.size()-1;
178 int delIDX = deletes.size()-1;
180 List<SegmentInfo> allDeleted = null;
182 while (infosIDX >= 0) {
183 //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
185 final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
186 final SegmentInfo info = infos2.get(infosIDX);
187 final long segGen = info.getBufferedDeletesGen();
189 if (packet != null && segGen < packet.gen) {
190 //System.out.println(" coalesce");
191 if (coalescedDeletes == null) {
192 coalescedDeletes = new CoalescedDeletes();
194 coalescedDeletes.update(packet);
196 } else if (packet != null && segGen == packet.gen) {
197 //System.out.println(" eq");
199 // Lock order: IW -> BD -> RP
200 assert readerPool.infoIsLive(info);
201 SegmentReader reader = readerPool.get(info, false);
203 final boolean segAllDeletes;
205 if (coalescedDeletes != null) {
206 //System.out.println(" del coalesced");
207 delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
208 delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
210 //System.out.println(" del exact");
211 // Don't delete by Term here; DocumentsWriter
212 // already did that on flush:
213 delCount += applyQueryDeletes(packet.queriesIterable(), reader);
214 segAllDeletes = reader.numDocs() == 0;
216 readerPool.release(reader);
218 anyNewDeletes |= delCount > 0;
221 if (allDeleted == null) {
222 allDeleted = new ArrayList<SegmentInfo>();
224 allDeleted.add(info);
227 if (infoStream != null) {
228 message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
231 if (coalescedDeletes == null) {
232 coalescedDeletes = new CoalescedDeletes();
234 coalescedDeletes.update(packet);
237 info.setBufferedDeletesGen(nextGen);
240 //System.out.println(" gt");
242 if (coalescedDeletes != null) {
243 // Lock order: IW -> BD -> RP
244 assert readerPool.infoIsLive(info);
245 SegmentReader reader = readerPool.get(info, false);
247 final boolean segAllDeletes;
249 delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
250 delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
251 segAllDeletes = reader.numDocs() == 0;
253 readerPool.release(reader);
255 anyNewDeletes |= delCount > 0;
258 if (allDeleted == null) {
259 allDeleted = new ArrayList<SegmentInfo>();
261 allDeleted.add(info);
264 if (infoStream != null) {
265 message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
268 info.setBufferedDeletesGen(nextGen);
274 assert checkDeleteStats();
275 if (infoStream != null) {
276 message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
278 // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
280 return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
283 public synchronized long getNextGen() {
287 // Lock order IW -> BD
288 /* Removes any BufferedDeletes that we no longer need to
289 * store because all segments in the index have had the
290 * deletes applied. */
291 public synchronized void prune(SegmentInfos segmentInfos) {
292 assert checkDeleteStats();
293 long minGen = Long.MAX_VALUE;
294 for(SegmentInfo info : segmentInfos) {
295 minGen = Math.min(info.getBufferedDeletesGen(), minGen);
298 if (infoStream != null) {
299 message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
302 final int limit = deletes.size();
303 for(int delIDX=0;delIDX<limit;delIDX++) {
304 if (deletes.get(delIDX).gen >= minGen) {
306 assert checkDeleteStats();
311 // All deletes pruned
314 assert checkDeleteStats();
317 private synchronized void prune(int count) {
319 if (infoStream != null) {
320 message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
322 for(int delIDX=0;delIDX<count;delIDX++) {
323 final FrozenBufferedDeletes packet = deletes.get(delIDX);
324 numTerms.addAndGet(-packet.numTermDeletes);
325 assert numTerms.get() >= 0;
326 bytesUsed.addAndGet(-packet.bytesUsed);
327 assert bytesUsed.get() >= 0;
329 deletes.subList(0, count).clear();
334 private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
337 assert checkDeleteTerm(null);
339 final TermDocs docs = reader.termDocs();
341 for (Term term : termsIter) {
343 // Since we visit terms sorted, we gain performance
344 // by re-using the same TermsEnum and seeking only
346 assert checkDeleteTerm(term);
349 while (docs.next()) {
350 final int docID = docs.doc();
351 reader.deleteDocument(docID);
352 // TODO: we could/should change
353 // reader.deleteDocument to return boolean
354 // true if it did in fact delete, because here
355 // we could be deleting an already-deleted doc
356 // which makes this an upper bound:
364 public static class QueryAndLimit {
365 public final Query query;
366 public final int limit;
367 public QueryAndLimit(Query query, int limit) {
374 private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
377 for (QueryAndLimit ent : queriesIter) {
378 Query query = ent.query;
379 int limit = ent.limit;
380 final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(reader);
382 final DocIdSetIterator it = docs.iterator();
385 int doc = it.nextDoc();
389 reader.deleteDocument(doc);
390 // TODO: we could/should change
391 // reader.deleteDocument to return boolean
392 // true if it did in fact delete, because here
393 // we could be deleting an already-deleted doc
394 // which makes this an upper bound:
404 // used only by assert
405 private boolean checkDeleteTerm(Term term) {
407 assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
409 // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
410 lastDeleteTerm = term == null ? null : new Term(term.field(), term.text());
415 private boolean checkDeleteStats() {
418 for(FrozenBufferedDeletes packet : deletes) {
419 numTerms2 += packet.numTermDeletes;
420 bytesUsed2 += packet.bytesUsed;
422 assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
423 assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;