add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / src / java / org / apache / lucene / index / FreqProxTermsWriter.java
1 package org.apache.lucene.index;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import org.apache.lucene.index.FieldInfo.IndexOptions;
21 import org.apache.lucene.util.UnicodeUtil;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.Map;
26 import java.util.ArrayList;
27 import java.util.List;
28 import org.apache.lucene.util.BitVector;
29 import org.apache.lucene.util.CollectionUtil;
30
31 final class FreqProxTermsWriter extends TermsHashConsumer {
32
33   @Override
34   public TermsHashConsumerPerThread addThread(TermsHashPerThread perThread) {
35     return new FreqProxTermsWriterPerThread(perThread);
36   }
37
38   private static int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
39     while(true) {
40       final char c1 = text1[pos1++];
41       final char c2 = text2[pos2++];
42       if (c1 != c2) {
43         if (0xffff == c2)
44           return 1;
45         else if (0xffff == c1)
46           return -1;
47         else
48           return c1-c2;
49       } else if (0xffff == c1)
50         return 0;
51     }
52   }
53
54   @Override
55   void abort() {}
56
57   // TODO: would be nice to factor out more of this, eg the
58   // FreqProxFieldMergeState, and code to visit all Fields
59   // under the same FieldInfo together, up into TermsHash*.
60   // Other writers would presumably share alot of this...
61
62   @Override
63   public void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
64
65     // Gather all FieldData's that have postings, across all
66     // ThreadStates
67     List<FreqProxTermsWriterPerField> allFields = new ArrayList<FreqProxTermsWriterPerField>();
68
69     for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
70
71       Collection<TermsHashConsumerPerField> fields = entry.getValue();
72
73       for (final TermsHashConsumerPerField i : fields) {
74         final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) i;
75         if (perField.termsHashPerField.numPostings > 0)
76           allFields.add(perField);
77       }
78     }
79
80     // Sort by field name
81     CollectionUtil.quickSort(allFields);
82     final int numAllFields = allFields.size();
83
84     // TODO: allow Lucene user to customize this consumer:
85     final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
86     /*
87     Current writer chain:
88       FormatPostingsFieldsConsumer
89         -> IMPL: FormatPostingsFieldsWriter
90           -> FormatPostingsTermsConsumer
91             -> IMPL: FormatPostingsTermsWriter
92               -> FormatPostingsDocConsumer
93                 -> IMPL: FormatPostingsDocWriter
94                   -> FormatPostingsPositionsConsumer
95                     -> IMPL: FormatPostingsPositionsWriter
96     */
97     try {
98       int start = 0;
99       while(start < numAllFields) {
100         final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
101         final String fieldName = fieldInfo.name;
102         
103         int end = start+1;
104         while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
105           end++;
106         
107         FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
108         for(int i=start;i<end;i++) {
109           fields[i-start] = allFields.get(i);
110           
111           // Aggregate the storePayload as seen by the same
112           // field across multiple threads
113           if (fieldInfo.indexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
114             fieldInfo.storePayloads |= fields[i-start].hasPayloads;
115           }
116         }
117         
118         // If this field has postings then add them to the
119         // segment
120         appendPostings(fieldName, state, fields, consumer);
121         
122         for(int i=0;i<fields.length;i++) {
123           TermsHashPerField perField = fields[i].termsHashPerField;
124           int numPostings = perField.numPostings;
125           perField.reset();
126           perField.shrinkHash(numPostings);
127           fields[i].reset();
128         }
129         
130         start = end;
131       }
132       
133       for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
134         FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
135         perThread.termsHashPerThread.reset(true);
136       }
137     } finally {
138       consumer.finish();
139     }
140   }
141
142   private byte[] payloadBuffer;
143
144   /* Walk through all unique text tokens (Posting
145    * instances) found in this field and serialize them
146    * into a single RAM segment. */
147   void appendPostings(String fieldName, SegmentWriteState state,
148                       FreqProxTermsWriterPerField[] fields,
149                       FormatPostingsFieldsConsumer consumer)
150     throws CorruptIndexException, IOException {
151
152     int numFields = fields.length;
153
154     final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
155
156     for(int i=0;i<numFields;i++) {
157       FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);
158
159       assert fms.field.fieldInfo == fields[0].fieldInfo;
160
161       // Should always be true
162       boolean result = fms.nextTerm();
163       assert result;
164     }
165
166     final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
167     final Term protoTerm = new Term(fieldName);
168
169     FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
170
171     final IndexOptions currentFieldIndexOptions = fields[0].fieldInfo.indexOptions;
172
173     final Map<Term,Integer> segDeletes;
174     if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
175       segDeletes = state.segDeletes.terms;
176     } else {
177       segDeletes = null;
178     }
179
180     try {
181       // TODO: really TermsHashPerField should take over most
182       // of this loop, including merge sort of terms from
183       // multiple threads and interacting with the
184       // TermsConsumer, only calling out to us (passing us the
185       // DocsConsumer) to handle delivery of docs/positions
186       while(numFields > 0) {
187
188         // Get the next term to merge
189         termStates[0] = mergeStates[0];
190         int numToMerge = 1;
191
192         // TODO: pqueue
193         for(int i=1;i<numFields;i++) {
194           final char[] text = mergeStates[i].text;
195           final int textOffset = mergeStates[i].textOffset;
196           final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
197
198           if (cmp < 0) {
199             termStates[0] = mergeStates[i];
200             numToMerge = 1;
201           } else if (cmp == 0)
202             termStates[numToMerge++] = mergeStates[i];
203         }
204
205         final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);
206
207         final int delDocLimit;
208         if (segDeletes != null) {
209           final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(termStates[0].termText()));
210           if (docIDUpto != null) {
211             delDocLimit = docIDUpto;
212           } else {
213             delDocLimit = 0;
214           }
215         } else {
216           delDocLimit = 0;
217         }
218
219         try {
220           // Now termStates has numToMerge FieldMergeStates
221           // which all share the same term.  Now we must
222           // interleave the docID streams.
223           while(numToMerge > 0) {
224             
225             FreqProxFieldMergeState minState = termStates[0];
226             for(int i=1;i<numToMerge;i++)
227               if (termStates[i].docID < minState.docID)
228                 minState = termStates[i];
229
230             final int termDocFreq = minState.termFreq;
231
232             final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);
233
234             // NOTE: we could check here if the docID was
235             // deleted, and skip it.  However, this is somewhat
236             // dangerous because it can yield non-deterministic
237             // behavior since we may see the docID before we see
238             // the term that caused it to be deleted.  This
239             // would mean some (but not all) of its postings may
240             // make it into the index, which'd alter the docFreq
241             // for those terms.  We could fix this by doing two
242             // passes, ie first sweep marks all del docs, and
243             // 2nd sweep does the real flush, but I suspect
244             // that'd add too much time to flush.
245
246             if (minState.docID < delDocLimit) {
247               // Mark it deleted.  TODO: we could also skip
248               // writing its postings; this would be
249               // deterministic (just for this Term's docs).
250               if (state.deletedDocs == null) {
251                 state.deletedDocs = new BitVector(state.numDocs);
252               }
253               state.deletedDocs.set(minState.docID);
254             }
255
256             final ByteSliceReader prox = minState.prox;
257
258             // Carefully copy over the prox + payload info,
259             // changing the format to match Lucene's segment
260             // format.
261             if (currentFieldIndexOptions == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
262               // omitTermFreqAndPositions == false so we do write positions &
263               // payload  
264               try {
265                 int position = 0;
266                 for(int j=0;j<termDocFreq;j++) {
267                   final int code = prox.readVInt();
268                   position += code >> 1;
269                 
270                 final int payloadLength;
271                 if ((code & 1) != 0) {
272                   // This position has a payload
273                   payloadLength = prox.readVInt();
274                   
275                   if (payloadBuffer == null || payloadBuffer.length < payloadLength)
276                     payloadBuffer = new byte[payloadLength];
277                   
278                   prox.readBytes(payloadBuffer, 0, payloadLength);
279                   
280                 } else
281                   payloadLength = 0;
282                 
283                 posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
284                 } //End for
285               } finally {
286                 posConsumer.finish();
287               }
288             }
289
290             if (!minState.nextDoc()) {
291
292               // Remove from termStates
293               int upto = 0;
294               for(int i=0;i<numToMerge;i++)
295                 if (termStates[i] != minState)
296                   termStates[upto++] = termStates[i];
297               numToMerge--;
298               assert upto == numToMerge;
299
300               // Advance this state to the next term
301
302               if (!minState.nextTerm()) {
303                 // OK, no more terms, so remove from mergeStates
304                 // as well
305                 upto = 0;
306                 for(int i=0;i<numFields;i++)
307                   if (mergeStates[i] != minState)
308                     mergeStates[upto++] = mergeStates[i];
309                 numFields--;
310                 assert upto == numFields;
311               }
312             }
313           }
314         } finally {
315           docConsumer.finish();
316         }
317       }
318     } finally {
319       termsConsumer.finish();
320     }
321   }
322
323   final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();
324 }