pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / BufferedDeletesStream.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.util.List;
23 import java.util.ArrayList;
24 import java.util.Date;
25 import java.util.Comparator;
26 import java.util.Collections;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.lucene.search.DocIdSet;
31 import org.apache.lucene.search.DocIdSetIterator;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.search.QueryWrapperFilter;
34 import org.apache.lucene.util.BytesRef;
35
36 /* Tracks the stream of {@link BuffereDeletes}.
37  * When DocumensWriter flushes, its buffered
38  * deletes are appended to this stream.  We later
39  * apply these deletes (resolve them to the actual
40  * docIDs, per segment) when a merge is started
41  * (only to the to-be-merged segments).  We
42  * also apply to all segments when NRT reader is pulled,
43  * commit/close is called, or when too many deletes are
44  * buffered and must be flushed (by RAM usage or by count).
45  *
46  * Each packet is assigned a generation, and each flushed or
47  * merged segment is also assigned a generation, so we can
48  * track which BufferedDeletes packets to apply to any given
49  * segment. */
50
51 class BufferedDeletesStream {
52
53   // TODO: maybe linked list?
54   private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
55
56   // Starts at 1 so that SegmentInfos that have never had
57   // deletes applied (whose bufferedDelGen defaults to 0)
58   // will be correct:
59   private long nextGen = 1;
60
61   // used only by assert
62   private Term lastDeleteTerm;
63   
64   private PrintStream infoStream;
65   private final AtomicLong bytesUsed = new AtomicLong();
66   private final AtomicInteger numTerms = new AtomicInteger();
67   private final int messageID;
68
69   public BufferedDeletesStream(int messageID) {
70     this.messageID = messageID;
71   }
72
73   private synchronized void message(String message) {
74     if (infoStream != null) {
75       infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
76     }
77   }
78   
79   public synchronized void setInfoStream(PrintStream infoStream) {
80     this.infoStream = infoStream;
81   }
82
83   // Appends a new packet of buffered deletes to the stream,
84   // setting its generation:
85   public synchronized void push(FrozenBufferedDeletes packet) {
86     assert packet.any();
87     assert checkDeleteStats();    
88     assert packet.gen < nextGen;
89     deletes.add(packet);
90     numTerms.addAndGet(packet.numTermDeletes);
91     bytesUsed.addAndGet(packet.bytesUsed);
92     if (infoStream != null) {
93       message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
94     }
95     assert checkDeleteStats();    
96   }
97     
98   public synchronized void clear() {
99     deletes.clear();
100     nextGen = 1;
101     numTerms.set(0);
102     bytesUsed.set(0);
103   }
104
105   public boolean any() {
106     return bytesUsed.get() != 0;
107   }
108
109   public int numTerms() {
110     return numTerms.get();
111   }
112
113   public long bytesUsed() {
114     return bytesUsed.get();
115   }
116
117   public static class ApplyDeletesResult {
118     // True if any actual deletes took place:
119     public final boolean anyDeletes;
120
121     // Current gen, for the merged segment:
122     public final long gen;
123
124     // If non-null, contains segments that are 100% deleted
125     public final List<SegmentInfo> allDeleted;
126
127     ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentInfo> allDeleted) {
128       this.anyDeletes = anyDeletes;
129       this.gen = gen;
130       this.allDeleted = allDeleted;
131     }
132   }
133
134   // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
135   private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() {
136     // @Override -- not until Java 1.6
137     public int compare(SegmentInfo si1, SegmentInfo si2) {
138       final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
139       if (cmp > 0) {
140         return 1;
141       } else if (cmp < 0) {
142         return -1;
143       } else {
144         return 0;
145       }
146     }
147   };
148
149   /** Resolves the buffered deleted Term/Query/docIDs, into
150    *  actual deleted docIDs in the deletedDocs BitVector for
151    *  each SegmentReader. */
152   public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, List<SegmentInfo> infos) throws IOException {
153     final long t0 = System.currentTimeMillis();
154
155     if (infos.size() == 0) {
156       return new ApplyDeletesResult(false, nextGen++, null);
157     }
158
159     assert checkDeleteStats();
160
161     if (!any()) {
162       message("applyDeletes: no deletes; skipping");
163       return new ApplyDeletesResult(false, nextGen++, null);
164     }
165
166     if (infoStream != null) {
167       message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size());
168     }
169
170     List<SegmentInfo> infos2 = new ArrayList<SegmentInfo>();
171     infos2.addAll(infos);
172     Collections.sort(infos2, sortByDelGen);
173
174     CoalescedDeletes coalescedDeletes = null;
175     boolean anyNewDeletes = false;
176
177     int infosIDX = infos2.size()-1;
178     int delIDX = deletes.size()-1;
179
180     List<SegmentInfo> allDeleted = null;
181
182     while (infosIDX >= 0) {
183       //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
184
185       final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
186       final SegmentInfo info = infos2.get(infosIDX);
187       final long segGen = info.getBufferedDeletesGen();
188
189       if (packet != null && segGen < packet.gen) {
190         //System.out.println("  coalesce");
191         if (coalescedDeletes == null) {
192           coalescedDeletes = new CoalescedDeletes();
193         }
194         coalescedDeletes.update(packet);
195         delIDX--;
196       } else if (packet != null && segGen == packet.gen) {
197         //System.out.println("  eq");
198
199         // Lock order: IW -> BD -> RP
200         assert readerPool.infoIsLive(info);
201         SegmentReader reader = readerPool.get(info, false);
202         int delCount = 0;
203         final boolean segAllDeletes;
204         try {
205           if (coalescedDeletes != null) {
206             //System.out.println("    del coalesced");
207             delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
208             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
209           }
210           //System.out.println("    del exact");
211           // Don't delete by Term here; DocumentsWriter
212           // already did that on flush:
213           delCount += applyQueryDeletes(packet.queriesIterable(), reader);
214           segAllDeletes = reader.numDocs() == 0;
215         } finally {
216           readerPool.release(reader);
217         }
218         anyNewDeletes |= delCount > 0;
219
220         if (segAllDeletes) {
221           if (allDeleted == null) {
222             allDeleted = new ArrayList<SegmentInfo>();
223           }
224           allDeleted.add(info);
225         }
226
227         if (infoStream != null) {
228           message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
229         }
230
231         if (coalescedDeletes == null) {
232           coalescedDeletes = new CoalescedDeletes();
233         }
234         coalescedDeletes.update(packet);
235         delIDX--;
236         infosIDX--;
237         info.setBufferedDeletesGen(nextGen);
238
239       } else {
240         //System.out.println("  gt");
241
242         if (coalescedDeletes != null) {
243           // Lock order: IW -> BD -> RP
244           assert readerPool.infoIsLive(info);
245           SegmentReader reader = readerPool.get(info, false);
246           int delCount = 0;
247           final boolean segAllDeletes;
248           try {
249             delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
250             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
251             segAllDeletes = reader.numDocs() == 0;
252           } finally {
253             readerPool.release(reader);
254           }
255           anyNewDeletes |= delCount > 0;
256
257           if (segAllDeletes) {
258             if (allDeleted == null) {
259               allDeleted = new ArrayList<SegmentInfo>();
260             }
261             allDeleted.add(info);
262           }
263
264           if (infoStream != null) {
265             message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
266           }
267         }
268         info.setBufferedDeletesGen(nextGen);
269
270         infosIDX--;
271       }
272     }
273
274     assert checkDeleteStats();
275     if (infoStream != null) {
276       message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
277     }
278     // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
279     
280     return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
281   }
282
283   public synchronized long getNextGen() {
284     return nextGen++;
285   }
286
287   // Lock order IW -> BD
288   /* Removes any BufferedDeletes that we no longer need to
289    * store because all segments in the index have had the
290    * deletes applied. */
291   public synchronized void prune(SegmentInfos segmentInfos) {
292     assert checkDeleteStats();
293     long minGen = Long.MAX_VALUE;
294     for(SegmentInfo info : segmentInfos) {
295       minGen = Math.min(info.getBufferedDeletesGen(), minGen);
296     }
297
298     if (infoStream != null) {
299       message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
300     }
301
302     final int limit = deletes.size();
303     for(int delIDX=0;delIDX<limit;delIDX++) {
304       if (deletes.get(delIDX).gen >= minGen) {
305         prune(delIDX);
306         assert checkDeleteStats();
307         return;
308       }
309     }
310
311     // All deletes pruned
312     prune(limit);
313     assert !any();
314     assert checkDeleteStats();
315   }
316
317   private synchronized void prune(int count) {
318     if (count > 0) {
319       if (infoStream != null) {
320         message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
321       }
322       for(int delIDX=0;delIDX<count;delIDX++) {
323         final FrozenBufferedDeletes packet = deletes.get(delIDX);
324         numTerms.addAndGet(-packet.numTermDeletes);
325         assert numTerms.get() >= 0;
326         bytesUsed.addAndGet(-packet.bytesUsed);
327         assert bytesUsed.get() >= 0;
328       }
329       deletes.subList(0, count).clear();
330     }
331   }
332
333   // Delete by Term
334   private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
335     long delCount = 0;
336         
337     assert checkDeleteTerm(null);
338     
339     final TermDocs docs = reader.termDocs();
340
341     for (Term term : termsIter) {
342         
343       // Since we visit terms sorted, we gain performance
344       // by re-using the same TermsEnum and seeking only
345       // forwards
346       assert checkDeleteTerm(term);
347       docs.seek(term);
348           
349       while (docs.next()) {
350         final int docID = docs.doc();
351         reader.deleteDocument(docID);
352         // TODO: we could/should change
353         // reader.deleteDocument to return boolean
354         // true if it did in fact delete, because here
355         // we could be deleting an already-deleted doc
356         // which makes this an upper bound:
357         delCount++;
358       }
359     }
360
361     return delCount;
362   }
363
364   public static class QueryAndLimit {
365     public final Query query;
366     public final int limit;
367     public QueryAndLimit(Query query, int limit) {
368       this.query = query;       
369       this.limit = limit;
370     }
371   }
372
373   // Delete by query
374   private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
375     long delCount = 0;
376
377     for (QueryAndLimit ent : queriesIter) {
378       Query query = ent.query;
379       int limit = ent.limit;
380       final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(reader);
381       if (docs != null) {
382         final DocIdSetIterator it = docs.iterator();
383         if (it != null) {
384           while(true)  {
385             int doc = it.nextDoc();
386             if (doc >= limit)
387               break;
388
389             reader.deleteDocument(doc);
390             // TODO: we could/should change
391             // reader.deleteDocument to return boolean
392             // true if it did in fact delete, because here
393             // we could be deleting an already-deleted doc
394             // which makes this an upper bound:
395             delCount++;
396           }
397         }
398       }
399     }
400
401     return delCount;
402   }
403
404   // used only by assert
405   private boolean checkDeleteTerm(Term term) {
406     if (term != null) {
407       assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
408     }
409     // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
410     lastDeleteTerm = term == null ? null : new Term(term.field(), term.text());
411     return true;
412   }
413   
414   // only for assert
415   private boolean checkDeleteStats() {
416     int numTerms2 = 0;
417     long bytesUsed2 = 0;
418     for(FrozenBufferedDeletes packet : deletes) {
419       numTerms2 += packet.numTermDeletes;
420       bytesUsed2 += packet.bytesUsed;
421     }
422     assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
423     assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
424     return true;
425   }
426 }