1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.text.NumberFormat;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicLong;
30 import org.apache.lucene.analysis.Analyzer;
31 import org.apache.lucene.document.Document;
32 import org.apache.lucene.search.Query;
33 import org.apache.lucene.search.Similarity;
34 import org.apache.lucene.store.AlreadyClosedException;
35 import org.apache.lucene.store.Directory;
36 import org.apache.lucene.store.RAMFile;
37 import org.apache.lucene.util.ArrayUtil;
38 import org.apache.lucene.util.BitVector;
39 import org.apache.lucene.util.RamUsageEstimator;
40 import org.apache.lucene.util.ThreadInterruptedException;
44 * This class accepts multiple added documents and directly
45 * writes a single segment file. It does this more
46 * efficiently than creating a single segment per document
47 * (with DocumentWriter) and doing standard merges on those
50 * Each added document is passed to the {@link DocConsumer},
51 * which in turn processes the document and interacts with
52 * other consumers in the indexing chain. Certain
53 * consumers, like {@link StoredFieldsWriter} and {@link
54 * TermVectorsTermsWriter}, digest a document and
55 * immediately write bytes to the "doc store" files (ie,
56 * they do not consume RAM per document, except while they
57 * are processing the document).
59 * Other consumers, eg {@link FreqProxTermsWriter} and
60 * {@link NormsWriter}, buffer bytes in RAM and flush only
61 * when a new segment is produced.
63 * Once we have used our allowed RAM buffer, or the number
64 * of added docs is large enough (in the case we are
65 * flushing by doc count instead of RAM usage), we create a
66 * real segment and flush it to the Directory.
70 * Multiple threads are allowed into addDocument at once.
71 * There is an initial synchronized call to getThreadState
72 * which allocates a ThreadState for this thread. The same
73 * thread will get the same ThreadState over time (thread
74 * affinity) so that if there are consistent patterns (for
75 * example each thread is indexing a different content
76 * source) then we make better use of RAM. Then
77 * processDocument is called on that ThreadState without
78 * synchronization (most of the "heavy lifting" is in this
79 * call). Finally the synchronized "finishDocument" is
80 * called to flush changes to the directory.
82 * When flush is called by IndexWriter we forcefully idle
83 * all threads and flush only once they are all idle. This
84 * means you can call flush with a given thread even while
85 * other threads are actively adding/deleting documents.
90 * Because this class directly updates in-memory posting
91 * lists, and flushes stored fields and term vectors
92 * directly to files in the directory, there are certain
93 * limited times when an exception can corrupt this state.
94 * For example, a disk full while flushing stored fields
95 * leaves this file in a corrupt state. Or, an OOM
96 * exception while appending to the in-memory posting lists
97 * can corrupt that posting list. We call such exceptions
98 * "aborting exceptions". In these cases we must call
99 * abort() to discard all docs added since the last flush.
101 * All other exceptions ("non-aborting exceptions") can
102 * still partially update the index structures. These
103 * updates are consistent, but, they represent only a part
104 * of the document seen up until the exception was hit.
105 * When this happens, we immediately mark the document as
106 * deleted so that the document is always atomically ("all
107 * or none") added to the index.
110 final class DocumentsWriter {
111 final AtomicLong bytesUsed = new AtomicLong(0);
115 String segment; // Current segment we are working on
117 private int nextDocID; // Next docID to be added
118 private int numDocs; // # of docs added, but not yet flushed
120 // Max # ThreadState instances; if there are more threads
121 // than this they share ThreadStates
122 private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
123 private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
125 boolean bufferIsFull; // True when it's time to write segment
126 private boolean aborting; // True if an abort is pending
128 PrintStream infoStream;
129 int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
130 Similarity similarity;
132 // max # simultaneous threads; if there are more than
133 // this, they wait for others to finish first
134 private final int maxThreadStates;
136 // Deletes for our still-in-RAM (to be flushed next) segment
137 private BufferedDeletes pendingDeletes = new BufferedDeletes();
139 static class DocState {
140 DocumentsWriter docWriter;
143 PrintStream infoStream;
144 Similarity similarity;
147 String maxTermPrefix;
149 // Only called by asserts
150 public boolean testPoint(String name) {
151 return docWriter.writer.testPoint(name);
154 public void clear() {
155 // don't hold onto doc nor analyzer, in case it is
162 /** Consumer returns this on each doc. This holds any
163 * state that must be flushed synchronized "in docID
164 * order". We gather these and flush them in order. */
165 abstract static class DocWriter {
168 abstract void finish() throws IOException;
169 abstract void abort();
170 abstract long sizeInBytes();
172 void setNext(DocWriter next) {
178 * Create and return a new DocWriterBuffer.
180 PerDocBuffer newPerDocBuffer() {
181 return new PerDocBuffer();
185 * RAMFile buffer for DocWriters.
187 class PerDocBuffer extends RAMFile {
190 * Allocate bytes used from shared pool.
193 protected byte[] newBuffer(int size) {
194 assert size == PER_DOC_BLOCK_SIZE;
195 return perDocAllocator.getByteBlock();
199 * Recycle the bytes used.
201 synchronized void recycle() {
202 if (buffers.size() > 0) {
205 // Recycle the blocks
206 perDocAllocator.recycleByteBlocks(buffers);
210 assert numBuffers() == 0;
216 * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
217 * which returns the DocConsumer that the DocumentsWriter calls to process the
220 abstract static class IndexingChain {
221 abstract DocConsumer getChain(DocumentsWriter documentsWriter);
224 static final IndexingChain defaultIndexingChain = new IndexingChain() {
227 DocConsumer getChain(DocumentsWriter documentsWriter) {
229 This is the current indexing chain:
231 DocConsumer / DocConsumerPerThread
232 --> code: DocFieldProcessor / DocFieldProcessorPerThread
233 --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
234 --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
235 --> code: DocInverter / DocInverterPerThread / DocInverterPerField
236 --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
237 --> code: TermsHash / TermsHashPerThread / TermsHashPerField
238 --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
239 --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
240 --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
241 --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
242 --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
243 --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
246 // Build up indexing chain:
248 final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
249 final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
251 final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
252 new TermsHash(documentsWriter, false, termVectorsWriter, null));
253 final NormsWriter normsWriter = new NormsWriter();
254 final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
255 return new DocFieldProcessor(documentsWriter, docInverter);
259 final DocConsumer consumer;
261 // How much RAM we can use before flushing. This is 0 if
262 // we are flushing by doc count instead.
264 private final IndexWriterConfig config;
266 private boolean closed;
267 private final FieldInfos fieldInfos;
269 private final BufferedDeletesStream bufferedDeletesStream;
270 private final IndexWriter.FlushControl flushControl;
272 DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
273 this.directory = directory;
274 this.writer = writer;
275 this.similarity = config.getSimilarity();
276 this.maxThreadStates = config.getMaxThreadStates();
277 this.fieldInfos = fieldInfos;
278 this.bufferedDeletesStream = bufferedDeletesStream;
279 flushControl = writer.flushControl;
281 consumer = config.getIndexingChain().getChain(this);
282 this.config = config;
285 // Buffer a specific docID for deletion. Currently only
286 // used when we hit a exception when adding a document
287 synchronized void deleteDocID(int docIDUpto) {
288 pendingDeletes.addDocID(docIDUpto);
289 // NOTE: we do not trigger flush here. This is
290 // potentially a RAM leak, if you have an app that tries
291 // to add docs but every single doc always hits a
292 // non-aborting exception. Allowing a flush here gets
293 // very messy because we are only invoked when handling
294 // exceptions so to do this properly, while handling an
295 // exception we'd have to go off and flush new deletes
296 // which is risky (likely would hit some other
297 // confounding exception).
300 boolean deleteQueries(Query... queries) {
301 final boolean doFlush = flushControl.waitUpdate(0, queries.length);
303 for (Query query : queries) {
304 pendingDeletes.addQuery(query, numDocs);
310 boolean deleteQuery(Query query) {
311 final boolean doFlush = flushControl.waitUpdate(0, 1);
313 pendingDeletes.addQuery(query, numDocs);
318 boolean deleteTerms(Term... terms) {
319 final boolean doFlush = flushControl.waitUpdate(0, terms.length);
321 for (Term term : terms) {
322 pendingDeletes.addTerm(term, numDocs);
328 // TODO: we could check w/ FreqProxTermsWriter: if the
329 // term doesn't exist, don't bother buffering into the
330 // per-DWPT map (but still must go into the global map)
331 boolean deleteTerm(Term term, boolean skipWait) {
332 final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
334 pendingDeletes.addTerm(term, numDocs);
339 public FieldInfos getFieldInfos() {
343 /** If non-null, various details of indexing are printed
345 synchronized void setInfoStream(PrintStream infoStream) {
346 this.infoStream = infoStream;
347 for(int i=0;i<threadStates.length;i++) {
348 threadStates[i].docState.infoStream = infoStream;
352 synchronized void setMaxFieldLength(int maxFieldLength) {
353 this.maxFieldLength = maxFieldLength;
354 for(int i=0;i<threadStates.length;i++) {
355 threadStates[i].docState.maxFieldLength = maxFieldLength;
359 synchronized void setSimilarity(Similarity similarity) {
360 this.similarity = similarity;
361 for(int i=0;i<threadStates.length;i++) {
362 threadStates[i].docState.similarity = similarity;
366 /** Get current segment name we are writing. */
367 synchronized String getSegment() {
371 /** Returns how many docs are currently buffered in RAM. */
372 synchronized int getNumDocs() {
376 void message(String message) {
377 if (infoStream != null) {
378 writer.message("DW: " + message);
382 synchronized void setAborting() {
383 if (infoStream != null) {
384 message("setAborting");
389 /** Called if we hit an exception at a bad time (when
390 * updating the index files) and must discard all
391 * currently buffered docs. This resets our state,
392 * discarding any docs added since last flush. */
393 synchronized void abort() throws IOException {
394 if (infoStream != null) {
395 message("docWriter: abort");
398 boolean success = false;
402 // Forcefully remove waiting ThreadStates from line
405 } catch (Throwable t) {
408 // Wait for all other threads to finish with
413 if (infoStream != null) {
414 message("docWriter: abort waitIdle done");
417 assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
418 waitQueue.waitingBytes = 0;
420 pendingDeletes.clear();
422 for (DocumentsWriterThreadState threadState : threadStates) {
424 threadState.consumer.abort();
425 } catch (Throwable t) {
431 } catch (Throwable t) {
434 // Reset all postings data
442 if (infoStream != null) {
443 message("docWriter: done abort; success=" + success);
448 /** Reset after a flush */
449 private void doAfterFlush() throws IOException {
450 // All ThreadStates should be idle when we are called
451 assert allThreadsIdle();
452 threadBindings.clear();
457 bufferIsFull = false;
458 for(int i=0;i<threadStates.length;i++) {
459 threadStates[i].doAfterFlush();
463 private synchronized boolean allThreadsIdle() {
464 for(int i=0;i<threadStates.length;i++) {
465 if (!threadStates[i].isIdle) {
472 synchronized boolean anyChanges() {
473 return numDocs != 0 || pendingDeletes.any();
477 public BufferedDeletes getPendingDeletes() {
478 return pendingDeletes;
481 private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
482 // Lock order: DW -> BD
483 final long delGen = bufferedDeletesStream.getNextGen();
484 if (pendingDeletes.any()) {
485 if (segmentInfos.size() > 0 || newSegment != null) {
486 final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
487 if (infoStream != null) {
488 message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
490 bufferedDeletesStream.push(packet);
491 if (infoStream != null) {
492 message("flush: delGen=" + packet.gen);
494 if (newSegment != null) {
495 newSegment.setBufferedDeletesGen(packet.gen);
498 if (infoStream != null) {
499 message("flush: drop buffered deletes: no segments");
501 // We can safely discard these deletes: since
502 // there are no segments, the deletions cannot
505 pendingDeletes.clear();
506 } else if (newSegment != null) {
507 newSegment.setBufferedDeletesGen(delGen);
511 public boolean anyDeletions() {
512 return pendingDeletes.any();
515 /** Flush all pending docs to a new segment */
516 // Lock order: IW -> DW
517 synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
519 final long startTime = System.currentTimeMillis();
521 // We change writer's segmentInfos:
522 assert Thread.holdsLock(writer);
528 if (infoStream != null) {
529 message("flush: no docs; skipping");
531 // Lock order: IW -> DW -> BD
532 pushDeletes(null, segmentInfos);
537 if (infoStream != null) {
538 message("flush: skip because aborting is set");
543 boolean success = false;
545 SegmentInfo newSegment;
548 //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
549 assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
550 assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
551 assert waitQueue.waitingBytes == 0;
553 if (infoStream != null) {
554 message("flush postings as segment " + segment + " numDocs=" + numDocs);
557 final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
558 numDocs, writer.getConfig().getTermIndexInterval(),
560 // Apply delete-by-docID now (delete-byDocID only
561 // happens when an exception is hit processing that
562 // doc, eg if analyzer has some problem w/ the text):
563 if (pendingDeletes.docIDs.size() > 0) {
564 flushState.deletedDocs = new BitVector(numDocs);
565 for(int delDocID : pendingDeletes.docIDs) {
566 flushState.deletedDocs.set(delDocID);
568 pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
569 pendingDeletes.docIDs.clear();
572 newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
574 Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
575 for (DocumentsWriterThreadState threadState : threadStates) {
576 threads.add(threadState.consumer);
579 double startMBUsed = bytesUsed()/1024./1024.;
581 consumer.flush(threads, flushState);
583 newSegment.setHasVectors(flushState.hasVectors);
585 if (infoStream != null) {
586 message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
587 if (flushState.deletedDocs != null) {
588 message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
590 message("flushedFiles=" + newSegment.files());
593 if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
594 final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
596 if (infoStream != null) {
597 message("flush: create compound file \"" + cfsFileName + "\"");
600 CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
601 for(String fileName : newSegment.files()) {
602 cfsWriter.addFile(fileName);
605 deleter.deleteNewFiles(newSegment.files());
606 newSegment.setUseCompoundFile(true);
609 // Must write deleted docs after the CFS so we don't
610 // slurp the del file into CFS:
611 if (flushState.deletedDocs != null) {
612 final int delCount = flushState.deletedDocs.count();
614 newSegment.setDelCount(delCount);
615 newSegment.advanceDelGen();
616 final String delFileName = newSegment.getDelFileName();
617 if (infoStream != null) {
618 message("flush: write " + delCount + " deletes to " + delFileName);
620 boolean success2 = false;
622 // TODO: in the NRT case it'd be better to hand
623 // this del vector over to the
624 // shortly-to-be-opened SegmentReader and let it
625 // carry the changes; there's no reason to use
626 // filesystem as intermediary here.
627 flushState.deletedDocs.write(directory, delFileName);
632 directory.deleteFile(delFileName);
633 } catch (Throwable t) {
634 // suppress this so we keep throwing the
635 // original exception
641 if (infoStream != null) {
642 message("flush: segment=" + newSegment);
643 final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
644 final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
645 message(" ramUsed=" + nf.format(startMBUsed) + " MB" +
646 " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
647 " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
648 " docs/MB=" + nf.format(numDocs / newSegmentSize) +
649 " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
656 if (segment != null) {
657 deleter.refresh(segment);
665 // Lock order: IW -> DW -> BD
666 pushDeletes(newSegment, segmentInfos);
667 if (infoStream != null) {
668 message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
674 synchronized void close() {
679 /** Returns a free (idle) ThreadState that may be used for
680 * indexing this one document. This call also pauses if a
681 * flush is pending. If delTerm is non-null then we
682 * buffer this deleted term after the thread state has
684 synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
686 final Thread currentThread = Thread.currentThread();
687 assert !Thread.holdsLock(writer);
689 // First, find a thread state. If this thread already
690 // has affinity to a specific ThreadState, use that one
692 DocumentsWriterThreadState state = threadBindings.get(currentThread);
695 // First time this thread has called us since last
696 // flush. Find the least loaded thread state:
697 DocumentsWriterThreadState minThreadState = null;
698 for(int i=0;i<threadStates.length;i++) {
699 DocumentsWriterThreadState ts = threadStates[i];
700 if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
704 if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
705 state = minThreadState;
708 // Just create a new "private" thread state
709 DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
710 if (threadStates.length > 0) {
711 System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
713 state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
714 threadStates = newArray;
716 threadBindings.put(currentThread, state);
719 // Next, wait until my thread state is idle (in case
720 // it's shared with other threads), and no flush/abort
724 // Allocate segment name if this is the first doc since
726 if (segment == null) {
727 segment = writer.newSegmentName();
731 state.docState.docID = nextDocID;
732 nextDocID += docCount;
734 if (delTerm != null) {
735 pendingDeletes.addTerm(delTerm, state.docState.docID);
739 state.isIdle = false;
743 boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
744 return updateDocument(doc, analyzer, null);
747 boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
748 throws CorruptIndexException, IOException {
750 // Possibly trigger a flush, or wait until any running flush completes:
751 boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
753 // This call is synchronized but fast
754 final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
756 final DocState docState = state.docState;
758 docState.analyzer = analyzer;
760 boolean success = false;
762 // This call is not synchronized and does all the
764 final DocWriter perDoc;
766 perDoc = state.consumer.processDocument();
771 // This call is synchronized but fast
772 finishDocument(state, perDoc);
778 // If this thread state had decided to flush, we
779 // must clear it so another thread can flush
781 flushControl.clearFlushPending();
784 if (infoStream != null) {
785 message("exception in updateDocument aborting=" + aborting);
796 skipDocWriter.docID = docState.docID;
797 boolean success2 = false;
799 waitQueue.add(skipDocWriter);
808 // Immediately mark this document as deleted
809 // since likely it was partially added. This
810 // keeps indexing as "all or none" (atomic) when
811 // adding a document:
812 deleteDocID(state.docState.docID);
818 doFlush |= flushControl.flushByRAMUsage("new document");
823 boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
824 throws CorruptIndexException, IOException {
826 // Possibly trigger a flush, or wait until any running flush completes:
827 boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
829 final int docCount = docs.size();
831 // This call is synchronized but fast -- we allocate the
832 // N docIDs up front:
833 final DocumentsWriterThreadState state = getThreadState(null, docCount);
834 final DocState docState = state.docState;
836 final int startDocID = docState.docID;
837 int docID = startDocID;
839 //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
840 for(Document doc : docs) {
842 docState.analyzer = analyzer;
843 // Assign next docID from our block:
844 docState.docID = docID++;
846 boolean success = false;
848 // This call is not synchronized and does all the
850 final DocWriter perDoc;
852 perDoc = state.consumer.processDocument();
857 // Must call this w/o holding synchronized(this) else
858 // we'll hit deadlock:
861 // Synchronized but fast
866 assert perDoc == null || perDoc.docID == docState.docID;
867 final boolean doPause;
868 if (perDoc != null) {
869 waitQueue.add(perDoc);
871 skipDocWriter.docID = docState.docID;
872 waitQueue.add(skipDocWriter);
879 //System.out.println(Thread.currentThread().getName() + ": E");
881 // If this thread state had decided to flush, we
882 // must clear it so another thread can flush
884 message("clearFlushPending!");
885 flushControl.clearFlushPending();
888 if (infoStream != null) {
889 message("exception in updateDocuments aborting=" + aborting);
901 // Fill hole in the doc stores for all
902 // docIDs we pre-allocated
903 //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
904 final int endDocID = startDocID + docCount;
905 docID = docState.docID;
906 while(docID < endDocID) {
907 skipDocWriter.docID = docID++;
908 boolean success2 = false;
910 waitQueue.add(skipDocWriter);
919 //System.out.println(Thread.currentThread().getName() + ": F " + docCount + " done");
921 // Mark all pre-allocated docIDs as deleted:
923 while(docID < startDocID + docs.size()) {
924 deleteDocID(docID++);
933 // We must delay pausing until the full doc block is
934 // added, else we can hit deadlock if more than one
935 // thread is adding a block and we need to pause when
936 // both are only part way done:
937 if (waitQueue.doPause()) {
941 //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
945 // We are currently aborting, and another thread is
946 // waiting for me to become idle. We just forcefully
947 // idle this threadState; it will be fully reset by
951 // wakes up any threads waiting on the wait queue
957 message("clearFlushPending!");
958 flushControl.clearFlushPending();
964 // Apply delTerm only after all indexing has
965 // succeeded, but apply it only to docs prior to when
966 // this batch started:
967 if (delTerm != null) {
968 pendingDeletes.addTerm(delTerm, startDocID);
973 // wakes up any threads waiting on the wait queue
977 doFlush |= flushControl.flushByRAMUsage("new document");
979 //System.out.println(Thread.currentThread().getName() + ": B " + docCount);
983 public synchronized void waitIdle() {
984 while (!allThreadsIdle()) {
987 } catch (InterruptedException ie) {
988 throw new ThreadInterruptedException(ie);
993 synchronized void waitReady(DocumentsWriterThreadState state) {
994 while (!closed && (!state.isIdle || aborting)) {
997 } catch (InterruptedException ie) {
998 throw new ThreadInterruptedException(ie);
1003 throw new AlreadyClosedException("this IndexWriter is closed");
1007 /** Does the synchronized work to finish/flush the
1008 * inverted document. */
1009 private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
1011 // Must call this w/o holding synchronized(this) else
1012 // we'll hit deadlock:
1015 synchronized(this) {
1017 assert docWriter == null || docWriter.docID == perThread.docState.docID;
1021 // We are currently aborting, and another thread is
1022 // waiting for me to become idle. We just forcefully
1023 // idle this threadState; it will be fully reset by
1025 if (docWriter != null) {
1028 } catch (Throwable t) {
1032 perThread.isIdle = true;
1034 // wakes up any threads waiting on the wait queue
1040 final boolean doPause;
1042 if (docWriter != null) {
1043 doPause = waitQueue.add(docWriter);
1045 skipDocWriter.docID = perThread.docState.docID;
1046 doPause = waitQueue.add(skipDocWriter);
1053 perThread.isIdle = true;
1055 // wakes up any threads waiting on the wait queue
1060 synchronized void waitForWaitQueue() {
1064 } catch (InterruptedException ie) {
1065 throw new ThreadInterruptedException(ie);
1067 } while (!waitQueue.doResume());
1070 private static class SkipDocWriter extends DocWriter {
1078 long sizeInBytes() {
1082 final SkipDocWriter skipDocWriter = new SkipDocWriter();
1084 NumberFormat nf = NumberFormat.getInstance();
1086 /* Initial chunks size of the shared byte[] blocks used to
1087 store postings data */
1088 final static int BYTE_BLOCK_SHIFT = 15;
1089 final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
1090 final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
1091 final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
1093 private class ByteBlockAllocator extends ByteBlockPool.Allocator {
1094 final int blockSize;
1096 ByteBlockAllocator(int blockSize) {
1097 this.blockSize = blockSize;
1100 ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
1102 /* Allocate another byte[] from the shared pool */
1104 byte[] getByteBlock() {
1105 synchronized(DocumentsWriter.this) {
1106 final int size = freeByteBlocks.size();
1109 b = new byte[blockSize];
1110 bytesUsed.addAndGet(blockSize);
1112 b = freeByteBlocks.remove(size-1);
1117 /* Return byte[]'s to the pool */
1120 void recycleByteBlocks(byte[][] blocks, int start, int end) {
1121 synchronized(DocumentsWriter.this) {
1122 for(int i=start;i<end;i++) {
1123 freeByteBlocks.add(blocks[i]);
1130 void recycleByteBlocks(List<byte[]> blocks) {
1131 synchronized(DocumentsWriter.this) {
1132 final int size = blocks.size();
1133 for(int i=0;i<size;i++) {
1134 freeByteBlocks.add(blocks.get(i));
1135 blocks.set(i, null);
1141 /* Initial chunks size of the shared int[] blocks used to
1142 store postings data */
1143 final static int INT_BLOCK_SHIFT = 13;
1144 final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
1145 final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
1147 private List<int[]> freeIntBlocks = new ArrayList<int[]>();
1149 /* Allocate another int[] from the shared pool */
1150 synchronized int[] getIntBlock() {
1151 final int size = freeIntBlocks.size();
1154 b = new int[INT_BLOCK_SIZE];
1155 bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
1157 b = freeIntBlocks.remove(size-1);
1162 synchronized void bytesUsed(long numBytes) {
1163 bytesUsed.addAndGet(numBytes);
1167 return bytesUsed.get() + pendingDeletes.bytesUsed.get();
1170 /* Return int[]s to the pool */
1171 synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
1172 for(int i=start;i<end;i++) {
1173 freeIntBlocks.add(blocks[i]);
1178 ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
1180 final static int PER_DOC_BLOCK_SIZE = 1024;
1182 final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
1185 /* Initial chunk size of the shared char[] blocks used to
1187 final static int CHAR_BLOCK_SHIFT = 14;
1188 final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
1189 final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
1191 final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
1193 private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
1195 /* Allocate another char[] from the shared pool */
1196 synchronized char[] getCharBlock() {
1197 final int size = freeCharBlocks.size();
1200 bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1201 c = new char[CHAR_BLOCK_SIZE];
1203 c = freeCharBlocks.remove(size-1);
1204 // We always track allocations of char blocks, for now,
1205 // because nothing that skips allocation tracking
1206 // (currently only term vectors) uses its own char
1211 /* Return char[]s to the pool */
1212 synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
1213 for(int i=0;i<numBlocks;i++) {
1214 freeCharBlocks.add(blocks[i]);
1219 String toMB(long v) {
1220 return nf.format(v/1024./1024.);
1223 /* We have four pools of RAM: Postings, byte blocks
1224 * (holds freq/prox posting data), char blocks (holds
1225 * characters in the term) and per-doc buffers (stored fields/term vectors).
1226 * Different docs require varying amount of storage from
1227 * these four classes.
1229 * For example, docs with many unique single-occurrence
1230 * short terms will use up the Postings RAM and hardly any
1231 * of the other two. Whereas docs with very large terms
1232 * will use alot of char blocks RAM and relatively less of
1233 * the other two. This method just frees allocations from
1234 * the pools once we are over-budget, which balances the
1235 * pools to match the current docs. */
1238 final boolean doBalance;
1239 final long deletesRAMUsed;
1241 deletesRAMUsed = bufferedDeletesStream.bytesUsed();
1243 final long ramBufferSize;
1244 final double mb = config.getRAMBufferSizeMB();
1245 if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1246 ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
1248 ramBufferSize = (long) (mb*1024*1024);
1251 synchronized(this) {
1252 if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
1256 doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
1261 if (infoStream != null) {
1262 message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
1263 " vs trigger=" + toMB(ramBufferSize) +
1264 " deletesMB=" + toMB(deletesRAMUsed) +
1265 " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
1266 " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
1267 " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
1270 final long startBytesUsed = bytesUsed() + deletesRAMUsed;
1274 // We free equally from each pool in 32 KB
1275 // chunks until we are below our threshold
1280 final long freeLevel = (long) (0.95 * ramBufferSize);
1282 while(bytesUsed()+deletesRAMUsed > freeLevel) {
1284 synchronized(this) {
1285 if (0 == perDocAllocator.freeByteBlocks.size()
1286 && 0 == byteBlockAllocator.freeByteBlocks.size()
1287 && 0 == freeCharBlocks.size()
1288 && 0 == freeIntBlocks.size()
1290 // Nothing else to free -- must flush now.
1291 bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
1292 if (infoStream != null) {
1293 if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
1294 message(" nothing to free; set bufferIsFull");
1296 message(" nothing to free");
1302 if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
1303 byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
1304 bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
1307 if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
1308 freeCharBlocks.remove(freeCharBlocks.size()-1);
1309 bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
1312 if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
1313 freeIntBlocks.remove(freeIntBlocks.size()-1);
1314 bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
1317 if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
1318 // Remove upwards of 32 blocks (each block is 1K)
1319 for (int i = 0; i < 32; ++i) {
1320 perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
1321 bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
1322 if (perDocAllocator.freeByteBlocks.size() == 0) {
1329 if ((4 == iter % 5) && any) {
1330 // Ask consumer to free any recycled state
1331 any = consumer.freeRAM();
1337 if (infoStream != null) {
1338 message(" after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
1343 final WaitQueue waitQueue = new WaitQueue();
1345 private class WaitQueue {
1346 DocWriter[] waiting;
1352 public WaitQueue() {
1353 waiting = new DocWriter[10];
1356 synchronized void reset() {
1357 // NOTE: nextWriteLoc doesn't need to be reset
1358 assert numWaiting == 0;
1359 assert waitingBytes == 0;
1363 synchronized boolean doResume() {
1364 final double mb = config.getRAMBufferSizeMB();
1365 final long waitQueueResumeBytes;
1366 if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1367 waitQueueResumeBytes = 2*1024*1024;
1369 waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
1371 return waitingBytes <= waitQueueResumeBytes;
1374 synchronized boolean doPause() {
1375 final double mb = config.getRAMBufferSizeMB();
1376 final long waitQueuePauseBytes;
1377 if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
1378 waitQueuePauseBytes = 4*1024*1024;
1380 waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
1382 return waitingBytes > waitQueuePauseBytes;
1385 synchronized void abort() {
1387 for(int i=0;i<waiting.length;i++) {
1388 final DocWriter doc = waiting[i];
1396 assert count == numWaiting;
1400 private void writeDocument(DocWriter doc) throws IOException {
1401 assert doc == skipDocWriter || nextWriteDocID == doc.docID;
1402 boolean success = false;
1407 assert nextWriteLoc <= waiting.length;
1408 if (nextWriteLoc == waiting.length) {
1419 synchronized public boolean add(DocWriter doc) throws IOException {
1421 assert doc.docID >= nextWriteDocID;
1423 if (doc.docID == nextWriteDocID) {
1426 doc = waiting[nextWriteLoc];
1429 waiting[nextWriteLoc] = null;
1430 waitingBytes -= doc.sizeInBytes();
1438 // I finished before documents that were added
1439 // before me. This can easily happen when I am a
1440 // small doc and the docs before me were large, or,
1441 // just due to luck in the thread scheduling. Just
1442 // add myself to the queue and when that large doc
1443 // finishes, it will flush me:
1444 int gap = doc.docID - nextWriteDocID;
1445 if (gap >= waiting.length) {
1447 DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
1448 assert nextWriteLoc >= 0;
1449 System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
1450 System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
1453 gap = doc.docID - nextWriteDocID;
1456 int loc = nextWriteLoc + gap;
1457 if (loc >= waiting.length) {
1458 loc -= waiting.length;
1461 // We should only wrap one time
1462 assert loc < waiting.length;
1464 // Nobody should be in my spot!
1465 assert waiting[loc] == null;
1468 waitingBytes += doc.sizeInBytes();