--- /dev/null
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.lucene.store.Directory; // javadoc
+import org.apache.lucene.store.NativeFSLockFactory; // javadoc
+
+/**
+ * An {@link Directory} implementation that uses the
+ * Linux-specific O_DIRECT flag to bypass all OS level
+ * caching. To use this you must compile
+ * NativePosixUtil.cpp (exposes Linux-specific APIs through
+ * JNI) for your platform.
+ *
+ * <p><b>WARNING</b>: this code is very new and quite easily
+ * could contain horrible bugs. For example, here's one
+ * known issue: if you use seek in IndexOutput, and then
+ * write more than one buffer's worth of bytes, then the
+ * file will be wrong. Lucene does not do this (only writes
+ * small number of bytes after seek).
+
+ * @lucene.experimental
+ */
+public class DirectIOLinuxDirectory extends FSDirectory {
+
+ private final static long ALIGN = 512;
+ private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
+
+ private final int forcedBufferSize;
+
+ /** Create a new NIOFSDirectory for the named location.
+ *
+ * @param path the path of the directory
+ * @param lockFactory the lock factory to use, or null for the default
+ * ({@link NativeFSLockFactory});
+ * @param forcedBufferSize if this is 0, just use Lucene's
+ * default buffer size; else, force this buffer size.
+ * For best performance, force the buffer size to
+ * something fairly large (eg 1 MB), but note that this
+ * will eat up the JRE's direct buffer storage space
+ * @throws IOException
+ */
+ public DirectIOLinuxDirectory(File path, LockFactory lockFactory, int forcedBufferSize) throws IOException {
+ super(path, lockFactory);
+ this.forcedBufferSize = forcedBufferSize;
+ }
+
+ @Override
+ public IndexInput openInput(String name, int bufferSize) throws IOException {
+ ensureOpen();
+ return new DirectIOLinuxIndexInput(new File(getDirectory(), name), forcedBufferSize == 0 ? bufferSize : forcedBufferSize);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name) throws IOException {
+ ensureOpen();
+ ensureCanWrite(name);
+ return new DirectIOLinuxIndexOutput(new File(getDirectory(), name), forcedBufferSize == 0 ? BufferedIndexOutput.BUFFER_SIZE : forcedBufferSize);
+ }
+
+ private final static class DirectIOLinuxIndexOutput extends IndexOutput {
+ private final ByteBuffer buffer;
+ private final FileOutputStream fos;
+ private final FileChannel channel;
+ private final int bufferSize;
+
+ //private final File path;
+
+ private int bufferPos;
+ private long filePos;
+ private long fileLength;
+ private boolean isOpen;
+
+ public DirectIOLinuxIndexOutput(File path, int bufferSize) throws IOException {
+ //this.path = path;
+ FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
+ fos = new FileOutputStream(fd);
+ //fos = new FileOutputStream(path);
+ channel = fos.getChannel();
+ buffer = ByteBuffer.allocateDirect(bufferSize);
+ this.bufferSize = bufferSize;
+ isOpen = true;
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
+ buffer.put(b);
+ if (++bufferPos == bufferSize) {
+ dump();
+ }
+ }
+
+ @Override
+ public void writeBytes(byte[] src, int offset, int len) throws IOException {
+ int toWrite = len;
+ while(true) {
+ final int left = bufferSize - bufferPos;
+ if (left <= toWrite) {
+ buffer.put(src, offset, left);
+ toWrite -= left;
+ offset += left;
+ bufferPos = bufferSize;
+ dump();
+ } else {
+ buffer.put(src, offset, toWrite);
+ bufferPos += toWrite;
+ break;
+ }
+ }
+ }
+
+ //@Override
+ //public void setLength() throws IOException {
+ // TODO -- how to impl this? neither FOS nor
+ // FileChannel provides an API?
+ //}
+
+ @Override
+ public void flush() throws IOException {
+ // TODO -- I don't think this method is necessary?
+ }
+
+ private void dump() throws IOException {
+ buffer.flip();
+ final long limit = filePos + buffer.limit();
+ if (limit > fileLength) {
+ // this dump extends the file
+ fileLength = limit;
+ } else {
+ // we had seek'd back & wrote some changes
+ }
+
+ // must always round to next block
+ buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
+
+ assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
+ assert (filePos & ALIGN_NOT_MASK) == filePos;
+ //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
+ channel.write(buffer, filePos);
+ filePos += bufferPos;
+ bufferPos = 0;
+ buffer.clear();
+ //System.out.println("dump: done");
+
+ // TODO: the case where we'd seek'd back, wrote an
+ // entire buffer, we must here read the next buffer;
+ // likely Lucene won't trip on this since we only
+ // write smallish amounts on seeking back
+ }
+
+ @Override
+ public long getFilePointer() {
+ return filePos + bufferPos;
+ }
+
+ // TODO: seek is fragile at best; it can only properly
+ // handle seek & then change bytes that fit entirely
+ // within one buffer
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos != getFilePointer()) {
+ dump();
+ final long alignedPos = pos & ALIGN_NOT_MASK;
+ filePos = alignedPos;
+ int n = (int) NativePosixUtil.pread(fos.getFD(), filePos, buffer);
+ if (n < bufferSize) {
+ buffer.limit(n);
+ }
+ //System.out.println("seek refill=" + n);
+ final int delta = (int) (pos - alignedPos);
+ buffer.position(delta);
+ bufferPos = delta;
+ }
+ }
+
+ @Override
+ public long length() throws IOException {
+ return fileLength;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen) {
+ isOpen = false;
+ try {
+ dump();
+ } finally {
+ try {
+ //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
+ channel.truncate(fileLength);
+ //System.out.println(" now: " + channel.size());
+ } finally {
+ try {
+ channel.close();
+ } finally {
+ fos.close();
+ //System.out.println(" final len=" + path.length());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private final static class DirectIOLinuxIndexInput extends IndexInput {
+ private final ByteBuffer buffer;
+ private final FileInputStream fis;
+ private final FileChannel channel;
+ private final int bufferSize;
+
+ private boolean isOpen;
+ private boolean isClone;
+ private long filePos;
+ private int bufferPos;
+
+ public DirectIOLinuxIndexInput(File path, int bufferSize) throws IOException {
+ super("DirectIOLinuxIndexInput(path=\"" + path.getPath() + "\")");
+ FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
+ fis = new FileInputStream(fd);
+ channel = fis.getChannel();
+ this.bufferSize = bufferSize;
+ buffer = ByteBuffer.allocateDirect(bufferSize);
+ isOpen = true;
+ isClone = false;
+ filePos = -bufferSize;
+ bufferPos = bufferSize;
+ //System.out.println("D open " + path + " this=" + this);
+ }
+
+ // for clone
+ public DirectIOLinuxIndexInput(DirectIOLinuxIndexInput other) throws IOException {
+ super(other.toString());
+ this.fis = null;
+ channel = other.channel;
+ this.bufferSize = other.bufferSize;
+ buffer = ByteBuffer.allocateDirect(bufferSize);
+ filePos = -bufferSize;
+ bufferPos = bufferSize;
+ isOpen = true;
+ isClone = true;
+ //System.out.println("D clone this=" + this);
+ seek(other.getFilePointer());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen && !isClone) {
+ try {
+ channel.close();
+ } finally {
+ if (!isClone) {
+ fis.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getFilePointer() {
+ return filePos + bufferPos;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos != getFilePointer()) {
+ final long alignedPos = pos & ALIGN_NOT_MASK;
+ //System.out.println("seek pos=" + pos + " aligned=" + alignedPos + " bufferSize=" + bufferSize + " this=" + this);
+ filePos = alignedPos-bufferSize;
+ refill();
+
+ final int delta = (int) (pos - alignedPos);
+ buffer.position(delta);
+ bufferPos = delta;
+ }
+ }
+
+ @Override
+ public long length() {
+ try {
+ return channel.size();
+ } catch (IOException ioe) {
+ throw new RuntimeException("IOException during length(): " + this, ioe);
+ }
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ // NOTE: we don't guard against EOF here... ie the
+ // "final" buffer will typically be filled to less
+ // than bufferSize
+ if (bufferPos == bufferSize) {
+ refill();
+ }
+ assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
+ bufferPos++;
+ return buffer.get();
+ }
+
+ private void refill() throws IOException {
+ buffer.clear();
+ filePos += bufferSize;
+ bufferPos = 0;
+ assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
+ //System.out.println("X refill filePos=" + filePos);
+ int n;
+ try {
+ n = channel.read(buffer, filePos);
+ } catch (IOException ioe) {
+ IOException newIOE = new IOException(ioe.getMessage() + ": " + this);
+ newIOE.initCause(ioe);
+ throw newIOE;
+ }
+ if (n < 0) {
+ throw new IOException("eof: " + this);
+ }
+ buffer.rewind();
+ }
+
+ @Override
+ public void readBytes(byte[] dst, int offset, int len) throws IOException {
+ int toRead = len;
+ //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
+ while(true) {
+ final int left = bufferSize - bufferPos;
+ if (left < toRead) {
+ //System.out.println(" copy " + left);
+ buffer.get(dst, offset, left);
+ toRead -= left;
+ offset += left;
+ refill();
+ } else {
+ //System.out.println(" copy " + toRead);
+ buffer.get(dst, offset, toRead);
+ bufferPos += toRead;
+ //System.out.println(" readBytes done");
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ return new DirectIOLinuxIndexInput(this);
+ } catch (IOException ioe) {
+ throw new RuntimeException("IOException during clone: " + this, ioe);
+ }
+ }
+ }
+}