add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / contrib / misc / src / java / org / apache / lucene / store / DirectIOLinuxDirectory.java
1 package org.apache.lucene.store;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements. See the NOTICE file distributed with this
6  * work for additional information regarding copyright ownership. The ASF
7  * licenses this file to You under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  * http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16  * License for the specific language governing permissions and limitations under
17  * the License.
18  */
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.FileInputStream;
23 import java.io.FileDescriptor;
24 import java.io.FileOutputStream;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27
28 import org.apache.lucene.store.Directory; // javadoc
29 import org.apache.lucene.store.NativeFSLockFactory; // javadoc
30
31 /**
32  * An {@link Directory} implementation that uses the
33  * Linux-specific O_DIRECT flag to bypass all OS level
34  * caching.  To use this you must compile
35  * NativePosixUtil.cpp (exposes Linux-specific APIs through
36  * JNI) for your platform.
37  *
38  * <p><b>WARNING</b>: this code is very new and quite easily
39  * could contain horrible bugs.  For example, here's one
40  * known issue: if you use seek in IndexOutput, and then
41  * write more than one buffer's worth of bytes, then the
42  * file will be wrong.  Lucene does not do this (only writes
43  * small number of bytes after seek).
44
45  * @lucene.experimental
46  */
47 public class DirectIOLinuxDirectory extends FSDirectory {
48
49   private final static long ALIGN = 512;
50   private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
51
52   private final int forcedBufferSize;
53
54   /** Create a new NIOFSDirectory for the named location.
55    * 
56    * @param path the path of the directory
57    * @param lockFactory the lock factory to use, or null for the default
58    * ({@link NativeFSLockFactory});
59    * @param forcedBufferSize if this is 0, just use Lucene's
60    *    default buffer size; else, force this buffer size.
61    *    For best performance, force the buffer size to
62    *    something fairly large (eg 1 MB), but note that this
63    *    will eat up the JRE's direct buffer storage space
64    * @throws IOException
65    */
66   public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
67     super(path, lockFactory);
68     this.forcedBufferSize = forcedBufferSize;
69   }
70
71   @Override
72   public IndexInput openInput(String name, int bufferSize) throws IOException {
73     ensureOpen();
74     return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
75   }
76
77   @Override
78   public IndexOutput createOutput(String name) throws IOException {
79     ensureOpen();
80     ensureCanWrite(name);
81     return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
82   }
83
84   private final static class DirectIOLinuxIndexOutput extends IndexOutput {
85     private final ByteBuffer buffer;
86     private final FileOutputStream fos;
87     private final FileChannel channel;
88     private final int bufferSize;
89
90     //private final File path;
91
92     private int bufferPos;
93     private long filePos;
94     private long fileLength;
95     private boolean isOpen;
96
97     public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
98       //this.path = path;
99       FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
100       fos = new FileOutputStream(fd);
101       //fos = new FileOutputStream(path);
102       channel = fos.getChannel();
103       buffer = ByteBuffer.allocateDirect(bufferSize);
104       this.bufferSize = bufferSize;
105       isOpen = true;
106     }
107
108     @Override
109     public void writeByte(byte b) throws IOException {
110       assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
111       buffer.put(b);
112       if (++bufferPos == bufferSize) {
113         dump();
114       }
115     }
116
117     @Override
118     public void writeBytes(byte[] src, int offset, int len) throws IOException {
119       int toWrite = len;
120       while(true) {
121         final int left = bufferSize - bufferPos;
122         if (left <= toWrite) {
123           buffer.put(src, offset, left);
124           toWrite -= left;
125           offset += left;
126           bufferPos = bufferSize;
127           dump();
128         } else {
129           buffer.put(src, offset, toWrite);
130           bufferPos += toWrite;
131           break;
132         }
133       }
134     }
135
136     //@Override
137     //public void setLength() throws IOException {
138     //   TODO -- how to impl this?  neither FOS nor
139     //   FileChannel provides an API?
140     //}
141
142     @Override
143     public void flush() throws IOException {
144       // TODO -- I don't think this method is necessary?
145     }
146
147     private void dump() throws IOException {
148       buffer.flip();
149       final long limit = filePos + buffer.limit();
150       if (limit > fileLength) {
151         // this dump extends the file
152         fileLength = limit;
153       } else {
154         // we had seek'd back & wrote some changes
155       }
156
157       // must always round to next block
158       buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
159
160       assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
161       assert (filePos & ALIGN_NOT_MASK) == filePos;
162       //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
163       channel.write(buffer, filePos);
164       filePos += bufferPos;
165       bufferPos = 0;
166       buffer.clear();
167       //System.out.println("dump: done");
168
169       // TODO: the case where we'd seek'd back, wrote an
170       // entire buffer, we must here read the next buffer;
171       // likely Lucene won't trip on this since we only
172       // write smallish amounts on seeking back
173     }
174
175     @Override
176     public long getFilePointer() {
177       return filePos + bufferPos;
178     }
179
180     // TODO: seek is fragile at best; it can only properly
181     // handle seek & then change bytes that fit entirely
182     // within one buffer
183     @Override
184     public void seek(long pos) throws IOException {
185       if (pos != getFilePointer()) {
186         dump();
187         final long alignedPos = pos & ALIGN_NOT_MASK;
188         filePos = alignedPos;
189         int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
190         if (n < bufferSize) {
191           buffer.limit(n);
192         }
193         //System.out.println("seek refill=" + n);
194         final int delta = (int) (pos - alignedPos);
195         buffer.position(delta);
196         bufferPos = delta;
197       }
198     }
199
200     @Override
201     public long length() throws IOException {
202       return fileLength;
203     }
204
205     @Override
206     public void close() throws IOException {
207       if (isOpen) {
208         isOpen = false;
209         try {
210           dump();
211         } finally {
212           try {
213             //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
214             channel.truncate(fileLength);
215             //System.out.println("  now: " + channel.size());
216           } finally {
217             try {
218               channel.close();
219             } finally {
220               fos.close();
221               //System.out.println("  final len=" + path.length());
222             }
223           }
224         }
225       }
226     }
227   }
228
229   private final static class DirectIOLinuxIndexInput extends IndexInput {
230     private final ByteBuffer buffer;
231     private final FileInputStream fis;
232     private final FileChannel channel;
233     private final int bufferSize;
234
235     private boolean isOpen;
236     private boolean isClone;
237     private long filePos;
238     private int bufferPos;
239
240     public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
241       FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
242       fis = new FileInputStream(fd);
243       channel = fis.getChannel();
244       this.bufferSize = bufferSize;
245       buffer = ByteBuffer.allocateDirect(bufferSize);
246       isOpen = true;
247       isClone = false;
248       filePos = -bufferSize;
249       bufferPos = bufferSize;
250       //System.out.println("D open " + path + " this=" + this);
251     }
252
253     // for clone
254     public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
255       this.fis = null;
256       channel = other.channel;
257       this.bufferSize = other.bufferSize;
258       buffer = ByteBuffer.allocateDirect(bufferSize);
259       filePos = -bufferSize;
260       bufferPos = bufferSize;
261       isOpen = true;
262       isClone = true;
263       //System.out.println("D clone this=" + this);
264       seek(other.getFilePointer());
265     }
266
267     @Override
268     public void close() throws IOException {
269       if (isOpen && !isClone) {
270         try {
271           channel.close();
272         } finally {
273           if (!isClone) {
274             fis.close();
275           }
276         }
277       }
278     }
279
280     @Override
281     public long getFilePointer() {
282       return filePos + bufferPos;
283     }
284
285     @Override
286     public void seek(long pos) throws IOException {
287       if (pos != getFilePointer()) {
288         final long alignedPos = pos & ALIGN_NOT_MASK;
289         //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
290         filePos = alignedPos-bufferSize;
291         refill();
292         
293         final int delta = (int) (pos - alignedPos);
294         buffer.position(delta);
295         bufferPos = delta;
296       }
297     }
298
299     @Override
300     public long length() {
301       try {
302         return channel.size();
303       } catch (IOException ioe) {
304         throw new RuntimeException(ioe);
305       }
306     }
307
308     @Override
309     public byte readByte() throws IOException {
310       // NOTE: we don't guard against EOF here... ie the
311       // "final" buffer will typically be filled to less
312       // than bufferSize
313       if (bufferPos == bufferSize) {
314         refill();
315       }
316       assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
317       bufferPos++;
318       return buffer.get();
319     }
320
321     private void refill() throws IOException {
322       buffer.clear();
323       filePos += bufferSize;
324       bufferPos = 0;
325       assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
326       //System.out.println("X refill filePos=" + filePos);
327       int n = channel.read(buffer, filePos);
328       if (n < 0) {
329         throw new IOException("eof");
330       }
331       buffer.rewind();
332     }
333
334     @Override
335     public void readBytes(byte[] dst, int offset, int len) throws IOException {
336       int toRead = len;
337       //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
338       while(true) {
339         final int left = bufferSize - bufferPos;
340         if (left < toRead) {
341           //System.out.println("  copy " + left);
342           buffer.get(dst, offset, left);
343           toRead -= left;
344           offset += left;
345           refill();
346         } else {
347           //System.out.println("  copy " + toRead);
348           buffer.get(dst, offset, toRead);
349           bufferPos += toRead;
350           //System.out.println("  readBytes done");
351           break;
352         }
353       }
354     }
355
356     @Override
357     public Object clone() {
358       try {
359         return new DirectIOLinuxIndexInput(this);
360       } catch (IOException ioe) {
361         throw new RuntimeException(ioe);
362       }
363     }
364   }
365 }