7ec88c619174b0e1f0745ac1dbea577eba89b6dd
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2
3 from django.conf import settings
4 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
5     NumericField, Version, Document, JavaError, IndexSearcher, \
6     QueryParser, Term, PerFieldAnalyzerWrapper, \
7     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
8     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
9     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
10     HashSet, BooleanClause, Term, CharTermAttribute, \
11     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
12     Sort, Integer, \
13     initVM, CLASSPATH
14     # KeywordAnalyzer
15 JVM = initVM(CLASSPATH)
16 import sys
17 import os
18 import errno
19 from librarian import dcparser
20 from librarian.parser import WLDocument
21 import catalogue.models
22 from multiprocessing.pool import ThreadPool
23 from threading import current_thread
24 import atexit
25 import traceback
26
27 class WLAnalyzer(PerFieldAnalyzerWrapper):
28     def __init__(self):
29         polish = PolishAnalyzer(Version.LUCENE_34)
30         simple = SimpleAnalyzer(Version.LUCENE_34)
31         keyword = KeywordAnalyzer(Version.LUCENE_34)
32         # not sure if needed: there's NOT_ANALYZED meaning basically the same
33
34         PerFieldAnalyzerWrapper.__init__(self, polish)
35
36         self.addAnalyzer("tags", simple)
37         self.addAnalyzer("technical_editors", simple)
38         self.addAnalyzer("editors", simple)
39         self.addAnalyzer("url", keyword)
40         self.addAnalyzer("source_url", keyword)
41         self.addAnalyzer("source_name", simple)
42         self.addAnalyzer("publisher", simple)
43         self.addAnalyzer("author", simple)
44         self.addAnalyzer("is_book", keyword)
45
46         #self.addanalyzer("fragment_anchor", keyword)
47
48
49 class IndexStore(object):
50     def __init__(self):
51         self.make_index_dir()
52         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
53
54     def make_index_dir(self):
55         try:
56             os.makedirs(settings.SEARCH_INDEX)
57         except OSError as exc:
58             if exc.errno == errno.EEXIST:
59                 pass
60             else: raise
61
62
63 class Index(IndexStore):
64     def __init__(self, analyzer=None):
65         IndexStore.__init__(self)
66         self.index = None
67         if not analyzer:
68             analyzer = WLAnalyzer()
69         self.analyzer = analyzer
70
71     def open(self, analyzer=None):
72         if self.index:
73             raise Exception("Index is already opened")
74         self.index = IndexWriter(self.store, self.analyzer,\
75                                  IndexWriter.MaxFieldLength.LIMITED)
76         return self.index
77
78     def close(self):
79         self.index.optimize()
80         self.index.close()
81         self.index = None
82
83     def remove_book(self, book):
84         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
85         self.index.deleteDocuments(q)
86
87     def index_book(self, book, overwrite=True):
88         if overwrite:
89             self.remove_book(book)
90
91         doc = self.extract_metadata(book)
92         parts = self.extract_content(book)
93         block = ArrayList().of_(Document)
94
95         for p in parts:
96             block.add(p)
97         block.add(doc)
98         self.index.addDocuments(block)
99
100     master_tags = [
101         'opowiadanie',
102         'powiesc',
103         'dramat_wierszowany_l',
104         'dramat_wierszowany_lp',
105         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
106         'wywiad'
107         ]
108
109     skip_header_tags = ['autor_utworu', 'nazwa_utworu', 'dzielo_nadrzedne']
110
111     def create_book_doc(self, book):
112         """
113         Create a lucene document connected to the book
114         """
115         doc = Document()
116         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
117         if book.parent is not None:
118             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
119         return doc
120
121     def extract_metadata(self, book):
122         book_info = dcparser.parse(book.xml_file)
123
124         print("extract metadata for book %s id=%d, thread%d" % (book.slug, book.id, current_thread().ident))
125         
126         doc = self.create_book_doc(book)
127         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
128         #doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
129         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
130
131         # validator, name
132         for field in dcparser.BookInfo.FIELDS:
133             if hasattr(book_info, field.name):
134                 if not getattr(book_info, field.name):
135                     continue
136                 # since no type information is available, we use validator
137                 type_indicator = field.validator
138                 if type_indicator == dcparser.as_unicode:
139                     s = getattr(book_info, field.name)
140                     if field.multiple:
141                         s = ', '.join(s)
142                     try:
143                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
144                     except JavaError as je:
145                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
146                 elif type_indicator == dcparser.as_person:
147                     p = getattr(book_info, field.name)
148                     if isinstance(p, dcparser.Person):
149                         persons = unicode(p)
150                     else:
151                         persons = ', '.join(map(unicode, p))
152                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
153                 elif type_indicator == dcparser.as_date:
154                     dt = getattr(book_info, field.name)
155                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
156         return doc
157
158     def get_master(self, root):
159         for master in root.iter():
160             if master.tag in self.master_tags:
161                 return master
162
163     
164     def extract_content(self, book):
165         wld = WLDocument.from_file(book.xml_file.path)
166         root = wld.edoc.getroot()
167
168         # first we build a sequence of top-level items.
169         # book_id
170         # header_index - the 0-indexed position of header element.
171         # content
172         master = self.get_master(root)
173         if master is None:
174             return []
175         
176         header_docs = []
177         for header, position in zip(list(master), range(len(master))):
178             if header.tag in self.skip_header_tags:
179                 continue
180             doc = self.create_book_doc(book)
181             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
182             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
183             content = u' '.join([t for t in header.itertext()])
184             doc.add(Field("content", content, Field.Store.YES, Field.Index.ANALYZED))
185             header_docs.append(doc)
186
187         def walker(node):
188             yield node, None
189             for child in list(node):
190                 for b, e in walker(child):
191                     yield b, e
192             yield None, node
193             return
194
195         # Then we create a document for each fragments
196         # fragment_anchor - the anchor
197         # themes - list of themes [not indexed]
198         fragment_docs = []
199         # will contain (framgent id -> { content: [], themes: [] }
200         fragments = {}
201         for start, end in walker(master):
202             if start is not None and start.tag == 'begin':
203                 fid = start.attrib['id'][1:]
204                 fragments[fid] = {'content': [], 'themes': []}
205                 fragments[fid]['content'].append(start.tail)
206             elif start is not None and start.tag == 'motyw':
207                 fid = start.attrib['id'][1:]
208                 fragments[fid]['themes'].append(start.text)
209                 fragments[fid]['content'].append(start.tail)
210             elif start is not None and start.tag == 'end':
211                 fid = start.attrib['id'][1:]
212                 if fid not in fragments:
213                     continue  # a broken <end> node, skip it
214                 frag = fragments[fid]
215                 del fragments[fid]
216
217                 def jstr(l):
218                     return u' '.join(map(
219                         lambda x: x == None and u'(none)' or unicode(x),
220                         l))
221
222                 doc = self.create_book_doc(book)
223                 doc.add(Field("fragment_anchor", fid,
224                               Field.Store.YES, Field.Index.NOT_ANALYZED))
225                 doc.add(Field("content",
226                               u' '.join(filter(lambda s: s is not None, frag['content'])),
227                               Field.Store.YES, Field.Index.ANALYZED))
228                 doc.add(Field("themes",
229                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
230                               Field.Store.NO, Field.Index.ANALYZED))
231
232                 fragment_docs.append(doc)
233             elif start is not None:
234                 for frag in fragments.values():
235                     frag['content'].append(start.text)
236             elif end is not None:
237                 for frag in fragments.values():
238                     frag['content'].append(end.tail)
239
240         return header_docs + fragment_docs
241
242     def __enter__(self):
243         self.open()
244         return self
245
246     def __exit__(self, type, value, tb):
247         self.close()
248
249
250 def log_exception_wrapper(f):
251     def _wrap(*a):
252         try:
253             f(*a)
254         except Exception, e:
255             print("Error in indexing thread: %s" % e)
256             traceback.print_exc()
257             raise e
258     return _wrap
259
260
261 class ReusableIndex(Index):
262     """
263     Works like index, but does not close/optimize Lucene index
264     until program exit (uses atexit hook).
265     This is usefull for importbooks command.
266
267     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
268     """
269     index = None
270     pool = None
271     pool_jobs = None
272
273     def open(self, analyzer=None, threads=4):
274         if ReusableIndex.index is not None:
275             self.index = ReusableIndex.index
276         else:
277             print("opening index")
278             ReusableIndex.pool = ThreadPool(threads, initializer=lambda: JVM.attachCurrentThread() )
279             ReusableIndex.pool_jobs = []
280             Index.open(self, analyzer)
281             ReusableIndex.index = self.index
282             atexit.register(ReusableIndex.close_reusable)
283
284     def index_book(self, *args, **kw):
285         job = ReusableIndex.pool.apply_async(log_exception_wrapper(Index.index_book), (self,) + args, kw)
286         ReusableIndex.pool_jobs.append(job)
287
288     @staticmethod
289     def close_reusable():
290         if ReusableIndex.index is not None:
291             print("wait for indexing to finish")
292             for job in ReusableIndex.pool_jobs:
293                 job.get()
294                 sys.stdout.write('.')
295                 sys.stdout.flush()
296             print("done.")
297             ReusableIndex.pool.close()
298
299             ReusableIndex.index.optimize()
300             ReusableIndex.index.close()
301             ReusableIndex.index = None
302
303     def close(self):
304         pass
305
306
307 class Search(IndexStore):
308     def __init__(self, default_field="content"):
309         IndexStore.__init__(self)
310         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
311         ## self.analyzer = WLAnalyzer()
312         self.searcher = IndexSearcher(self.store, True)
313         self.parser = QueryParser(Version.LUCENE_34, default_field,
314                                   self.analyzer)
315
316         self.parent_filter = TermsFilter()
317         self.parent_filter.addTerm(Term("is_book", "true"))
318
319     def query(self, query):
320         return self.parser.parse(query)
321
322     def wrapjoins(self, query, fields=[]):
323         """
324         This functions modifies the query in a recursive way,
325         so Term and Phrase Queries contained, which match
326         provided fields are wrapped in a BlockJoinQuery,
327         and so delegated to children documents.
328         """
329         if BooleanQuery.instance_(query):
330             qs = BooleanQuery.cast_(query)
331             for clause in qs:
332                 clause = BooleanClause.cast_(clause)
333                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
334             return qs
335         else:
336             termset = HashSet()
337             query.extractTerms(termset)
338             for t in termset:
339                 t = Term.cast_(t)
340                 if t.field() not in fields:
341                     return query
342             return BlockJoinQuery(query, self.parent_filter,
343                                   BlockJoinQuery.ScoreMode.Total)
344
345     def simple_search(self, query, max_results=50):
346         """Returns (books, total_hits)
347         """
348
349         tops = self.searcher.search(self.query(query), max_results)
350         bks = []
351         for found in tops.scoreDocs:
352             doc = self.searcher.doc(found.doc)
353             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
354         return (bks, tops.totalHits)
355
356     def search(self, query, max_results=50):
357         query = self.query(query)
358         query = self.wrapjoins(query, ["content", "themes"])
359
360         tops = self.searcher.search(query, max_results)
361         bks = []
362         for found in tops.scoreDocs:
363             doc = self.searcher.doc(found.doc)
364             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
365         return (bks, tops.totalHits)
366
367     def bsearch(self, query, max_results=50):
368         q = self.query(query)
369         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
370
371         tops = self.searcher.search(bjq, max_results)
372         bks = []
373         for found in tops.scoreDocs:
374             doc = self.searcher.doc(found.doc)
375             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
376         return (bks, tops.totalHits)
377
378 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
379 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
380 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
381
382 # while (tokenStream.incrementToken()) {
383 #     int startOffset = offsetAttribute.startOffset();
384 #     int endOffset = offsetAttribute.endOffset();
385 #     String term = charTermAttribute.toString();
386 # }
387
388
389 class SearchResult(object):
390     def __init__(self, searcher, scoreDocs, score=None):
391         if score:
392             self.score = score
393         else:
394             self.score = scoreDocs.score
395
396         self.fragments = []
397         self.scores = {}
398         self.sections = []
399
400         stored = searcher.doc(scoreDocs.doc)
401         self.book_id = int(stored.get("book_id"))
402
403         fragment = stored.get("fragment_anchor")
404         if fragment:
405             self.fragments.append(fragment)
406             self.scores[fragment] = scoreDocs.score
407
408         header_type = stored.get("header_type")
409         if header_type:
410             sec = (header_type, int(stored.get("header_index")))
411             self.sections.append(sec)
412             self.scores[sec] = scoreDocs.score
413
414     def get_book(self):
415         return catalogue.models.Book.objects.get(id=self.book_id)
416
417     book = property(get_book)
418
419     def get_parts(self):
420         book = self.book
421         parts = [{"header": s[0], "position": s[1], '_score_key': s} for s in self.sections] \
422             + [{"fragment": book.fragments.get(anchor=f), '_score_key':f} for f in self.fragments]
423
424         parts.sort(lambda a, b: cmp(self.scores[a['_score_key']], self.scores[b['_score_key']]))
425         print("bookid: %d parts: %s" % (self.book_id, parts))
426         return parts
427
428     parts = property(get_parts)
429
430     def merge(self, other):
431         if self.book_id != other.book_id:
432             raise ValueError("this search result is or book %d; tried to merge with %d" % (self.book_id, other.book_id))
433         self.fragments += other.fragments
434         self.sections += other.sections
435         self.scores.update(other.scores)
436         if other.score > self.score:
437             self.score = other.score
438         return self
439
440     def __unicode__(self):
441         return u'SearchResult(book_id=%d, score=%d)' % (self.book_id, self.score)
442
443     @staticmethod
444     def aggregate(*result_lists):
445         books = {}
446         for rl in result_lists:
447             for r in rl:
448                 if r.book_id in books:
449                     books[r.book_id].merge(r)
450                     #print(u"already have one with score %f, and this one has score %f" % (books[book.id][0], found.score))
451                 else:
452                     books[r.book_id] = r
453         return books.values()
454
455     def __cmp__(self, other):
456         return cmp(self.score, other.score)
457
458
459 class MultiSearch(Search):
460     """Class capable of IMDb-like searching"""
461     def get_tokens(self, queryreader):
462         if isinstance(queryreader, str) or isinstance(queryreader, unicode):
463             queryreader = StringReader(queryreader)
464         queryreader.reset()
465         tokens = self.analyzer.reusableTokenStream('content', queryreader)
466         toks = []
467         while tokens.incrementToken():
468             cta = tokens.getAttribute(CharTermAttribute.class_)
469             toks.append(cta.toString())
470         return toks
471
472     def make_phrase(self, tokens, field='content', slop=2):
473         phrase = PhraseQuery()
474         phrase.setSlop(slop)
475         for t in tokens:
476             term = Term(field, t)
477             phrase.add(term)
478         return phrase
479
480     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD):
481         q = BooleanQuery()
482         for t in tokens:
483             term = Term(field, t)
484             q.add(BooleanClause(TermQuery(term), modal))
485         return q
486
487     def content_query(self, query):
488         return BlockJoinQuery(query, self.parent_filter,
489                               BlockJoinQuery.ScoreMode.Total)
490
491     def search_perfect_book(self, tokens, max_results=20):
492         qrys = [self.make_phrase(tokens, field=fld) for fld in ['author', 'title']]
493
494         books = []
495         for q in qrys:
496             top = self.searcher.search(q, max_results)
497             for found in top.scoreDocs:
498                 books.append(SearchResult(self.searcher, found))
499         return books
500
501     def search_perfect_parts(self, tokens, max_results=20):
502         qrys = [self.make_phrase(tokens, field=fld) for fld in ['content']]
503
504         books = []
505         for q in qrys:
506             top = self.searcher.search(q, max_results)
507             for found in top.scoreDocs:
508                 books.append(SearchResult(self.searcher, found))
509
510         return books
511
512     def search_everywhere(self, tokens, max_results=20):
513         q = BooleanQuery()
514         in_meta = BooleanQuery()
515         in_content = BooleanQuery()
516
517         for fld in ['themes', 'content']:
518             in_content.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
519
520         for fld in ['author', 'title', 'epochs', 'genres', 'kinds']:
521             in_meta.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
522
523         q.add(BooleanClause(in_meta, BooleanClause.Occur.MUST))
524         in_content_join = self.content_query(in_content)
525         q.add(BooleanClause(in_content_join, BooleanClause.Occur.MUST))
526
527         collector = BlockJoinCollector(Sort.RELEVANCE, 100, True, True)
528
529         self.searcher.search(q, collector)
530
531         books = []
532
533         top_groups = collector.getTopGroups(in_content_join, Sort.RELEVANCE, 0, max_results, 0, True)
534         if top_groups:
535             for grp in top_groups.groups:
536                 for part in grp.scoreDocs:
537                     books.append(SearchResult(self.searcher, part, score=grp.maxScore))
538         return books
539
540     def multisearch(self, query, max_results=50):
541         """
542         Search strategy:
543         - (phrase) OR -> content
544                       -> title
545                       -> author
546         - (keywords)  -> author
547                       -> motyw
548                       -> tags
549                       -> content
550         """
551         # queryreader = StringReader(query)
552         # tokens = self.get_tokens(queryreader)
553
554         # top_level = BooleanQuery()
555         # Should = BooleanClause.Occur.SHOULD
556
557         # phrase_level = BooleanQuery()
558         # phrase_level.setBoost(1.3)
559
560         # p_content = self.make_phrase(tokens, joined=True)
561         # p_title = self.make_phrase(tokens, 'title')
562         # p_author = self.make_phrase(tokens, 'author')
563
564         # phrase_level.add(BooleanClause(p_content, Should))
565         # phrase_level.add(BooleanClause(p_title, Should))
566         # phrase_level.add(BooleanClause(p_author, Should))
567
568         # kw_level = BooleanQuery()
569
570         # kw_level.add(self.make_term_query(tokens, 'author'), Should)
571         # j_themes = self.make_term_query(tokens, 'themes', joined=True)
572         # kw_level.add(j_themes, Should)
573         # kw_level.add(self.make_term_query(tokens, 'tags'), Should)
574         # j_con = self.make_term_query(tokens, joined=True)
575         # kw_level.add(j_con, Should)
576
577         # top_level.add(BooleanClause(phrase_level, Should))
578         # top_level.add(BooleanClause(kw_level, Should))
579
580         return None
581
582     
583     def do_search(self, query, max_results=50, collector=None):
584         tops = self.searcher.search(query, max_results)
585         #tops = self.searcher.search(p_content, max_results)
586
587         bks = []
588         for found in tops.scoreDocs:
589             doc = self.searcher.doc(found.doc)
590             b = catalogue.models.Book.objects.get(id=doc.get("book_id"))
591             bks.append(b)
592             print "%s (%d) -> %f" % (b, b.id, found.score)
593         return (bks, tops.totalHits)