log parallel job exceptions
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2
3 from django.conf import settings
4 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
5     NumericField, Version, Document, JavaError, IndexSearcher, \
6     QueryParser, Term, PerFieldAnalyzerWrapper, \
7     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
8     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
9     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
10     HashSet, BooleanClause, Term, CharTermAttribute, \
11     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
12     Sort, Integer, \
13     initVM, CLASSPATH
14     # KeywordAnalyzer
15 JVM = initVM(CLASSPATH)
16 import sys
17 import os
18 import errno
19 from librarian import dcparser
20 from librarian.parser import WLDocument
21 import catalogue.models
22 from multiprocessing.pool import ThreadPool
23 from threading import current_thread
24 import atexit
25 import traceback
26
27 class WLAnalyzer(PerFieldAnalyzerWrapper):
28     def __init__(self):
29         polish = PolishAnalyzer(Version.LUCENE_34)
30         simple = SimpleAnalyzer(Version.LUCENE_34)
31         keyword = KeywordAnalyzer(Version.LUCENE_34)
32         # not sure if needed: there's NOT_ANALYZED meaning basically the same
33
34         PerFieldAnalyzerWrapper.__init__(self, polish)
35
36         self.addAnalyzer("tags", simple)
37         self.addAnalyzer("technical_editors", simple)
38         self.addAnalyzer("editors", simple)
39         self.addAnalyzer("url", keyword)
40         self.addAnalyzer("source_url", keyword)
41         self.addAnalyzer("source_name", simple)
42         self.addAnalyzer("publisher", simple)
43         self.addAnalyzer("author", simple)
44         self.addAnalyzer("is_book", keyword)
45
46         #self.addanalyzer("fragment_anchor", keyword)
47
48
49 class IndexStore(object):
50     def __init__(self):
51         self.make_index_dir()
52         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
53
54     def make_index_dir(self):
55         try:
56             os.makedirs(settings.SEARCH_INDEX)
57         except OSError as exc:
58             if exc.errno == errno.EEXIST:
59                 pass
60             else: raise
61
62
63 class Index(IndexStore):
64     def __init__(self, analyzer=None):
65         IndexStore.__init__(self)
66         self.index = None
67         if not analyzer:
68             analyzer = WLAnalyzer()
69         self.analyzer = analyzer
70
71     def open(self, analyzer=None):
72         if self.index:
73             raise Exception("Index is already opened")
74         self.index = IndexWriter(self.store, self.analyzer,\
75                                  IndexWriter.MaxFieldLength.LIMITED)
76         return self.index
77
78     def close(self):
79         self.index.optimize()
80         self.index.close()
81         self.index = None
82
83     def remove_book(self, book):
84         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
85         self.index.deleteDocuments(q)
86
87     def index_book(self, book, overwrite=True):
88         if overwrite:
89             self.remove_book(book)
90
91         doc = self.extract_metadata(book)
92         parts = self.extract_content(book)
93         block = ArrayList().of_(Document)
94
95         for p in parts:
96             block.add(p)
97         block.add(doc)
98         self.index.addDocuments(block)
99
100     master_tags = [
101         'opowiadanie',
102         'powiesc',
103         'dramat_wierszowany_l',
104         'dramat_wierszowany_lp',
105         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
106         'wywiad'
107         ]
108
109     skip_header_tags = ['autor_utworu', 'nazwa_utworu', 'dzielo_nadrzedne']
110
111     def create_book_doc(self, book):
112         """
113         Create a lucene document connected to the book
114         """
115         doc = Document()
116         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
117         if book.parent is not None:
118             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
119         return doc
120
121     def extract_metadata(self, book):
122         book_info = dcparser.parse(book.xml_file)
123
124         print("extract metadata for book %s id=%d, thread%d" % (book.slug, book.id, current_thread().ident))
125         
126         doc = self.create_book_doc(book)
127         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
128         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
129         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
130
131         # validator, name
132         for field in dcparser.BookInfo.FIELDS:
133             if hasattr(book_info, field.name):
134                 if not getattr(book_info, field.name):
135                     continue
136                 # since no type information is available, we use validator
137                 type_indicator = field.validator
138                 if type_indicator == dcparser.as_unicode:
139                     s = getattr(book_info, field.name)
140                     if field.multiple:
141                         s = ', '.join(s)
142                     try:
143                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
144                     except JavaError as je:
145                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
146                 elif type_indicator == dcparser.as_person:
147                     p = getattr(book_info, field.name)
148                     if isinstance(p, dcparser.Person):
149                         persons = unicode(p)
150                     else:
151                         persons = ', '.join(map(unicode, p))
152                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
153                 elif type_indicator == dcparser.as_date:
154                     dt = getattr(book_info, field.name)
155                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
156         return doc
157
158     def get_master(self, root):
159         for master in root.iter():
160             if master.tag in self.master_tags:
161                 return master
162
163     
164     def extract_content(self, book):
165         wld = WLDocument.from_file(book.xml_file.path)
166         root = wld.edoc.getroot()
167
168         # first we build a sequence of top-level items.
169         # book_id
170         # header_index - the 0-indexed position of header element.
171         # content
172         master = self.get_master(root)
173         if master is None:
174             return []
175         
176         header_docs = []
177         for header, position in zip(list(master), range(len(master))):
178             if header.tag in self.skip_header_tags:
179                 continue
180             doc = self.create_book_doc(book)
181             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
182             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
183             content = u' '.join([t for t in header.itertext()])
184             doc.add(Field("content", content, Field.Store.YES, Field.Index.ANALYZED))
185             header_docs.append(doc)
186
187         def walker(node):
188             yield node, None
189             for child in list(node):
190                 for b, e in walker(child):
191                     yield b, e
192             yield None, node
193             return
194
195         # Then we create a document for each fragments
196         # fragment_anchor - the anchor
197         # themes - list of themes [not indexed]
198         fragment_docs = []
199         # will contain (framgent id -> { content: [], themes: [] }
200         fragments = {}
201         for start, end in walker(master):
202             if start is not None and start.tag == 'begin':
203                 fid = start.attrib['id'][1:]
204                 fragments[fid] = {'content': [], 'themes': []}
205                 fragments[fid]['content'].append(start.tail)
206             elif start is not None and start.tag == 'motyw':
207                 fid = start.attrib['id'][1:]
208                 fragments[fid]['themes'].append(start.text)
209                 fragments[fid]['content'].append(start.tail)
210             elif start is not None and start.tag == 'end':
211                 fid = start.attrib['id'][1:]
212                 if fid not in fragments:
213                     continue  # a broken <end> node, skip it
214                 frag = fragments[fid]
215                 del fragments[fid]
216
217                 def jstr(l):
218                     return u' '.join(map(
219                         lambda x: x == None and u'(none)' or unicode(x),
220                         l))
221
222                 doc = self.create_book_doc(book)
223                 doc.add(Field("fragment_anchor", fid,
224                               Field.Store.YES, Field.Index.NOT_ANALYZED))
225                 doc.add(Field("content",
226                               u' '.join(filter(lambda s: s is not None, frag['content'])),
227                               Field.Store.YES, Field.Index.ANALYZED))
228                 doc.add(Field("themes",
229                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
230                               Field.Store.NO, Field.Index.ANALYZED))
231
232                 fragment_docs.append(doc)
233             elif start is not None:
234                 for frag in fragments.values():
235                     frag['content'].append(start.text)
236             elif end is not None:
237                 for frag in fragments.values():
238                     frag['content'].append(end.tail)
239
240         return header_docs + fragment_docs
241
242     def __enter__(self):
243         self.open()
244         return self
245
246     def __exit__(self, type, value, tb):
247         self.close()
248
249
250 def log_exception_wrapper(f):
251     def _wrap(*a):
252         try:
253             f(*a)
254         except Exception, e:
255             print("Error in indexing thread: %s" % e)
256             traceback.print_exc()
257             raise e
258     return _wrap
259
260
261 class ReusableIndex(Index):
262     """
263     Works like index, but does not close/optimize Lucene index
264     until program exit (uses atexit hook).
265     This is usefull for importbooks command.
266
267     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
268     """
269     index = None
270     pool = None
271     pool_jobs = None
272
273     def open(self, analyzer=None, threads=4):
274         if ReusableIndex.index is not None:
275             self.index = ReusableIndex.index
276         else:
277             print("opening index")
278             ReusableIndex.pool = ThreadPool(threads, initializer=lambda: JVM.attachCurrentThread() )
279             ReusableIndex.pool_jobs = []
280             Index.open(self, analyzer)
281             ReusableIndex.index = self.index
282             atexit.register(ReusableIndex.close_reusable)
283
284     def index_book(self, *args, **kw):
285         job = ReusableIndex.pool.apply_async(log_exception_wrapper(Index.index_book), (self,) + args, kw)
286         ReusableIndex.pool_jobs.append(job)
287
288     @staticmethod
289     def close_reusable():
290         if ReusableIndex.index is not None:
291             print("closing index")
292             for job in ReusableIndex.pool_jobs:
293                 job.get()
294                 sys.stdout.write('.')
295                 sys.stdout.flush()
296             ReusableIndex.pool.close()
297
298             ReusableIndex.index.optimize()
299             ReusableIndex.index.close()
300             ReusableIndex.index = None
301
302     def close(self):
303         pass
304
305
306 class Search(IndexStore):
307     def __init__(self, default_field="content"):
308         IndexStore.__init__(self)
309         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
310         ## self.analyzer = WLAnalyzer()
311         self.searcher = IndexSearcher(self.store, True)
312         self.parser = QueryParser(Version.LUCENE_34, default_field,
313                                   self.analyzer)
314
315         self.parent_filter = TermsFilter()
316         self.parent_filter.addTerm(Term("is_book", "true"))
317
318     def query(self, query):
319         return self.parser.parse(query)
320
321     def wrapjoins(self, query, fields=[]):
322         """
323         This functions modifies the query in a recursive way,
324         so Term and Phrase Queries contained, which match
325         provided fields are wrapped in a BlockJoinQuery,
326         and so delegated to children documents.
327         """
328         if BooleanQuery.instance_(query):
329             qs = BooleanQuery.cast_(query)
330             for clause in qs:
331                 clause = BooleanClause.cast_(clause)
332                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
333             return qs
334         else:
335             termset = HashSet()
336             query.extractTerms(termset)
337             for t in termset:
338                 t = Term.cast_(t)
339                 if t.field() not in fields:
340                     return query
341             return BlockJoinQuery(query, self.parent_filter,
342                                   BlockJoinQuery.ScoreMode.Total)
343
344     def simple_search(self, query, max_results=50):
345         """Returns (books, total_hits)
346         """
347
348         tops = self.searcher.search(self.query(query), max_results)
349         bks = []
350         for found in tops.scoreDocs:
351             doc = self.searcher.doc(found.doc)
352             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
353         return (bks, tops.totalHits)
354
355     def search(self, query, max_results=50):
356         query = self.query(query)
357         query = self.wrapjoins(query, ["content", "themes"])
358
359         tops = self.searcher.search(query, max_results)
360         bks = []
361         for found in tops.scoreDocs:
362             doc = self.searcher.doc(found.doc)
363             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
364         return (bks, tops.totalHits)
365
366     def bsearch(self, query, max_results=50):
367         q = self.query(query)
368         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
369
370         tops = self.searcher.search(bjq, max_results)
371         bks = []
372         for found in tops.scoreDocs:
373             doc = self.searcher.doc(found.doc)
374             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
375         return (bks, tops.totalHits)
376
377 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
378 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
379 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
380
381 # while (tokenStream.incrementToken()) {
382 #     int startOffset = offsetAttribute.startOffset();
383 #     int endOffset = offsetAttribute.endOffset();
384 #     String term = charTermAttribute.toString();
385 # }
386
387
388 class SearchResult(object):
389     def __init__(self, searcher, scoreDocs, score=None):
390         if score:
391             self.score = score
392         else:
393             self.score = scoreDocs.score
394
395         self.fragments = []
396         self.scores = {}
397         self.sections = []
398
399         stored = searcher.doc(scoreDocs.doc)
400         self.book_id = int(stored.get("book_id"))
401
402         fragment = stored.get("fragment_anchor")
403         if fragment:
404             self.fragments.append(fragment)
405             self.scores[fragment] = scoreDocs.score
406
407         header_type = stored.get("header_type")
408         if header_type:
409             sec = (header_type, int(stored.get("header_index")))
410             self.sections.append(sec)
411             self.scores[sec] = scoreDocs.score
412
413     def get_book(self):
414         return catalogue.models.Book.objects.get(id=self.book_id)
415
416     book = property(get_book)
417
418     def get_parts(self):
419         book = self.book
420         parts = [{"header": s[0], "position": s[1], '_score_key': s} for s in self.sections] \
421             + [{"fragment": book.fragments.get(anchor=f), '_score_key':f} for f in self.fragments]
422
423         parts.sort(lambda a, b: cmp(self.scores[a['_score_key']], self.scores[b['_score_key']]))
424         print("bookid: %d parts: %s" % (self.book_id, parts))
425         return parts
426
427     parts = property(get_parts)
428
429     def merge(self, other):
430         if self.book_id != other.book_id:
431             raise ValueError("this search result is or book %d; tried to merge with %d" % (self.book_id, other.book_id))
432         self.fragments += other.fragments
433         self.sections += other.sections
434         self.scores.update(other.scores)
435         if other.score > self.score:
436             self.score = other.score
437         return self
438
439     def __unicode__(self):
440         return u'SearchResult(book_id=%d, score=%d)' % (self.book_id, self.score)
441
442     @staticmethod
443     def aggregate(*result_lists):
444         books = {}
445         for rl in result_lists:
446             for r in rl:
447                 if r.book_id in books:
448                     books[r.book_id].merge(r)
449                     #print(u"already have one with score %f, and this one has score %f" % (books[book.id][0], found.score))
450                 else:
451                     books[r.book_id] = r
452         return books.values()
453
454     def __cmp__(self, other):
455         return cmp(self.score, other.score)
456
457
458 class MultiSearch(Search):
459     """Class capable of IMDb-like searching"""
460     def get_tokens(self, queryreader):
461         if isinstance(queryreader, str) or isinstance(queryreader, unicode):
462             queryreader = StringReader(queryreader)
463         queryreader.reset()
464         tokens = self.analyzer.reusableTokenStream('content', queryreader)
465         toks = []
466         while tokens.incrementToken():
467             cta = tokens.getAttribute(CharTermAttribute.class_)
468             toks.append(cta.toString())
469         return toks
470
471     def make_phrase(self, tokens, field='content', slop=2):
472         phrase = PhraseQuery()
473         phrase.setSlop(slop)
474         for t in tokens:
475             term = Term(field, t)
476             phrase.add(term)
477         return phrase
478
479     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD):
480         q = BooleanQuery()
481         for t in tokens:
482             term = Term(field, t)
483             q.add(BooleanClause(TermQuery(term), modal))
484         return q
485
486     def content_query(self, query):
487         return BlockJoinQuery(query, self.parent_filter,
488                               BlockJoinQuery.ScoreMode.Total)
489
490     def search_perfect_book(self, tokens, max_results=20):
491         qrys = [self.make_phrase(tokens, field=fld) for fld in ['author', 'title']]
492
493         books = []
494         for q in qrys:
495             top = self.searcher.search(q, max_results)
496             for found in top.scoreDocs:
497                 books.append(SearchResult(self.searcher, found))
498         return books
499
500     def search_perfect_parts(self, tokens, max_results=20):
501         qrys = [self.make_phrase(tokens, field=fld) for fld in ['content']]
502
503         books = []
504         for q in qrys:
505             top = self.searcher.search(q, max_results)
506             for found in top.scoreDocs:
507                 books.append(SearchResult(self.searcher, found))
508
509         return books
510
511     def search_everywhere(self, tokens, max_results=20):
512         q = BooleanQuery()
513         in_meta = BooleanQuery()
514         in_content = BooleanQuery()
515
516         for fld in ['themes', 'content']:
517             in_content.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
518
519         for fld in ['author', 'title', 'epochs', 'genres', 'kinds']:
520             in_meta.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
521
522         q.add(BooleanClause(in_meta, BooleanClause.Occur.MUST))
523         in_content_join = self.content_query(in_content)
524         q.add(BooleanClause(in_content_join, BooleanClause.Occur.MUST))
525
526         collector = BlockJoinCollector(Sort.RELEVANCE, 100, True, True)
527
528         self.searcher.search(q, collector)
529
530         books = []
531
532         top_groups = collector.getTopGroups(in_content_join, Sort.RELEVANCE, 0, max_results, 0, True)
533         if top_groups:
534             for grp in top_groups.groups:
535                 for part in grp.scoreDocs:
536                     books.append(SearchResult(self.searcher, part, score=grp.maxScore))
537         return books
538
539     def multisearch(self, query, max_results=50):
540         """
541         Search strategy:
542         - (phrase) OR -> content
543                       -> title
544                       -> author
545         - (keywords)  -> author
546                       -> motyw
547                       -> tags
548                       -> content
549         """
550         # queryreader = StringReader(query)
551         # tokens = self.get_tokens(queryreader)
552
553         # top_level = BooleanQuery()
554         # Should = BooleanClause.Occur.SHOULD
555
556         # phrase_level = BooleanQuery()
557         # phrase_level.setBoost(1.3)
558
559         # p_content = self.make_phrase(tokens, joined=True)
560         # p_title = self.make_phrase(tokens, 'title')
561         # p_author = self.make_phrase(tokens, 'author')
562
563         # phrase_level.add(BooleanClause(p_content, Should))
564         # phrase_level.add(BooleanClause(p_title, Should))
565         # phrase_level.add(BooleanClause(p_author, Should))
566
567         # kw_level = BooleanQuery()
568
569         # kw_level.add(self.make_term_query(tokens, 'author'), Should)
570         # j_themes = self.make_term_query(tokens, 'themes', joined=True)
571         # kw_level.add(j_themes, Should)
572         # kw_level.add(self.make_term_query(tokens, 'tags'), Should)
573         # j_con = self.make_term_query(tokens, joined=True)
574         # kw_level.add(j_con, Should)
575
576         # top_level.add(BooleanClause(phrase_level, Should))
577         # top_level.add(BooleanClause(kw_level, Should))
578
579         return None
580
581     
582     def do_search(self, query, max_results=50, collector=None):
583         tops = self.searcher.search(query, max_results)
584         #tops = self.searcher.search(p_content, max_results)
585
586         bks = []
587         for found in tops.scoreDocs:
588             doc = self.searcher.doc(found.doc)
589             b = catalogue.models.Book.objects.get(id=doc.get("book_id"))
590             bks.append(b)
591             print "%s (%d) -> %f" % (b, b.id, found.score)
592         return (bks, tops.totalHits)