1 package org.apache.lucene.store;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to You under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations under
21 import java.io.IOException;
22 import java.io.FileInputStream;
23 import java.io.FileDescriptor;
24 import java.io.FileOutputStream;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
28 import org.apache.lucene.store.Directory; // javadoc
29 import org.apache.lucene.store.NativeFSLockFactory; // javadoc
32 * An {@link Directory} implementation that uses the
33 * Linux-specific O_DIRECT flag to bypass all OS level
34 * caching. To use this you must compile
35 * NativePosixUtil.cpp (exposes Linux-specific APIs through
36 * JNI) for your platform.
38 * <p><b>WARNING</b>: this code is very new and quite easily
39 * could contain horrible bugs. For example, here's one
40 * known issue: if you use seek in IndexOutput, and then
41 * write more than one buffer's worth of bytes, then the
42 * file will be wrong. Lucene does not do this (only writes
43 * small number of bytes after seek).
45 * @lucene.experimental
47 public class DirectIOLinuxDirectory extends FSDirectory {
49 private final static long ALIGN = 512;
50 private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
52 private final int forcedBufferSize;
54 /** Create a new NIOFSDirectory for the named location.
56 * @param path the path of the directory
57 * @param lockFactory the lock factory to use, or null for the default
58 * ({@link NativeFSLockFactory});
59 * @param forcedBufferSize if this is 0, just use Lucene's
60 * default buffer size; else, force this buffer size.
61 * For best performance, force the buffer size to
62 * something fairly large (eg 1 MB), but note that this
63 * will eat up the JRE's direct buffer storage space
66 public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
67 super(path, lockFactory);
68 this.forcedBufferSize = forcedBufferSize;
72 public IndexInput openInput(String name, int bufferSize) throws IOException {
74 return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
78 public IndexOutput createOutput(String name) throws IOException {
81 return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
84 private final static class DirectIOLinuxIndexOutput extends IndexOutput {
85 private final ByteBuffer buffer;
86 private final FileOutputStream fos;
87 private final FileChannel channel;
88 private final int bufferSize;
90 //private final File path;
92 private int bufferPos;
94 private long fileLength;
95 private boolean isOpen;
97 public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
99 FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
100 fos = new FileOutputStream(fd);
101 //fos = new FileOutputStream(path);
102 channel = fos.getChannel();
103 buffer = ByteBuffer.allocateDirect(bufferSize);
104 this.bufferSize = bufferSize;
109 public void writeByte(byte b) throws IOException {
110 assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
112 if (++bufferPos == bufferSize) {
118 public void writeBytes(byte[] src, int offset, int len) throws IOException {
121 final int left = bufferSize - bufferPos;
122 if (left <= toWrite) {
123 buffer.put(src, offset, left);
126 bufferPos = bufferSize;
129 buffer.put(src, offset, toWrite);
130 bufferPos += toWrite;
137 //public void setLength() throws IOException {
138 // TODO -- how to impl this? neither FOS nor
139 // FileChannel provides an API?
143 public void flush() throws IOException {
144 // TODO -- I don't think this method is necessary?
147 private void dump() throws IOException {
149 final long limit = filePos + buffer.limit();
150 if (limit > fileLength) {
151 // this dump extends the file
154 // we had seek'd back & wrote some changes
157 // must always round to next block
158 buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
160 assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
161 assert (filePos & ALIGN_NOT_MASK) == filePos;
162 //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
163 channel.write(buffer, filePos);
164 filePos += bufferPos;
167 //System.out.println("dump: done");
169 // TODO: the case where we'd seek'd back, wrote an
170 // entire buffer, we must here read the next buffer;
171 // likely Lucene won't trip on this since we only
172 // write smallish amounts on seeking back
176 public long getFilePointer() {
177 return filePos + bufferPos;
180 // TODO: seek is fragile at best; it can only properly
181 // handle seek & then change bytes that fit entirely
184 public void seek(long pos) throws IOException {
185 if (pos != getFilePointer()) {
187 final long alignedPos = pos & ALIGN_NOT_MASK;
188 filePos = alignedPos;
189 int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
190 if (n < bufferSize) {
193 //System.out.println("seek refill=" + n);
194 final int delta = (int) (pos - alignedPos);
195 buffer.position(delta);
201 public long length() throws IOException {
206 public void close() throws IOException {
213 //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
214 channel.truncate(fileLength);
215 //System.out.println(" now: " + channel.size());
221 //System.out.println(" final len=" + path.length());
229 private final static class DirectIOLinuxIndexInput extends IndexInput {
230 private final ByteBuffer buffer;
231 private final FileInputStream fis;
232 private final FileChannel channel;
233 private final int bufferSize;
235 private boolean isOpen;
236 private boolean isClone;
237 private long filePos;
238 private int bufferPos;
240 public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
241 super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
242 FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
243 fis = new FileInputStream(fd);
244 channel = fis.getChannel();
245 this.bufferSize = bufferSize;
246 buffer = ByteBuffer.allocateDirect(bufferSize);
249 filePos = -bufferSize;
250 bufferPos = bufferSize;
251 //System.out.println("D open " + path + " this=" + this);
255 public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
256 super(other.toString());
258 channel = other.channel;
259 this.bufferSize = other.bufferSize;
260 buffer = ByteBuffer.allocateDirect(bufferSize);
261 filePos = -bufferSize;
262 bufferPos = bufferSize;
265 //System.out.println("D clone this=" + this);
266 seek(other.getFilePointer());
270 public void close() throws IOException {
271 if (isOpen && !isClone) {
283 public long getFilePointer() {
284 return filePos + bufferPos;
288 public void seek(long pos) throws IOException {
289 if (pos != getFilePointer()) {
290 final long alignedPos = pos & ALIGN_NOT_MASK;
291 //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
292 filePos = alignedPos-bufferSize;
295 final int delta = (int) (pos - alignedPos);
296 buffer.position(delta);
302 public long length() {
304 return channel.size();
305 } catch (IOException ioe) {
306 throw new RuntimeException("IOException during length(): " + this, ioe);
311 public byte readByte() throws IOException {
312 // NOTE: we don't guard against EOF here... ie the
313 // "final" buffer will typically be filled to less
315 if (bufferPos == bufferSize) {
318 assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
323 private void refill() throws IOException {
325 filePos += bufferSize;
327 assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
328 //System.out.println("X refill filePos=" + filePos);
331 n = channel.read(buffer, filePos);
332 } catch (IOException ioe) {
333 IOException newIOE = new IOException(ioe.getMessage() + ": " + this);
334 newIOE.initCause(ioe);
338 throw new IOException("eof: " + this);
344 public void readBytes(byte[] dst, int offset, int len) throws IOException {
346 //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
348 final int left = bufferSize - bufferPos;
350 //System.out.println(" copy " + left);
351 buffer.get(dst, offset, left);
356 //System.out.println(" copy " + toRead);
357 buffer.get(dst, offset, toRead);
359 //System.out.println(" readBytes done");
366 public Object clone() {
368 return new DirectIOLinuxIndexInput(this);
369 } catch (IOException ioe) {
370 throw new RuntimeException("IOException during clone: " + this, ioe);