X-Git-Url: https://git.mdrn.pl/pylucene.git/blobdiff_plain/a2e61f0c04805cfcb8706176758d1283c7e3a55c..aaeed5504b982cf3545252ab528713250aa33eed:/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/search/NRTManager.java diff --git a/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/search/NRTManager.java b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/search/NRTManager.java new file mode 100644 index 0000000..5ed8829 --- /dev/null +++ b/lucene-java-3.5.0/lucene/src/java/org/apache/lucene/search/NRTManager.java @@ -0,0 +1,407 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexReader; // javadocs +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; // javadocs +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * Utility class to manage sharing near-real-time searchers + * across multiple searching threads. + * + *

NOTE: to use this class, you must call {@link #maybeReopen(boolean)} + * periodically. The {@link NRTManagerReopenThread} is a + * simple class to do this on a periodic basis. If you + * implement your own reopener, be sure to call {@link + * #addWaitingListener} so your reopener is notified when a + * caller is waiting for a specific generation searcher.

+ * + * @lucene.experimental + */ + +public class NRTManager implements Closeable { + private static final long MAX_SEARCHER_GEN = Long.MAX_VALUE; + private final IndexWriter writer; + private final SearcherManagerRef withoutDeletes; + private final SearcherManagerRef withDeletes; + private final AtomicLong indexingGen; + private final List waitingListeners = new CopyOnWriteArrayList(); + private final ReentrantLock reopenLock = new ReentrantLock(); + private final Condition newGeneration = reopenLock.newCondition(); + + /** + * Create new NRTManager. + * + * @param writer IndexWriter to open near-real-time + * readers + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. If this is non-null then a + * merged segment warmer is installed on the + * provided IndexWriter's config. + * + *

NOTE: the provided {@link SearcherWarmer} is + * not invoked for the initial searcher; you should + * warm it yourself if necessary. + */ + public NRTManager(IndexWriter writer, SearcherWarmer warmer) throws IOException { + this(writer, null, warmer, true); + } + + /** + * Create new NRTManager. + * + * @param writer IndexWriter to open near-real-time + * readers + * @param es optional ExecutorService so different segments can + * be searched concurrently (see {@link IndexSearcher#IndexSearcher(IndexReader, ExecutorService)}. + * Pass null to search segments sequentially. + * @param warmer optional {@link SearcherWarmer}. Pass + * null if you don't require the searcher to warmed + * before going live. If this is non-null then a + * merged segment warmer is installed on the + * provided IndexWriter's config. + * + *

NOTE: the provided {@link SearcherWarmer} is + * not invoked for the initial searcher; you should + * warm it yourself if necessary. + */ + public NRTManager(IndexWriter writer, ExecutorService es, + SearcherWarmer warmer) throws IOException { + this(writer, es, warmer, true); + } + + /** + * Expert: just like {@link + * #NRTManager(IndexWriter,ExecutorService,SearcherWarmer)}, + * but you can also specify whether every searcher must + * apply deletes. This is useful for cases where certain + * uses can tolerate seeing some deleted docs, since + * reopen time is faster if deletes need not be applied. */ + public NRTManager(IndexWriter writer, ExecutorService es, + SearcherWarmer warmer, boolean alwaysApplyDeletes) throws IOException { + this.writer = writer; + if (alwaysApplyDeletes) { + withoutDeletes = withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es)); + } else { + withDeletes = new SearcherManagerRef(true, 0, new SearcherManager(writer, true, warmer, es)); + withoutDeletes = new SearcherManagerRef(false, 0, new SearcherManager(writer, false, warmer, es)); + } + indexingGen = new AtomicLong(1); + } + + /** NRTManager invokes this interface to notify it when a + * caller is waiting for a specific generation searcher + * to be visible. */ + public static interface WaitingListener { + public void waiting(boolean requiresDeletes, long targetGen); + } + + /** Adds a listener, to be notified when a caller is + * waiting for a specific generation searcher to be + * visible. */ + public void addWaitingListener(WaitingListener l) { + waitingListeners.add(l); + } + + /** Remove a listener added with {@link + * #addWaitingListener}. */ + public void removeWaitingListener(WaitingListener l) { + waitingListeners.remove(l); + } + + public long updateDocument(Term t, Document d, Analyzer a) throws IOException { + writer.updateDocument(t, d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocument(Term t, Document d) throws IOException { + writer.updateDocument(t, d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Collection docs, Analyzer a) throws IOException { + writer.updateDocuments(t, docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long updateDocuments(Term t, Collection docs) throws IOException { + writer.updateDocuments(t, docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Term t) throws IOException { + writer.deleteDocuments(t); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Term... terms) throws IOException { + writer.deleteDocuments(terms); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Query q) throws IOException { + writer.deleteDocuments(q); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteDocuments(Query... queries) throws IOException { + writer.deleteDocuments(queries); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long deleteAll() throws IOException { + writer.deleteAll(); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(Document d, Analyzer a) throws IOException { + writer.addDocument(d, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Collection docs, Analyzer a) throws IOException { + writer.addDocuments(docs, a); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocument(Document d) throws IOException { + writer.addDocument(d); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addDocuments(Collection docs) throws IOException { + writer.addDocuments(docs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addIndexes(Directory... dirs) throws CorruptIndexException, IOException { + writer.addIndexes(dirs); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + public long addIndexes(IndexReader... readers) throws CorruptIndexException, IOException { + writer.addIndexes(readers); + // Return gen as of when indexing finished: + return indexingGen.get(); + } + + /** + * Waits for a given {@link SearcherManager} target generation to be available + * via {@link #getSearcherManager(boolean)}. If the current generation is less + * than the given target generation this method will block until the + * correspondent {@link SearcherManager} is reopened by another thread via + * {@link #maybeReopen(boolean)} or until the {@link NRTManager} is closed. + * + * @param targetGen the generation to wait for + * @param requireDeletes true iff the generation requires deletes to be applied otherwise false + * @return the {@link SearcherManager} with the given target generation + */ + public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes) { + return waitForGeneration(targetGen, requireDeletes, -1, TimeUnit.NANOSECONDS); + } + + /** + * Waits for a given {@link SearcherManager} target generation to be available + * via {@link #getSearcherManager(boolean)}. If the current generation is less + * than the given target generation this method will block until the + * correspondent {@link SearcherManager} is reopened by another thread via + * {@link #maybeReopen(boolean)}, the given waiting time has elapsed, or until + * the {@link NRTManager} is closed. + *

+ * NOTE: if the waiting time elapses before the requested target generation is + * available the latest {@link SearcherManager} is returned instead. + * + * @param targetGen + * the generation to wait for + * @param requireDeletes + * true iff the generation requires deletes to be + * applied otherwise false + * @param time + * the time to wait for the target generation + * @param unit + * the waiting time's time unit + * @return the {@link SearcherManager} with the given target generation or the + * latest {@link SearcherManager} if the waiting time elapsed before + * the requested generation is available. + */ + public SearcherManager waitForGeneration(long targetGen, boolean requireDeletes, long time, TimeUnit unit) { + try { + final long curGen = indexingGen.get(); + if (targetGen > curGen) { + throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by this NRTManager instance (current gen=" + curGen + ")"); + } + reopenLock.lockInterruptibly(); + try { + if (targetGen > getCurrentSearchingGen(requireDeletes)) { + for (WaitingListener listener : waitingListeners) { + listener.waiting(requireDeletes, targetGen); + } + while (targetGen > getCurrentSearchingGen(requireDeletes)) { + if (!waitOnGenCondition(time, unit)) { + return getSearcherManager(requireDeletes); + } + } + } + + } finally { + reopenLock.unlock(); + } + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + return getSearcherManager(requireDeletes); + } + + private boolean waitOnGenCondition(long time, TimeUnit unit) + throws InterruptedException { + assert reopenLock.isHeldByCurrentThread(); + if (time < 0) { + newGeneration.await(); + return true; + } else { + return newGeneration.await(time, unit); + } + } + + /** Returns generation of current searcher. */ + public long getCurrentSearchingGen(boolean applyAllDeletes) { + if (applyAllDeletes) { + return withDeletes.generation; + } else { + return Math.max(withoutDeletes.generation, withDeletes.generation); + } + } + + public boolean maybeReopen(boolean applyAllDeletes) throws IOException { + if (reopenLock.tryLock()) { + try { + final SearcherManagerRef reference = applyAllDeletes ? withDeletes : withoutDeletes; + // Mark gen as of when reopen started: + final long newSearcherGen = indexingGen.getAndIncrement(); + boolean setSearchGen = false; + if (reference.generation == MAX_SEARCHER_GEN) { + newGeneration.signalAll(); // wake up threads if we have a new generation + return false; + } + if (!(setSearchGen = reference.manager.isSearcherCurrent())) { + setSearchGen = reference.manager.maybeReopen(); + } + if (setSearchGen) { + reference.generation = newSearcherGen;// update searcher gen + newGeneration.signalAll(); // wake up threads if we have a new generation + } + return setSearchGen; + } finally { + reopenLock.unlock(); + } + } + return false; + } + + /** + * Close this NRTManager to future searching. Any searches still in process in + * other threads won't be affected, and they should still call + * {@link SearcherManager#release(IndexSearcher)} after they are done. + * + *

+ * NOTE: caller must separately close the writer. + */ + public void close() throws IOException { + reopenLock.lock(); + try { + try { + IOUtils.close(withDeletes, withoutDeletes); + } finally { // make sure we signal even if close throws an exception + newGeneration.signalAll(); + } + } finally { + reopenLock.unlock(); + assert withDeletes.generation == MAX_SEARCHER_GEN && withoutDeletes.generation == MAX_SEARCHER_GEN; + } + } + + /** + * Returns a {@link SearcherManager}. If applyAllDeletes is + * true the returned manager is guaranteed to have all deletes + * applied on the last reopen. Otherwise the latest manager with or without deletes + * is returned. + */ + public SearcherManager getSearcherManager(boolean applyAllDeletes) { + if (applyAllDeletes) { + return withDeletes.manager; + } else { + if (withDeletes.generation > withoutDeletes.generation) { + return withDeletes.manager; + } else { + return withoutDeletes.manager; + } + } + } + + static final class SearcherManagerRef implements Closeable { + final boolean applyDeletes; + volatile long generation; + final SearcherManager manager; + + SearcherManagerRef(boolean applyDeletes, long generation, SearcherManager manager) { + super(); + this.applyDeletes = applyDeletes; + this.generation = generation; + this.manager = manager; + } + + public void close() throws IOException { + generation = MAX_SEARCHER_GEN; // max it out to make sure nobody can wait on another gen + manager.close(); + } + } +}