add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / contrib / misc / src / java / org / apache / lucene / store / NRTCachingDirectory.java
1 package org.apache.lucene.store;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import org.apache.lucene.index.ConcurrentMergeScheduler;
27 import org.apache.lucene.index.IndexFileNames;
28 import org.apache.lucene.index.IndexWriter;       // javadocs
29 import org.apache.lucene.index.MergePolicy;
30 import org.apache.lucene.index.MergeScheduler;
31 import org.apache.lucene.store.RAMDirectory;      // javadocs
32 import org.apache.lucene.util.IOUtils;
33
34 // TODO
35 //   - let subclass dictate policy...?
36 //   - rename to MergeCacheingDir?  NRTCachingDir
37
38 /**
39  * Wraps a {@link RAMDirectory}
40  * around any provided delegate directory, to
41  * be used during NRT search.  Make sure you pull the merge
42  * scheduler using {@link #getMergeScheduler} and pass that to your
43  * {@link IndexWriter}; this class uses that to keep track of which
44  * merges are being done by which threads, to decide when to
45  * cache each written file.
46  *
47  * <p>This class is likely only useful in a near-real-time
48  * context, where indexing rate is lowish but reopen
49  * rate is highish, resulting in many tiny files being
50  * written.  This directory keeps such segments (as well as
51  * the segments produced by merging them, as long as they
52  * are small enough), in RAM.</p>
53  *
54  * <p>This is safe to use: when your app calls {IndexWriter#commit},
55  * all cached files will be flushed from the cached and sync'd.</p>
56  *
57  * <p><b>NOTE</b>: this class is somewhat sneaky in its
58  * approach for spying on merges to determine the size of a
59  * merge: it records which threads are running which merges
60  * by watching ConcurrentMergeScheduler's doMerge method.
61  * While this works correctly, likely future versions of
62  * this class will take a more general approach.
63  *
64  * <p>Here's a simple example usage:
65  *
66  * <pre>
67  *   Directory fsDir = FSDirectory.open(new File("/path/to/index"));
68  *   NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
69  *   IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_32, analyzer);
70  *   conf.setMergeScheduler(cachedFSDir.getMergeScheduler());
71  *   IndexWriter writer = new IndexWriter(cachedFSDir, conf);
72  * </pre>
73  *
74  * <p>This will cache all newly flushed segments, all merges
75  * whose expected segment size is <= 5 MB, unless the net
76  * cached bytes exceeds 60 MB at which point all writes will
77  * not be cached (until the net bytes falls below 60 MB).</p>
78  *
79  * @lucene.experimental
80  */
81
82 public class NRTCachingDirectory extends Directory {
83
84   private final RAMDirectory cache = new RAMDirectory();
85
86   private final Directory delegate;
87
88   private final long maxMergeSizeBytes;
89   private final long maxCachedBytes;
90
91   private static final boolean VERBOSE = false;
92
93   /**
94    *  We will cache a newly created output if 1) it's a
95    *  flush or a merge and the estimated size of the merged segment is <=
96    *  maxMergeSizeMB, and 2) the total cached bytes is <=
97    *  maxCachedMB */
98   public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) {
99     this.delegate = delegate;
100     maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024);
101     maxCachedBytes = (long) (maxCachedMB*1024*1024);
102   }
103
104   @Override
105   public LockFactory getLockFactory() {
106     return delegate.getLockFactory();
107   }
108
109   @Override
110   public void setLockFactory(LockFactory lf) throws IOException {
111     delegate.setLockFactory(lf);
112   }
113
114   @Override
115   public String getLockID() {
116     return delegate.getLockID();
117   }
118
119   @Override
120   public Lock makeLock(String name) {
121     return delegate.makeLock(name);
122   }
123
124   @Override
125   public void clearLock(String name) throws IOException {
126     delegate.clearLock(name);
127   }
128
129   @Override
130   public String toString() {
131     return "NRTCachingDirectory(" + delegate + "; maxCacheMB=" + (maxCachedBytes/1024/1024.) + " maxMergeSizeMB=" + (maxMergeSizeBytes/1024/1024.) + ")";
132   }
133
134   @Override
135   public synchronized String[] listAll() throws IOException {
136     final Set<String> files = new HashSet<String>();
137     for(String f : cache.listAll()) {
138       files.add(f);
139     }
140     // LUCENE-1468: our NRTCachingDirectory will actually exist (RAMDir!),
141     // but if the underlying delegate is an FSDir and mkdirs() has not
142     // yet been called, because so far everything is a cached write,
143     // in this case, we don't want to throw a NoSuchDirectoryException
144     try {
145       for(String f : delegate.listAll()) {
146         // Cannot do this -- if lucene calls createOutput but
147         // file already exists then this falsely trips:
148         //assert !files.contains(f): "file \"" + f + "\" is in both dirs";
149         files.add(f);
150       }
151     } catch (NoSuchDirectoryException ex) {
152       // however, if there are no cached files, then the directory truly
153       // does not "exist"
154       if (files.isEmpty()) {
155         throw ex;
156       }
157     }
158     return files.toArray(new String[files.size()]);
159   }
160
161   /** Returns how many bytes are being used by the
162    *  RAMDirectory cache */
163   public long sizeInBytes()  {
164     return cache.sizeInBytes();
165   }
166
167   @Override
168   public synchronized boolean fileExists(String name) throws IOException {
169     return cache.fileExists(name) || delegate.fileExists(name);
170   }
171
172   @Override
173   public synchronized long fileModified(String name) throws IOException {
174     if (cache.fileExists(name)) {
175       return cache.fileModified(name);
176     } else {
177       return delegate.fileModified(name);
178     }
179   }
180
181   @Override
182   @Deprecated
183   /*  @deprecated Lucene never uses this API; it will be
184    *  removed in 4.0. */
185   public synchronized void touchFile(String name) throws IOException {
186     if (cache.fileExists(name)) {
187       cache.touchFile(name);
188     } else {
189       delegate.touchFile(name);
190     }
191   }
192
193   @Override
194   public synchronized void deleteFile(String name) throws IOException {
195     if (VERBOSE) {
196       System.out.println("nrtdir.deleteFile name=" + name);
197     }
198     if (cache.fileExists(name)) {
199       assert !delegate.fileExists(name);
200       cache.deleteFile(name);
201     } else {
202       delegate.deleteFile(name);
203     }
204   }
205
206   @Override
207   public synchronized long fileLength(String name) throws IOException {
208     if (cache.fileExists(name)) {
209       return cache.fileLength(name);
210     } else {
211       return delegate.fileLength(name);
212     }
213   }
214
215   public String[] listCachedFiles() {
216     return cache.listAll();
217   }
218
219   @Override
220   public IndexOutput createOutput(String name) throws IOException {
221     if (VERBOSE) {
222       System.out.println("nrtdir.createOutput name=" + name);
223     }
224     if (doCacheWrite(name)) {
225       if (VERBOSE) {
226         System.out.println("  to cache");
227       }
228       return cache.createOutput(name);
229     } else {
230       return delegate.createOutput(name);
231     }
232   }
233
234   @Override
235   public void sync(Collection<String> fileNames) throws IOException {
236     if (VERBOSE) {
237       System.out.println("nrtdir.sync files=" + fileNames);
238     }
239     for(String fileName : fileNames) {
240       unCache(fileName);
241     }
242     delegate.sync(fileNames);
243   }
244
245   @Override
246   public synchronized IndexInput openInput(String name) throws IOException {
247     if (VERBOSE) {
248       System.out.println("nrtdir.openInput name=" + name);
249     }
250     if (cache.fileExists(name)) {
251       if (VERBOSE) {
252         System.out.println("  from cache");
253       }
254       return cache.openInput(name);
255     } else {
256       return delegate.openInput(name);
257     }
258   }
259
260   @Override
261   public synchronized IndexInput openInput(String name, int bufferSize) throws IOException {
262     if (cache.fileExists(name)) {
263       return cache.openInput(name, bufferSize);
264     } else {
265       return delegate.openInput(name, bufferSize);
266     }
267   }
268
269   /** Close thius directory, which flushes any cached files
270    *  to the delegate and then closes the delegate. */
271   @Override
272   public void close() throws IOException {
273     for(String fileName : cache.listAll()) {
274       unCache(fileName);
275     }
276     cache.close();
277     delegate.close();
278   }
279
280   private final ConcurrentHashMap<Thread,MergePolicy.OneMerge> merges = new ConcurrentHashMap<Thread,MergePolicy.OneMerge>();
281
282   public MergeScheduler getMergeScheduler() {
283     return new ConcurrentMergeScheduler() {
284       @Override
285       protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
286         try {
287           merges.put(Thread.currentThread(), merge);
288           super.doMerge(merge);
289         } finally {
290           merges.remove(Thread.currentThread());
291         }
292       }
293     };
294   }
295
296   /** Subclass can override this to customize logic; return
297    *  true if this file should be written to the RAMDirectory. */
298   protected boolean doCacheWrite(String name) {
299     final MergePolicy.OneMerge merge = merges.get(Thread.currentThread());
300     //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes));
301     return !name.equals(IndexFileNames.SEGMENTS_GEN) && (merge == null || merge.estimatedMergeBytes <= maxMergeSizeBytes) && cache.sizeInBytes() <= maxCachedBytes;
302   }
303
304   private void unCache(String fileName) throws IOException {
305     final IndexOutput out;
306     synchronized(this) {
307       if (!delegate.fileExists(fileName)) {
308         assert cache.fileExists(fileName);
309         out = delegate.createOutput(fileName);
310       } else {
311         out = null;
312       }
313     }
314
315     if (out != null) {
316       IndexInput in = null;
317       try {
318         in = cache.openInput(fileName);
319         in.copyBytes(out, in.length());
320       } finally {
321         IOUtils.close(in, out);
322       }
323       synchronized(this) {
324         cache.deleteFile(fileName);
325       }
326     }
327   }
328 }
329