pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / contrib / misc / src / java / org / apache / lucene / store / DirectIOLinuxDirectory.java
1 package org.apache.lucene.store;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements. See the NOTICE file distributed with this
6  * work for additional information regarding copyright ownership. The ASF
7  * licenses this file to You under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  * http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16  * License for the specific language governing permissions and limitations under
17  * the License.
18  */
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.io.FileInputStream;
23 import java.io.FileDescriptor;
24 import java.io.FileOutputStream;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27
28 import org.apache.lucene.store.Directory; // javadoc
29 import org.apache.lucene.store.NativeFSLockFactory; // javadoc
30
31 /**
32  * An {@link Directory} implementation that uses the
33  * Linux-specific O_DIRECT flag to bypass all OS level
34  * caching.  To use this you must compile
35  * NativePosixUtil.cpp (exposes Linux-specific APIs through
36  * JNI) for your platform.
37  *
38  * <p><b>WARNING</b>: this code is very new and quite easily
39  * could contain horrible bugs.  For example, here's one
40  * known issue: if you use seek in IndexOutput, and then
41  * write more than one buffer's worth of bytes, then the
42  * file will be wrong.  Lucene does not do this (only writes
43  * small number of bytes after seek).
44
45  * @lucene.experimental
46  */
47 public class DirectIOLinuxDirectory extends FSDirectory {
48
49   private final static long ALIGN = 512;
50   private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
51
52   private final int forcedBufferSize;
53
54   /** Create a new NIOFSDirectory for the named location.
55    * 
56    * @param path the path of the directory
57    * @param lockFactory the lock factory to use, or null for the default
58    * ({@link NativeFSLockFactory});
59    * @param forcedBufferSize if this is 0, just use Lucene's
60    *    default buffer size; else, force this buffer size.
61    *    For best performance, force the buffer size to
62    *    something fairly large (eg 1 MB), but note that this
63    *    will eat up the JRE's direct buffer storage space
64    * @throws IOException
65    */
66   public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
67     super(path, lockFactory);
68     this.forcedBufferSize = forcedBufferSize;
69   }
70
71   @Override
72   public IndexInput openInput(String name, int bufferSize) throws IOException {
73     ensureOpen();
74     return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
75   }
76
77   @Override
78   public IndexOutput createOutput(String name) throws IOException {
79     ensureOpen();
80     ensureCanWrite(name);
81     return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
82   }
83
84   private final static class DirectIOLinuxIndexOutput extends IndexOutput {
85     private final ByteBuffer buffer;
86     private final FileOutputStream fos;
87     private final FileChannel channel;
88     private final int bufferSize;
89
90     //private final File path;
91
92     private int bufferPos;
93     private long filePos;
94     private long fileLength;
95     private boolean isOpen;
96
97     public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
98       //this.path = path;
99       FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
100       fos = new FileOutputStream(fd);
101       //fos = new FileOutputStream(path);
102       channel = fos.getChannel();
103       buffer = ByteBuffer.allocateDirect(bufferSize);
104       this.bufferSize = bufferSize;
105       isOpen = true;
106     }
107
108     @Override
109     public void writeByte(byte b) throws IOException {
110       assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
111       buffer.put(b);
112       if (++bufferPos == bufferSize) {
113         dump();
114       }
115     }
116
117     @Override
118     public void writeBytes(byte[] src, int offset, int len) throws IOException {
119       int toWrite = len;
120       while(true) {
121         final int left = bufferSize - bufferPos;
122         if (left <= toWrite) {
123           buffer.put(src, offset, left);
124           toWrite -= left;
125           offset += left;
126           bufferPos = bufferSize;
127           dump();
128         } else {
129           buffer.put(src, offset, toWrite);
130           bufferPos += toWrite;
131           break;
132         }
133       }
134     }
135
136     //@Override
137     //public void setLength() throws IOException {
138     //   TODO -- how to impl this?  neither FOS nor
139     //   FileChannel provides an API?
140     //}
141
142     @Override
143     public void flush() throws IOException {
144       // TODO -- I don't think this method is necessary?
145     }
146
147     private void dump() throws IOException {
148       buffer.flip();
149       final long limit = filePos + buffer.limit();
150       if (limit > fileLength) {
151         // this dump extends the file
152         fileLength = limit;
153       } else {
154         // we had seek'd back & wrote some changes
155       }
156
157       // must always round to next block
158       buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
159
160       assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
161       assert (filePos & ALIGN_NOT_MASK) == filePos;
162       //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
163       channel.write(buffer, filePos);
164       filePos += bufferPos;
165       bufferPos = 0;
166       buffer.clear();
167       //System.out.println("dump: done");
168
169       // TODO: the case where we'd seek'd back, wrote an
170       // entire buffer, we must here read the next buffer;
171       // likely Lucene won't trip on this since we only
172       // write smallish amounts on seeking back
173     }
174
175     @Override
176     public long getFilePointer() {
177       return filePos + bufferPos;
178     }
179
180     // TODO: seek is fragile at best; it can only properly
181     // handle seek & then change bytes that fit entirely
182     // within one buffer
183     @Override
184     public void seek(long pos) throws IOException {
185       if (pos != getFilePointer()) {
186         dump();
187         final long alignedPos = pos & ALIGN_NOT_MASK;
188         filePos = alignedPos;
189         int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
190         if (n < bufferSize) {
191           buffer.limit(n);
192         }
193         //System.out.println("seek refill=" + n);
194         final int delta = (int) (pos - alignedPos);
195         buffer.position(delta);
196         bufferPos = delta;
197       }
198     }
199
200     @Override
201     public long length() throws IOException {
202       return fileLength;
203     }
204
205     @Override
206     public void close() throws IOException {
207       if (isOpen) {
208         isOpen = false;
209         try {
210           dump();
211         } finally {
212           try {
213             //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
214             channel.truncate(fileLength);
215             //System.out.println("  now: " + channel.size());
216           } finally {
217             try {
218               channel.close();
219             } finally {
220               fos.close();
221               //System.out.println("  final len=" + path.length());
222             }
223           }
224         }
225       }
226     }
227   }
228
229   private final static class DirectIOLinuxIndexInput extends IndexInput {
230     private final ByteBuffer buffer;
231     private final FileInputStream fis;
232     private final FileChannel channel;
233     private final int bufferSize;
234
235     private boolean isOpen;
236     private boolean isClone;
237     private long filePos;
238     private int bufferPos;
239
240     public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
241       super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
242       FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
243       fis = new FileInputStream(fd);
244       channel = fis.getChannel();
245       this.bufferSize = bufferSize;
246       buffer = ByteBuffer.allocateDirect(bufferSize);
247       isOpen = true;
248       isClone = false;
249       filePos = -bufferSize;
250       bufferPos = bufferSize;
251       //System.out.println("D open " + path + " this=" + this);
252     }
253
254     // for clone
255     public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
256       super(other.toString());
257       this.fis = null;
258       channel = other.channel;
259       this.bufferSize = other.bufferSize;
260       buffer = ByteBuffer.allocateDirect(bufferSize);
261       filePos = -bufferSize;
262       bufferPos = bufferSize;
263       isOpen = true;
264       isClone = true;
265       //System.out.println("D clone this=" + this);
266       seek(other.getFilePointer());
267     }
268
269     @Override
270     public void close() throws IOException {
271       if (isOpen && !isClone) {
272         try {
273           channel.close();
274         } finally {
275           if (!isClone) {
276             fis.close();
277           }
278         }
279       }
280     }
281
282     @Override
283     public long getFilePointer() {
284       return filePos + bufferPos;
285     }
286
287     @Override
288     public void seek(long pos) throws IOException {
289       if (pos != getFilePointer()) {
290         final long alignedPos = pos & ALIGN_NOT_MASK;
291         //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
292         filePos = alignedPos-bufferSize;
293         refill();
294         
295         final int delta = (int) (pos - alignedPos);
296         buffer.position(delta);
297         bufferPos = delta;
298       }
299     }
300
301     @Override
302     public long length() {
303       try {
304         return channel.size();
305       } catch (IOException ioe) {
306         throw new RuntimeException("IOException during length(): " + this, ioe);
307       }
308     }
309
310     @Override
311     public byte readByte() throws IOException {
312       // NOTE: we don't guard against EOF here... ie the
313       // "final" buffer will typically be filled to less
314       // than bufferSize
315       if (bufferPos == bufferSize) {
316         refill();
317       }
318       assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
319       bufferPos++;
320       return buffer.get();
321     }
322
323     private void refill() throws IOException {
324       buffer.clear();
325       filePos += bufferSize;
326       bufferPos = 0;
327       assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
328       //System.out.println("X refill filePos=" + filePos);
329       int n;
330       try {
331         n = channel.read(buffer, filePos);
332       } catch (IOException ioe) {
333         IOException newIOE = new IOException(ioe.getMessage() + ": " + this);
334         newIOE.initCause(ioe);
335         throw newIOE;
336       }
337       if (n < 0) {
338         throw new IOException("eof: " + this);
339       }
340       buffer.rewind();
341     }
342
343     @Override
344     public void readBytes(byte[] dst, int offset, int len) throws IOException {
345       int toRead = len;
346       //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
347       while(true) {
348         final int left = bufferSize - bufferPos;
349         if (left < toRead) {
350           //System.out.println("  copy " + left);
351           buffer.get(dst, offset, left);
352           toRead -= left;
353           offset += left;
354           refill();
355         } else {
356           //System.out.println("  copy " + toRead);
357           buffer.get(dst, offset, toRead);
358           bufferPos += toRead;
359           //System.out.println("  readBytes done");
360           break;
361         }
362       }
363     }
364
365     @Override
366     public Object clone() {
367       try {
368         return new DirectIOLinuxIndexInput(this);
369       } catch (IOException ioe) {
370         throw new RuntimeException("IOException during clone: " + this, ioe);
371       }
372     }
373   }
374 }