+++ /dev/null
-package org.apache.lucene.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Similarity;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMFile;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.RamUsageEstimator;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-
-/**
- * This class accepts multiple added documents and directly
- * writes a single segment file. It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
- *
- * Each added document is passed to the {@link DocConsumer},
- * which in turn processes the document and interacts with
- * other consumers in the indexing chain. Certain
- * consumers, like {@link StoredFieldsWriter} and {@link
- * TermVectorsTermsWriter}, digest a document and
- * immediately write bytes to the "doc store" files (ie,
- * they do not consume RAM per document, except while they
- * are processing the document).
- *
- * Other consumers, eg {@link FreqProxTermsWriter} and
- * {@link NormsWriter}, buffer bytes in RAM and flush only
- * when a new segment is produced.
-
- * Once we have used our allowed RAM buffer, or the number
- * of added docs is large enough (in the case we are
- * flushing by doc count instead of RAM usage), we create a
- * real segment and flush it to the Directory.
- *
- * Threads:
- *
- * Multiple threads are allowed into addDocument at once.
- * There is an initial synchronized call to getThreadState
- * which allocates a ThreadState for this thread. The same
- * thread will get the same ThreadState over time (thread
- * affinity) so that if there are consistent patterns (for
- * example each thread is indexing a different content
- * source) then we make better use of RAM. Then
- * processDocument is called on that ThreadState without
- * synchronization (most of the "heavy lifting" is in this
- * call). Finally the synchronized "finishDocument" is
- * called to flush changes to the directory.
- *
- * When flush is called by IndexWriter we forcefully idle
- * all threads and flush only once they are all idle. This
- * means you can call flush with a given thread even while
- * other threads are actively adding/deleting documents.
- *
- *
- * Exceptions:
- *
- * Because this class directly updates in-memory posting
- * lists, and flushes stored fields and term vectors
- * directly to files in the directory, there are certain
- * limited times when an exception can corrupt this state.
- * For example, a disk full while flushing stored fields
- * leaves this file in a corrupt state. Or, an OOM
- * exception while appending to the in-memory posting lists
- * can corrupt that posting list. We call such exceptions
- * "aborting exceptions". In these cases we must call
- * abort() to discard all docs added since the last flush.
- *
- * All other exceptions ("non-aborting exceptions") can
- * still partially update the index structures. These
- * updates are consistent, but, they represent only a part
- * of the document seen up until the exception was hit.
- * When this happens, we immediately mark the document as
- * deleted so that the document is always atomically ("all
- * or none") added to the index.
- */
-
-final class DocumentsWriter {
- final AtomicLong bytesUsed = new AtomicLong(0);
- IndexWriter writer;
- Directory directory;
-
- String segment; // Current segment we are working on
-
- private int nextDocID; // Next docID to be added
- private int numDocs; // # of docs added, but not yet flushed
-
- // Max # ThreadState instances; if there are more threads
- // than this they share ThreadStates
- private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
- private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
-
- boolean bufferIsFull; // True when it's time to write segment
- private boolean aborting; // True if an abort is pending
-
- PrintStream infoStream;
- int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
- Similarity similarity;
-
- // max # simultaneous threads; if there are more than
- // this, they wait for others to finish first
- private final int maxThreadStates;
-
- // Deletes for our still-in-RAM (to be flushed next) segment
- private BufferedDeletes pendingDeletes = new BufferedDeletes();
-
- static class DocState {
- DocumentsWriter docWriter;
- Analyzer analyzer;
- int maxFieldLength;
- PrintStream infoStream;
- Similarity similarity;
- int docID;
- Document doc;
- String maxTermPrefix;
-
- // Only called by asserts
- public boolean testPoint(String name) {
- return docWriter.writer.testPoint(name);
- }
-
- public void clear() {
- // don't hold onto doc nor analyzer, in case it is
- // largish:
- doc = null;
- analyzer = null;
- }
- }
-
- /** Consumer returns this on each doc. This holds any
- * state that must be flushed synchronized "in docID
- * order". We gather these and flush them in order. */
- abstract static class DocWriter {
- DocWriter next;
- int docID;
- abstract void finish() throws IOException;
- abstract void abort();
- abstract long sizeInBytes();
-
- void setNext(DocWriter next) {
- this.next = next;
- }
- }
-
- /**
- * Create and return a new DocWriterBuffer.
- */
- PerDocBuffer newPerDocBuffer() {
- return new PerDocBuffer();
- }
-
- /**
- * RAMFile buffer for DocWriters.
- */
- class PerDocBuffer extends RAMFile {
-
- /**
- * Allocate bytes used from shared pool.
- */
- @Override
- protected byte[] newBuffer(int size) {
- assert size == PER_DOC_BLOCK_SIZE;
- return perDocAllocator.getByteBlock();
- }
-
- /**
- * Recycle the bytes used.
- */
- synchronized void recycle() {
- if (buffers.size() > 0) {
- setLength(0);
-
- // Recycle the blocks
- perDocAllocator.recycleByteBlocks(buffers);
- buffers.clear();
- sizeInBytes = 0;
-
- assert numBuffers() == 0;
- }
- }
- }
-
- /**
- * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
- * which returns the DocConsumer that the DocumentsWriter calls to process the
- * documents.
- */
- abstract static class IndexingChain {
- abstract DocConsumer getChain(DocumentsWriter documentsWriter);
- }
-
- static final IndexingChain defaultIndexingChain = new IndexingChain() {
-
- @Override
- DocConsumer getChain(DocumentsWriter documentsWriter) {
- /*
- This is the current indexing chain:
-
- DocConsumer / DocConsumerPerThread
- --> code: DocFieldProcessor / DocFieldProcessorPerThread
- --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
- --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
- --> code: DocInverter / DocInverterPerThread / DocInverterPerField
- --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
- --> code: TermsHash / TermsHashPerThread / TermsHashPerField
- --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
- --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
- --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
- --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
- --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
- --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
- */
-
- // Build up indexing chain:
-
- final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
- final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
-
- final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
- new TermsHash(documentsWriter, false, termVectorsWriter, null));
- final NormsWriter normsWriter = new NormsWriter();
- final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
- return new DocFieldProcessor(documentsWriter, docInverter);
- }
- };
-
- final DocConsumer consumer;
-
- // How much RAM we can use before flushing. This is 0 if
- // we are flushing by doc count instead.
-
- private final IndexWriterConfig config;
-
- private boolean closed;
- private final FieldInfos fieldInfos;
-
- private final BufferedDeletesStream bufferedDeletesStream;
- private final IndexWriter.FlushControl flushControl;
-
- DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
- this.directory = directory;
- this.writer = writer;
- this.similarity = config.getSimilarity();
- this.maxThreadStates = config.getMaxThreadStates();
- this.fieldInfos = fieldInfos;
- this.bufferedDeletesStream = bufferedDeletesStream;
- flushControl = writer.flushControl;
-
- consumer = config.getIndexingChain().getChain(this);
- this.config = config;
- }
-
- // Buffer a specific docID for deletion. Currently only
- // used when we hit a exception when adding a document
- synchronized void deleteDocID(int docIDUpto) {
- pendingDeletes.addDocID(docIDUpto);
- // NOTE: we do not trigger flush here. This is
- // potentially a RAM leak, if you have an app that tries
- // to add docs but every single doc always hits a
- // non-aborting exception. Allowing a flush here gets
- // very messy because we are only invoked when handling
- // exceptions so to do this properly, while handling an
- // exception we'd have to go off and flush new deletes
- // which is risky (likely would hit some other
- // confounding exception).
- }
-
- boolean deleteQueries(Query... queries) {
- final boolean doFlush = flushControl.waitUpdate(0, queries.length);
- synchronized(this) {
- for (Query query : queries) {
- pendingDeletes.addQuery(query, numDocs);
- }
- }
- return doFlush;
- }
-
- boolean deleteQuery(Query query) {
- final boolean doFlush = flushControl.waitUpdate(0, 1);
- synchronized(this) {
- pendingDeletes.addQuery(query, numDocs);
- }
- return doFlush;
- }
-
- boolean deleteTerms(Term... terms) {
- final boolean doFlush = flushControl.waitUpdate(0, terms.length);
- synchronized(this) {
- for (Term term : terms) {
- pendingDeletes.addTerm(term, numDocs);
- }
- }
- return doFlush;
- }
-
- // TODO: we could check w/ FreqProxTermsWriter: if the
- // term doesn't exist, don't bother buffering into the
- // per-DWPT map (but still must go into the global map)
- boolean deleteTerm(Term term, boolean skipWait) {
- final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
- synchronized(this) {
- pendingDeletes.addTerm(term, numDocs);
- }
- return doFlush;
- }
-
- public FieldInfos getFieldInfos() {
- return fieldInfos;
- }
-
- /** If non-null, various details of indexing are printed
- * here. */
- synchronized void setInfoStream(PrintStream infoStream) {
- this.infoStream = infoStream;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].docState.infoStream = infoStream;
- }
- }
-
- synchronized void setMaxFieldLength(int maxFieldLength) {
- this.maxFieldLength = maxFieldLength;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].docState.maxFieldLength = maxFieldLength;
- }
- }
-
- synchronized void setSimilarity(Similarity similarity) {
- this.similarity = similarity;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].docState.similarity = similarity;
- }
- }
-
- /** Get current segment name we are writing. */
- synchronized String getSegment() {
- return segment;
- }
-
- /** Returns how many docs are currently buffered in RAM. */
- synchronized int getNumDocs() {
- return numDocs;
- }
-
- void message(String message) {
- if (infoStream != null) {
- writer.message("DW: " + message);
- }
- }
-
- synchronized void setAborting() {
- if (infoStream != null) {
- message("setAborting");
- }
- aborting = true;
- }
-
- /** Called if we hit an exception at a bad time (when
- * updating the index files) and must discard all
- * currently buffered docs. This resets our state,
- * discarding any docs added since last flush. */
- synchronized void abort() throws IOException {
- if (infoStream != null) {
- message("docWriter: abort");
- }
-
- boolean success = false;
-
- try {
-
- // Forcefully remove waiting ThreadStates from line
- try {
- waitQueue.abort();
- } catch (Throwable t) {
- }
-
- // Wait for all other threads to finish with
- // DocumentsWriter:
- try {
- waitIdle();
- } finally {
- if (infoStream != null) {
- message("docWriter: abort waitIdle done");
- }
-
- assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
- waitQueue.waitingBytes = 0;
-
- pendingDeletes.clear();
-
- for (DocumentsWriterThreadState threadState : threadStates) {
- try {
- threadState.consumer.abort();
- } catch (Throwable t) {
- }
- }
-
- try {
- consumer.abort();
- } catch (Throwable t) {
- }
-
- // Reset all postings data
- doAfterFlush();
- }
-
- success = true;
- } finally {
- aborting = false;
- notifyAll();
- if (infoStream != null) {
- message("docWriter: done abort; success=" + success);
- }
- }
- }
-
- /** Reset after a flush */
- private void doAfterFlush() throws IOException {
- // All ThreadStates should be idle when we are called
- assert allThreadsIdle();
- threadBindings.clear();
- waitQueue.reset();
- segment = null;
- numDocs = 0;
- nextDocID = 0;
- bufferIsFull = false;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].doAfterFlush();
- }
- }
-
- private synchronized boolean allThreadsIdle() {
- for(int i=0;i<threadStates.length;i++) {
- if (!threadStates[i].isIdle) {
- return false;
- }
- }
- return true;
- }
-
- synchronized boolean anyChanges() {
- return numDocs != 0 || pendingDeletes.any();
- }
-
- // for testing
- public BufferedDeletes getPendingDeletes() {
- return pendingDeletes;
- }
-
- private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
- // Lock order: DW -> BD
- final long delGen = bufferedDeletesStream.getNextGen();
- if (pendingDeletes.any()) {
- if (segmentInfos.size() > 0 || newSegment != null) {
- final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
- if (infoStream != null) {
- message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
- }
- bufferedDeletesStream.push(packet);
- if (infoStream != null) {
- message("flush: delGen=" + packet.gen);
- }
- if (newSegment != null) {
- newSegment.setBufferedDeletesGen(packet.gen);
- }
- } else {
- if (infoStream != null) {
- message("flush: drop buffered deletes: no segments");
- }
- // We can safely discard these deletes: since
- // there are no segments, the deletions cannot
- // affect anything.
- }
- pendingDeletes.clear();
- } else if (newSegment != null) {
- newSegment.setBufferedDeletesGen(delGen);
- }
- }
-
- public boolean anyDeletions() {
- return pendingDeletes.any();
- }
-
- /** Flush all pending docs to a new segment */
- // Lock order: IW -> DW
- synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
-
- final long startTime = System.currentTimeMillis();
-
- // We change writer's segmentInfos:
- assert Thread.holdsLock(writer);
-
- waitIdle();
-
- if (numDocs == 0) {
- // nothing to do!
- if (infoStream != null) {
- message("flush: no docs; skipping");
- }
- // Lock order: IW -> DW -> BD
- pushDeletes(null, segmentInfos);
- return null;
- }
-
- if (aborting) {
- if (infoStream != null) {
- message("flush: skip because aborting is set");
- }
- return null;
- }
-
- boolean success = false;
-
- SegmentInfo newSegment;
-
- try {
- //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
- assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
- assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
- assert waitQueue.waitingBytes == 0;
-
- if (infoStream != null) {
- message("flush postings as segment " + segment + " numDocs=" + numDocs);
- }
-
- final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
- numDocs, writer.getConfig().getTermIndexInterval(),
- pendingDeletes);
- // Apply delete-by-docID now (delete-byDocID only
- // happens when an exception is hit processing that
- // doc, eg if analyzer has some problem w/ the text):
- if (pendingDeletes.docIDs.size() > 0) {
- flushState.deletedDocs = new BitVector(numDocs);
- for(int delDocID : pendingDeletes.docIDs) {
- flushState.deletedDocs.set(delDocID);
- }
- pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
- pendingDeletes.docIDs.clear();
- }
-
- newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);
-
- Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
- for (DocumentsWriterThreadState threadState : threadStates) {
- threads.add(threadState.consumer);
- }
-
- double startMBUsed = bytesUsed()/1024./1024.;
-
- consumer.flush(threads, flushState);
-
- newSegment.setHasVectors(flushState.hasVectors);
-
- if (infoStream != null) {
- message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
- if (flushState.deletedDocs != null) {
- message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
- }
- message("flushedFiles=" + newSegment.files());
- }
-
- if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
- final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);
-
- if (infoStream != null) {
- message("flush: create compound file \"" + cfsFileName + "\"");
- }
-
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
- for(String fileName : newSegment.files()) {
- cfsWriter.addFile(fileName);
- }
- cfsWriter.close();
- deleter.deleteNewFiles(newSegment.files());
- newSegment.setUseCompoundFile(true);
- }
-
- // Must write deleted docs after the CFS so we don't
- // slurp the del file into CFS:
- if (flushState.deletedDocs != null) {
- final int delCount = flushState.deletedDocs.count();
- assert delCount > 0;
- newSegment.setDelCount(delCount);
- newSegment.advanceDelGen();
- final String delFileName = newSegment.getDelFileName();
- if (infoStream != null) {
- message("flush: write " + delCount + " deletes to " + delFileName);
- }
- boolean success2 = false;
- try {
- // TODO: in the NRT case it'd be better to hand
- // this del vector over to the
- // shortly-to-be-opened SegmentReader and let it
- // carry the changes; there's no reason to use
- // filesystem as intermediary here.
- flushState.deletedDocs.write(directory, delFileName);
- success2 = true;
- } finally {
- if (!success2) {
- try {
- directory.deleteFile(delFileName);
- } catch (Throwable t) {
- // suppress this so we keep throwing the
- // original exception
- }
- }
- }
- }
-
- if (infoStream != null) {
- message("flush: segment=" + newSegment);
- final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
- final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
- message(" ramUsed=" + nf.format(startMBUsed) + " MB" +
- " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
- " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
- " docs/MB=" + nf.format(numDocs / newSegmentSize) +
- " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
- }
-
- success = true;
- } finally {
- notifyAll();
- if (!success) {
- if (segment != null) {
- deleter.refresh(segment);
- }
- abort();
- }
- }
-
- doAfterFlush();
-
- // Lock order: IW -> DW -> BD
- pushDeletes(newSegment, segmentInfos);
- if (infoStream != null) {
- message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
- }
-
- return newSegment;
- }
-
- synchronized void close() {
- closed = true;
- notifyAll();
- }
-
- /** Returns a free (idle) ThreadState that may be used for
- * indexing this one document. This call also pauses if a
- * flush is pending. If delTerm is non-null then we
- * buffer this deleted term after the thread state has
- * been acquired. */
- synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
-
- final Thread currentThread = Thread.currentThread();
- assert !Thread.holdsLock(writer);
-
- // First, find a thread state. If this thread already
- // has affinity to a specific ThreadState, use that one
- // again.
- DocumentsWriterThreadState state = threadBindings.get(currentThread);
- if (state == null) {
-
- // First time this thread has called us since last
- // flush. Find the least loaded thread state:
- DocumentsWriterThreadState minThreadState = null;
- for(int i=0;i<threadStates.length;i++) {
- DocumentsWriterThreadState ts = threadStates[i];
- if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
- minThreadState = ts;
- }
- }
- if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
- state = minThreadState;
- state.numThreads++;
- } else {
- // Just create a new "private" thread state
- DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
- if (threadStates.length > 0) {
- System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
- }
- state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
- threadStates = newArray;
- }
- threadBindings.put(currentThread, state);
- }
-
- // Next, wait until my thread state is idle (in case
- // it's shared with other threads), and no flush/abort
- // pending
- waitReady(state);
-
- // Allocate segment name if this is the first doc since
- // last flush:
- if (segment == null) {
- segment = writer.newSegmentName();
- assert numDocs == 0;
- }
-
- state.docState.docID = nextDocID;
- nextDocID += docCount;
-
- if (delTerm != null) {
- pendingDeletes.addTerm(delTerm, state.docState.docID);
- }
-
- numDocs += docCount;
- state.isIdle = false;
- return state;
- }
-
- boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
- return updateDocument(doc, analyzer, null);
- }
-
- boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
- throws CorruptIndexException, IOException {
-
- // Possibly trigger a flush, or wait until any running flush completes:
- boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
-
- // This call is synchronized but fast
- final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
-
- final DocState docState = state.docState;
- docState.doc = doc;
- docState.analyzer = analyzer;
-
- boolean success = false;
- try {
- // This call is not synchronized and does all the
- // work
- final DocWriter perDoc;
- try {
- perDoc = state.consumer.processDocument();
- } finally {
- docState.clear();
- }
-
- // This call is synchronized but fast
- finishDocument(state, perDoc);
-
- success = true;
- } finally {
- if (!success) {
-
- // If this thread state had decided to flush, we
- // must clear it so another thread can flush
- if (doFlush) {
- flushControl.clearFlushPending();
- }
-
- if (infoStream != null) {
- message("exception in updateDocument aborting=" + aborting);
- }
-
- synchronized(this) {
-
- state.isIdle = true;
- notifyAll();
-
- if (aborting) {
- abort();
- } else {
- skipDocWriter.docID = docState.docID;
- boolean success2 = false;
- try {
- waitQueue.add(skipDocWriter);
- success2 = true;
- } finally {
- if (!success2) {
- abort();
- return false;
- }
- }
-
- // Immediately mark this document as deleted
- // since likely it was partially added. This
- // keeps indexing as "all or none" (atomic) when
- // adding a document:
- deleteDocID(state.docState.docID);
- }
- }
- }
- }
-
- doFlush |= flushControl.flushByRAMUsage("new document");
-
- return doFlush;
- }
-
- boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
- throws CorruptIndexException, IOException {
-
- // Possibly trigger a flush, or wait until any running flush completes:
- boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
-
- final int docCount = docs.size();
-
- // This call is synchronized but fast -- we allocate the
- // N docIDs up front:
- final DocumentsWriterThreadState state = getThreadState(null, docCount);
- final DocState docState = state.docState;
-
- final int startDocID = docState.docID;
- int docID = startDocID;
-
- //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
- for(Document doc : docs) {
- docState.doc = doc;
- docState.analyzer = analyzer;
- // Assign next docID from our block:
- docState.docID = docID++;
-
- boolean success = false;
- try {
- // This call is not synchronized and does all the
- // work
- final DocWriter perDoc;
- try {
- perDoc = state.consumer.processDocument();
- } finally {
- docState.clear();
- }
-
- // Must call this w/o holding synchronized(this) else
- // we'll hit deadlock:
- balanceRAM();
-
- // Synchronized but fast
- synchronized(this) {
- if (aborting) {
- break;
- }
- assert perDoc == null || perDoc.docID == docState.docID;
- final boolean doPause;
- if (perDoc != null) {
- waitQueue.add(perDoc);
- } else {
- skipDocWriter.docID = docState.docID;
- waitQueue.add(skipDocWriter);
- }
- }
-
- success = true;
- } finally {
- if (!success) {
- //System.out.println(Thread.currentThread().getName() + ": E");
-
- // If this thread state had decided to flush, we
- // must clear it so another thread can flush
- if (doFlush) {
- message("clearFlushPending!");
- flushControl.clearFlushPending();
- }
-
- if (infoStream != null) {
- message("exception in updateDocuments aborting=" + aborting);
- }
-
- synchronized(this) {
-
- state.isIdle = true;
- notifyAll();
-
- if (aborting) {
- abort();
- } else {
-
- // Fill hole in the doc stores for all
- // docIDs we pre-allocated
- //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
- final int endDocID = startDocID + docCount;
- docID = docState.docID;
- while(docID < endDocID) {
- skipDocWriter.docID = docID++;
- boolean success2 = false;
- try {
- waitQueue.add(skipDocWriter);
- success2 = true;
- } finally {
- if (!success2) {
- abort();
- return false;
- }
- }
- }
- //System.out.println(Thread.currentThread().getName() + ": F " + docCount + " done");
-
- // Mark all pre-allocated docIDs as deleted:
- docID = startDocID;
- while(docID < startDocID + docs.size()) {
- deleteDocID(docID++);
- }
- }
- }
- }
- }
- }
-
- synchronized(this) {
- // We must delay pausing until the full doc block is
- // added, else we can hit deadlock if more than one
- // thread is adding a block and we need to pause when
- // both are only part way done:
- if (waitQueue.doPause()) {
- waitForWaitQueue();
- }
-
- //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
-
- if (aborting) {
-
- // We are currently aborting, and another thread is
- // waiting for me to become idle. We just forcefully
- // idle this threadState; it will be fully reset by
- // abort()
- state.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
-
- abort();
-
- if (doFlush) {
- message("clearFlushPending!");
- flushControl.clearFlushPending();
- }
-
- return false;
- }
-
- // Apply delTerm only after all indexing has
- // succeeded, but apply it only to docs prior to when
- // this batch started:
- if (delTerm != null) {
- pendingDeletes.addTerm(delTerm, startDocID);
- }
-
- state.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
- }
-
- doFlush |= flushControl.flushByRAMUsage("new document");
-
- //System.out.println(Thread.currentThread().getName() + ": B " + docCount);
- return doFlush;
- }
-
- public synchronized void waitIdle() {
- while (!allThreadsIdle()) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
-
- synchronized void waitReady(DocumentsWriterThreadState state) {
- while (!closed && (!state.isIdle || aborting)) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- if (closed) {
- throw new AlreadyClosedException("this IndexWriter is closed");
- }
- }
-
- /** Does the synchronized work to finish/flush the
- * inverted document. */
- private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
-
- // Must call this w/o holding synchronized(this) else
- // we'll hit deadlock:
- balanceRAM();
-
- synchronized(this) {
-
- assert docWriter == null || docWriter.docID == perThread.docState.docID;
-
- if (aborting) {
-
- // We are currently aborting, and another thread is
- // waiting for me to become idle. We just forcefully
- // idle this threadState; it will be fully reset by
- // abort()
- if (docWriter != null) {
- try {
- docWriter.abort();
- } catch (Throwable t) {
- }
- }
-
- perThread.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
-
- return;
- }
-
- final boolean doPause;
-
- if (docWriter != null) {
- doPause = waitQueue.add(docWriter);
- } else {
- skipDocWriter.docID = perThread.docState.docID;
- doPause = waitQueue.add(skipDocWriter);
- }
-
- if (doPause) {
- waitForWaitQueue();
- }
-
- perThread.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
- }
- }
-
- synchronized void waitForWaitQueue() {
- do {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- } while (!waitQueue.doResume());
- }
-
- private static class SkipDocWriter extends DocWriter {
- @Override
- void finish() {
- }
- @Override
- void abort() {
- }
- @Override
- long sizeInBytes() {
- return 0;
- }
- }
- final SkipDocWriter skipDocWriter = new SkipDocWriter();
-
- NumberFormat nf = NumberFormat.getInstance();
-
- /* Initial chunks size of the shared byte[] blocks used to
- store postings data */
- final static int BYTE_BLOCK_SHIFT = 15;
- final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
- final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
- final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
-
- private class ByteBlockAllocator extends ByteBlockPool.Allocator {
- final int blockSize;
-
- ByteBlockAllocator(int blockSize) {
- this.blockSize = blockSize;
- }
-
- ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
-
- /* Allocate another byte[] from the shared pool */
- @Override
- byte[] getByteBlock() {
- synchronized(DocumentsWriter.this) {
- final int size = freeByteBlocks.size();
- final byte[] b;
- if (0 == size) {
- b = new byte[blockSize];
- bytesUsed.addAndGet(blockSize);
- } else
- b = freeByteBlocks.remove(size-1);
- return b;
- }
- }
-
- /* Return byte[]'s to the pool */
-
- @Override
- void recycleByteBlocks(byte[][] blocks, int start, int end) {
- synchronized(DocumentsWriter.this) {
- for(int i=start;i<end;i++) {
- freeByteBlocks.add(blocks[i]);
- blocks[i] = null;
- }
- }
- }
-
- @Override
- void recycleByteBlocks(List<byte[]> blocks) {
- synchronized(DocumentsWriter.this) {
- final int size = blocks.size();
- for(int i=0;i<size;i++) {
- freeByteBlocks.add(blocks.get(i));
- blocks.set(i, null);
- }
- }
- }
- }
-
- /* Initial chunks size of the shared int[] blocks used to
- store postings data */
- final static int INT_BLOCK_SHIFT = 13;
- final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
- final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
- private List<int[]> freeIntBlocks = new ArrayList<int[]>();
-
- /* Allocate another int[] from the shared pool */
- synchronized int[] getIntBlock() {
- final int size = freeIntBlocks.size();
- final int[] b;
- if (0 == size) {
- b = new int[INT_BLOCK_SIZE];
- bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
- } else {
- b = freeIntBlocks.remove(size-1);
- }
- return b;
- }
-
- synchronized void bytesUsed(long numBytes) {
- bytesUsed.addAndGet(numBytes);
- }
-
- long bytesUsed() {
- return bytesUsed.get() + pendingDeletes.bytesUsed.get();
- }
-
- /* Return int[]s to the pool */
- synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
- for(int i=start;i<end;i++) {
- freeIntBlocks.add(blocks[i]);
- blocks[i] = null;
- }
- }
-
- ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
-
- final static int PER_DOC_BLOCK_SIZE = 1024;
-
- final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
-
-
- /* Initial chunk size of the shared char[] blocks used to
- store term text */
- final static int CHAR_BLOCK_SHIFT = 14;
- final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
- final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
-
- final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
-
- private ArrayList<char[]> freeCharBlocks = new ArrayList<char[]>();
-
- /* Allocate another char[] from the shared pool */
- synchronized char[] getCharBlock() {
- final int size = freeCharBlocks.size();
- final char[] c;
- if (0 == size) {
- bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
- c = new char[CHAR_BLOCK_SIZE];
- } else
- c = freeCharBlocks.remove(size-1);
- // We always track allocations of char blocks, for now,
- // because nothing that skips allocation tracking
- // (currently only term vectors) uses its own char
- // blocks.
- return c;
- }
-
- /* Return char[]s to the pool */
- synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
- for(int i=0;i<numBlocks;i++) {
- freeCharBlocks.add(blocks[i]);
- blocks[i] = null;
- }
- }
-
- String toMB(long v) {
- return nf.format(v/1024./1024.);
- }
-
- /* We have four pools of RAM: Postings, byte blocks
- * (holds freq/prox posting data), char blocks (holds
- * characters in the term) and per-doc buffers (stored fields/term vectors).
- * Different docs require varying amount of storage from
- * these four classes.
- *
- * For example, docs with many unique single-occurrence
- * short terms will use up the Postings RAM and hardly any
- * of the other two. Whereas docs with very large terms
- * will use alot of char blocks RAM and relatively less of
- * the other two. This method just frees allocations from
- * the pools once we are over-budget, which balances the
- * pools to match the current docs. */
- void balanceRAM() {
-
- final boolean doBalance;
- final long deletesRAMUsed;
-
- deletesRAMUsed = bufferedDeletesStream.bytesUsed();
-
- final long ramBufferSize;
- final double mb = config.getRAMBufferSizeMB();
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
- } else {
- ramBufferSize = (long) (mb*1024*1024);
- }
-
- synchronized(this) {
- if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
- return;
- }
-
- doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
- }
-
- if (doBalance) {
-
- if (infoStream != null) {
- message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
- " vs trigger=" + toMB(ramBufferSize) +
- " deletesMB=" + toMB(deletesRAMUsed) +
- " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
- " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
- " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
- }
-
- final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-
- int iter = 0;
-
- // We free equally from each pool in 32 KB
- // chunks until we are below our threshold
- // (freeLevel)
-
- boolean any = true;
-
- final long freeLevel = (long) (0.95 * ramBufferSize);
-
- while(bytesUsed()+deletesRAMUsed > freeLevel) {
-
- synchronized(this) {
- if (0 == perDocAllocator.freeByteBlocks.size()
- && 0 == byteBlockAllocator.freeByteBlocks.size()
- && 0 == freeCharBlocks.size()
- && 0 == freeIntBlocks.size()
- && !any) {
- // Nothing else to free -- must flush now.
- bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
- if (infoStream != null) {
- if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
- message(" nothing to free; set bufferIsFull");
- } else {
- message(" nothing to free");
- }
- }
- break;
- }
-
- if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
- byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
- bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
- }
-
- if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
- freeCharBlocks.remove(freeCharBlocks.size()-1);
- bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
- }
-
- if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
- freeIntBlocks.remove(freeIntBlocks.size()-1);
- bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
- }
-
- if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
- // Remove upwards of 32 blocks (each block is 1K)
- for (int i = 0; i < 32; ++i) {
- perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
- bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
- if (perDocAllocator.freeByteBlocks.size() == 0) {
- break;
- }
- }
- }
- }
-
- if ((4 == iter % 5) && any) {
- // Ask consumer to free any recycled state
- any = consumer.freeRAM();
- }
-
- iter++;
- }
-
- if (infoStream != null) {
- message(" after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
- }
- }
- }
-
- final WaitQueue waitQueue = new WaitQueue();
-
- private class WaitQueue {
- DocWriter[] waiting;
- int nextWriteDocID;
- int nextWriteLoc;
- int numWaiting;
- long waitingBytes;
-
- public WaitQueue() {
- waiting = new DocWriter[10];
- }
-
- synchronized void reset() {
- // NOTE: nextWriteLoc doesn't need to be reset
- assert numWaiting == 0;
- assert waitingBytes == 0;
- nextWriteDocID = 0;
- }
-
- synchronized boolean doResume() {
- final double mb = config.getRAMBufferSizeMB();
- final long waitQueueResumeBytes;
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- waitQueueResumeBytes = 2*1024*1024;
- } else {
- waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
- }
- return waitingBytes <= waitQueueResumeBytes;
- }
-
- synchronized boolean doPause() {
- final double mb = config.getRAMBufferSizeMB();
- final long waitQueuePauseBytes;
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- waitQueuePauseBytes = 4*1024*1024;
- } else {
- waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
- }
- return waitingBytes > waitQueuePauseBytes;
- }
-
- synchronized void abort() {
- int count = 0;
- for(int i=0;i<waiting.length;i++) {
- final DocWriter doc = waiting[i];
- if (doc != null) {
- doc.abort();
- waiting[i] = null;
- count++;
- }
- }
- waitingBytes = 0;
- assert count == numWaiting;
- numWaiting = 0;
- }
-
- private void writeDocument(DocWriter doc) throws IOException {
- assert doc == skipDocWriter || nextWriteDocID == doc.docID;
- boolean success = false;
- try {
- doc.finish();
- nextWriteDocID++;
- nextWriteLoc++;
- assert nextWriteLoc <= waiting.length;
- if (nextWriteLoc == waiting.length) {
- nextWriteLoc = 0;
- }
- success = true;
- } finally {
- if (!success) {
- setAborting();
- }
- }
- }
-
- synchronized public boolean add(DocWriter doc) throws IOException {
-
- assert doc.docID >= nextWriteDocID;
-
- if (doc.docID == nextWriteDocID) {
- writeDocument(doc);
- while(true) {
- doc = waiting[nextWriteLoc];
- if (doc != null) {
- numWaiting--;
- waiting[nextWriteLoc] = null;
- waitingBytes -= doc.sizeInBytes();
- writeDocument(doc);
- } else {
- break;
- }
- }
- } else {
-
- // I finished before documents that were added
- // before me. This can easily happen when I am a
- // small doc and the docs before me were large, or,
- // just due to luck in the thread scheduling. Just
- // add myself to the queue and when that large doc
- // finishes, it will flush me:
- int gap = doc.docID - nextWriteDocID;
- if (gap >= waiting.length) {
- // Grow queue
- DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
- assert nextWriteLoc >= 0;
- System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
- System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
- nextWriteLoc = 0;
- waiting = newArray;
- gap = doc.docID - nextWriteDocID;
- }
-
- int loc = nextWriteLoc + gap;
- if (loc >= waiting.length) {
- loc -= waiting.length;
- }
-
- // We should only wrap one time
- assert loc < waiting.length;
-
- // Nobody should be in my spot!
- assert waiting[loc] == null;
- waiting[loc] = doc;
- numWaiting++;
- waitingBytes += doc.sizeInBytes();
- }
-
- return doPause();
- }
- }
-}