1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
25 import org.apache.lucene.document.Document;
26 import org.apache.lucene.index.FieldInfo.IndexOptions;
27 import org.apache.lucene.index.IndexReader.FieldOption;
28 import org.apache.lucene.index.MergePolicy.MergeAbortedException;
29 import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
30 import org.apache.lucene.store.Directory;
31 import org.apache.lucene.store.IndexInput;
32 import org.apache.lucene.store.IndexOutput;
33 import org.apache.lucene.util.IOUtils;
34 import org.apache.lucene.util.ReaderUtil;
37 * The SegmentMerger class combines two or more Segments, represented by an IndexReader ({@link #add},
38 * into a single Segment. After adding the appropriate readers, call the merge method to combine the
44 final class SegmentMerger {
45 private Directory directory;
46 private String segment;
47 private int termIndexInterval = IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL;
49 private List<IndexReader> readers = new ArrayList<IndexReader>();
50 private final FieldInfos fieldInfos;
52 private int mergedDocs;
54 private final CheckAbort checkAbort;
56 /** Maximum number of contiguous documents to bulk-copy
57 when merging stored fields */
58 private final static int MAX_RAW_MERGE_DOCS = 4192;
60 private SegmentWriteState segmentWriteState;
62 private final PayloadProcessorProvider payloadProcessorProvider;
64 SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, PayloadProcessorProvider payloadProcessorProvider, FieldInfos fieldInfos) {
65 this.payloadProcessorProvider = payloadProcessorProvider;
67 this.fieldInfos = fieldInfos;
70 checkAbort = new CheckAbort(merge, directory);
72 checkAbort = new CheckAbort(null, null) {
74 public void work(double units) throws MergeAbortedException {
79 this.termIndexInterval = termIndexInterval;
82 public FieldInfos fieldInfos() {
87 * Add an IndexReader to the collection of readers that are to be merged
90 final void add(IndexReader reader) {
91 ReaderUtil.gatherSubReaders(readers, reader);
95 * Merges the readers specified by the {@link #add} method into the directory passed to the constructor
96 * @return The number of documents that were merged
97 * @throws CorruptIndexException if the index is corrupt
98 * @throws IOException if there is a low-level IO error
100 final int merge() throws CorruptIndexException, IOException {
101 // NOTE: it's important to add calls to
102 // checkAbort.work(...) if you make any changes to this
103 // method that will spend alot of time. The frequency
104 // of this check impacts how long
105 // IndexWriter.close(false) takes to actually stop the
108 mergedDocs = mergeFields();
112 if (fieldInfos.hasVectors())
119 * NOTE: this method creates a compound file for all files returned by
120 * info.files(). While, generally, this may include separate norms and
121 * deletion files, this SegmentInfo must not reference such files when this
122 * method is called, because they are not allowed within a compound file.
124 final Collection<String> createCompoundFile(String fileName, final SegmentInfo info)
126 // Now merge all added files
127 Collection<String> files = info.files();
128 CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, fileName, checkAbort);
129 for (String file : files) {
130 assert !IndexFileNames.matchesExtension(file, IndexFileNames.DELETES_EXTENSION)
131 : ".del file is not allowed in .cfs: " + file;
132 assert !IndexFileNames.isSeparateNormsFile(file)
133 : "separate norms file (.s[0-9]+) is not allowed in .cfs: " + file;
134 cfsWriter.addFile(file);
143 private static void addIndexed(IndexReader reader, FieldInfos fInfos,
144 Collection<String> names, boolean storeTermVectors,
145 boolean storePositionWithTermVector, boolean storeOffsetWithTermVector,
146 boolean storePayloads, IndexOptions indexOptions)
148 for (String field : names) {
149 fInfos.add(field, true, storeTermVectors,
150 storePositionWithTermVector, storeOffsetWithTermVector, !reader
151 .hasNorms(field), storePayloads, indexOptions);
155 private SegmentReader[] matchingSegmentReaders;
156 private int[] rawDocLengths;
157 private int[] rawDocLengths2;
158 private int matchedCount;
160 public int getMatchedSubReaderCount() {
164 private void setMatchingSegmentReaders() {
165 // If the i'th reader is a SegmentReader and has
166 // identical fieldName -> number mapping, then this
167 // array will be non-null at position i:
168 int numReaders = readers.size();
169 matchingSegmentReaders = new SegmentReader[numReaders];
171 // If this reader is a SegmentReader, and all of its
172 // field name -> number mappings match the "merged"
173 // FieldInfos, then we can do a bulk copy of the
175 for (int i = 0; i < numReaders; i++) {
176 IndexReader reader = readers.get(i);
177 if (reader instanceof SegmentReader) {
178 SegmentReader segmentReader = (SegmentReader) reader;
180 FieldInfos segmentFieldInfos = segmentReader.fieldInfos();
181 int numFieldInfos = segmentFieldInfos.size();
182 for (int j = 0; same && j < numFieldInfos; j++) {
183 same = fieldInfos.fieldName(j).equals(segmentFieldInfos.fieldName(j));
186 matchingSegmentReaders[i] = segmentReader;
192 // Used for bulk-reading raw bytes for stored fields
193 rawDocLengths = new int[MAX_RAW_MERGE_DOCS];
194 rawDocLengths2 = new int[MAX_RAW_MERGE_DOCS];
199 * @return The number of documents in all of the readers
200 * @throws CorruptIndexException if the index is corrupt
201 * @throws IOException if there is a low-level IO error
203 private int mergeFields() throws CorruptIndexException, IOException {
205 for (IndexReader reader : readers) {
206 if (reader instanceof SegmentReader) {
207 SegmentReader segmentReader = (SegmentReader) reader;
208 FieldInfos readerFieldInfos = segmentReader.fieldInfos();
209 int numReaderFieldInfos = readerFieldInfos.size();
210 for (int j = 0; j < numReaderFieldInfos; j++) {
211 fieldInfos.add(readerFieldInfos.fieldInfo(j));
214 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.TERMVECTOR_WITH_POSITION_OFFSET), true, true, true, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
215 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.TERMVECTOR_WITH_POSITION), true, true, false, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
216 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.TERMVECTOR_WITH_OFFSET), true, false, true, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
217 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.TERMVECTOR), true, false, false, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
218 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.OMIT_POSITIONS), false, false, false, false, IndexOptions.DOCS_AND_FREQS);
219 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.OMIT_TERM_FREQ_AND_POSITIONS), false, false, false, false, IndexOptions.DOCS_ONLY);
220 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.STORES_PAYLOADS), false, false, false, true, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
221 addIndexed(reader, fieldInfos, reader.getFieldNames(FieldOption.INDEXED), false, false, false, false, IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
222 fieldInfos.add(reader.getFieldNames(FieldOption.UNINDEXED), false);
225 fieldInfos.write(directory, segment + ".fnm");
229 setMatchingSegmentReaders();
231 final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
235 for (IndexReader reader : readers) {
236 final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
237 FieldsReader matchingFieldsReader = null;
238 if (matchingSegmentReader != null) {
239 final FieldsReader fieldsReader = matchingSegmentReader.getFieldsReader();
240 if (fieldsReader != null && fieldsReader.canReadRawDocs()) {
241 matchingFieldsReader = fieldsReader;
244 if (reader.hasDeletions()) {
245 docCount += copyFieldsWithDeletions(fieldsWriter,
246 reader, matchingFieldsReader);
248 docCount += copyFieldsNoDeletions(fieldsWriter,
249 reader, matchingFieldsReader);
253 fieldsWriter.close();
256 final String fileName = IndexFileNames.segmentFileName(segment, IndexFileNames.FIELDS_INDEX_EXTENSION);
257 final long fdxFileLength = directory.fileLength(fileName);
259 if (4+((long) docCount)*8 != fdxFileLength)
260 // This is most likely a bug in Sun JRE 1.6.0_04/_05;
261 // we detect that the bug has struck, here, and
262 // throw an exception to prevent the corruption from
263 // entering the index. See LUCENE-1282 for
265 throw new RuntimeException("mergeFields produced an invalid result: docCount is " + docCount + " but fdx file size is " + fdxFileLength + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption");
267 segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, null);
271 private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
272 final FieldsReader matchingFieldsReader)
273 throws IOException, MergeAbortedException, CorruptIndexException {
275 final int maxDoc = reader.maxDoc();
276 if (matchingFieldsReader != null) {
277 // We can bulk-copy because the fieldInfos are "congruent"
278 for (int j = 0; j < maxDoc;) {
279 if (reader.isDeleted(j)) {
284 // We can optimize this case (doing a bulk byte copy) since the field
285 // numbers are identical
286 int start = j, numDocs = 0;
290 if (j >= maxDoc) break;
291 if (reader.isDeleted(j)) {
295 } while(numDocs < MAX_RAW_MERGE_DOCS);
297 IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs);
298 fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs);
300 checkAbort.work(300 * numDocs);
303 for (int j = 0; j < maxDoc; j++) {
304 if (reader.isDeleted(j)) {
308 // NOTE: it's very important to first assign to doc then pass it to
309 // termVectorsWriter.addAllDocVectors; see LUCENE-1282
310 Document doc = reader.document(j);
311 fieldsWriter.addDocument(doc);
313 checkAbort.work(300);
319 private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
320 final FieldsReader matchingFieldsReader)
321 throws IOException, MergeAbortedException, CorruptIndexException {
322 final int maxDoc = reader.maxDoc();
324 if (matchingFieldsReader != null) {
325 // We can bulk-copy because the fieldInfos are "congruent"
326 while (docCount < maxDoc) {
327 int len = Math.min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
328 IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, docCount, len);
329 fieldsWriter.addRawDocuments(stream, rawDocLengths, len);
331 checkAbort.work(300 * len);
334 for (; docCount < maxDoc; docCount++) {
335 // NOTE: it's very important to first assign to doc then pass it to
336 // termVectorsWriter.addAllDocVectors; see LUCENE-1282
337 Document doc = reader.document(docCount);
338 fieldsWriter.addDocument(doc);
339 checkAbort.work(300);
346 * Merge the TermVectors from each of the segments into the new one.
347 * @throws IOException
349 private final void mergeVectors() throws IOException {
350 TermVectorsWriter termVectorsWriter =
351 new TermVectorsWriter(directory, segment, fieldInfos);
355 for (final IndexReader reader : readers) {
356 final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
357 TermVectorsReader matchingVectorsReader = null;
358 if (matchingSegmentReader != null) {
359 TermVectorsReader vectorsReader = matchingSegmentReader.getTermVectorsReader();
361 // If the TV* files are an older format then they cannot read raw docs:
362 if (vectorsReader != null && vectorsReader.canReadRawDocs()) {
363 matchingVectorsReader = vectorsReader;
366 if (reader.hasDeletions()) {
367 copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader);
369 copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader);
374 termVectorsWriter.close();
377 final String fileName = IndexFileNames.segmentFileName(segment, IndexFileNames.VECTORS_INDEX_EXTENSION);
378 final long tvxSize = directory.fileLength(fileName);
380 if (4+((long) mergedDocs)*16 != tvxSize)
381 // This is most likely a bug in Sun JRE 1.6.0_04/_05;
382 // we detect that the bug has struck, here, and
383 // throw an exception to prevent the corruption from
384 // entering the index. See LUCENE-1282 for
386 throw new RuntimeException("mergeVectors produced an invalid result: mergedDocs is " + mergedDocs + " but tvx size is " + tvxSize + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption");
389 private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter,
390 final TermVectorsReader matchingVectorsReader,
391 final IndexReader reader)
392 throws IOException, MergeAbortedException {
393 final int maxDoc = reader.maxDoc();
394 if (matchingVectorsReader != null) {
395 // We can bulk-copy because the fieldInfos are "congruent"
396 for (int docNum = 0; docNum < maxDoc;) {
397 if (reader.isDeleted(docNum)) {
402 // We can optimize this case (doing a bulk byte copy) since the field
403 // numbers are identical
404 int start = docNum, numDocs = 0;
408 if (docNum >= maxDoc) break;
409 if (reader.isDeleted(docNum)) {
413 } while(numDocs < MAX_RAW_MERGE_DOCS);
415 matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs);
416 termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs);
417 checkAbort.work(300 * numDocs);
420 for (int docNum = 0; docNum < maxDoc; docNum++) {
421 if (reader.isDeleted(docNum)) {
426 // NOTE: it's very important to first assign to vectors then pass it to
427 // termVectorsWriter.addAllDocVectors; see LUCENE-1282
428 TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
429 termVectorsWriter.addAllDocVectors(vectors);
430 checkAbort.work(300);
435 private void copyVectorsNoDeletions(final TermVectorsWriter termVectorsWriter,
436 final TermVectorsReader matchingVectorsReader,
437 final IndexReader reader)
438 throws IOException, MergeAbortedException {
439 final int maxDoc = reader.maxDoc();
440 if (matchingVectorsReader != null) {
441 // We can bulk-copy because the fieldInfos are "congruent"
443 while (docCount < maxDoc) {
444 int len = Math.min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);
445 matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, docCount, len);
446 termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, len);
448 checkAbort.work(300 * len);
451 for (int docNum = 0; docNum < maxDoc; docNum++) {
452 // NOTE: it's very important to first assign to vectors then pass it to
453 // termVectorsWriter.addAllDocVectors; see LUCENE-1282
454 TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
455 termVectorsWriter.addAllDocVectors(vectors);
456 checkAbort.work(300);
461 private SegmentMergeQueue queue = null;
463 private final void mergeTerms() throws CorruptIndexException, IOException {
465 final FormatPostingsFieldsConsumer fieldsConsumer = new FormatPostingsFieldsWriter(segmentWriteState, fieldInfos);
468 queue = new SegmentMergeQueue(readers.size());
470 mergeTermInfos(fieldsConsumer);
474 fieldsConsumer.finish();
483 IndexOptions indexOptions;
485 private final void mergeTermInfos(final FormatPostingsFieldsConsumer consumer) throws CorruptIndexException, IOException {
487 final int readerCount = readers.size();
488 for (int i = 0; i < readerCount; i++) {
489 IndexReader reader = readers.get(i);
490 TermEnum termEnum = reader.terms();
491 SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader);
492 if (payloadProcessorProvider != null) {
493 smi.dirPayloadProcessor = payloadProcessorProvider.getDirProcessor(reader.directory());
495 int[] docMap = smi.getDocMap();
496 if (docMap != null) {
497 if (docMaps == null) {
498 docMaps = new int[readerCount][];
503 base += reader.numDocs();
505 assert reader.numDocs() == reader.maxDoc() - smi.delCount;
508 queue.add(smi); // initialize queue
513 SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()];
515 String currentField = null;
516 FormatPostingsTermsConsumer termsConsumer = null;
518 while (queue.size() > 0) {
519 int matchSize = 0; // pop matching terms
520 match[matchSize++] = queue.pop();
521 Term term = match[0].term;
522 SegmentMergeInfo top = queue.top();
524 while (top != null && term.compareTo(top.term) == 0) {
525 match[matchSize++] = queue.pop();
529 if (currentField != term.field) {
530 currentField = term.field;
531 if (termsConsumer != null)
532 termsConsumer.finish();
533 final FieldInfo fieldInfo = fieldInfos.fieldInfo(currentField);
534 termsConsumer = consumer.addField(fieldInfo);
535 indexOptions = fieldInfo.indexOptions;
538 int df = appendPostings(termsConsumer, match, matchSize); // add new TermInfo
539 checkAbort.work(df/3.0);
541 while (matchSize > 0) {
542 SegmentMergeInfo smi = match[--matchSize];
544 queue.add(smi); // restore queue
546 smi.close(); // done with a segment
551 private byte[] payloadBuffer;
552 private int[][] docMaps;
554 /** Process postings from multiple segments all positioned on the
555 * same term. Writes out merged entries into freqOutput and
556 * the proxOutput streams.
558 * @param smis array of segments
559 * @param n number of cells in the array actually occupied
560 * @return number of documents across all segments where this term was found
561 * @throws CorruptIndexException if the index is corrupt
562 * @throws IOException if there is a low-level IO error
564 private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n)
565 throws CorruptIndexException, IOException {
567 final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(smis[0].term.text);
569 for (int i = 0; i < n; i++) {
570 SegmentMergeInfo smi = smis[i];
571 TermPositions postings = smi.getPositions();
572 assert postings != null;
574 int[] docMap = smi.getDocMap();
575 postings.seek(smi.termEnum);
577 PayloadProcessor payloadProcessor = null;
578 if (smi.dirPayloadProcessor != null) {
579 payloadProcessor = smi.dirPayloadProcessor.getProcessor(smi.term);
582 while (postings.next()) {
584 int doc = postings.doc();
586 doc = docMap[doc]; // map around deletions
587 doc += base; // convert to merged space
589 final int freq = postings.freq();
590 final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(doc, freq);
592 if (indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
593 for (int j = 0; j < freq; j++) {
594 final int position = postings.nextPosition();
595 int payloadLength = postings.getPayloadLength();
596 if (payloadLength > 0) {
597 if (payloadBuffer == null || payloadBuffer.length < payloadLength)
598 payloadBuffer = new byte[payloadLength];
599 postings.getPayload(payloadBuffer, 0);
600 if (payloadProcessor != null) {
601 payloadBuffer = payloadProcessor.processPayload(payloadBuffer, 0, payloadLength);
602 payloadLength = payloadProcessor.payloadLength();
605 posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
607 posConsumer.finish();
611 docConsumer.finish();
616 public boolean getAnyNonBulkMerges() {
617 assert matchedCount <= readers.size();
618 return matchedCount != readers.size();
621 private void mergeNorms() throws IOException {
622 // get needed buffer size by finding the largest segment
624 for (IndexReader reader : readers) {
625 bufferSize = Math.max(bufferSize, reader.maxDoc());
628 byte[] normBuffer = null;
629 IndexOutput output = null;
630 boolean success = false;
632 int numFieldInfos = fieldInfos.size();
633 for (int i = 0; i < numFieldInfos; i++) {
634 FieldInfo fi = fieldInfos.fieldInfo(i);
635 if (fi.isIndexed && !fi.omitNorms) {
636 if (output == null) {
637 output = directory.createOutput(IndexFileNames.segmentFileName(segment, IndexFileNames.NORMS_EXTENSION));
638 output.writeBytes(SegmentNorms.NORMS_HEADER, SegmentNorms.NORMS_HEADER.length);
640 if (normBuffer == null) {
641 normBuffer = new byte[bufferSize];
643 for (IndexReader reader : readers) {
644 final int maxDoc = reader.maxDoc();
645 reader.norms(fi.name, normBuffer, 0);
646 if (!reader.hasDeletions()) {
647 //optimized case for segments without deleted docs
648 output.writeBytes(normBuffer, maxDoc);
650 // this segment has deleted docs, so we have to
651 // check for every doc if it is deleted or not
652 for (int k = 0; k < maxDoc; k++) {
653 if (!reader.isDeleted(k)) {
654 output.writeByte(normBuffer[k]);
658 checkAbort.work(maxDoc);
665 IOUtils.close(output);
667 IOUtils.closeWhileHandlingException(output);
672 static class CheckAbort {
673 private double workCount;
674 private MergePolicy.OneMerge merge;
675 private Directory dir;
676 public CheckAbort(MergePolicy.OneMerge merge, Directory dir) {
682 * Records the fact that roughly units amount of work
683 * have been done since this method was last called.
684 * When adding time-consuming code into SegmentMerger,
685 * you should test different values for units to ensure
686 * that the time in between calls to merge.checkAborted
687 * is up to ~ 1 second.
689 public void work(double units) throws MergePolicy.MergeAbortedException {
691 if (workCount >= 10000.0) {
692 merge.checkAborted(dir);