pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / index / DocumentsWriter.java
diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
new file mode 100644 (file)
index 0000000..cad636b
--- /dev/null
@@ -0,0 +1,1474 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+
+/**
+ * This class accepts multiple added documents and directly
+ * writes a single segment file.  It does this more
+ * efficiently than creating a single segment per document
+ * (with DocumentWriter) and doing standard merges on those
+ * segments.
+ *
+ * Each added document is passed to the {@link DocConsumer},
+ * which in turn processes the document and interacts with
+ * other consumers in the indexing chain.  Certain
+ * consumers, like {@link StoredFieldsWriter} and {@link
+ * TermVectorsTermsWriter}, digest a document and
+ * immediately write bytes to the "doc store" files (ie,
+ * they do not consume RAM per document, except while they
+ * are processing the document).
+ *
+ * Other consumers, eg {@link FreqProxTermsWriter} and
+ * {@link NormsWriter}, buffer bytes in RAM and flush only
+ * when a new segment is produced.
+
+ * Once we have used our allowed RAM buffer, or the number
+ * of added docs is large enough (in the case we are
+ * flushing by doc count instead of RAM usage), we create a
+ * real segment and flush it to the Directory.
+ *
+ * Threads:
+ *
+ * Multiple threads are allowed into addDocument at once.
+ * There is an initial synchronized call to getThreadState
+ * which allocates a ThreadState for this thread.  The same
+ * thread will get the same ThreadState over time (thread
+ * affinity) so that if there are consistent patterns (for
+ * example each thread is indexing a different content
+ * source) then we make better use of RAM.  Then
+ * processDocument is called on that ThreadState without
+ * synchronization (most of the "heavy lifting" is in this
+ * call).  Finally the synchronized "finishDocument" is
+ * called to flush changes to the directory.
+ *
+ * When flush is called by IndexWriter we forcefully idle
+ * all threads and flush only once they are all idle.  This
+ * means you can call flush with a given thread even while
+ * other threads are actively adding/deleting documents.
+ *
+ *
+ * Exceptions:
+ *
+ * Because this class directly updates in-memory posting
+ * lists, and flushes stored fields and term vectors
+ * directly to files in the directory, there are certain
+ * limited times when an exception can corrupt this state.
+ * For example, a disk full while flushing stored fields
+ * leaves this file in a corrupt state.  Or, an OOM
+ * exception while appending to the in-memory posting lists
+ * can corrupt that posting list.  We call such exceptions
+ * "aborting exceptions".  In these cases we must call
+ * abort() to discard all docs added since the last flush.
+ *
+ * All other exceptions ("non-aborting exceptions") can
+ * still partially update the index structures.  These
+ * updates are consistent, but, they represent only a part
+ * of the document seen up until the exception was hit.
+ * When this happens, we immediately mark the document as
+ * deleted so that the document is always atomically ("all
+ * or none") added to the index.
+ */
+
+final class DocumentsWriter {
+  final AtomicLong bytesUsed = new AtomicLong(0);
+  IndexWriter writer;
+  Directory directory;
+
+  String segment;                         // Current segment we are working on
+
+  private int nextDocID;                  // Next docID to be added
+  private int numDocs;                    // # of docs added, but not yet flushed
+
+  // Max # ThreadState instances; if there are more threads
+  // than this they share ThreadStates
+  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
+  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
+
+  boolean bufferIsFull;                   // True when it's time to write segment
+  private boolean aborting;               // True if an abort is pending
+
+  PrintStream infoStream;
+  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
+  Similarity similarity;
+
+  // max # simultaneous threads; if there are more than
+  // this, they wait for others to finish first
+  private final int maxThreadStates;
+
+  // Deletes for our still-in-RAM (to be flushed next) segment
+  private BufferedDeletes pendingDeletes = new BufferedDeletes();
+  
+  static class DocState {
+    DocumentsWriter docWriter;
+    Analyzer analyzer;
+    int maxFieldLength;
+    PrintStream infoStream;
+    Similarity similarity;
+    int docID;
+    Document doc;
+    String maxTermPrefix;
+
+    // Only called by asserts
+    public boolean testPoint(String name) {
+      return docWriter.writer.testPoint(name);
+    }
+
+    public void clear() {
+      // don't hold onto doc nor analyzer, in case it is
+      // largish:
+      doc = null;
+      analyzer = null;
+    }
+  }
+
+  /** Consumer returns this on each doc.  This holds any
+   *  state that must be flushed synchronized "in docID
+   *  order".  We gather these and flush them in order. */
+  abstract static class DocWriter {
+    DocWriter next;
+    int docID;
+    abstract void finish() throws IOException;
+    abstract void abort();
+    abstract long sizeInBytes();
+
+    void setNext(DocWriter next) {
+      this.next = next;
+    }
+  }
+
+  /**
+   * Create and return a new DocWriterBuffer.
+   */
+  PerDocBuffer newPerDocBuffer() {
+    return new PerDocBuffer();
+  }
+
+  /**
+   * RAMFile buffer for DocWriters.
+   */
+  class PerDocBuffer extends RAMFile {
+    
+    /**
+     * Allocate bytes used from shared pool.
+     */
+    @Override
+    protected byte[] newBuffer(int size) {
+      assert size == PER_DOC_BLOCK_SIZE;
+      return perDocAllocator.getByteBlock();
+    }
+    
+    /**
+     * Recycle the bytes used.
+     */
+    synchronized void recycle() {
+      if (buffers.size() > 0) {
+        setLength(0);
+        
+        // Recycle the blocks
+        perDocAllocator.recycleByteBlocks(buffers);
+        buffers.clear();
+        sizeInBytes = 0;
+        
+        assert numBuffers() == 0;
+      }
+    }
+  }
+  
+  /**
+   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
+   * which returns the DocConsumer that the DocumentsWriter calls to process the
+   * documents. 
+   */
+  abstract static class IndexingChain {
+    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
+  }
+  
+  static final IndexingChain defaultIndexingChain = new IndexingChain() {
+
+    @Override
+    DocConsumer getChain(DocumentsWriter documentsWriter) {
+      /*
+      This is the current indexing chain:
+
+      DocConsumer / DocConsumerPerThread
+        --> code: DocFieldProcessor / DocFieldProcessorPerThread
+          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+    */
+
+    // Build up indexing chain:
+
+      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
+      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
+                                                           new TermsHash(documentsWriter, false, termVectorsWriter, null));
+      final NormsWriter normsWriter = new NormsWriter();
+      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
+      return new DocFieldProcessor(documentsWriter, docInverter);
+    }
+  };
+
+  final DocConsumer consumer;
+
+  // How much RAM we can use before flushing.  This is 0 if
+  // we are flushing by doc count instead.
+
+  private final IndexWriterConfig config;
+
+  private boolean closed;
+  private final FieldInfos fieldInfos;
+
+  private final BufferedDeletesStream bufferedDeletesStream;
+  private final IndexWriter.FlushControl flushControl;
+
+  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
+    this.directory = directory;
+    this.writer = writer;
+    this.similarity = config.getSimilarity();
+    this.maxThreadStates = config.getMaxThreadStates();
+    this.fieldInfos = fieldInfos;
+    this.bufferedDeletesStream = bufferedDeletesStream;
+    flushControl = writer.flushControl;
+
+    consumer = config.getIndexingChain().getChain(this);
+    this.config = config;
+  }
+
+  // Buffer a specific docID for deletion.  Currently only
+  // used when we hit a exception when adding a document
+  synchronized void deleteDocID(int docIDUpto) {
+    pendingDeletes.addDocID(docIDUpto);
+    // NOTE: we do not trigger flush here.  This is
+    // potentially a RAM leak, if you have an app that tries
+    // to add docs but every single doc always hits a
+    // non-aborting exception.  Allowing a flush here gets
+    // very messy because we are only invoked when handling
+    // exceptions so to do this properly, while handling an
+    // exception we'd have to go off and flush new deletes
+    // which is risky (likely would hit some other
+    // confounding exception).
+  }
+  
+  boolean deleteQueries(Query... queries) {
+    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
+    synchronized(this) {
+      for (Query query : queries) {
+        pendingDeletes.addQuery(query, numDocs);
+      }
+    }
+    return doFlush;
+  }
+  
+  boolean deleteQuery(Query query) { 
+    final boolean doFlush = flushControl.waitUpdate(0, 1);
+    synchronized(this) {
+      pendingDeletes.addQuery(query, numDocs);
+    }
+    return doFlush;
+  }
+  
+  boolean deleteTerms(Term... terms) {
+    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
+    synchronized(this) {
+      for (Term term : terms) {
+        pendingDeletes.addTerm(term, numDocs);
+      }
+    }
+    return doFlush;
+  }
+
+  // TODO: we could check w/ FreqProxTermsWriter: if the
+  // term doesn't exist, don't bother buffering into the
+  // per-DWPT map (but still must go into the global map)
+  boolean deleteTerm(Term term, boolean skipWait) {
+    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
+    synchronized(this) {
+      pendingDeletes.addTerm(term, numDocs);
+    }
+    return doFlush;
+  }
+
+  public FieldInfos getFieldInfos() {
+    return fieldInfos;
+  }
+
+  /** If non-null, various details of indexing are printed
+   *  here. */
+  synchronized void setInfoStream(PrintStream infoStream) {
+    this.infoStream = infoStream;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].docState.infoStream = infoStream;
+    }
+  }
+
+  synchronized void setMaxFieldLength(int maxFieldLength) {
+    this.maxFieldLength = maxFieldLength;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].docState.maxFieldLength = maxFieldLength;
+    }
+  }
+
+  synchronized void setSimilarity(Similarity similarity) {
+    this.similarity = similarity;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].docState.similarity = similarity;
+    }
+  }
+
+  /** Get current segment name we are writing. */
+  synchronized String getSegment() {
+    return segment;
+  }
+
+  /** Returns how many docs are currently buffered in RAM. */
+  synchronized int getNumDocs() {
+    return numDocs;
+  }
+
+  void message(String message) {
+    if (infoStream != null) {
+      writer.message("DW: " + message);
+    }
+  }
+
+  synchronized void setAborting() {
+    if (infoStream != null) {
+      message("setAborting");
+    }
+    aborting = true;
+  }
+
+  /** Called if we hit an exception at a bad time (when
+   *  updating the index files) and must discard all
+   *  currently buffered docs.  This resets our state,
+   *  discarding any docs added since last flush. */
+  synchronized void abort() throws IOException {
+    if (infoStream != null) {
+      message("docWriter: abort");
+    }
+
+    boolean success = false;
+
+    try {
+
+      // Forcefully remove waiting ThreadStates from line
+      try {
+        waitQueue.abort();
+      } catch (Throwable t) {
+      }
+
+      // Wait for all other threads to finish with
+      // DocumentsWriter:
+      try {
+        waitIdle();
+      } finally {
+        if (infoStream != null) {
+          message("docWriter: abort waitIdle done");
+        }
+        
+        assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
+        waitQueue.waitingBytes = 0;
+        
+        pendingDeletes.clear();
+        
+        for (DocumentsWriterThreadState threadState : threadStates) {
+          try {
+            threadState.consumer.abort();
+          } catch (Throwable t) {
+          }
+        }
+          
+        try {
+          consumer.abort();
+        } catch (Throwable t) {
+        }
+        
+        // Reset all postings data
+        doAfterFlush();
+      }
+
+      success = true;
+    } finally {
+      aborting = false;
+      notifyAll();
+      if (infoStream != null) {
+        message("docWriter: done abort; success=" + success);
+      }
+    }
+  }
+
+  /** Reset after a flush */
+  private void doAfterFlush() throws IOException {
+    // All ThreadStates should be idle when we are called
+    assert allThreadsIdle();
+    threadBindings.clear();
+    waitQueue.reset();
+    segment = null;
+    numDocs = 0;
+    nextDocID = 0;
+    bufferIsFull = false;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].doAfterFlush();
+    }
+  }
+
+  private synchronized boolean allThreadsIdle() {
+    for(int i=0;i<threadStates.length;i++) {
+      if (!threadStates[i].isIdle) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  synchronized boolean anyChanges() {
+    return numDocs != 0 || pendingDeletes.any();
+  }
+
+  // for testing
+  public BufferedDeletes getPendingDeletes() {
+    return pendingDeletes;
+  }
+
+  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
+    // Lock order: DW -> BD
+    final long delGen = bufferedDeletesStream.getNextGen();
+    if (pendingDeletes.any()) {
+      if (segmentInfos.size() > 0 || newSegment != null) {
+        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
+        if (infoStream != null) {
+          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
+        }
+        bufferedDeletesStream.push(packet);
+        if (infoStream != null) {
+          message("flush: delGen=" + packet.gen);
+        }
+        if (newSegment != null) {
+          newSegment.setBufferedDeletesGen(packet.gen);
+        }
+      } else {
+        if (infoStream != null) {
+          message("flush: drop buffered deletes: no segments");
+        }
+        // We can safely discard these deletes: since
+        // there are no segments, the deletions cannot
+        // affect anything.
+      }
+      pendingDeletes.clear();
+    } else if (newSegment != null) {
+      newSegment.setBufferedDeletesGen(delGen);
+    }
+  }
+
+  public boolean anyDeletions() {
+    return pendingDeletes.any();
+  }
+
+  /** Flush all pending docs to a new segment */
+  // Lock order: IW -> DW
+  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
+
+    final long startTime = System.currentTimeMillis();
+
+    // We change writer's segmentInfos:
+    assert Thread.holdsLock(writer);
+
+    waitIdle();
+
+    if (numDocs == 0) {
+      // nothing to do!
+      if (infoStream != null) {
+        message("flush: no docs; skipping");
+      }
+      // Lock order: IW -> DW -> BD
+      pushDeletes(null, segmentInfos);
+      return null;
+    }
+
+    if (aborting) {
+      if (infoStream != null) {
+        message("flush: skip because aborting is set");
+      }
+      return null;
+    }
+
+    boolean success = false;
+
+    SegmentInfo newSegment;
+
+    try {
+      //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
+      assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
+      assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
+      assert waitQueue.waitingBytes == 0;
+
+      if (infoStream != null) {
+        message("flush postings as segment " + segment + " numDocs=" + numDocs);
+      }
+
+      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
+                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
+                                                                 pendingDeletes);
+      // Apply delete-by-docID now (delete-byDocID only
+      // happens when an exception is hit processing that
+      // doc, eg if analyzer has some problem w/ the text):
+      if (pendingDeletes.docIDs.size() > 0) {
+        flushState.deletedDocs = new BitVector(numDocs);
+        for(int delDocID : pendingDeletes.docIDs) {
+          flushState.deletedDocs.set(delDocID);
+        }
+        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
+        pendingDeletes.docIDs.clear();
+      }
+
+      newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
+
+      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
+      for (DocumentsWriterThreadState threadState : threadStates) {
+        threads.add(threadState.consumer);
+      }
+
+      double startMBUsed = bytesUsed()/1024./1024.;
+
+      consumer.flush(threads, flushState);
+
+      newSegment.setHasVectors(flushState.hasVectors);
+
+      if (infoStream != null) {
+        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
+        if (flushState.deletedDocs != null) {
+          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
+        }
+        message("flushedFiles=" + newSegment.files());
+      }
+
+      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
+        final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
+
+        if (infoStream != null) {
+          message("flush: create compound file \"" + cfsFileName + "\"");
+        }
+
+        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
+        for(String fileName : newSegment.files()) {
+          cfsWriter.addFile(fileName);
+        }
+        cfsWriter.close();
+        deleter.deleteNewFiles(newSegment.files());
+        newSegment.setUseCompoundFile(true);
+      }
+
+      // Must write deleted docs after the CFS so we don't
+      // slurp the del file into CFS:
+      if (flushState.deletedDocs != null) {
+        final int delCount = flushState.deletedDocs.count();
+        assert delCount > 0;
+        newSegment.setDelCount(delCount);
+        newSegment.advanceDelGen();
+        final String delFileName = newSegment.getDelFileName();
+        if (infoStream != null) {
+          message("flush: write " + delCount + " deletes to " + delFileName);
+        }
+        boolean success2 = false;
+        try {
+          // TODO: in the NRT case it'd be better to hand
+          // this del vector over to the
+          // shortly-to-be-opened SegmentReader and let it
+          // carry the changes; there's no reason to use
+          // filesystem as intermediary here.
+          flushState.deletedDocs.write(directory, delFileName);
+          success2 = true;
+        } finally {
+          if (!success2) {
+            try {
+              directory.deleteFile(delFileName);
+            } catch (Throwable t) {
+              // suppress this so we keep throwing the
+              // original exception
+            }
+          }
+        }
+      }
+
+      if (infoStream != null) {
+        message("flush: segment=" + newSegment);
+        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
+        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
+        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
+                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
+                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
+                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
+                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
+      }
+
+      success = true;
+    } finally {
+      notifyAll();
+      if (!success) {
+        if (segment != null) {
+          deleter.refresh(segment);
+        }
+        abort();
+      }
+    }
+
+    doAfterFlush();
+
+    // Lock order: IW -> DW -> BD
+    pushDeletes(newSegment, segmentInfos);
+    if (infoStream != null) {
+      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
+    }
+
+    return newSegment;
+  }
+
+  synchronized void close() {
+    closed = true;
+    notifyAll();
+  }
+
+  /** Returns a free (idle) ThreadState that may be used for
+   * indexing this one document.  This call also pauses if a
+   * flush is pending.  If delTerm is non-null then we
+   * buffer this deleted term after the thread state has
+   * been acquired. */
+  synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
+
+    final Thread currentThread = Thread.currentThread();
+    assert !Thread.holdsLock(writer);
+
+    // First, find a thread state.  If this thread already
+    // has affinity to a specific ThreadState, use that one
+    // again.
+    DocumentsWriterThreadState state = threadBindings.get(currentThread);
+    if (state == null) {
+
+      // First time this thread has called us since last
+      // flush.  Find the least loaded thread state:
+      DocumentsWriterThreadState minThreadState = null;
+      for(int i=0;i<threadStates.length;i++) {
+        DocumentsWriterThreadState ts = threadStates[i];
+        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
+          minThreadState = ts;
+        }
+      }
+      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
+        state = minThreadState;
+        state.numThreads++;
+      } else {
+        // Just create a new "private" thread state
+        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
+        if (threadStates.length > 0) {
+          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
+        }
+        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
+        threadStates = newArray;
+      }
+      threadBindings.put(currentThread, state);
+    }
+
+    // Next, wait until my thread state is idle (in case
+    // it's shared with other threads), and no flush/abort
+    // pending 
+    waitReady(state);
+
+    // Allocate segment name if this is the first doc since
+    // last flush:
+    if (segment == null) {
+      segment = writer.newSegmentName();
+      assert numDocs == 0;
+    }
+
+    state.docState.docID = nextDocID;
+    nextDocID += docCount;
+
+    if (delTerm != null) {
+      pendingDeletes.addTerm(delTerm, state.docState.docID);
+    }
+
+    numDocs += docCount;
+    state.isIdle = false;
+    return state;
+  }
+  
+  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
+    return updateDocument(doc, analyzer, null);
+  }
+  
+  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
+    throws CorruptIndexException, IOException {
+
+    // Possibly trigger a flush, or wait until any running flush completes:
+    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
+
+    // This call is synchronized but fast
+    final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
+
+    final DocState docState = state.docState;
+    docState.doc = doc;
+    docState.analyzer = analyzer;
+
+    boolean success = false;
+    try {
+      // This call is not synchronized and does all the
+      // work
+      final DocWriter perDoc;
+      try {
+        perDoc = state.consumer.processDocument();
+      } finally {
+        docState.clear();
+      }
+
+      // This call is synchronized but fast
+      finishDocument(state, perDoc);
+
+      success = true;
+    } finally {
+      if (!success) {
+
+        // If this thread state had decided to flush, we
+        // must clear it so another thread can flush
+        if (doFlush) {
+          flushControl.clearFlushPending();
+        }
+
+        if (infoStream != null) {
+          message("exception in updateDocument aborting=" + aborting);
+        }
+
+        synchronized(this) {
+
+          state.isIdle = true;
+          notifyAll();
+            
+          if (aborting) {
+            abort();
+          } else {
+            skipDocWriter.docID = docState.docID;
+            boolean success2 = false;
+            try {
+              waitQueue.add(skipDocWriter);
+              success2 = true;
+            } finally {
+              if (!success2) {
+                abort();
+                return false;
+              }
+            }
+
+            // Immediately mark this document as deleted
+            // since likely it was partially added.  This
+            // keeps indexing as "all or none" (atomic) when
+            // adding a document:
+            deleteDocID(state.docState.docID);
+          }
+        }
+      }
+    }
+
+    doFlush |= flushControl.flushByRAMUsage("new document");
+
+    return doFlush;
+  }
+
+  boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
+    throws CorruptIndexException, IOException {
+
+    // Possibly trigger a flush, or wait until any running flush completes:
+    boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
+
+    final int docCount = docs.size();
+
+    // This call is synchronized but fast -- we allocate the
+    // N docIDs up front:
+    final DocumentsWriterThreadState state = getThreadState(null, docCount);
+    final DocState docState = state.docState;
+
+    final int startDocID = docState.docID;
+    int docID = startDocID;
+
+    //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
+    for(Document doc : docs) {
+      docState.doc = doc;
+      docState.analyzer = analyzer;
+      // Assign next docID from our block:
+      docState.docID = docID++;
+      
+      boolean success = false;
+      try {
+        // This call is not synchronized and does all the
+        // work
+        final DocWriter perDoc;
+        try {
+          perDoc = state.consumer.processDocument();
+        } finally {
+          docState.clear();
+        }
+
+        // Must call this w/o holding synchronized(this) else
+        // we'll hit deadlock:
+        balanceRAM();
+
+        // Synchronized but fast
+        synchronized(this) {
+          if (aborting) {
+            break;
+          }
+          assert perDoc == null || perDoc.docID == docState.docID;
+          final boolean doPause;
+          if (perDoc != null) {
+            waitQueue.add(perDoc);
+          } else {
+            skipDocWriter.docID = docState.docID;
+            waitQueue.add(skipDocWriter);
+          }
+        }
+
+        success = true;
+      } finally {
+        if (!success) {
+          //System.out.println(Thread.currentThread().getName() + ": E");
+
+          // If this thread state had decided to flush, we
+          // must clear it so another thread can flush
+          if (doFlush) {
+            message("clearFlushPending!");
+            flushControl.clearFlushPending();
+          }
+
+          if (infoStream != null) {
+            message("exception in updateDocuments aborting=" + aborting);
+          }
+
+          synchronized(this) {
+
+            state.isIdle = true;
+            notifyAll();
+
+            if (aborting) {
+              abort();
+            } else {
+
+              // Fill hole in the doc stores for all
+              // docIDs we pre-allocated
+              //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
+              final int endDocID = startDocID + docCount;
+              docID = docState.docID;
+              while(docID < endDocID) {
+                skipDocWriter.docID = docID++;
+                boolean success2 = false;
+                try {
+                  waitQueue.add(skipDocWriter);
+                  success2 = true;
+                } finally {
+                  if (!success2) {
+                    abort();
+                    return false;
+                  }
+                }
+              }
+              //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");
+
+              // Mark all pre-allocated docIDs as deleted:
+              docID = startDocID;
+              while(docID < startDocID + docs.size()) {
+                deleteDocID(docID++);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    synchronized(this) {
+      // We must delay pausing until the full doc block is
+      // added, else we can hit deadlock if more than one
+      // thread is adding a block and we need to pause when
+      // both are only part way done:
+      if (waitQueue.doPause()) {
+        waitForWaitQueue();
+      }
+
+      //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);
+
+      if (aborting) {
+
+        // We are currently aborting, and another thread is
+        // waiting for me to become idle.  We just forcefully
+        // idle this threadState; it will be fully reset by
+        // abort()
+        state.isIdle = true;
+
+        // wakes up any threads waiting on the wait queue
+        notifyAll();
+
+        abort();
+
+        if (doFlush) {
+          message("clearFlushPending!");
+          flushControl.clearFlushPending();
+        }
+
+        return false;
+      }
+
+      // Apply delTerm only after all indexing has
+      // succeeded, but apply it only to docs prior to when
+      // this batch started:
+      if (delTerm != null) {
+        pendingDeletes.addTerm(delTerm, startDocID);
+      }
+
+      state.isIdle = true;
+
+      // wakes up any threads waiting on the wait queue
+      notifyAll();
+    }
+
+    doFlush |= flushControl.flushByRAMUsage("new document");
+
+    //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
+    return doFlush;
+  }
+
+  public synchronized void waitIdle() {
+    while (!allThreadsIdle()) {
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+    }
+  }
+
+  synchronized void waitReady(DocumentsWriterThreadState state) {
+    while (!closed && (!state.isIdle || aborting)) {
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+    }
+
+    if (closed) {
+      throw new AlreadyClosedException("this IndexWriter is closed");
+    }
+  }
+
+  /** Does the synchronized work to finish/flush the
+   *  inverted document. */
+  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
+
+    // Must call this w/o holding synchronized(this) else
+    // we'll hit deadlock:
+    balanceRAM();
+
+    synchronized(this) {
+
+      assert docWriter == null || docWriter.docID == perThread.docState.docID;
+
+      if (aborting) {
+
+        // We are currently aborting, and another thread is
+        // waiting for me to become idle.  We just forcefully
+        // idle this threadState; it will be fully reset by
+        // abort()
+        if (docWriter != null) {
+          try {
+            docWriter.abort();
+          } catch (Throwable t) {
+          }
+        }
+
+        perThread.isIdle = true;
+
+        // wakes up any threads waiting on the wait queue
+        notifyAll();
+
+        return;
+      }
+
+      final boolean doPause;
+
+      if (docWriter != null) {
+        doPause = waitQueue.add(docWriter);
+      } else {
+        skipDocWriter.docID = perThread.docState.docID;
+        doPause = waitQueue.add(skipDocWriter);
+      }
+
+      if (doPause) {
+        waitForWaitQueue();
+      }
+
+      perThread.isIdle = true;
+
+      // wakes up any threads waiting on the wait queue
+      notifyAll();
+    }
+  }
+
+  synchronized void waitForWaitQueue() {
+    do {
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+    } while (!waitQueue.doResume());
+  }
+
+  private static class SkipDocWriter extends DocWriter {
+    @Override
+    void finish() {
+    }
+    @Override
+    void abort() {
+    }
+    @Override
+    long sizeInBytes() {
+      return 0;
+    }
+  }
+  final SkipDocWriter skipDocWriter = new SkipDocWriter();
+
+  NumberFormat nf = NumberFormat.getInstance();
+
+  /* Initial chunks size of the shared byte[] blocks used to
+     store postings data */
+  final static int BYTE_BLOCK_SHIFT = 15;
+  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
+  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
+  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
+
+  private class ByteBlockAllocator extends ByteBlockPool.Allocator {
+    final int blockSize;
+
+    ByteBlockAllocator(int blockSize) {
+      this.blockSize = blockSize;
+    }
+
+    ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
+    
+    /* Allocate another byte[] from the shared pool */
+    @Override
+    byte[] getByteBlock() {
+      synchronized(DocumentsWriter.this) {
+        final int size = freeByteBlocks.size();
+        final byte[] b;
+        if (0 == size) {
+          b = new byte[blockSize];
+          bytesUsed.addAndGet(blockSize);
+        } else
+          b = freeByteBlocks.remove(size-1);
+        return b;
+      }
+    }
+
+    /* Return byte[]'s to the pool */
+
+    @Override
+    void recycleByteBlocks(byte[][] blocks, int start, int end) {
+      synchronized(DocumentsWriter.this) {
+        for(int i=start;i<end;i++) {
+          freeByteBlocks.add(blocks[i]);
+          blocks[i] = null;
+        }
+      }
+    }
+
+    @Override
+    void recycleByteBlocks(List<byte[]> blocks) {
+      synchronized(DocumentsWriter.this) {
+        final int size = blocks.size();
+        for(int i=0;i<size;i++) {
+          freeByteBlocks.add(blocks.get(i));
+          blocks.set(i, null);
+        }
+      }
+    }
+  }
+
+  /* Initial chunks size of the shared int[] blocks used to
+     store postings data */
+  final static int INT_BLOCK_SHIFT = 13;
+  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
+  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+  private List<int[]> freeIntBlocks = new ArrayList<int[]>();
+
+  /* Allocate another int[] from the shared pool */
+  synchronized int[] getIntBlock() {
+    final int size = freeIntBlocks.size();
+    final int[] b;
+    if (0 == size) {
+      b = new int[INT_BLOCK_SIZE];
+      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
+    } else {
+      b = freeIntBlocks.remove(size-1);
+    }
+    return b;
+  }
+
+  synchronized void bytesUsed(long numBytes) {
+    bytesUsed.addAndGet(numBytes);
+  }
+
+  long bytesUsed() {
+    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
+  }
+
+  /* Return int[]s to the pool */
+  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
+    for(int i=start;i<end;i++) {
+      freeIntBlocks.add(blocks[i]);
+      blocks[i] = null;
+    }
+  }
+
+  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
+
+  final static int PER_DOC_BLOCK_SIZE = 1024;
+
+  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
+
+
+  /* Initial chunk size of the shared char[] blocks used to
+     store term text */
+  final static int CHAR_BLOCK_SHIFT = 14;
+  final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
+  final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
+
+  final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
+
+  private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
+
+  /* Allocate another char[] from the shared pool */
+  synchronized char[] getCharBlock() {
+    final int size = freeCharBlocks.size();
+    final char[] c;
+    if (0 == size) {
+      bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
+      c = new char[CHAR_BLOCK_SIZE];
+    } else
+      c = freeCharBlocks.remove(size-1);
+    // We always track allocations of char blocks, for now,
+    // because nothing that skips allocation tracking
+    // (currently only term vectors) uses its own char
+    // blocks.
+    return c;
+  }
+
+  /* Return char[]s to the pool */
+  synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
+    for(int i=0;i<numBlocks;i++) {
+      freeCharBlocks.add(blocks[i]);
+      blocks[i] = null;
+    }
+  }
+
+  String toMB(long v) {
+    return nf.format(v/1024./1024.);
+  }
+
+  /* We have four pools of RAM: Postings, byte blocks
+   * (holds freq/prox posting data), char blocks (holds
+   * characters in the term) and per-doc buffers (stored fields/term vectors).  
+   * Different docs require varying amount of storage from 
+   * these four classes.
+   * 
+   * For example, docs with many unique single-occurrence
+   * short terms will use up the Postings RAM and hardly any
+   * of the other two.  Whereas docs with very large terms
+   * will use alot of char blocks RAM and relatively less of
+   * the other two.  This method just frees allocations from
+   * the pools once we are over-budget, which balances the
+   * pools to match the current docs. */
+  void balanceRAM() {
+
+    final boolean doBalance;
+    final long deletesRAMUsed;
+
+    deletesRAMUsed = bufferedDeletesStream.bytesUsed();
+
+    final long ramBufferSize;
+    final double mb = config.getRAMBufferSizeMB();
+    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
+    } else {
+      ramBufferSize = (long) (mb*1024*1024);
+    }
+
+    synchronized(this) {
+      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
+        return;
+      }
+    
+      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
+    }
+
+    if (doBalance) {
+
+      if (infoStream != null) {
+        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
+                " vs trigger=" + toMB(ramBufferSize) +
+                " deletesMB=" + toMB(deletesRAMUsed) +
+                " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
+                " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
+                " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
+      }
+
+      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
+
+      int iter = 0;
+
+      // We free equally from each pool in 32 KB
+      // chunks until we are below our threshold
+      // (freeLevel)
+
+      boolean any = true;
+
+      final long freeLevel = (long) (0.95 * ramBufferSize);
+
+      while(bytesUsed()+deletesRAMUsed > freeLevel) {
+      
+        synchronized(this) {
+          if (0 == perDocAllocator.freeByteBlocks.size() 
+              && 0 == byteBlockAllocator.freeByteBlocks.size() 
+              && 0 == freeCharBlocks.size() 
+              && 0 == freeIntBlocks.size() 
+              && !any) {
+            // Nothing else to free -- must flush now.
+            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
+            if (infoStream != null) {
+              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
+                message("    nothing to free; set bufferIsFull");
+              } else {
+                message("    nothing to free");
+              }
+            }
+            break;
+          }
+
+          if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
+            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
+            bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
+          }
+
+          if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
+            freeCharBlocks.remove(freeCharBlocks.size()-1);
+            bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
+          }
+
+          if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
+            freeIntBlocks.remove(freeIntBlocks.size()-1);
+            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
+          }
+
+          if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
+            // Remove upwards of 32 blocks (each block is 1K)
+            for (int i = 0; i < 32; ++i) {
+              perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
+              bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
+              if (perDocAllocator.freeByteBlocks.size() == 0) {
+                break;
+              }
+            }
+          }
+        }
+
+        if ((4 == iter % 5) && any) {
+          // Ask consumer to free any recycled state
+          any = consumer.freeRAM();
+        }
+
+        iter++;
+      }
+
+      if (infoStream != null) {
+        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
+      }
+    }
+  }
+
+  final WaitQueue waitQueue = new WaitQueue();
+
+  private class WaitQueue {
+    DocWriter[] waiting;
+    int nextWriteDocID;
+    int nextWriteLoc;
+    int numWaiting;
+    long waitingBytes;
+
+    public WaitQueue() {
+      waiting = new DocWriter[10];
+    }
+
+    synchronized void reset() {
+      // NOTE: nextWriteLoc doesn't need to be reset
+      assert numWaiting == 0;
+      assert waitingBytes == 0;
+      nextWriteDocID = 0;
+    }
+
+    synchronized boolean doResume() {
+      final double mb = config.getRAMBufferSizeMB();
+      final long waitQueueResumeBytes;
+      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+        waitQueueResumeBytes = 2*1024*1024;
+      } else {
+        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
+      }
+      return waitingBytes <= waitQueueResumeBytes;
+    }
+
+    synchronized boolean doPause() {
+      final double mb = config.getRAMBufferSizeMB();
+      final long waitQueuePauseBytes;
+      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+        waitQueuePauseBytes = 4*1024*1024;
+      } else {
+        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
+      }
+      return waitingBytes > waitQueuePauseBytes;
+    }
+
+    synchronized void abort() {
+      int count = 0;
+      for(int i=0;i<waiting.length;i++) {
+        final DocWriter doc = waiting[i];
+        if (doc != null) {
+          doc.abort();
+          waiting[i] = null;
+          count++;
+        }
+      }
+      waitingBytes = 0;
+      assert count == numWaiting;
+      numWaiting = 0;
+    }
+
+    private void writeDocument(DocWriter doc) throws IOException {
+      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
+      boolean success = false;
+      try {
+        doc.finish();
+        nextWriteDocID++;
+        nextWriteLoc++;
+        assert nextWriteLoc <= waiting.length;
+        if (nextWriteLoc == waiting.length) {
+          nextWriteLoc = 0;
+        }
+        success = true;
+      } finally {
+        if (!success) {
+          setAborting();
+        }
+      }
+    }
+
+    synchronized public boolean add(DocWriter doc) throws IOException {
+
+      assert doc.docID >= nextWriteDocID;
+
+      if (doc.docID == nextWriteDocID) {
+        writeDocument(doc);
+        while(true) {
+          doc = waiting[nextWriteLoc];
+          if (doc != null) {
+            numWaiting--;
+            waiting[nextWriteLoc] = null;
+            waitingBytes -= doc.sizeInBytes();
+            writeDocument(doc);
+          } else {
+            break;
+          }
+        }
+      } else {
+
+        // I finished before documents that were added
+        // before me.  This can easily happen when I am a
+        // small doc and the docs before me were large, or,
+        // just due to luck in the thread scheduling.  Just
+        // add myself to the queue and when that large doc
+        // finishes, it will flush me:
+        int gap = doc.docID - nextWriteDocID;
+        if (gap >= waiting.length) {
+          // Grow queue
+          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
+          assert nextWriteLoc >= 0;
+          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
+          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
+          nextWriteLoc = 0;
+          waiting = newArray;
+          gap = doc.docID - nextWriteDocID;
+        }
+
+        int loc = nextWriteLoc + gap;
+        if (loc >= waiting.length) {
+          loc -= waiting.length;
+        }
+
+        // We should only wrap one time
+        assert loc < waiting.length;
+
+        // Nobody should be in my spot!
+        assert waiting[loc] == null;
+        waiting[loc] = doc;
+        numWaiting++;
+        waitingBytes += doc.sizeInBytes();
+      }
+      
+      return doPause();
+    }
+  }
+}