fixed multithreading bug with attaching to jvm
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2
3 from django.conf import settings
4 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
5     NumericField, Version, Document, JavaError, IndexSearcher, \
6     QueryParser, Term, PerFieldAnalyzerWrapper, \
7     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
8     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
9     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
10     HashSet, BooleanClause, Term, CharTermAttribute, \
11     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
12     Sort, Integer, \
13     initVM, CLASSPATH
14     # KeywordAnalyzer
15 JVM = initVM(CLASSPATH)
16 import sys
17 import os
18 import errno
19 from librarian import dcparser
20 from librarian.parser import WLDocument
21 import catalogue.models
22 from multiprocessing.pool import ThreadPool
23 from threading import current_thread
24 import atexit
25
26
27 class WLAnalyzer(PerFieldAnalyzerWrapper):
28     def __init__(self):
29         polish = PolishAnalyzer(Version.LUCENE_34)
30         simple = SimpleAnalyzer(Version.LUCENE_34)
31         keyword = KeywordAnalyzer(Version.LUCENE_34)
32         # not sure if needed: there's NOT_ANALYZED meaning basically the same
33
34         PerFieldAnalyzerWrapper.__init__(self, polish)
35
36         self.addAnalyzer("tags", simple)
37         self.addAnalyzer("technical_editors", simple)
38         self.addAnalyzer("editors", simple)
39         self.addAnalyzer("url", keyword)
40         self.addAnalyzer("source_url", keyword)
41         self.addAnalyzer("source_name", simple)
42         self.addAnalyzer("publisher", simple)
43         self.addAnalyzer("author", simple)
44         self.addAnalyzer("is_book", keyword)
45
46         #self.addanalyzer("fragment_anchor", keyword)
47
48
49 class IndexStore(object):
50     def __init__(self):
51         self.make_index_dir()
52         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
53
54     def make_index_dir(self):
55         try:
56             os.makedirs(settings.SEARCH_INDEX)
57         except OSError as exc:
58             if exc.errno == errno.EEXIST:
59                 pass
60             else: raise
61
62
63 class Index(IndexStore):
64     def __init__(self, analyzer=None):
65         IndexStore.__init__(self)
66         self.index = None
67         if not analyzer:
68             analyzer = WLAnalyzer()
69         self.analyzer = analyzer
70
71     def open(self, analyzer=None):
72         if self.index:
73             raise Exception("Index is already opened")
74         self.index = IndexWriter(self.store, self.analyzer,\
75                                  IndexWriter.MaxFieldLength.LIMITED)
76         return self.index
77
78     def close(self):
79         self.index.optimize()
80         self.index.close()
81         self.index = None
82
83     def remove_book(self, book):
84         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
85         self.index.deleteDocuments(q)
86
87     def index_book(self, book, overwrite=True):
88         if overwrite:
89             self.remove_book(book)
90
91         doc = self.extract_metadata(book)
92         parts = self.extract_content(book)
93         block = ArrayList().of_(Document)
94
95         for p in parts:
96             block.add(p)
97         block.add(doc)
98         self.index.addDocuments(block)
99
100     master_tags = [
101         'opowiadanie',
102         'powiesc',
103         'dramat_wierszowany_l',
104         'dramat_wierszowany_lp',
105         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
106         'wywiad'
107         ]
108
109     skip_header_tags = ['autor_utworu', 'nazwa_utworu', 'dzielo_nadrzedne']
110
111     def create_book_doc(self, book):
112         """
113         Create a lucene document connected to the book
114         """
115         doc = Document()
116         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
117         if book.parent is not None:
118             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
119         return doc
120
121     def extract_metadata(self, book):
122         book_info = dcparser.parse(book.xml_file)
123
124         print("extract metadata for book %s id=%d, thread%d" % (book.slug, book.id, current_thread().ident))
125         
126         doc = self.create_book_doc(book)
127         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
128         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
129         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
130
131         # validator, name
132         for field in dcparser.BookInfo.FIELDS:
133             if hasattr(book_info, field.name):
134                 if not getattr(book_info, field.name):
135                     continue
136                 # since no type information is available, we use validator
137                 type_indicator = field.validator
138                 if type_indicator == dcparser.as_unicode:
139                     s = getattr(book_info, field.name)
140                     if field.multiple:
141                         s = ', '.join(s)
142                     try:
143                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
144                     except JavaError as je:
145                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
146                 elif type_indicator == dcparser.as_person:
147                     p = getattr(book_info, field.name)
148                     if isinstance(p, dcparser.Person):
149                         persons = unicode(p)
150                     else:
151                         persons = ', '.join(map(unicode, p))
152                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
153                 elif type_indicator == dcparser.as_date:
154                     dt = getattr(book_info, field.name)
155                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
156         return doc
157
158     def get_master(self, root):
159         for master in root.iter():
160             if master.tag in self.master_tags:
161                 return master
162
163     
164     def extract_content(self, book):
165         wld = WLDocument.from_file(book.xml_file.path)
166         root = wld.edoc.getroot()
167
168         # first we build a sequence of top-level items.
169         # book_id
170         # header_index - the 0-indexed position of header element.
171         # content
172         master = self.get_master(root)
173         if master is None:
174             return []
175         
176         header_docs = []
177         for header, position in zip(list(master), range(len(master))):
178             if header.tag in self.skip_header_tags:
179                 continue
180             doc = self.create_book_doc(book)
181             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
182             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
183             content = u' '.join([t for t in header.itertext()])
184             doc.add(Field("content", content, Field.Store.YES, Field.Index.ANALYZED))
185             header_docs.append(doc)
186
187         def walker(node):
188             yield node, None
189             for child in list(node):
190                 for b, e in walker(child):
191                     yield b, e
192             yield None, node
193             return
194
195         # Then we create a document for each fragments
196         # fragment_anchor - the anchor
197         # themes - list of themes [not indexed]
198         fragment_docs = []
199         # will contain (framgent id -> { content: [], themes: [] }
200         fragments = {}
201         for start, end in walker(master):
202             if start is not None and start.tag == 'begin':
203                 fid = start.attrib['id'][1:]
204                 fragments[fid] = {'content': [], 'themes': []}
205                 fragments[fid]['content'].append(start.tail)
206             elif start is not None and start.tag == 'motyw':
207                 fid = start.attrib['id'][1:]
208                 fragments[fid]['themes'].append(start.text)
209                 fragments[fid]['content'].append(start.tail)
210             elif start is not None and start.tag == 'end':
211                 fid = start.attrib['id'][1:]
212                 if fid not in fragments:
213                     continue  # a broken <end> node, skip it
214                 frag = fragments[fid]
215                 del fragments[fid]
216
217                 def jstr(l):
218                     return u' '.join(map(
219                         lambda x: x == None and u'(none)' or unicode(x),
220                         l))
221
222                 doc = self.create_book_doc(book)
223                 doc.add(Field("fragment_anchor", fid,
224                               Field.Store.YES, Field.Index.NOT_ANALYZED))
225                 doc.add(Field("content",
226                               u' '.join(filter(lambda s: s is not None, frag['content'])),
227                               Field.Store.YES, Field.Index.ANALYZED))
228                 doc.add(Field("themes",
229                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
230                               Field.Store.NO, Field.Index.ANALYZED))
231
232                 fragment_docs.append(doc)
233             elif start is not None:
234                 for frag in fragments.values():
235                     frag['content'].append(start.text)
236             elif end is not None:
237                 for frag in fragments.values():
238                     frag['content'].append(end.tail)
239
240         return header_docs + fragment_docs
241
242     def __enter__(self):
243         self.open()
244         return self
245
246     def __exit__(self, type, value, tb):
247         self.close()
248
249
250 class ReusableIndex(Index):
251     """
252     Works like index, but does not close/optimize Lucene index
253     until program exit (uses atexit hook).
254     This is usefull for importbooks command.
255
256     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
257     """
258     index = None
259     pool = None
260     pool_jobs = None
261
262     def open(self, analyzer=None, threads=4):
263         if ReusableIndex.index is not None:
264             self.index = ReusableIndex.index
265         else:
266             print("opening index")
267             ReusableIndex.pool = ThreadPool(threads, initializer=lambda: JVM.attachCurrentThread() )
268             ReusableIndex.pool_jobs = []
269             Index.open(self, analyzer)
270             ReusableIndex.index = self.index
271             atexit.register(ReusableIndex.close_reusable)
272
273     def index_book(self, *args, **kw):
274         job = ReusableIndex.pool.apply_async(Index.index_book, (self,) + args, kw)
275         ReusableIndex.pool_jobs.append(job)
276
277     @staticmethod
278     def close_reusable():
279         if ReusableIndex.index is not None:
280             print("closing index")
281             for job in ReusableIndex.pool_jobs:
282                 job.get()
283                 sys.stdout.write('.')
284                 sys.stdout.flush()
285             ReusableIndex.pool.close()
286
287             ReusableIndex.index.optimize()
288             ReusableIndex.index.close()
289             ReusableIndex.index = None
290
291     def close(self):
292         pass
293
294
295 class Search(IndexStore):
296     def __init__(self, default_field="content"):
297         IndexStore.__init__(self)
298         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
299         ## self.analyzer = WLAnalyzer()
300         self.searcher = IndexSearcher(self.store, True)
301         self.parser = QueryParser(Version.LUCENE_34, default_field,
302                                   self.analyzer)
303
304         self.parent_filter = TermsFilter()
305         self.parent_filter.addTerm(Term("is_book", "true"))
306
307     def query(self, query):
308         return self.parser.parse(query)
309
310     def wrapjoins(self, query, fields=[]):
311         """
312         This functions modifies the query in a recursive way,
313         so Term and Phrase Queries contained, which match
314         provided fields are wrapped in a BlockJoinQuery,
315         and so delegated to children documents.
316         """
317         if BooleanQuery.instance_(query):
318             qs = BooleanQuery.cast_(query)
319             for clause in qs:
320                 clause = BooleanClause.cast_(clause)
321                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
322             return qs
323         else:
324             termset = HashSet()
325             query.extractTerms(termset)
326             for t in termset:
327                 t = Term.cast_(t)
328                 if t.field() not in fields:
329                     return query
330             return BlockJoinQuery(query, self.parent_filter,
331                                   BlockJoinQuery.ScoreMode.Total)
332
333     def simple_search(self, query, max_results=50):
334         """Returns (books, total_hits)
335         """
336
337         tops = self.searcher.search(self.query(query), max_results)
338         bks = []
339         for found in tops.scoreDocs:
340             doc = self.searcher.doc(found.doc)
341             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
342         return (bks, tops.totalHits)
343
344     def search(self, query, max_results=50):
345         query = self.query(query)
346         query = self.wrapjoins(query, ["content", "themes"])
347
348         tops = self.searcher.search(query, max_results)
349         bks = []
350         for found in tops.scoreDocs:
351             doc = self.searcher.doc(found.doc)
352             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
353         return (bks, tops.totalHits)
354
355     def bsearch(self, query, max_results=50):
356         q = self.query(query)
357         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
358
359         tops = self.searcher.search(bjq, max_results)
360         bks = []
361         for found in tops.scoreDocs:
362             doc = self.searcher.doc(found.doc)
363             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
364         return (bks, tops.totalHits)
365
366 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
367 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
368 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
369
370 # while (tokenStream.incrementToken()) {
371 #     int startOffset = offsetAttribute.startOffset();
372 #     int endOffset = offsetAttribute.endOffset();
373 #     String term = charTermAttribute.toString();
374 # }
375
376
377 class SearchResult(object):
378     def __init__(self, searcher, scoreDocs, score=None):
379         if score:
380             self.score = score
381         else:
382             self.score = scoreDocs.score
383
384         self.fragments = []
385         self.scores = {}
386         self.sections = []
387
388         stored = searcher.doc(scoreDocs.doc)
389         self.book_id = int(stored.get("book_id"))
390
391         fragment = stored.get("fragment_anchor")
392         if fragment:
393             self.fragments.append(fragment)
394             self.scores[fragment] = scoreDocs.score
395
396         header_type = stored.get("header_type")
397         if header_type:
398             sec = (header_type, int(stored.get("header_index")))
399             self.sections.append(sec)
400             self.scores[sec] = scoreDocs.score
401
402     def get_book(self):
403         return catalogue.models.Book.objects.get(id=self.book_id)
404
405     book = property(get_book)
406
407     def get_parts(self):
408         book = self.book
409         parts = [{"header": s[0], "position": s[1], '_score_key': s} for s in self.sections] \
410             + [{"fragment": book.fragments.get(anchor=f), '_score_key':f} for f in self.fragments]
411
412         parts.sort(lambda a, b: cmp(self.scores[a['_score_key']], self.scores[b['_score_key']]))
413         print("bookid: %d parts: %s" % (self.book_id, parts))
414         return parts
415
416     parts = property(get_parts)
417
418     def merge(self, other):
419         if self.book_id != other.book_id:
420             raise ValueError("this search result is or book %d; tried to merge with %d" % (self.book_id, other.book_id))
421         self.fragments += other.fragments
422         self.sections += other.sections
423         self.scores.update(other.scores)
424         if other.score > self.score:
425             self.score = other.score
426         return self
427
428     def __unicode__(self):
429         return u'SearchResult(book_id=%d, score=%d)' % (self.book_id, self.score)
430
431     @staticmethod
432     def aggregate(*result_lists):
433         books = {}
434         for rl in result_lists:
435             for r in rl:
436                 if r.book_id in books:
437                     books[r.book_id].merge(r)
438                     #print(u"already have one with score %f, and this one has score %f" % (books[book.id][0], found.score))
439                 else:
440                     books[r.book_id] = r
441         return books.values()
442
443     def __cmp__(self, other):
444         return cmp(self.score, other.score)
445
446
447 class MultiSearch(Search):
448     """Class capable of IMDb-like searching"""
449     def get_tokens(self, queryreader):
450         if isinstance(queryreader, str) or isinstance(queryreader, unicode):
451             queryreader = StringReader(queryreader)
452         queryreader.reset()
453         tokens = self.analyzer.reusableTokenStream('content', queryreader)
454         toks = []
455         while tokens.incrementToken():
456             cta = tokens.getAttribute(CharTermAttribute.class_)
457             toks.append(cta.toString())
458         return toks
459
460     def make_phrase(self, tokens, field='content', slop=2):
461         phrase = PhraseQuery()
462         phrase.setSlop(slop)
463         for t in tokens:
464             term = Term(field, t)
465             phrase.add(term)
466         return phrase
467
468     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD):
469         q = BooleanQuery()
470         for t in tokens:
471             term = Term(field, t)
472             q.add(BooleanClause(TermQuery(term), modal))
473         return q
474
475     def content_query(self, query):
476         return BlockJoinQuery(query, self.parent_filter,
477                               BlockJoinQuery.ScoreMode.Total)
478
479     def search_perfect_book(self, tokens, max_results=20):
480         qrys = [self.make_phrase(tokens, field=fld) for fld in ['author', 'title']]
481
482         books = []
483         for q in qrys:
484             top = self.searcher.search(q, max_results)
485             for found in top.scoreDocs:
486                 books.append(SearchResult(self.searcher, found))
487         return books
488
489     def search_perfect_parts(self, tokens, max_results=20):
490         qrys = [self.make_phrase(tokens, field=fld) for fld in ['content']]
491
492         books = []
493         for q in qrys:
494             top = self.searcher.search(q, max_results)
495             for found in top.scoreDocs:
496                 books.append(SearchResult(self.searcher, found))
497
498         return books
499
500     def search_everywhere(self, tokens, max_results=20):
501         q = BooleanQuery()
502         in_meta = BooleanQuery()
503         in_content = BooleanQuery()
504
505         for fld in ['themes', 'content']:
506             in_content.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
507
508         for fld in ['author', 'title', 'epochs', 'genres', 'kinds']:
509             in_meta.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
510
511         q.add(BooleanClause(in_meta, BooleanClause.Occur.MUST))
512         in_content_join = self.content_query(in_content)
513         q.add(BooleanClause(in_content_join, BooleanClause.Occur.MUST))
514
515         collector = BlockJoinCollector(Sort.RELEVANCE, 100, True, True)
516
517         self.searcher.search(q, collector)
518
519         books = []
520
521         top_groups = collector.getTopGroups(in_content_join, Sort.RELEVANCE, 0, max_results, 0, True)
522         if top_groups:
523             for grp in top_groups.groups:
524                 for part in grp.scoreDocs:
525                     books.append(SearchResult(self.searcher, part, score=grp.maxScore))
526         return books
527
528     def multisearch(self, query, max_results=50):
529         """
530         Search strategy:
531         - (phrase) OR -> content
532                       -> title
533                       -> author
534         - (keywords)  -> author
535                       -> motyw
536                       -> tags
537                       -> content
538         """
539         # queryreader = StringReader(query)
540         # tokens = self.get_tokens(queryreader)
541
542         # top_level = BooleanQuery()
543         # Should = BooleanClause.Occur.SHOULD
544
545         # phrase_level = BooleanQuery()
546         # phrase_level.setBoost(1.3)
547
548         # p_content = self.make_phrase(tokens, joined=True)
549         # p_title = self.make_phrase(tokens, 'title')
550         # p_author = self.make_phrase(tokens, 'author')
551
552         # phrase_level.add(BooleanClause(p_content, Should))
553         # phrase_level.add(BooleanClause(p_title, Should))
554         # phrase_level.add(BooleanClause(p_author, Should))
555
556         # kw_level = BooleanQuery()
557
558         # kw_level.add(self.make_term_query(tokens, 'author'), Should)
559         # j_themes = self.make_term_query(tokens, 'themes', joined=True)
560         # kw_level.add(j_themes, Should)
561         # kw_level.add(self.make_term_query(tokens, 'tags'), Should)
562         # j_con = self.make_term_query(tokens, joined=True)
563         # kw_level.add(j_con, Should)
564
565         # top_level.add(BooleanClause(phrase_level, Should))
566         # top_level.add(BooleanClause(kw_level, Should))
567
568         return None
569
570     
571     def do_search(self, query, max_results=50, collector=None):
572         tops = self.searcher.search(query, max_results)
573         #tops = self.searcher.search(p_content, max_results)
574
575         bks = []
576         for found in tops.scoreDocs:
577             doc = self.searcher.doc(found.doc)
578             b = catalogue.models.Book.objects.get(id=doc.get("book_id"))
579             bks.append(b)
580             print "%s (%d) -> %f" % (b, b.id, found.score)
581         return (bks, tops.totalHits)