1 package org.apache.lucene.store;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.HashSet;
24 import java.util.concurrent.ConcurrentHashMap;
26 import org.apache.lucene.index.ConcurrentMergeScheduler;
27 import org.apache.lucene.index.IndexFileNames;
28 import org.apache.lucene.index.IndexWriter; // javadocs
29 import org.apache.lucene.index.MergePolicy;
30 import org.apache.lucene.index.MergeScheduler;
31 import org.apache.lucene.store.RAMDirectory; // javadocs
32 import org.apache.lucene.util.IOUtils;
35 // - let subclass dictate policy...?
36 // - rename to MergeCacheingDir? NRTCachingDir
39 * Wraps a {@link RAMDirectory}
40 * around any provided delegate directory, to
41 * be used during NRT search. Make sure you pull the merge
42 * scheduler using {@link #getMergeScheduler} and pass that to your
43 * {@link IndexWriter}; this class uses that to keep track of which
44 * merges are being done by which threads, to decide when to
45 * cache each written file.
47 * <p>This class is likely only useful in a near-real-time
48 * context, where indexing rate is lowish but reopen
49 * rate is highish, resulting in many tiny files being
50 * written. This directory keeps such segments (as well as
51 * the segments produced by merging them, as long as they
52 * are small enough), in RAM.</p>
54 * <p>This is safe to use: when your app calls {IndexWriter#commit},
55 * all cached files will be flushed from the cached and sync'd.</p>
57 * <p><b>NOTE</b>: this class is somewhat sneaky in its
58 * approach for spying on merges to determine the size of a
59 * merge: it records which threads are running which merges
60 * by watching ConcurrentMergeScheduler's doMerge method.
61 * While this works correctly, likely future versions of
62 * this class will take a more general approach.
64 * <p>Here's a simple example usage:
67 * Directory fsDir = FSDirectory.open(new File("/path/to/index"));
68 * NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
69 * IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
70 * conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
71 * IndexWriter writer = new IndexWriter(cachedFSDir, conf);
74 * <p>This will cache all newly flushed segments, all merges
75 * whose expected segment size is <= 5 MB, unless the net
76 * cached bytes exceeds 60 MB at which point all writes will
77 * not be cached (until the net bytes falls below 60 MB).</p>
79 * @lucene.experimental
82 public class NRTCachingDirectory extends Directory {
84 private final RAMDirectory cache = new RAMDirectory();
86 private final Directory delegate;
88 private final long maxMergeSizeBytes;
89 private final long maxCachedBytes;
91 private static final boolean VERBOSE = false;
94 * We will cache a newly created output if 1) it's a
95 * flush or a merge and the estimated size of the merged segment is <=
96 * maxMergeSizeMB, and 2) the total cached bytes is <=
98 public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) {
99 this.delegate = delegate;
100 maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024);
101 maxCachedBytes = (long) (maxCachedMB*1024*1024);
105 public LockFactory getLockFactory() {
106 return delegate.getLockFactory();
110 public void setLockFactory(LockFactory lf) throws IOException {
111 delegate.setLockFactory(lf);
115 public String getLockID() {
116 return delegate.getLockID();
120 public Lock makeLock(String name) {
121 return delegate.makeLock(name);
125 public void clearLock(String name) throws IOException {
126 delegate.clearLock(name);
130 public String toString() {
131 return "NRTCachingDirectory(" + delegate + "; maxCacheMB=" + (maxCachedBytes/1024/1024.) + " maxMergeSizeMB=" + (maxMergeSizeBytes/1024/1024.) + ")";
135 public synchronized String[] listAll() throws IOException {
136 final Set<String> files = new HashSet<String>();
137 for(String f : cache.listAll()) {
140 // LUCENE-1468: our NRTCachingDirectory will actually exist (RAMDir!),
141 // but if the underlying delegate is an FSDir and mkdirs() has not
142 // yet been called, because so far everything is a cached write,
143 // in this case, we don't want to throw a NoSuchDirectoryException
145 for(String f : delegate.listAll()) {
146 // Cannot do this -- if lucene calls createOutput but
147 // file already exists then this falsely trips:
148 //assert !files.contains(f): "file \"" + f + "\" is in both dirs";
151 } catch (NoSuchDirectoryException ex) {
152 // however, if there are no cached files, then the directory truly
154 if (files.isEmpty()) {
158 return files.toArray(new String[files.size()]);
161 /** Returns how many bytes are being used by the
162 * RAMDirectory cache */
163 public long sizeInBytes() {
164 return cache.sizeInBytes();
168 public synchronized boolean fileExists(String name) throws IOException {
169 return cache.fileExists(name) || delegate.fileExists(name);
173 public synchronized long fileModified(String name) throws IOException {
174 if (cache.fileExists(name)) {
175 return cache.fileModified(name);
177 return delegate.fileModified(name);
183 /* @deprecated Lucene never uses this API; it will be
185 public synchronized void touchFile(String name) throws IOException {
186 if (cache.fileExists(name)) {
187 cache.touchFile(name);
189 delegate.touchFile(name);
194 public synchronized void deleteFile(String name) throws IOException {
196 System.out.println("nrtdir.deleteFile name=" + name);
198 if (cache.fileExists(name)) {
199 assert !delegate.fileExists(name);
200 cache.deleteFile(name);
202 delegate.deleteFile(name);
207 public synchronized long fileLength(String name) throws IOException {
208 if (cache.fileExists(name)) {
209 return cache.fileLength(name);
211 return delegate.fileLength(name);
215 public String[] listCachedFiles() {
216 return cache.listAll();
220 public IndexOutput createOutput(String name) throws IOException {
222 System.out.println("nrtdir.createOutput name=" + name);
224 if (doCacheWrite(name)) {
226 System.out.println(" to cache");
228 return cache.createOutput(name);
230 return delegate.createOutput(name);
235 public void sync(Collection<String> fileNames) throws IOException {
237 System.out.println("nrtdir.sync files=" + fileNames);
239 for(String fileName : fileNames) {
242 delegate.sync(fileNames);
246 public synchronized IndexInput openInput(String name) throws IOException {
248 System.out.println("nrtdir.openInput name=" + name);
250 if (cache.fileExists(name)) {
252 System.out.println(" from cache");
254 return cache.openInput(name);
256 return delegate.openInput(name);
261 public synchronized IndexInput openInput(String name, int bufferSize) throws IOException {
262 if (cache.fileExists(name)) {
263 return cache.openInput(name, bufferSize);
265 return delegate.openInput(name, bufferSize);
269 /** Close thius directory, which flushes any cached files
270 * to the delegate and then closes the delegate. */
272 public void close() throws IOException {
273 for(String fileName : cache.listAll()) {
280 private final ConcurrentHashMap<Thread,MergePolicy.OneMerge> merges = new ConcurrentHashMap<Thread,MergePolicy.OneMerge>();
282 public MergeScheduler getMergeScheduler() {
283 return new ConcurrentMergeScheduler() {
285 protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
287 merges.put(Thread.currentThread(), merge);
288 super.doMerge(merge);
290 merges.remove(Thread.currentThread());
296 /** Subclass can override this to customize logic; return
297 * true if this file should be written to the RAMDirectory. */
298 protected boolean doCacheWrite(String name) {
299 final MergePolicy.OneMerge merge = merges.get(Thread.currentThread());
300 //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes));
301 return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes;
304 private void unCache(String fileName) throws IOException {
305 final IndexOutput out;
307 if (!delegate.fileExists(fileName)) {
308 assert cache.fileExists(fileName);
309 out = delegate.createOutput(fileName);
316 IndexInput in = null;
318 in = cache.openInput(fileName);
319 in.copyBytes(out, in.length());
321 IOUtils.close(in, out);
324 cache.deleteFile(fileName);