pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.4.0 / lucene / src / java / org / apache / lucene / index / DocumentsWriter.java
diff --git a/lucene-java-3.4.0/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene-java-3.4.0/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
deleted file mode 100644 (file)
index cad636b..0000000
+++ /dev/null
@@ -1,1474 +0,0 @@
-package org.apache.lucene.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Similarity;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMFile;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.RamUsageEstimator;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-
-/**
- * This class accepts multiple added documents and directly
- * writes a single segment file.  It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
- *
- * Each added document is passed to the {@link DocConsumer},
- * which in turn processes the document and interacts with
- * other consumers in the indexing chain.  Certain
- * consumers, like {@link StoredFieldsWriter} and {@link
- * TermVectorsTermsWriter}, digest a document and
- * immediately write bytes to the "doc store" files (ie,
- * they do not consume RAM per document, except while they
- * are processing the document).
- *
- * Other consumers, eg {@link FreqProxTermsWriter} and
- * {@link NormsWriter}, buffer bytes in RAM and flush only
- * when a new segment is produced.
-
- * Once we have used our allowed RAM buffer, or the number
- * of added docs is large enough (in the case we are
- * flushing by doc count instead of RAM usage), we create a
- * real segment and flush it to the Directory.
- *
- * Threads:
- *
- * Multiple threads are allowed into addDocument at once.
- * There is an initial synchronized call to getThreadState
- * which allocates a ThreadState for this thread.  The same
- * thread will get the same ThreadState over time (thread
- * affinity) so that if there are consistent patterns (for
- * example each thread is indexing a different content
- * source) then we make better use of RAM.  Then
- * processDocument is called on that ThreadState without
- * synchronization (most of the "heavy lifting" is in this
- * call).  Finally the synchronized "finishDocument" is
- * called to flush changes to the directory.
- *
- * When flush is called by IndexWriter we forcefully idle
- * all threads and flush only once they are all idle.  This
- * means you can call flush with a given thread even while
- * other threads are actively adding/deleting documents.
- *
- *
- * Exceptions:
- *
- * Because this class directly updates in-memory posting
- * lists, and flushes stored fields and term vectors
- * directly to files in the directory, there are certain
- * limited times when an exception can corrupt this state.
- * For example, a disk full while flushing stored fields
- * leaves this file in a corrupt state.  Or, an OOM
- * exception while appending to the in-memory posting lists
- * can corrupt that posting list.  We call such exceptions
- * "aborting exceptions".  In these cases we must call
- * abort() to discard all docs added since the last flush.
- *
- * All other exceptions ("non-aborting exceptions") can
- * still partially update the index structures.  These
- * updates are consistent, but, they represent only a part
- * of the document seen up until the exception was hit.
- * When this happens, we immediately mark the document as
- * deleted so that the document is always atomically ("all
- * or none") added to the index.
- */
-
-final class DocumentsWriter {
-  final AtomicLong bytesUsed = new AtomicLong(0);
-  IndexWriter writer;
-  Directory directory;
-
-  String segment;                         // Current segment we are working on
-
-  private int nextDocID;                  // Next docID to be added
-  private int numDocs;                    // # of docs added, but not yet flushed
-
-  // Max # ThreadState instances; if there are more threads
-  // than this they share ThreadStates
-  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
-  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
-
-  boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean aborting;               // True if an abort is pending
-
-  PrintStream infoStream;
-  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
-  Similarity similarity;
-
-  // max # simultaneous threads; if there are more than
-  // this, they wait for others to finish first
-  private final int maxThreadStates;
-
-  // Deletes for our still-in-RAM (to be flushed next) segment
-  private BufferedDeletes pendingDeletes = new BufferedDeletes();
-  
-  static class DocState {
-    DocumentsWriter docWriter;
-    Analyzer analyzer;
-    int maxFieldLength;
-    PrintStream infoStream;
-    Similarity similarity;
-    int docID;
-    Document doc;
-    String maxTermPrefix;
-
-    // Only called by asserts
-    public boolean testPoint(String name) {
-      return docWriter.writer.testPoint(name);
-    }
-
-    public void clear() {
-      // don't hold onto doc nor analyzer, in case it is
-      // largish:
-      doc = null;
-      analyzer = null;
-    }
-  }
-
-  /** Consumer returns this on each doc.  This holds any
-   *  state that must be flushed synchronized "in docID
-   *  order".  We gather these and flush them in order. */
-  abstract static class DocWriter {
-    DocWriter next;
-    int docID;
-    abstract void finish() throws IOException;
-    abstract void abort();
-    abstract long sizeInBytes();
-
-    void setNext(DocWriter next) {
-      this.next = next;
-    }
-  }
-
-  /**
-   * Create and return a new DocWriterBuffer.
-   */
-  PerDocBuffer newPerDocBuffer() {
-    return new PerDocBuffer();
-  }
-
-  /**
-   * RAMFile buffer for DocWriters.
-   */
-  class PerDocBuffer extends RAMFile {
-    
-    /**
-     * Allocate bytes used from shared pool.
-     */
-    @Override
-    protected byte[] newBuffer(int size) {
-      assert size == PER_DOC_BLOCK_SIZE;
-      return perDocAllocator.getByteBlock();
-    }
-    
-    /**
-     * Recycle the bytes used.
-     */
-    synchronized void recycle() {
-      if (buffers.size() > 0) {
-        setLength(0);
-        
-        // Recycle the blocks
-        perDocAllocator.recycleByteBlocks(buffers);
-        buffers.clear();
-        sizeInBytes = 0;
-        
-        assert numBuffers() == 0;
-      }
-    }
-  }
-  
-  /**
-   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
-   * which returns the DocConsumer that the DocumentsWriter calls to process the
-   * documents. 
-   */
-  abstract static class IndexingChain {
-    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
-  }
-  
-  static final IndexingChain defaultIndexingChain = new IndexingChain() {
-
-    @Override
-    DocConsumer getChain(DocumentsWriter documentsWriter) {
-      /*
-      This is the current indexing chain:
-
-      DocConsumer / DocConsumerPerThread
-        --> code: DocFieldProcessor / DocFieldProcessorPerThread
-          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
-            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
-              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
-                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
-                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
-                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
-                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
-                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
-              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
-    */
-
-    // Build up indexing chain:
-
-      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
-      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
-
-      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
-                                                           new TermsHash(documentsWriter, false, termVectorsWriter, null));
-      final NormsWriter normsWriter = new NormsWriter();
-      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
-      return new DocFieldProcessor(documentsWriter, docInverter);
-    }
-  };
-
-  final DocConsumer consumer;
-
-  // How much RAM we can use before flushing.  This is 0 if
-  // we are flushing by doc count instead.
-
-  private final IndexWriterConfig config;
-
-  private boolean closed;
-  private final FieldInfos fieldInfos;
-
-  private final BufferedDeletesStream bufferedDeletesStream;
-  private final IndexWriter.FlushControl flushControl;
-
-  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
-    this.directory = directory;
-    this.writer = writer;
-    this.similarity = config.getSimilarity();
-    this.maxThreadStates = config.getMaxThreadStates();
-    this.fieldInfos = fieldInfos;
-    this.bufferedDeletesStream = bufferedDeletesStream;
-    flushControl = writer.flushControl;
-
-    consumer = config.getIndexingChain().getChain(this);
-    this.config = config;
-  }
-
-  // Buffer a specific docID for deletion.  Currently only
-  // used when we hit a exception when adding a document
-  synchronized void deleteDocID(int docIDUpto) {
-    pendingDeletes.addDocID(docIDUpto);
-    // NOTE: we do not trigger flush here.  This is
-    // potentially a RAM leak, if you have an app that tries
-    // to add docs but every single doc always hits a
-    // non-aborting exception.  Allowing a flush here gets
-    // very messy because we are only invoked when handling
-    // exceptions so to do this properly, while handling an
-    // exception we'd have to go off and flush new deletes
-    // which is risky (likely would hit some other
-    // confounding exception).
-  }
-  
-  boolean deleteQueries(Query... queries) {
-    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
-    synchronized(this) {
-      for (Query query : queries) {
-        pendingDeletes.addQuery(query, numDocs);
-      }
-    }
-    return doFlush;
-  }
-  
-  boolean deleteQuery(Query query) { 
-    final boolean doFlush = flushControl.waitUpdate(0, 1);
-    synchronized(this) {
-      pendingDeletes.addQuery(query, numDocs);
-    }
-    return doFlush;
-  }
-  
-  boolean deleteTerms(Term... terms) {
-    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
-    synchronized(this) {
-      for (Term term : terms) {
-        pendingDeletes.addTerm(term, numDocs);
-      }
-    }
-    return doFlush;
-  }
-
-  // TODO: we could check w/ FreqProxTermsWriter: if the
-  // term doesn't exist, don't bother buffering into the
-  // per-DWPT map (but still must go into the global map)
-  boolean deleteTerm(Term term, boolean skipWait) {
-    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
-    synchronized(this) {
-      pendingDeletes.addTerm(term, numDocs);
-    }
-    return doFlush;
-  }
-
-  public FieldInfos getFieldInfos() {
-    return fieldInfos;
-  }
-
-  /** If non-null, various details of indexing are printed
-   *  here. */
-  synchronized void setInfoStream(PrintStream infoStream) {
-    this.infoStream = infoStream;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].docState.infoStream = infoStream;
-    }
-  }
-
-  synchronized void setMaxFieldLength(int maxFieldLength) {
-    this.maxFieldLength = maxFieldLength;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].docState.maxFieldLength = maxFieldLength;
-    }
-  }
-
-  synchronized void setSimilarity(Similarity similarity) {
-    this.similarity = similarity;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].docState.similarity = similarity;
-    }
-  }
-
-  /** Get current segment name we are writing. */
-  synchronized String getSegment() {
-    return segment;
-  }
-
-  /** Returns how many docs are currently buffered in RAM. */
-  synchronized int getNumDocs() {
-    return numDocs;
-  }
-
-  void message(String message) {
-    if (infoStream != null) {
-      writer.message("DW: " + message);
-    }
-  }
-
-  synchronized void setAborting() {
-    if (infoStream != null) {
-      message("setAborting");
-    }
-    aborting = true;
-  }
-
-  /** Called if we hit an exception at a bad time (when
-   *  updating the index files) and must discard all
-   *  currently buffered docs.  This resets our state,
-   *  discarding any docs added since last flush. */
-  synchronized void abort() throws IOException {
-    if (infoStream != null) {
-      message("docWriter: abort");
-    }
-
-    boolean success = false;
-
-    try {
-
-      // Forcefully remove waiting ThreadStates from line
-      try {
-        waitQueue.abort();
-      } catch (Throwable t) {
-      }
-
-      // Wait for all other threads to finish with
-      // DocumentsWriter:
-      try {
-        waitIdle();
-      } finally {
-        if (infoStream != null) {
-          message("docWriter: abort waitIdle done");
-        }
-        
-        assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
-        waitQueue.waitingBytes = 0;
-        
-        pendingDeletes.clear();
-        
-        for (DocumentsWriterThreadState threadState : threadStates) {
-          try {
-            threadState.consumer.abort();
-          } catch (Throwable t) {
-          }
-        }
-          
-        try {
-          consumer.abort();
-        } catch (Throwable t) {
-        }
-        
-        // Reset all postings data
-        doAfterFlush();
-      }
-
-      success = true;
-    } finally {
-      aborting = false;
-      notifyAll();
-      if (infoStream != null) {
-        message("docWriter: done abort; success=" + success);
-      }
-    }
-  }
-
-  /** Reset after a flush */
-  private void doAfterFlush() throws IOException {
-    // All ThreadStates should be idle when we are called
-    assert allThreadsIdle();
-    threadBindings.clear();
-    waitQueue.reset();
-    segment = null;
-    numDocs = 0;
-    nextDocID = 0;
-    bufferIsFull = false;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].doAfterFlush();
-    }
-  }
-
-  private synchronized boolean allThreadsIdle() {
-    for(int i=0;i<threadStates.length;i++) {
-      if (!threadStates[i].isIdle) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  synchronized boolean anyChanges() {
-    return numDocs != 0 || pendingDeletes.any();
-  }
-
-  // for testing
-  public BufferedDeletes getPendingDeletes() {
-    return pendingDeletes;
-  }
-
-  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
-    // Lock order: DW -> BD
-    final long delGen = bufferedDeletesStream.getNextGen();
-    if (pendingDeletes.any()) {
-      if (segmentInfos.size() > 0 || newSegment != null) {
-        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
-        if (infoStream != null) {
-          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
-        }
-        bufferedDeletesStream.push(packet);
-        if (infoStream != null) {
-          message("flush: delGen=" + packet.gen);
-        }
-        if (newSegment != null) {
-          newSegment.setBufferedDeletesGen(packet.gen);
-        }
-      } else {
-        if (infoStream != null) {
-          message("flush: drop buffered deletes: no segments");
-        }
-        // We can safely discard these deletes: since
-        // there are no segments, the deletions cannot
-        // affect anything.
-      }
-      pendingDeletes.clear();
-    } else if (newSegment != null) {
-      newSegment.setBufferedDeletesGen(delGen);
-    }
-  }
-
-  public boolean anyDeletions() {
-    return pendingDeletes.any();
-  }
-
-  /** Flush all pending docs to a new segment */
-  // Lock order: IW -> DW
-  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
-
-    final long startTime = System.currentTimeMillis();
-
-    // We change writer's segmentInfos:
-    assert Thread.holdsLock(writer);
-
-    waitIdle();
-
-    if (numDocs == 0) {
-      // nothing to do!
-      if (infoStream != null) {
-        message("flush: no docs; skipping");
-      }
-      // Lock order: IW -> DW -> BD
-      pushDeletes(null, segmentInfos);
-      return null;
-    }
-
-    if (aborting) {
-      if (infoStream != null) {
-        message("flush: skip because aborting is set");
-      }
-      return null;
-    }
-
-    boolean success = false;
-
-    SegmentInfo newSegment;
-
-    try {
-      //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
-      assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
-      assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
-      assert waitQueue.waitingBytes == 0;
-
-      if (infoStream != null) {
-        message("flush postings as segment " + segment + " numDocs=" + numDocs);
-      }
-
-      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
-                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
-                                                                 pendingDeletes);
-      // Apply delete-by-docID now (delete-byDocID only
-      // happens when an exception is hit processing that
-      // doc, eg if analyzer has some problem w/ the text):
-      if (pendingDeletes.docIDs.size() > 0) {
-        flushState.deletedDocs = new BitVector(numDocs);
-        for(int delDocID : pendingDeletes.docIDs) {
-          flushState.deletedDocs.set(delDocID);
-        }
-        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
-        pendingDeletes.docIDs.clear();
-      }
-
-      newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
-
-      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
-      for (DocumentsWriterThreadState threadState : threadStates) {
-        threads.add(threadState.consumer);
-      }
-
-      double startMBUsed = bytesUsed()/1024./1024.;
-
-      consumer.flush(threads, flushState);
-
-      newSegment.setHasVectors(flushState.hasVectors);
-
-      if (infoStream != null) {
-        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
-        if (flushState.deletedDocs != null) {
-          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
-        }
-        message("flushedFiles=" + newSegment.files());
-      }
-
-      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
-        final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
-
-        if (infoStream != null) {
-          message("flush: create compound file \"" + cfsFileName + "\"");
-        }
-
-        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
-        for(String fileName : newSegment.files()) {
-          cfsWriter.addFile(fileName);
-        }
-        cfsWriter.close();
-        deleter.deleteNewFiles(newSegment.files());
-        newSegment.setUseCompoundFile(true);
-      }
-
-      // Must write deleted docs after the CFS so we don't
-      // slurp the del file into CFS:
-      if (flushState.deletedDocs != null) {
-        final int delCount = flushState.deletedDocs.count();
-        assert delCount > 0;
-        newSegment.setDelCount(delCount);
-        newSegment.advanceDelGen();
-        final String delFileName = newSegment.getDelFileName();
-        if (infoStream != null) {
-          message("flush: write " + delCount + " deletes to " + delFileName);
-        }
-        boolean success2 = false;
-        try {
-          // TODO: in the NRT case it'd be better to hand
-          // this del vector over to the
-          // shortly-to-be-opened SegmentReader and let it
-          // carry the changes; there's no reason to use
-          // filesystem as intermediary here.
-          flushState.deletedDocs.write(directory, delFileName);
-          success2 = true;
-        } finally {
-          if (!success2) {
-            try {
-              directory.deleteFile(delFileName);
-            } catch (Throwable t) {
-              // suppress this so we keep throwing the
-              // original exception
-            }
-          }
-        }
-      }
-
-      if (infoStream != null) {
-        message("flush: segment=" + newSegment);
-        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
-        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
-        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
-                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
-                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
-                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
-                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
-      }
-
-      success = true;
-    } finally {
-      notifyAll();
-      if (!success) {
-        if (segment != null) {
-          deleter.refresh(segment);
-        }
-        abort();
-      }
-    }
-
-    doAfterFlush();
-
-    // Lock order: IW -> DW -> BD
-    pushDeletes(newSegment, segmentInfos);
-    if (infoStream != null) {
-      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
-    }
-
-    return newSegment;
-  }
-
-  synchronized void close() {
-    closed = true;
-    notifyAll();
-  }
-
-  /** Returns a free (idle) ThreadState that may be used for
-   * indexing this one document.  This call also pauses if a
-   * flush is pending.  If delTerm is non-null then we
-   * buffer this deleted term after the thread state has
-   * been acquired. */
-  synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
-
-    final Thread currentThread = Thread.currentThread();
-    assert !Thread.holdsLock(writer);
-
-    // First, find a thread state.  If this thread already
-    // has affinity to a specific ThreadState, use that one
-    // again.
-    DocumentsWriterThreadState state = threadBindings.get(currentThread);
-    if (state == null) {
-
-      // First time this thread has called us since last
-      // flush.  Find the least loaded thread state:
-      DocumentsWriterThreadState minThreadState = null;
-      for(int i=0;i<threadStates.length;i++) {
-        DocumentsWriterThreadState ts = threadStates[i];
-        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
-          minThreadState = ts;
-        }
-      }
-      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
-        state = minThreadState;
-        state.numThreads++;
-      } else {
-        // Just create a new "private" thread state
-        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
-        if (threadStates.length > 0) {
-          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
-        }
-        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
-        threadStates = newArray;
-      }
-      threadBindings.put(currentThread, state);
-    }
-
-    // Next, wait until my thread state is idle (in case
-    // it's shared with other threads), and no flush/abort
-    // pending 
-    waitReady(state);
-
-    // Allocate segment name if this is the first doc since
-    // last flush:
-    if (segment == null) {
-      segment = writer.newSegmentName();
-      assert numDocs == 0;
-    }
-
-    state.docState.docID = nextDocID;
-    nextDocID += docCount;
-
-    if (delTerm != null) {
-      pendingDeletes.addTerm(delTerm, state.docState.docID);
-    }
-
-    numDocs += docCount;
-    state.isIdle = false;
-    return state;
-  }
-  
-  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
-    return updateDocument(doc, analyzer, null);
-  }
-  
-  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
-    throws CorruptIndexException, IOException {
-
-    // Possibly trigger a flush, or wait until any running flush completes:
-    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
-
-    // This call is synchronized but fast
-    final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
-
-    final DocState docState = state.docState;
-    docState.doc = doc;
-    docState.analyzer = analyzer;
-
-    boolean success = false;
-    try {
-      // This call is not synchronized and does all the
-      // work
-      final DocWriter perDoc;
-      try {
-        perDoc = state.consumer.processDocument();
-      } finally {
-        docState.clear();
-      }
-
-      // This call is synchronized but fast
-      finishDocument(state, perDoc);
-
-      success = true;
-    } finally {
-      if (!success) {
-
-        // If this thread state had decided to flush, we
-        // must clear it so another thread can flush
-        if (doFlush) {
-          flushControl.clearFlushPending();
-        }
-
-        if (infoStream != null) {
-          message("exception in updateDocument aborting=" + aborting);
-        }
-
-        synchronized(this) {
-
-          state.isIdle = true;
-          notifyAll();
-            
-          if (aborting) {
-            abort();
-          } else {
-            skipDocWriter.docID = docState.docID;
-            boolean success2 = false;
-            try {
-              waitQueue.add(skipDocWriter);
-              success2 = true;
-            } finally {
-              if (!success2) {
-                abort();
-                return false;
-              }
-            }
-
-            // Immediately mark this document as deleted
-            // since likely it was partially added.  This
-            // keeps indexing as "all or none" (atomic) when
-            // adding a document:
-            deleteDocID(state.docState.docID);
-          }
-        }
-      }
-    }
-
-    doFlush |= flushControl.flushByRAMUsage("new document");
-
-    return doFlush;
-  }
-
-  boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
-    throws CorruptIndexException, IOException {
-
-    // Possibly trigger a flush, or wait until any running flush completes:
-    boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
-
-    final int docCount = docs.size();
-
-    // This call is synchronized but fast -- we allocate the
-    // N docIDs up front:
-    final DocumentsWriterThreadState state = getThreadState(null, docCount);
-    final DocState docState = state.docState;
-
-    final int startDocID = docState.docID;
-    int docID = startDocID;
-
-    //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
-    for(Document doc : docs) {
-      docState.doc = doc;
-      docState.analyzer = analyzer;
-      // Assign next docID from our block:
-      docState.docID = docID++;
-      
-      boolean success = false;
-      try {
-        // This call is not synchronized and does all the
-        // work
-        final DocWriter perDoc;
-        try {
-          perDoc = state.consumer.processDocument();
-        } finally {
-          docState.clear();
-        }
-
-        // Must call this w/o holding synchronized(this) else
-        // we'll hit deadlock:
-        balanceRAM();
-
-        // Synchronized but fast
-        synchronized(this) {
-          if (aborting) {
-            break;
-          }
-          assert perDoc == null || perDoc.docID == docState.docID;
-          final boolean doPause;
-          if (perDoc != null) {
-            waitQueue.add(perDoc);
-          } else {
-            skipDocWriter.docID = docState.docID;
-            waitQueue.add(skipDocWriter);
-          }
-        }
-
-        success = true;
-      } finally {
-        if (!success) {
-          //System.out.println(Thread.currentThread().getName() + ": E");
-
-          // If this thread state had decided to flush, we
-          // must clear it so another thread can flush
-          if (doFlush) {
-            message("clearFlushPending!");
-            flushControl.clearFlushPending();
-          }
-
-          if (infoStream != null) {
-            message("exception in updateDocuments aborting=" + aborting);
-          }
-
-          synchronized(this) {
-
-            state.isIdle = true;
-            notifyAll();
-
-            if (aborting) {
-              abort();
-            } else {
-
-              // Fill hole in the doc stores for all
-              // docIDs we pre-allocated
-              //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
-              final int endDocID = startDocID + docCount;
-              docID = docState.docID;
-              while(docID < endDocID) {
-                skipDocWriter.docID = docID++;
-                boolean success2 = false;
-                try {
-                  waitQueue.add(skipDocWriter);
-                  success2 = true;
-                } finally {
-                  if (!success2) {
-                    abort();
-                    return false;
-                  }
-                }
-              }
-              //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");
-
-              // Mark all pre-allocated docIDs as deleted:
-              docID = startDocID;
-              while(docID < startDocID + docs.size()) {
-                deleteDocID(docID++);
-              }
-            }
-          }
-        }
-      }
-    }
-
-    synchronized(this) {
-      // We must delay pausing until the full doc block is
-      // added, else we can hit deadlock if more than one
-      // thread is adding a block and we need to pause when
-      // both are only part way done:
-      if (waitQueue.doPause()) {
-        waitForWaitQueue();
-      }
-
-      //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);
-
-      if (aborting) {
-
-        // We are currently aborting, and another thread is
-        // waiting for me to become idle.  We just forcefully
-        // idle this threadState; it will be fully reset by
-        // abort()
-        state.isIdle = true;
-
-        // wakes up any threads waiting on the wait queue
-        notifyAll();
-
-        abort();
-
-        if (doFlush) {
-          message("clearFlushPending!");
-          flushControl.clearFlushPending();
-        }
-
-        return false;
-      }
-
-      // Apply delTerm only after all indexing has
-      // succeeded, but apply it only to docs prior to when
-      // this batch started:
-      if (delTerm != null) {
-        pendingDeletes.addTerm(delTerm, startDocID);
-      }
-
-      state.isIdle = true;
-
-      // wakes up any threads waiting on the wait queue
-      notifyAll();
-    }
-
-    doFlush |= flushControl.flushByRAMUsage("new document");
-
-    //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
-    return doFlush;
-  }
-
-  public synchronized void waitIdle() {
-    while (!allThreadsIdle()) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    }
-  }
-
-  synchronized void waitReady(DocumentsWriterThreadState state) {
-    while (!closed && (!state.isIdle || aborting)) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    }
-
-    if (closed) {
-      throw new AlreadyClosedException("this IndexWriter is closed");
-    }
-  }
-
-  /** Does the synchronized work to finish/flush the
-   *  inverted document. */
-  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
-
-    // Must call this w/o holding synchronized(this) else
-    // we'll hit deadlock:
-    balanceRAM();
-
-    synchronized(this) {
-
-      assert docWriter == null || docWriter.docID == perThread.docState.docID;
-
-      if (aborting) {
-
-        // We are currently aborting, and another thread is
-        // waiting for me to become idle.  We just forcefully
-        // idle this threadState; it will be fully reset by
-        // abort()
-        if (docWriter != null) {
-          try {
-            docWriter.abort();
-          } catch (Throwable t) {
-          }
-        }
-
-        perThread.isIdle = true;
-
-        // wakes up any threads waiting on the wait queue
-        notifyAll();
-
-        return;
-      }
-
-      final boolean doPause;
-
-      if (docWriter != null) {
-        doPause = waitQueue.add(docWriter);
-      } else {
-        skipDocWriter.docID = perThread.docState.docID;
-        doPause = waitQueue.add(skipDocWriter);
-      }
-
-      if (doPause) {
-        waitForWaitQueue();
-      }
-
-      perThread.isIdle = true;
-
-      // wakes up any threads waiting on the wait queue
-      notifyAll();
-    }
-  }
-
-  synchronized void waitForWaitQueue() {
-    do {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    } while (!waitQueue.doResume());
-  }
-
-  private static class SkipDocWriter extends DocWriter {
-    @Override
-    void finish() {
-    }
-    @Override
-    void abort() {
-    }
-    @Override
-    long sizeInBytes() {
-      return 0;
-    }
-  }
-  final SkipDocWriter skipDocWriter = new SkipDocWriter();
-
-  NumberFormat nf = NumberFormat.getInstance();
-
-  /* Initial chunks size of the shared byte[] blocks used to
-     store postings data */
-  final static int BYTE_BLOCK_SHIFT = 15;
-  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
-  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
-  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
-
-  private class ByteBlockAllocator extends ByteBlockPool.Allocator {
-    final int blockSize;
-
-    ByteBlockAllocator(int blockSize) {
-      this.blockSize = blockSize;
-    }
-
-    ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
-    
-    /* Allocate another byte[] from the shared pool */
-    @Override
-    byte[] getByteBlock() {
-      synchronized(DocumentsWriter.this) {
-        final int size = freeByteBlocks.size();
-        final byte[] b;
-        if (0 == size) {
-          b = new byte[blockSize];
-          bytesUsed.addAndGet(blockSize);
-        } else
-          b = freeByteBlocks.remove(size-1);
-        return b;
-      }
-    }
-
-    /* Return byte[]'s to the pool */
-
-    @Override
-    void recycleByteBlocks(byte[][] blocks, int start, int end) {
-      synchronized(DocumentsWriter.this) {
-        for(int i=start;i<end;i++) {
-          freeByteBlocks.add(blocks[i]);
-          blocks[i] = null;
-        }
-      }
-    }
-
-    @Override
-    void recycleByteBlocks(List<byte[]> blocks) {
-      synchronized(DocumentsWriter.this) {
-        final int size = blocks.size();
-        for(int i=0;i<size;i++) {
-          freeByteBlocks.add(blocks.get(i));
-          blocks.set(i, null);
-        }
-      }
-    }
-  }
-
-  /* Initial chunks size of the shared int[] blocks used to
-     store postings data */
-  final static int INT_BLOCK_SHIFT = 13;
-  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
-  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
-  private List<int[]> freeIntBlocks = new ArrayList<int[]>();
-
-  /* Allocate another int[] from the shared pool */
-  synchronized int[] getIntBlock() {
-    final int size = freeIntBlocks.size();
-    final int[] b;
-    if (0 == size) {
-      b = new int[INT_BLOCK_SIZE];
-      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
-    } else {
-      b = freeIntBlocks.remove(size-1);
-    }
-    return b;
-  }
-
-  synchronized void bytesUsed(long numBytes) {
-    bytesUsed.addAndGet(numBytes);
-  }
-
-  long bytesUsed() {
-    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
-  }
-
-  /* Return int[]s to the pool */
-  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
-    for(int i=start;i<end;i++) {
-      freeIntBlocks.add(blocks[i]);
-      blocks[i] = null;
-    }
-  }
-
-  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
-
-  final static int PER_DOC_BLOCK_SIZE = 1024;
-
-  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
-
-
-  /* Initial chunk size of the shared char[] blocks used to
-     store term text */
-  final static int CHAR_BLOCK_SHIFT = 14;
-  final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
-  final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
-
-  final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
-
-  private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
-
-  /* Allocate another char[] from the shared pool */
-  synchronized char[] getCharBlock() {
-    final int size = freeCharBlocks.size();
-    final char[] c;
-    if (0 == size) {
-      bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
-      c = new char[CHAR_BLOCK_SIZE];
-    } else
-      c = freeCharBlocks.remove(size-1);
-    // We always track allocations of char blocks, for now,
-    // because nothing that skips allocation tracking
-    // (currently only term vectors) uses its own char
-    // blocks.
-    return c;
-  }
-
-  /* Return char[]s to the pool */
-  synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
-    for(int i=0;i<numBlocks;i++) {
-      freeCharBlocks.add(blocks[i]);
-      blocks[i] = null;
-    }
-  }
-
-  String toMB(long v) {
-    return nf.format(v/1024./1024.);
-  }
-
-  /* We have four pools of RAM: Postings, byte blocks
-   * (holds freq/prox posting data), char blocks (holds
-   * characters in the term) and per-doc buffers (stored fields/term vectors).  
-   * Different docs require varying amount of storage from 
-   * these four classes.
-   * 
-   * For example, docs with many unique single-occurrence
-   * short terms will use up the Postings RAM and hardly any
-   * of the other two.  Whereas docs with very large terms
-   * will use alot of char blocks RAM and relatively less of
-   * the other two.  This method just frees allocations from
-   * the pools once we are over-budget, which balances the
-   * pools to match the current docs. */
-  void balanceRAM() {
-
-    final boolean doBalance;
-    final long deletesRAMUsed;
-
-    deletesRAMUsed = bufferedDeletesStream.bytesUsed();
-
-    final long ramBufferSize;
-    final double mb = config.getRAMBufferSizeMB();
-    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
-    } else {
-      ramBufferSize = (long) (mb*1024*1024);
-    }
-
-    synchronized(this) {
-      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
-        return;
-      }
-    
-      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
-    }
-
-    if (doBalance) {
-
-      if (infoStream != null) {
-        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
-                " vs trigger=" + toMB(ramBufferSize) +
-                " deletesMB=" + toMB(deletesRAMUsed) +
-                " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
-                " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
-                " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
-      }
-
-      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-
-      int iter = 0;
-
-      // We free equally from each pool in 32 KB
-      // chunks until we are below our threshold
-      // (freeLevel)
-
-      boolean any = true;
-
-      final long freeLevel = (long) (0.95 * ramBufferSize);
-
-      while(bytesUsed()+deletesRAMUsed > freeLevel) {
-      
-        synchronized(this) {
-          if (0 == perDocAllocator.freeByteBlocks.size() 
-              && 0 == byteBlockAllocator.freeByteBlocks.size() 
-              && 0 == freeCharBlocks.size() 
-              && 0 == freeIntBlocks.size() 
-              && !any) {
-            // Nothing else to free -- must flush now.
-            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
-            if (infoStream != null) {
-              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
-                message("    nothing to free; set bufferIsFull");
-              } else {
-                message("    nothing to free");
-              }
-            }
-            break;
-          }
-
-          if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
-            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
-            bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
-          }
-
-          if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
-            freeCharBlocks.remove(freeCharBlocks.size()-1);
-            bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
-          }
-
-          if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
-            freeIntBlocks.remove(freeIntBlocks.size()-1);
-            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
-          }
-
-          if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
-            // Remove upwards of 32 blocks (each block is 1K)
-            for (int i = 0; i < 32; ++i) {
-              perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
-              bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
-              if (perDocAllocator.freeByteBlocks.size() == 0) {
-                break;
-              }
-            }
-          }
-        }
-
-        if ((4 == iter % 5) && any) {
-          // Ask consumer to free any recycled state
-          any = consumer.freeRAM();
-        }
-
-        iter++;
-      }
-
-      if (infoStream != null) {
-        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
-      }
-    }
-  }
-
-  final WaitQueue waitQueue = new WaitQueue();
-
-  private class WaitQueue {
-    DocWriter[] waiting;
-    int nextWriteDocID;
-    int nextWriteLoc;
-    int numWaiting;
-    long waitingBytes;
-
-    public WaitQueue() {
-      waiting = new DocWriter[10];
-    }
-
-    synchronized void reset() {
-      // NOTE: nextWriteLoc doesn't need to be reset
-      assert numWaiting == 0;
-      assert waitingBytes == 0;
-      nextWriteDocID = 0;
-    }
-
-    synchronized boolean doResume() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueueResumeBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueueResumeBytes = 2*1024*1024;
-      } else {
-        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
-      }
-      return waitingBytes <= waitQueueResumeBytes;
-    }
-
-    synchronized boolean doPause() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueuePauseBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueuePauseBytes = 4*1024*1024;
-      } else {
-        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
-      }
-      return waitingBytes > waitQueuePauseBytes;
-    }
-
-    synchronized void abort() {
-      int count = 0;
-      for(int i=0;i<waiting.length;i++) {
-        final DocWriter doc = waiting[i];
-        if (doc != null) {
-          doc.abort();
-          waiting[i] = null;
-          count++;
-        }
-      }
-      waitingBytes = 0;
-      assert count == numWaiting;
-      numWaiting = 0;
-    }
-
-    private void writeDocument(DocWriter doc) throws IOException {
-      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
-      boolean success = false;
-      try {
-        doc.finish();
-        nextWriteDocID++;
-        nextWriteLoc++;
-        assert nextWriteLoc <= waiting.length;
-        if (nextWriteLoc == waiting.length) {
-          nextWriteLoc = 0;
-        }
-        success = true;
-      } finally {
-        if (!success) {
-          setAborting();
-        }
-      }
-    }
-
-    synchronized public boolean add(DocWriter doc) throws IOException {
-
-      assert doc.docID >= nextWriteDocID;
-
-      if (doc.docID == nextWriteDocID) {
-        writeDocument(doc);
-        while(true) {
-          doc = waiting[nextWriteLoc];
-          if (doc != null) {
-            numWaiting--;
-            waiting[nextWriteLoc] = null;
-            waitingBytes -= doc.sizeInBytes();
-            writeDocument(doc);
-          } else {
-            break;
-          }
-        }
-      } else {
-
-        // I finished before documents that were added
-        // before me.  This can easily happen when I am a
-        // small doc and the docs before me were large, or,
-        // just due to luck in the thread scheduling.  Just
-        // add myself to the queue and when that large doc
-        // finishes, it will flush me:
-        int gap = doc.docID - nextWriteDocID;
-        if (gap >= waiting.length) {
-          // Grow queue
-          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
-          assert nextWriteLoc >= 0;
-          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
-          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
-          nextWriteLoc = 0;
-          waiting = newArray;
-          gap = doc.docID - nextWriteDocID;
-        }
-
-        int loc = nextWriteLoc + gap;
-        if (loc >= waiting.length) {
-          loc -= waiting.length;
-        }
-
-        // We should only wrap one time
-        assert loc < waiting.length;
-
-        // Nobody should be in my spot!
-        assert waiting[loc] == null;
-        waiting[loc] = doc;
-        numWaiting++;
-        waitingBytes += doc.sizeInBytes();
-      }
-      
-      return doPause();
-    }
-  }
-}