pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / DocumentsWriter.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.text.NumberFormat;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.lucene.analysis.Analyzer;
31 import org.apache.lucene.document.Document;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.search.Similarity;
34 import org.apache.lucene.store.AlreadyClosedException;
35 import org.apache.lucene.store.Directory;
36 import org.apache.lucene.store.RAMFile;
37 import org.apache.lucene.util.ArrayUtil;
38 import org.apache.lucene.util.BitVector;
39 import org.apache.lucene.util.RamUsageEstimator;
40 import org.apache.lucene.util.ThreadInterruptedException;
41
42
43 /**
44  * This class accepts multiple added documents and directly
45  * writes a single segment file.  It does this more
46  * efficiently than creating a single segment per document
47  * (with DocumentWriter) and doing standard merges on those
48  * segments.
49  *
50  * Each added document is passed to the {@link DocConsumer},
51  * which in turn processes the document and interacts with
52  * other consumers in the indexing chain.  Certain
53  * consumers, like {@link StoredFieldsWriter} and {@link
54  * TermVectorsTermsWriter}, digest a document and
55  * immediately write bytes to the "doc store" files (ie,
56  * they do not consume RAM per document, except while they
57  * are processing the document).
58  *
59  * Other consumers, eg {@link FreqProxTermsWriter} and
60  * {@link NormsWriter}, buffer bytes in RAM and flush only
61  * when a new segment is produced.
62
63  * Once we have used our allowed RAM buffer, or the number
64  * of added docs is large enough (in the case we are
65  * flushing by doc count instead of RAM usage), we create a
66  * real segment and flush it to the Directory.
67  *
68  * Threads:
69  *
70  * Multiple threads are allowed into addDocument at once.
71  * There is an initial synchronized call to getThreadState
72  * which allocates a ThreadState for this thread.  The same
73  * thread will get the same ThreadState over time (thread
74  * affinity) so that if there are consistent patterns (for
75  * example each thread is indexing a different content
76  * source) then we make better use of RAM.  Then
77  * processDocument is called on that ThreadState without
78  * synchronization (most of the "heavy lifting" is in this
79  * call).  Finally the synchronized "finishDocument" is
80  * called to flush changes to the directory.
81  *
82  * When flush is called by IndexWriter we forcefully idle
83  * all threads and flush only once they are all idle.  This
84  * means you can call flush with a given thread even while
85  * other threads are actively adding/deleting documents.
86  *
87  *
88  * Exceptions:
89  *
90  * Because this class directly updates in-memory posting
91  * lists, and flushes stored fields and term vectors
92  * directly to files in the directory, there are certain
93  * limited times when an exception can corrupt this state.
94  * For example, a disk full while flushing stored fields
95  * leaves this file in a corrupt state.  Or, an OOM
96  * exception while appending to the in-memory posting lists
97  * can corrupt that posting list.  We call such exceptions
98  * "aborting exceptions".  In these cases we must call
99  * abort() to discard all docs added since the last flush.
100  *
101  * All other exceptions ("non-aborting exceptions") can
102  * still partially update the index structures.  These
103  * updates are consistent, but, they represent only a part
104  * of the document seen up until the exception was hit.
105  * When this happens, we immediately mark the document as
106  * deleted so that the document is always atomically ("all
107  * or none") added to the index.
108  */
109
110 final class DocumentsWriter {
111   final AtomicLong bytesUsed = new AtomicLong(0);
112   IndexWriter writer;
113   Directory directory;
114
115   String segment;                         // Current segment we are working on
116
117   private int nextDocID;                  // Next docID to be added
118   private int numDocs;                    // # of docs added, but not yet flushed
119
120   // Max # ThreadState instances; if there are more threads
121   // than this they share ThreadStates
122   private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
123   private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
124
125   boolean bufferIsFull;                   // True when it's time to write segment
126   private boolean aborting;               // True if an abort is pending
127
128   PrintStream infoStream;
129   int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
130   Similarity similarity;
131
132   // max # simultaneous threads; if there are more than
133   // this, they wait for others to finish first
134   private final int maxThreadStates;
135
136   // Deletes for our still-in-RAM (to be flushed next) segment
137   private BufferedDeletes pendingDeletes = new BufferedDeletes();
138   
139   static class DocState {
140     DocumentsWriter docWriter;
141     Analyzer analyzer;
142     int maxFieldLength;
143     PrintStream infoStream;
144     Similarity similarity;
145     int docID;
146     Document doc;
147     String maxTermPrefix;
148
149     // Only called by asserts
150     public boolean testPoint(String name) {
151       return docWriter.writer.testPoint(name);
152     }
153
154     public void clear() {
155       // don't hold onto doc nor analyzer, in case it is
156       // largish:
157       doc = null;
158       analyzer = null;
159     }
160   }
161
162   /** Consumer returns this on each doc.  This holds any
163    *  state that must be flushed synchronized "in docID
164    *  order".  We gather these and flush them in order. */
165   abstract static class DocWriter {
166     DocWriter next;
167     int docID;
168     abstract void finish() throws IOException;
169     abstract void abort();
170     abstract long sizeInBytes();
171
172     void setNext(DocWriter next) {
173       this.next = next;
174     }
175   }
176
177   /**
178    * Create and return a new DocWriterBuffer.
179    */
180   PerDocBuffer newPerDocBuffer() {
181     return new PerDocBuffer();
182   }
183
184   /**
185    * RAMFile buffer for DocWriters.
186    */
187   class PerDocBuffer extends RAMFile {
188     
189     /**
190      * Allocate bytes used from shared pool.
191      */
192     @Override
193     protected byte[] newBuffer(int size) {
194       assert size == PER_DOC_BLOCK_SIZE;
195       return perDocAllocator.getByteBlock();
196     }
197     
198     /**
199      * Recycle the bytes used.
200      */
201     synchronized void recycle() {
202       if (buffers.size() > 0) {
203         setLength(0);
204         
205         // Recycle the blocks
206         perDocAllocator.recycleByteBlocks(buffers);
207         buffers.clear();
208         sizeInBytes = 0;
209         
210         assert numBuffers() == 0;
211       }
212     }
213   }
214   
215   /**
216    * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
217    * which returns the DocConsumer that the DocumentsWriter calls to process the
218    * documents. 
219    */
220   abstract static class IndexingChain {
221     abstract DocConsumer getChain(DocumentsWriter documentsWriter);
222   }
223   
224   static final IndexingChain defaultIndexingChain = new IndexingChain() {
225
226     @Override
227     DocConsumer getChain(DocumentsWriter documentsWriter) {
228       /*
229       This is the current indexing chain:
230
231       DocConsumer / DocConsumerPerThread
232         --> code: DocFieldProcessor / DocFieldProcessorPerThread
233           --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
234             --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
235               --> code: DocInverter / DocInverterPerThread / DocInverterPerField
236                 --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
237                   --> code: TermsHash / TermsHashPerThread / TermsHashPerField
238                     --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
239                       --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
240                       --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
241                 --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
242                   --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
243               --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
244     */
245
246     // Build up indexing chain:
247
248       final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
249       final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
250
251       final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
252                                                            new TermsHash(documentsWriter, false, termVectorsWriter, null));
253       final NormsWriter normsWriter = new NormsWriter();
254       final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
255       return new DocFieldProcessor(documentsWriter, docInverter);
256     }
257   };
258
259   final DocConsumer consumer;
260
261   // How much RAM we can use before flushing.  This is 0 if
262   // we are flushing by doc count instead.
263
264   private final IndexWriterConfig config;
265
266   private boolean closed;
267   private final FieldInfos fieldInfos;
268
269   private final BufferedDeletesStream bufferedDeletesStream;
270   private final IndexWriter.FlushControl flushControl;
271
272   DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
273     this.directory = directory;
274     this.writer = writer;
275     this.similarity = config.getSimilarity();
276     this.maxThreadStates = config.getMaxThreadStates();
277     this.fieldInfos = fieldInfos;
278     this.bufferedDeletesStream = bufferedDeletesStream;
279     flushControl = writer.flushControl;
280
281     consumer = config.getIndexingChain().getChain(this);
282     this.config = config;
283   }
284
285   // Buffer a specific docID for deletion.  Currently only
286   // used when we hit a exception when adding a document
287   synchronized void deleteDocID(int docIDUpto) {
288     pendingDeletes.addDocID(docIDUpto);
289     // NOTE: we do not trigger flush here.  This is
290     // potentially a RAM leak, if you have an app that tries
291     // to add docs but every single doc always hits a
292     // non-aborting exception.  Allowing a flush here gets
293     // very messy because we are only invoked when handling
294     // exceptions so to do this properly, while handling an
295     // exception we'd have to go off and flush new deletes
296     // which is risky (likely would hit some other
297     // confounding exception).
298   }
299   
300   boolean deleteQueries(Query... queries) {
301     final boolean doFlush = flushControl.waitUpdate(0, queries.length);
302     synchronized(this) {
303       for (Query query : queries) {
304         pendingDeletes.addQuery(query, numDocs);
305       }
306     }
307     return doFlush;
308   }
309   
310   boolean deleteQuery(Query query) { 
311     final boolean doFlush = flushControl.waitUpdate(0, 1);
312     synchronized(this) {
313       pendingDeletes.addQuery(query, numDocs);
314     }
315     return doFlush;
316   }
317   
318   boolean deleteTerms(Term... terms) {
319     final boolean doFlush = flushControl.waitUpdate(0, terms.length);
320     synchronized(this) {
321       for (Term term : terms) {
322         pendingDeletes.addTerm(term, numDocs);
323       }
324     }
325     return doFlush;
326   }
327
328   // TODO: we could check w/ FreqProxTermsWriter: if the
329   // term doesn't exist, don't bother buffering into the
330   // per-DWPT map (but still must go into the global map)
331   boolean deleteTerm(Term term, boolean skipWait) {
332     final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
333     synchronized(this) {
334       pendingDeletes.addTerm(term, numDocs);
335     }
336     return doFlush;
337   }
338
339   public FieldInfos getFieldInfos() {
340     return fieldInfos;
341   }
342
343   /** If non-null, various details of indexing are printed
344    *  here. */
345   synchronized void setInfoStream(PrintStream infoStream) {
346     this.infoStream = infoStream;
347     for(int i=0;i<threadStates.length;i++) {
348       threadStates[i].docState.infoStream = infoStream;
349     }
350   }
351
352   synchronized void setMaxFieldLength(int maxFieldLength) {
353     this.maxFieldLength = maxFieldLength;
354     for(int i=0;i<threadStates.length;i++) {
355       threadStates[i].docState.maxFieldLength = maxFieldLength;
356     }
357   }
358
359   synchronized void setSimilarity(Similarity similarity) {
360     this.similarity = similarity;
361     for(int i=0;i<threadStates.length;i++) {
362       threadStates[i].docState.similarity = similarity;
363     }
364   }
365
366   /** Get current segment name we are writing. */
367   synchronized String getSegment() {
368     return segment;
369   }
370
371   /** Returns how many docs are currently buffered in RAM. */
372   synchronized int getNumDocs() {
373     return numDocs;
374   }
375
376   void message(String message) {
377     if (infoStream != null) {
378       writer.message("DW: " + message);
379     }
380   }
381
382   synchronized void setAborting() {
383     if (infoStream != null) {
384       message("setAborting");
385     }
386     aborting = true;
387   }
388
389   /** Called if we hit an exception at a bad time (when
390    *  updating the index files) and must discard all
391    *  currently buffered docs.  This resets our state,
392    *  discarding any docs added since last flush. */
393   synchronized void abort() throws IOException {
394     if (infoStream != null) {
395       message("docWriter: abort");
396     }
397
398     boolean success = false;
399
400     try {
401
402       // Forcefully remove waiting ThreadStates from line
403       try {
404         waitQueue.abort();
405       } catch (Throwable t) {
406       }
407
408       // Wait for all other threads to finish with
409       // DocumentsWriter:
410       try {
411         waitIdle();
412       } finally {
413         if (infoStream != null) {
414           message("docWriter: abort waitIdle done");
415         }
416         
417         assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
418         waitQueue.waitingBytes = 0;
419         
420         pendingDeletes.clear();
421         
422         for (DocumentsWriterThreadState threadState : threadStates) {
423           try {
424             threadState.consumer.abort();
425           } catch (Throwable t) {
426           }
427         }
428           
429         try {
430           consumer.abort();
431         } catch (Throwable t) {
432         }
433         
434         // Reset all postings data
435         doAfterFlush();
436       }
437
438       success = true;
439     } finally {
440       aborting = false;
441       notifyAll();
442       if (infoStream != null) {
443         message("docWriter: done abort; success=" + success);
444       }
445     }
446   }
447
448   /** Reset after a flush */
449   private void doAfterFlush() throws IOException {
450     // All ThreadStates should be idle when we are called
451     assert allThreadsIdle();
452     threadBindings.clear();
453     waitQueue.reset();
454     segment = null;
455     numDocs = 0;
456     nextDocID = 0;
457     bufferIsFull = false;
458     for(int i=0;i<threadStates.length;i++) {
459       threadStates[i].doAfterFlush();
460     }
461   }
462
463   private synchronized boolean allThreadsIdle() {
464     for(int i=0;i<threadStates.length;i++) {
465       if (!threadStates[i].isIdle) {
466         return false;
467       }
468     }
469     return true;
470   }
471
472   synchronized boolean anyChanges() {
473     return numDocs != 0 || pendingDeletes.any();
474   }
475
476   // for testing
477   public BufferedDeletes getPendingDeletes() {
478     return pendingDeletes;
479   }
480
481   private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
482     // Lock order: DW -> BD
483     final long delGen = bufferedDeletesStream.getNextGen();
484     if (pendingDeletes.any()) {
485       if (segmentInfos.size() > 0 || newSegment != null) {
486         final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
487         if (infoStream != null) {
488           message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
489         }
490         bufferedDeletesStream.push(packet);
491         if (infoStream != null) {
492           message("flush: delGen=" + packet.gen);
493         }
494         if (newSegment != null) {
495           newSegment.setBufferedDeletesGen(packet.gen);
496         }
497       } else {
498         if (infoStream != null) {
499           message("flush: drop buffered deletes: no segments");
500         }
501         // We can safely discard these deletes: since
502         // there are no segments, the deletions cannot
503         // affect anything.
504       }
505       pendingDeletes.clear();
506     } else if (newSegment != null) {
507       newSegment.setBufferedDeletesGen(delGen);
508     }
509   }
510
511   public boolean anyDeletions() {
512     return pendingDeletes.any();
513   }
514
515   /** Flush all pending docs to a new segment */
516   // Lock order: IW -> DW
517   synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
518
519     final long startTime = System.currentTimeMillis();
520
521     // We change writer's segmentInfos:
522     assert Thread.holdsLock(writer);
523
524     waitIdle();
525
526     if (numDocs == 0) {
527       // nothing to do!
528       if (infoStream != null) {
529         message("flush: no docs; skipping");
530       }
531       // Lock order: IW -> DW -> BD
532       pushDeletes(null, segmentInfos);
533       return null;
534     }
535
536     if (aborting) {
537       if (infoStream != null) {
538         message("flush: skip because aborting is set");
539       }
540       return null;
541     }
542
543     boolean success = false;
544
545     SegmentInfo newSegment;
546
547     try {
548       //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
549       assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
550       assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
551       assert waitQueue.waitingBytes == 0;
552
553       if (infoStream != null) {
554         message("flush postings as segment " + segment + " numDocs=" + numDocs);
555       }
556
557       final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
558                                                                  numDocs, writer.getConfig().getTermIndexInterval(),
559                                                                  pendingDeletes);
560       // Apply delete-by-docID now (delete-byDocID only
561       // happens when an exception is hit processing that
562       // doc, eg if analyzer has some problem w/ the text):
563       if (pendingDeletes.docIDs.size() > 0) {
564         flushState.deletedDocs = new BitVector(numDocs);
565         for(int delDocID : pendingDeletes.docIDs) {
566           flushState.deletedDocs.set(delDocID);
567         }
568         pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
569         pendingDeletes.docIDs.clear();
570       }
571
572       newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
573
574       Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
575       for (DocumentsWriterThreadState threadState : threadStates) {
576         threads.add(threadState.consumer);
577       }
578
579       double startMBUsed = bytesUsed()/1024./1024.;
580
581       consumer.flush(threads, flushState);
582
583       newSegment.setHasVectors(flushState.hasVectors);
584
585       if (infoStream != null) {
586         message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
587         if (flushState.deletedDocs != null) {
588           message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
589         }
590         message("flushedFiles=" + newSegment.files());
591       }
592
593       if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
594         final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
595
596         if (infoStream != null) {
597           message("flush: create compound file \"" + cfsFileName + "\"");
598         }
599
600         CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
601         for(String fileName : newSegment.files()) {
602           cfsWriter.addFile(fileName);
603         }
604         cfsWriter.close();
605         deleter.deleteNewFiles(newSegment.files());
606         newSegment.setUseCompoundFile(true);
607       }
608
609       // Must write deleted docs after the CFS so we don't
610       // slurp the del file into CFS:
611       if (flushState.deletedDocs != null) {
612         final int delCount = flushState.deletedDocs.count();
613         assert delCount > 0;
614         newSegment.setDelCount(delCount);
615         newSegment.advanceDelGen();
616         final String delFileName = newSegment.getDelFileName();
617         if (infoStream != null) {
618           message("flush: write " + delCount + " deletes to " + delFileName);
619         }
620         boolean success2 = false;
621         try {
622           // TODO: in the NRT case it'd be better to hand
623           // this del vector over to the
624           // shortly-to-be-opened SegmentReader and let it
625           // carry the changes; there's no reason to use
626           // filesystem as intermediary here.
627           flushState.deletedDocs.write(directory, delFileName);
628           success2 = true;
629         } finally {
630           if (!success2) {
631             try {
632               directory.deleteFile(delFileName);
633             } catch (Throwable t) {
634               // suppress this so we keep throwing the
635               // original exception
636             }
637           }
638         }
639       }
640
641       if (infoStream != null) {
642         message("flush: segment=" + newSegment);
643         final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
644         final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
645         message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
646                 " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
647                 " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
648                 " docs/MB=" + nf.format(numDocs / newSegmentSize) +
649                 " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
650       }
651
652       success = true;
653     } finally {
654       notifyAll();
655       if (!success) {
656         if (segment != null) {
657           deleter.refresh(segment);
658         }
659         abort();
660       }
661     }
662
663     doAfterFlush();
664
665     // Lock order: IW -> DW -> BD
666     pushDeletes(newSegment, segmentInfos);
667     if (infoStream != null) {
668       message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
669     }
670
671     return newSegment;
672   }
673
674   synchronized void close() {
675     closed = true;
676     notifyAll();
677   }
678
679   /** Returns a free (idle) ThreadState that may be used for
680    * indexing this one document.  This call also pauses if a
681    * flush is pending.  If delTerm is non-null then we
682    * buffer this deleted term after the thread state has
683    * been acquired. */
684   synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
685
686     final Thread currentThread = Thread.currentThread();
687     assert !Thread.holdsLock(writer);
688
689     // First, find a thread state.  If this thread already
690     // has affinity to a specific ThreadState, use that one
691     // again.
692     DocumentsWriterThreadState state = threadBindings.get(currentThread);
693     if (state == null) {
694
695       // First time this thread has called us since last
696       // flush.  Find the least loaded thread state:
697       DocumentsWriterThreadState minThreadState = null;
698       for(int i=0;i<threadStates.length;i++) {
699         DocumentsWriterThreadState ts = threadStates[i];
700         if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
701           minThreadState = ts;
702         }
703       }
704       if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
705         state = minThreadState;
706         state.numThreads++;
707       } else {
708         // Just create a new "private" thread state
709         DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
710         if (threadStates.length > 0) {
711           System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
712         }
713         state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
714         threadStates = newArray;
715       }
716       threadBindings.put(currentThread, state);
717     }
718
719     // Next, wait until my thread state is idle (in case
720     // it's shared with other threads), and no flush/abort
721     // pending 
722     waitReady(state);
723
724     // Allocate segment name if this is the first doc since
725     // last flush:
726     if (segment == null) {
727       segment = writer.newSegmentName();
728       assert numDocs == 0;
729     }
730
731     state.docState.docID = nextDocID;
732     nextDocID += docCount;
733
734     if (delTerm != null) {
735       pendingDeletes.addTerm(delTerm, state.docState.docID);
736     }
737
738     numDocs += docCount;
739     state.isIdle = false;
740     return state;
741   }
742   
743   boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
744     return updateDocument(doc, analyzer, null);
745   }
746   
747   boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
748     throws CorruptIndexException, IOException {
749
750     // Possibly trigger a flush, or wait until any running flush completes:
751     boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
752
753     // This call is synchronized but fast
754     final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
755
756     final DocState docState = state.docState;
757     docState.doc = doc;
758     docState.analyzer = analyzer;
759
760     boolean success = false;
761     try {
762       // This call is not synchronized and does all the
763       // work
764       final DocWriter perDoc;
765       try {
766         perDoc = state.consumer.processDocument();
767       } finally {
768         docState.clear();
769       }
770
771       // This call is synchronized but fast
772       finishDocument(state, perDoc);
773
774       success = true;
775     } finally {
776       if (!success) {
777
778         // If this thread state had decided to flush, we
779         // must clear it so another thread can flush
780         if (doFlush) {
781           flushControl.clearFlushPending();
782         }
783
784         if (infoStream != null) {
785           message("exception in updateDocument aborting=" + aborting);
786         }
787
788         synchronized(this) {
789
790           state.isIdle = true;
791           notifyAll();
792             
793           if (aborting) {
794             abort();
795           } else {
796             skipDocWriter.docID = docState.docID;
797             boolean success2 = false;
798             try {
799               waitQueue.add(skipDocWriter);
800               success2 = true;
801             } finally {
802               if (!success2) {
803                 abort();
804                 return false;
805               }
806             }
807
808             // Immediately mark this document as deleted
809             // since likely it was partially added.  This
810             // keeps indexing as "all or none" (atomic) when
811             // adding a document:
812             deleteDocID(state.docState.docID);
813           }
814         }
815       }
816     }
817
818     doFlush |= flushControl.flushByRAMUsage("new document");
819
820     return doFlush;
821   }
822
823   boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
824     throws CorruptIndexException, IOException {
825
826     // Possibly trigger a flush, or wait until any running flush completes:
827     boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
828
829     final int docCount = docs.size();
830
831     // This call is synchronized but fast -- we allocate the
832     // N docIDs up front:
833     final DocumentsWriterThreadState state = getThreadState(null, docCount);
834     final DocState docState = state.docState;
835
836     final int startDocID = docState.docID;
837     int docID = startDocID;
838
839     //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
840     for(Document doc : docs) {
841       docState.doc = doc;
842       docState.analyzer = analyzer;
843       // Assign next docID from our block:
844       docState.docID = docID++;
845       
846       boolean success = false;
847       try {
848         // This call is not synchronized and does all the
849         // work
850         final DocWriter perDoc;
851         try {
852           perDoc = state.consumer.processDocument();
853         } finally {
854           docState.clear();
855         }
856
857         // Must call this w/o holding synchronized(this) else
858         // we'll hit deadlock:
859         balanceRAM();
860
861         // Synchronized but fast
862         synchronized(this) {
863           if (aborting) {
864             break;
865           }
866           assert perDoc == null || perDoc.docID == docState.docID;
867           final boolean doPause;
868           if (perDoc != null) {
869             waitQueue.add(perDoc);
870           } else {
871             skipDocWriter.docID = docState.docID;
872             waitQueue.add(skipDocWriter);
873           }
874         }
875
876         success = true;
877       } finally {
878         if (!success) {
879           //System.out.println(Thread.currentThread().getName() + ": E");
880
881           // If this thread state had decided to flush, we
882           // must clear it so another thread can flush
883           if (doFlush) {
884             message("clearFlushPending!");
885             flushControl.clearFlushPending();
886           }
887
888           if (infoStream != null) {
889             message("exception in updateDocuments aborting=" + aborting);
890           }
891
892           synchronized(this) {
893
894             state.isIdle = true;
895             notifyAll();
896
897             if (aborting) {
898               abort();
899             } else {
900
901               // Fill hole in the doc stores for all
902               // docIDs we pre-allocated
903               //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
904               final int endDocID = startDocID + docCount;
905               docID = docState.docID;
906               while(docID < endDocID) {
907                 skipDocWriter.docID = docID++;
908                 boolean success2 = false;
909                 try {
910                   waitQueue.add(skipDocWriter);
911                   success2 = true;
912                 } finally {
913                   if (!success2) {
914                     abort();
915                     return false;
916                   }
917                 }
918               }
919               //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");
920
921               // Mark all pre-allocated docIDs as deleted:
922               docID = startDocID;
923               while(docID < startDocID + docs.size()) {
924                 deleteDocID(docID++);
925               }
926             }
927           }
928         }
929       }
930     }
931
932     synchronized(this) {
933       // We must delay pausing until the full doc block is
934       // added, else we can hit deadlock if more than one
935       // thread is adding a block and we need to pause when
936       // both are only part way done:
937       if (waitQueue.doPause()) {
938         waitForWaitQueue();
939       }
940
941       //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);
942
943       if (aborting) {
944
945         // We are currently aborting, and another thread is
946         // waiting for me to become idle.  We just forcefully
947         // idle this threadState; it will be fully reset by
948         // abort()
949         state.isIdle = true;
950
951         // wakes up any threads waiting on the wait queue
952         notifyAll();
953
954         abort();
955
956         if (doFlush) {
957           message("clearFlushPending!");
958           flushControl.clearFlushPending();
959         }
960
961         return false;
962       }
963
964       // Apply delTerm only after all indexing has
965       // succeeded, but apply it only to docs prior to when
966       // this batch started:
967       if (delTerm != null) {
968         pendingDeletes.addTerm(delTerm, startDocID);
969       }
970
971       state.isIdle = true;
972
973       // wakes up any threads waiting on the wait queue
974       notifyAll();
975     }
976
977     doFlush |= flushControl.flushByRAMUsage("new document");
978
979     //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
980     return doFlush;
981   }
982
983   public synchronized void waitIdle() {
984     while (!allThreadsIdle()) {
985       try {
986         wait();
987       } catch (InterruptedException ie) {
988         throw new ThreadInterruptedException(ie);
989       }
990     }
991   }
992
993   synchronized void waitReady(DocumentsWriterThreadState state) {
994     while (!closed && (!state.isIdle || aborting)) {
995       try {
996         wait();
997       } catch (InterruptedException ie) {
998         throw new ThreadInterruptedException(ie);
999       }
1000     }
1001
1002     if (closed) {
1003       throw new AlreadyClosedException("this IndexWriter is closed");
1004     }
1005   }
1006
1007   /** Does the synchronized work to finish/flush the
1008    *  inverted document. */
1009   private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
1010
1011     // Must call this w/o holding synchronized(this) else
1012     // we'll hit deadlock:
1013     balanceRAM();
1014
1015     synchronized(this) {
1016
1017       assert docWriter == null || docWriter.docID == perThread.docState.docID;
1018
1019       if (aborting) {
1020
1021         // We are currently aborting, and another thread is
1022         // waiting for me to become idle.  We just forcefully
1023         // idle this threadState; it will be fully reset by
1024         // abort()
1025         if (docWriter != null) {
1026           try {
1027             docWriter.abort();
1028           } catch (Throwable t) {
1029           }
1030         }
1031
1032         perThread.isIdle = true;
1033
1034         // wakes up any threads waiting on the wait queue
1035         notifyAll();
1036
1037         return;
1038       }
1039
1040       final boolean doPause;
1041
1042       if (docWriter != null) {
1043         doPause = waitQueue.add(docWriter);
1044       } else {
1045         skipDocWriter.docID = perThread.docState.docID;
1046         doPause = waitQueue.add(skipDocWriter);
1047       }
1048
1049       if (doPause) {
1050         waitForWaitQueue();
1051       }
1052
1053       perThread.isIdle = true;
1054
1055       // wakes up any threads waiting on the wait queue
1056       notifyAll();
1057     }
1058   }
1059
1060   synchronized void waitForWaitQueue() {
1061     do {
1062       try {
1063         wait();
1064       } catch (InterruptedException ie) {
1065         throw new ThreadInterruptedException(ie);
1066       }
1067     } while (!waitQueue.doResume());
1068   }
1069
1070   private static class SkipDocWriter extends DocWriter {
1071     @Override
1072     void finish() {
1073     }
1074     @Override
1075     void abort() {
1076     }
1077     @Override
1078     long sizeInBytes() {
1079       return 0;
1080     }
1081   }
1082   final SkipDocWriter skipDocWriter = new SkipDocWriter();
1083
1084   NumberFormat nf = NumberFormat.getInstance();
1085
1086   /* Initial chunks size of the shared byte[] blocks used to
1087      store postings data */
1088   final static int BYTE_BLOCK_SHIFT = 15;
1089   final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
1090   final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
1091   final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
1092
1093   private class ByteBlockAllocator extends ByteBlockPool.Allocator {
1094     final int blockSize;
1095
1096     ByteBlockAllocator(int blockSize) {
1097       this.blockSize = blockSize;
1098     }
1099
1100     ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
1101     
1102     /* Allocate another byte[] from the shared pool */
1103     @Override
1104     byte[] getByteBlock() {
1105       synchronized(DocumentsWriter.this) {
1106         final int size = freeByteBlocks.size();
1107         final byte[] b;
1108         if (0 == size) {
1109           b = new byte[blockSize];
1110           bytesUsed.addAndGet(blockSize);
1111         } else
1112           b = freeByteBlocks.remove(size-1);
1113         return b;
1114       }
1115     }
1116
1117     /* Return byte[]'s to the pool */
1118
1119     @Override
1120     void recycleByteBlocks(byte[][] blocks, int start, int end) {
1121       synchronized(DocumentsWriter.this) {
1122         for(int i=start;i<end;i++) {
1123           freeByteBlocks.add(blocks[i]);
1124           blocks[i] = null;
1125         }
1126       }
1127     }
1128
1129     @Override
1130     void recycleByteBlocks(List<byte[]> blocks) {
1131       synchronized(DocumentsWriter.this) {
1132         final int size = blocks.size();
1133         for(int i=0;i<size;i++) {
1134           freeByteBlocks.add(blocks.get(i));
1135           blocks.set(i, null);
1136         }
1137       }
1138     }
1139   }
1140
1141   /* Initial chunks size of the shared int[] blocks used to
1142      store postings data */
1143   final static int INT_BLOCK_SHIFT = 13;
1144   final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
1145   final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
1146
1147   private List<int[]> freeIntBlocks = new ArrayList<int[]>();
1148
1149   /* Allocate another int[] from the shared pool */
1150   synchronized int[] getIntBlock() {
1151     final int size = freeIntBlocks.size();
1152     final int[] b;
1153     if (0 == size) {
1154       b = new int[INT_BLOCK_SIZE];
1155       bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
1156     } else {
1157       b = freeIntBlocks.remove(size-1);
1158     }
1159     return b;
1160   }
1161
1162   synchronized void bytesUsed(long numBytes) {
1163     bytesUsed.addAndGet(numBytes);
1164   }
1165
1166   long bytesUsed() {
1167     return bytesUsed.get() + pendingDeletes.bytesUsed.get();
1168   }
1169
1170   /* Return int[]s to the pool */
1171   synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
1172     for(int i=start;i<end;i++) {
1173       freeIntBlocks.add(blocks[i]);
1174       blocks[i] = null;
1175     }
1176   }
1177
1178   ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
1179
1180   final static int PER_DOC_BLOCK_SIZE = 1024;
1181
1182   final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
1183
1184
1185   /* Initial chunk size of the shared char[] blocks used to
1186      store term text */
1187   final static int CHAR_BLOCK_SHIFT = 14;
1188   final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
1189   final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
1190
1191   final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
1192
1193   private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
1194
1195   /* Allocate another char[] from the shared pool */
1196   synchronized char[] getCharBlock() {
1197     final int size = freeCharBlocks.size();
1198     final char[] c;
1199     if (0 == size) {
1200       bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1201       c = new char[CHAR_BLOCK_SIZE];
1202     } else
1203       c = freeCharBlocks.remove(size-1);
1204     // We always track allocations of char blocks, for now,
1205     // because nothing that skips allocation tracking
1206     // (currently only term vectors) uses its own char
1207     // blocks.
1208     return c;
1209   }
1210
1211   /* Return char[]s to the pool */
1212   synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
1213     for(int i=0;i<numBlocks;i++) {
1214       freeCharBlocks.add(blocks[i]);
1215       blocks[i] = null;
1216     }
1217   }
1218
1219   String toMB(long v) {
1220     return nf.format(v/1024./1024.);
1221   }
1222
1223   /* We have four pools of RAM: Postings, byte blocks
1224    * (holds freq/prox posting data), char blocks (holds
1225    * characters in the term) and per-doc buffers (stored fields/term vectors).  
1226    * Different docs require varying amount of storage from 
1227    * these four classes.
1228    * 
1229    * For example, docs with many unique single-occurrence
1230    * short terms will use up the Postings RAM and hardly any
1231    * of the other two.  Whereas docs with very large terms
1232    * will use alot of char blocks RAM and relatively less of
1233    * the other two.  This method just frees allocations from
1234    * the pools once we are over-budget, which balances the
1235    * pools to match the current docs. */
1236   void balanceRAM() {
1237
1238     final boolean doBalance;
1239     final long deletesRAMUsed;
1240
1241     deletesRAMUsed = bufferedDeletesStream.bytesUsed();
1242
1243     final long ramBufferSize;
1244     final double mb = config.getRAMBufferSizeMB();
1245     if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1246       ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
1247     } else {
1248       ramBufferSize = (long) (mb*1024*1024);
1249     }
1250
1251     synchronized(this) {
1252       if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
1253         return;
1254       }
1255     
1256       doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
1257     }
1258
1259     if (doBalance) {
1260
1261       if (infoStream != null) {
1262         message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
1263                 " vs trigger=" + toMB(ramBufferSize) +
1264                 " deletesMB=" + toMB(deletesRAMUsed) +
1265                 " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
1266                 " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
1267                 " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
1268       }
1269
1270       final long startBytesUsed = bytesUsed() + deletesRAMUsed;
1271
1272       int iter = 0;
1273
1274       // We free equally from each pool in 32 KB
1275       // chunks until we are below our threshold
1276       // (freeLevel)
1277
1278       boolean any = true;
1279
1280       final long freeLevel = (long) (0.95 * ramBufferSize);
1281
1282       while(bytesUsed()+deletesRAMUsed > freeLevel) {
1283       
1284         synchronized(this) {
1285           if (0 == perDocAllocator.freeByteBlocks.size() 
1286               && 0 == byteBlockAllocator.freeByteBlocks.size() 
1287               && 0 == freeCharBlocks.size() 
1288               && 0 == freeIntBlocks.size() 
1289               && !any) {
1290             // Nothing else to free -- must flush now.
1291             bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
1292             if (infoStream != null) {
1293               if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
1294                 message("    nothing to free; set bufferIsFull");
1295               } else {
1296                 message("    nothing to free");
1297               }
1298             }
1299             break;
1300           }
1301
1302           if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
1303             byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
1304             bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
1305           }
1306
1307           if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
1308             freeCharBlocks.remove(freeCharBlocks.size()-1);
1309             bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1310           }
1311
1312           if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
1313             freeIntBlocks.remove(freeIntBlocks.size()-1);
1314             bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
1315           }
1316
1317           if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
1318             // Remove upwards of 32 blocks (each block is 1K)
1319             for (int i = 0; i < 32; ++i) {
1320               perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
1321               bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
1322               if (perDocAllocator.freeByteBlocks.size() == 0) {
1323                 break;
1324               }
1325             }
1326           }
1327         }
1328
1329         if ((4 == iter % 5) && any) {
1330           // Ask consumer to free any recycled state
1331           any = consumer.freeRAM();
1332         }
1333
1334         iter++;
1335       }
1336
1337       if (infoStream != null) {
1338         message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
1339       }
1340     }
1341   }
1342
1343   final WaitQueue waitQueue = new WaitQueue();
1344
1345   private class WaitQueue {
1346     DocWriter[] waiting;
1347     int nextWriteDocID;
1348     int nextWriteLoc;
1349     int numWaiting;
1350     long waitingBytes;
1351
1352     public WaitQueue() {
1353       waiting = new DocWriter[10];
1354     }
1355
1356     synchronized void reset() {
1357       // NOTE: nextWriteLoc doesn't need to be reset
1358       assert numWaiting == 0;
1359       assert waitingBytes == 0;
1360       nextWriteDocID = 0;
1361     }
1362
1363     synchronized boolean doResume() {
1364       final double mb = config.getRAMBufferSizeMB();
1365       final long waitQueueResumeBytes;
1366       if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1367         waitQueueResumeBytes = 2*1024*1024;
1368       } else {
1369         waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
1370       }
1371       return waitingBytes <= waitQueueResumeBytes;
1372     }
1373
1374     synchronized boolean doPause() {
1375       final double mb = config.getRAMBufferSizeMB();
1376       final long waitQueuePauseBytes;
1377       if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1378         waitQueuePauseBytes = 4*1024*1024;
1379       } else {
1380         waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
1381       }
1382       return waitingBytes > waitQueuePauseBytes;
1383     }
1384
1385     synchronized void abort() {
1386       int count = 0;
1387       for(int i=0;i<waiting.length;i++) {
1388         final DocWriter doc = waiting[i];
1389         if (doc != null) {
1390           doc.abort();
1391           waiting[i] = null;
1392           count++;
1393         }
1394       }
1395       waitingBytes = 0;
1396       assert count == numWaiting;
1397       numWaiting = 0;
1398     }
1399
1400     private void writeDocument(DocWriter doc) throws IOException {
1401       assert doc == skipDocWriter || nextWriteDocID == doc.docID;
1402       boolean success = false;
1403       try {
1404         doc.finish();
1405         nextWriteDocID++;
1406         nextWriteLoc++;
1407         assert nextWriteLoc <= waiting.length;
1408         if (nextWriteLoc == waiting.length) {
1409           nextWriteLoc = 0;
1410         }
1411         success = true;
1412       } finally {
1413         if (!success) {
1414           setAborting();
1415         }
1416       }
1417     }
1418
1419     synchronized public boolean add(DocWriter doc) throws IOException {
1420
1421       assert doc.docID >= nextWriteDocID;
1422
1423       if (doc.docID == nextWriteDocID) {
1424         writeDocument(doc);
1425         while(true) {
1426           doc = waiting[nextWriteLoc];
1427           if (doc != null) {
1428             numWaiting--;
1429             waiting[nextWriteLoc] = null;
1430             waitingBytes -= doc.sizeInBytes();
1431             writeDocument(doc);
1432           } else {
1433             break;
1434           }
1435         }
1436       } else {
1437
1438         // I finished before documents that were added
1439         // before me.  This can easily happen when I am a
1440         // small doc and the docs before me were large, or,
1441         // just due to luck in the thread scheduling.  Just
1442         // add myself to the queue and when that large doc
1443         // finishes, it will flush me:
1444         int gap = doc.docID - nextWriteDocID;
1445         if (gap >= waiting.length) {
1446           // Grow queue
1447           DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
1448           assert nextWriteLoc >= 0;
1449           System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
1450           System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
1451           nextWriteLoc = 0;
1452           waiting = newArray;
1453           gap = doc.docID - nextWriteDocID;
1454         }
1455
1456         int loc = nextWriteLoc + gap;
1457         if (loc >= waiting.length) {
1458           loc -= waiting.length;
1459         }
1460
1461         // We should only wrap one time
1462         assert loc < waiting.length;
1463
1464         // Nobody should be in my spot!
1465         assert waiting[loc] == null;
1466         waiting[loc] = doc;
1467         numWaiting++;
1468         waitingBytes += doc.sizeInBytes();
1469       }
1470       
1471       return doPause();
1472     }
1473   }
1474 }