1 package org.apache.lucene.index;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import org.apache.lucene.index.FieldInfo.IndexOptions;
21 import org.apache.lucene.util.UnicodeUtil;
23 import java.io.IOException;
24 import java.util.Collection;
26 import java.util.ArrayList;
27 import java.util.List;
28 import org.apache.lucene.util.BitVector;
29 import org.apache.lucene.util.CollectionUtil;
31 final class FreqProxTermsWriter extends TermsHashConsumer {
34 public TermsHashConsumerPerThread addThread(TermsHashPerThread perThread) {
35 return new FreqProxTermsWriterPerThread(perThread);
38 private static int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
40 final char c1 = text1[pos1++];
41 final char c2 = text2[pos2++];
45 else if (0xffff == c1)
49 } else if (0xffff == c1)
57 // TODO: would be nice to factor out more of this, eg the
58 // FreqProxFieldMergeState, and code to visit all Fields
59 // under the same FieldInfo together, up into TermsHash*.
60 // Other writers would presumably share alot of this...
63 public void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
65 // Gather all FieldData's that have postings, across all
67 List<FreqProxTermsWriterPerField> allFields = new ArrayList<FreqProxTermsWriterPerField>();
69 for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
71 Collection<TermsHashConsumerPerField> fields = entry.getValue();
73 for (final TermsHashConsumerPerField i : fields) {
74 final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) i;
75 if (perField.termsHashPerField.numPostings > 0)
76 allFields.add(perField);
81 CollectionUtil.quickSort(allFields);
82 final int numAllFields = allFields.size();
84 // TODO: allow Lucene user to customize this consumer:
85 final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
88 FormatPostingsFieldsConsumer
89 -> IMPL: FormatPostingsFieldsWriter
90 -> FormatPostingsTermsConsumer
91 -> IMPL: FormatPostingsTermsWriter
92 -> FormatPostingsDocConsumer
93 -> IMPL: FormatPostingsDocWriter
94 -> FormatPostingsPositionsConsumer
95 -> IMPL: FormatPostingsPositionsWriter
99 while(start < numAllFields) {
100 final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
101 final String fieldName = fieldInfo.name;
104 while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
107 FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
108 for(int i=start;i<end;i++) {
109 fields[i-start] = allFields.get(i);
111 // Aggregate the storePayload as seen by the same
112 // field across multiple threads
113 if (fieldInfo.indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
114 fieldInfo.storePayloads |= fields[i-start].hasPayloads;
118 // If this field has postings then add them to the
120 appendPostings(fieldName, state, fields, consumer);
122 for(int i=0;i<fields.length;i++) {
123 TermsHashPerField perField = fields[i].termsHashPerField;
124 int numPostings = perField.numPostings;
126 perField.shrinkHash(numPostings);
133 for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
134 FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
135 perThread.termsHashPerThread.reset(true);
142 private byte[] payloadBuffer;
144 /* Walk through all unique text tokens (Posting
145 * instances) found in this field and serialize them
146 * into a single RAM segment. */
147 void appendPostings(String fieldName, SegmentWriteState state,
148 FreqProxTermsWriterPerField[] fields,
149 FormatPostingsFieldsConsumer consumer)
150 throws CorruptIndexException, IOException {
152 int numFields = fields.length;
154 final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
156 for(int i=0;i<numFields;i++) {
157 FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);
159 assert fms.field.fieldInfo == fields[0].fieldInfo;
161 // Should always be true
162 boolean result = fms.nextTerm();
166 final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
167 final Term protoTerm = new Term(fieldName);
169 FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
171 final IndexOptions currentFieldIndexOptions = fields[0].fieldInfo.indexOptions;
173 final Map<Term,Integer> segDeletes;
174 if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
175 segDeletes = state.segDeletes.terms;
181 // TODO: really TermsHashPerField should take over most
182 // of this loop, including merge sort of terms from
183 // multiple threads and interacting with the
184 // TermsConsumer, only calling out to us (passing us the
185 // DocsConsumer) to handle delivery of docs/positions
186 while(numFields > 0) {
188 // Get the next term to merge
189 termStates[0] = mergeStates[0];
193 for(int i=1;i<numFields;i++) {
194 final char[] text = mergeStates[i].text;
195 final int textOffset = mergeStates[i].textOffset;
196 final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
199 termStates[0] = mergeStates[i];
202 termStates[numToMerge++] = mergeStates[i];
205 final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
207 final int delDocLimit;
208 if (segDeletes != null) {
209 final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
210 if (docIDUpto != null) {
211 delDocLimit = docIDUpto;
220 // Now termStates has numToMerge FieldMergeStates
221 // which all share the same term. Now we must
222 // interleave the docID streams.
223 while(numToMerge > 0) {
225 FreqProxFieldMergeState minState = termStates[0];
226 for(int i=1;i<numToMerge;i++)
227 if (termStates[i].docID < minState.docID)
228 minState = termStates[i];
230 final int termDocFreq = minState.termFreq;
232 final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
234 // NOTE: we could check here if the docID was
235 // deleted, and skip it. However, this is somewhat
236 // dangerous because it can yield non-deterministic
237 // behavior since we may see the docID before we see
238 // the term that caused it to be deleted. This
239 // would mean some (but not all) of its postings may
240 // make it into the index, which'd alter the docFreq
241 // for those terms. We could fix this by doing two
242 // passes, ie first sweep marks all del docs, and
243 // 2nd sweep does the real flush, but I suspect
244 // that'd add too much time to flush.
246 if (minState.docID < delDocLimit) {
247 // Mark it deleted. TODO: we could also skip
248 // writing its postings; this would be
249 // deterministic (just for this Term's docs).
250 if (state.deletedDocs == null) {
251 state.deletedDocs = new BitVector(state.numDocs);
253 state.deletedDocs.set(minState.docID);
256 final ByteSliceReader prox = minState.prox;
258 // Carefully copy over the prox + payload info,
259 // changing the format to match Lucene's segment
261 if (currentFieldIndexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
262 // omitTermFreqAndPositions == false so we do write positions &
266 for(int j=0;j<termDocFreq;j++) {
267 final int code = prox.readVInt();
268 position += code >> 1;
270 final int payloadLength;
271 if ((code & 1) != 0) {
272 // This position has a payload
273 payloadLength = prox.readVInt();
275 if (payloadBuffer == null || payloadBuffer.length < payloadLength)
276 payloadBuffer = new byte[payloadLength];
278 prox.readBytes(payloadBuffer, 0, payloadLength);
283 posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
286 posConsumer.finish();
290 if (!minState.nextDoc()) {
292 // Remove from termStates
294 for(int i=0;i<numToMerge;i++)
295 if (termStates[i] != minState)
296 termStates[upto++] = termStates[i];
298 assert upto == numToMerge;
300 // Advance this state to the next term
302 if (!minState.nextTerm()) {
303 // OK, no more terms, so remove from mergeStates
306 for(int i=0;i<numFields;i++)
307 if (mergeStates[i] != minState)
308 mergeStates[upto++] = mergeStates[i];
310 assert upto == numFields;
315 docConsumer.finish();
319 termsConsumer.finish();
323 final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();