pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / src / java / org / apache / lucene / store / NRTCachingDirectory.java
diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/store/NRTCachingDirectory.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/store/NRTCachingDirectory.java
new file mode 100644 (file)
index 0000000..6dfb2b9
--- /dev/null
@@ -0,0 +1,329 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.lucene.index.ConcurrentMergeScheduler;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;       // javadocs
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.store.RAMDirectory;      // javadocs
+import org.apache.lucene.util.IOUtils;
+
+// TODO
+//   - let subclass dictate policy...?
+//   - rename to MergeCacheingDir?  NRTCachingDir
+
+/**
+ * Wraps a {@link RAMDirectory}
+ * around any provided delegate directory, to
+ * be used during NRT search.  Make sure you pull the merge
+ * scheduler using {@link #getMergeScheduler} and pass that to your
+ * {@link IndexWriter}; this class uses that to keep track of which
+ * merges are being done by which threads, to decide when to
+ * cache each written file.
+ *
+ * <p>This class is likely only useful in a near-real-time
+ * context, where indexing rate is lowish but reopen
+ * rate is highish, resulting in many tiny files being
+ * written.  This directory keeps such segments (as well as
+ * the segments produced by merging them, as long as they
+ * are small enough), in RAM.</p>
+ *
+ * <p>This is safe to use: when your app calls {IndexWriter#commit},
+ * all cached files will be flushed from the cached and sync'd.</p>
+ *
+ * <p><b>NOTE</b>: this class is somewhat sneaky in its
+ * approach for spying on merges to determine the size of a
+ * merge: it records which threads are running which merges
+ * by watching ConcurrentMergeScheduler's doMerge method.
+ * While this works correctly, likely future versions of
+ * this class will take a more general approach.
+ *
+ * <p>Here's a simple example usage:
+ *
+ * <pre>
+ *   Directory fsDir = FSDirectory.open(new File("/path/to/index"));
+ *   NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
+ *   IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
+ *   conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
+ *   IndexWriter writer = new IndexWriter(cachedFSDir, conf);
+ * </pre>
+ *
+ * <p>This will cache all newly flushed segments, all merges
+ * whose expected segment size is <= 5 MB, unless the net
+ * cached bytes exceeds 60 MB at which point all writes will
+ * not be cached (until the net bytes falls below 60 MB).</p>
+ *
+ * @lucene.experimental
+ */
+
+public class NRTCachingDirectory extends Directory {
+
+  private final RAMDirectory cache = new RAMDirectory();
+
+  private final Directory delegate;
+
+  private final long maxMergeSizeBytes;
+  private final long maxCachedBytes;
+
+  private static final boolean VERBOSE = false;
+
+  /**
+   *  We will cache a newly created output if 1) it's a
+   *  flush or a merge and the estimated size of the merged segment is <=
+   *  maxMergeSizeMB, and 2) the total cached bytes is <=
+   *  maxCachedMB */
+  public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) {
+    this.delegate = delegate;
+    maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024);
+    maxCachedBytes = (long) (maxCachedMB*1024*1024);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return delegate.getLockFactory();
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lf) throws IOException {
+    delegate.setLockFactory(lf);
+  }
+
+  @Override
+  public String getLockID() {
+    return delegate.getLockID();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return delegate.makeLock(name);
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    delegate.clearLock(name);
+  }
+
+  @Override
+  public String toString() {
+    return "NRTCachingDirectory(" + delegate + "; maxCacheMB=" + (maxCachedBytes/1024/1024.) + " maxMergeSizeMB=" + (maxMergeSizeBytes/1024/1024.) + ")";
+  }
+
+  @Override
+  public synchronized String[] listAll() throws IOException {
+    final Set<String> files = new HashSet<String>();
+    for(String f : cache.listAll()) {
+      files.add(f);
+    }
+    // LUCENE-1468: our NRTCachingDirectory will actually exist (RAMDir!),
+    // but if the underlying delegate is an FSDir and mkdirs() has not
+    // yet been called, because so far everything is a cached write,
+    // in this case, we don't want to throw a NoSuchDirectoryException
+    try {
+      for(String f : delegate.listAll()) {
+        // Cannot do this -- if lucene calls createOutput but
+        // file already exists then this falsely trips:
+        //assert !files.contains(f): "file \"" + f + "\" is in both dirs";
+        files.add(f);
+      }
+    } catch (NoSuchDirectoryException ex) {
+      // however, if there are no cached files, then the directory truly
+      // does not "exist"
+      if (files.isEmpty()) {
+        throw ex;
+      }
+    }
+    return files.toArray(new String[files.size()]);
+  }
+
+  /** Returns how many bytes are being used by the
+   *  RAMDirectory cache */
+  public long sizeInBytes()  {
+    return cache.sizeInBytes();
+  }
+
+  @Override
+  public synchronized boolean fileExists(String name) throws IOException {
+    return cache.fileExists(name) || delegate.fileExists(name);
+  }
+
+  @Override
+  public synchronized long fileModified(String name) throws IOException {
+    if (cache.fileExists(name)) {
+      return cache.fileModified(name);
+    } else {
+      return delegate.fileModified(name);
+    }
+  }
+
+  @Override
+  @Deprecated
+  /*  @deprecated Lucene never uses this API; it will be
+   *  removed in 4.0. */
+  public synchronized void touchFile(String name) throws IOException {
+    if (cache.fileExists(name)) {
+      cache.touchFile(name);
+    } else {
+      delegate.touchFile(name);
+    }
+  }
+
+  @Override
+  public synchronized void deleteFile(String name) throws IOException {
+    if (VERBOSE) {
+      System.out.println("nrtdir.deleteFile name=" + name);
+    }
+    if (cache.fileExists(name)) {
+      assert !delegate.fileExists(name);
+      cache.deleteFile(name);
+    } else {
+      delegate.deleteFile(name);
+    }
+  }
+
+  @Override
+  public synchronized long fileLength(String name) throws IOException {
+    if (cache.fileExists(name)) {
+      return cache.fileLength(name);
+    } else {
+      return delegate.fileLength(name);
+    }
+  }
+
+  public String[] listCachedFiles() {
+    return cache.listAll();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name) throws IOException {
+    if (VERBOSE) {
+      System.out.println("nrtdir.createOutput name=" + name);
+    }
+    if (doCacheWrite(name)) {
+      if (VERBOSE) {
+        System.out.println("  to cache");
+      }
+      return cache.createOutput(name);
+    } else {
+      return delegate.createOutput(name);
+    }
+  }
+
+  @Override
+  public void sync(Collection<String> fileNames) throws IOException {
+    if (VERBOSE) {
+      System.out.println("nrtdir.sync files=" + fileNames);
+    }
+    for(String fileName : fileNames) {
+      unCache(fileName);
+    }
+    delegate.sync(fileNames);
+  }
+
+  @Override
+  public synchronized IndexInput openInput(String name) throws IOException {
+    if (VERBOSE) {
+      System.out.println("nrtdir.openInput name=" + name);
+    }
+    if (cache.fileExists(name)) {
+      if (VERBOSE) {
+        System.out.println("  from cache");
+      }
+      return cache.openInput(name);
+    } else {
+      return delegate.openInput(name);
+    }
+  }
+
+  @Override
+  public synchronized IndexInput openInput(String name, int bufferSize) throws IOException {
+    if (cache.fileExists(name)) {
+      return cache.openInput(name, bufferSize);
+    } else {
+      return delegate.openInput(name, bufferSize);
+    }
+  }
+
+  /** Close thius directory, which flushes any cached files
+   *  to the delegate and then closes the delegate. */
+  @Override
+  public void close() throws IOException {
+    for(String fileName : cache.listAll()) {
+      unCache(fileName);
+    }
+    cache.close();
+    delegate.close();
+  }
+
+  private final ConcurrentHashMap<Thread,MergePolicy.OneMerge> merges = new ConcurrentHashMap<Thread,MergePolicy.OneMerge>();
+
+  public MergeScheduler getMergeScheduler() {
+    return new ConcurrentMergeScheduler() {
+      @Override
+      protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+        try {
+          merges.put(Thread.currentThread(), merge);
+          super.doMerge(merge);
+        } finally {
+          merges.remove(Thread.currentThread());
+        }
+      }
+    };
+  }
+
+  /** Subclass can override this to customize logic; return
+   *  true if this file should be written to the RAMDirectory. */
+  protected boolean doCacheWrite(String name) {
+    final MergePolicy.OneMerge merge = merges.get(Thread.currentThread());
+    //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes));
+    return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes;
+  }
+
+  private void unCache(String fileName) throws IOException {
+    final IndexOutput out;
+    synchronized(this) {
+      if (!delegate.fileExists(fileName)) {
+        assert cache.fileExists(fileName);
+        out = delegate.createOutput(fileName);
+      } else {
+        out = null;
+      }
+    }
+
+    if (out != null) {
+      IndexInput in = null;
+      try {
+        in = cache.openInput(fileName);
+        in.copyBytes(out, in.length());
+      } finally {
+        IOUtils.close(in, out);
+      }
+      synchronized(this) {
+        cache.deleteFile(fileName);
+      }
+    }
+  }
+}
+