log parallel job exceptions
[wolnelektury.git] / apps / search / index.py
index 1c79ea2..2ae909a 100644 (file)
@@ -1,4 +1,5 @@
 # -*- coding: utf-8 -*-
+
 from django.conf import settings
 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
     NumericField, Version, Document, JavaError, IndexSearcher, \
@@ -8,8 +9,10 @@ from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
     HashSet, BooleanClause, Term, CharTermAttribute, \
     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
-    Sort, Integer
+    Sort, Integer, \
+    initVM, CLASSPATH
     # KeywordAnalyzer
+JVM = initVM(CLASSPATH)
 import sys
 import os
 import errno
@@ -19,7 +22,7 @@ import catalogue.models
 from multiprocessing.pool import ThreadPool
 from threading import current_thread
 import atexit
-
+import traceback
 
 class WLAnalyzer(PerFieldAnalyzerWrapper):
     def __init__(self):
@@ -85,7 +88,6 @@ class Index(IndexStore):
         if overwrite:
             self.remove_book(book)
 
-
         doc = self.extract_metadata(book)
         parts = self.extract_content(book)
         block = ArrayList().of_(Document)
@@ -245,6 +247,17 @@ class Index(IndexStore):
         self.close()
 
 
+def log_exception_wrapper(f):
+    def _wrap(*a):
+       try:
+           f(*a)
+       except Exception, e:
+           print("Error in indexing thread: %s" % e)
+           traceback.print_exc()
+           raise e
+    return _wrap
+
+
 class ReusableIndex(Index):
     """
     Works like index, but does not close/optimize Lucene index
@@ -262,14 +275,14 @@ class ReusableIndex(Index):
             self.index = ReusableIndex.index
         else:
             print("opening index")
-            ReusableIndex.pool = ThreadPool(threads)
+            ReusableIndex.pool = ThreadPool(threads, initializer=lambda: JVM.attachCurrentThread() )
             ReusableIndex.pool_jobs = []
             Index.open(self, analyzer)
             ReusableIndex.index = self.index
             atexit.register(ReusableIndex.close_reusable)
 
     def index_book(self, *args, **kw):
-        job = ReusableIndex.pool.apply_async(Index.index_book, (self,)+ args, kw)
+        job = ReusableIndex.pool.apply_async(log_exception_wrapper(Index.index_book), (self,) + args, kw)
         ReusableIndex.pool_jobs.append(job)
 
     @staticmethod
@@ -277,7 +290,9 @@ class ReusableIndex(Index):
         if ReusableIndex.index is not None:
             print("closing index")
             for job in ReusableIndex.pool_jobs:
-                job.wait()
+                job.get()
+                sys.stdout.write('.')
+                sys.stdout.flush()
             ReusableIndex.pool.close()
 
             ReusableIndex.index.optimize()