aggregate & display results
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2 from django.conf import settings
3 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
4     NumericField, Version, Document, JavaError, IndexSearcher, \
5     QueryParser, Term, PerFieldAnalyzerWrapper, \
6     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
7     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
8     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
9     HashSet, BooleanClause, Term, CharTermAttribute, \
10     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
11     Sort, Integer
12     # KeywordAnalyzer
13 import sys
14 import os
15 import errno
16 from librarian import dcparser
17 from librarian.parser import WLDocument
18 import catalogue.models
19 from multiprocessing.pool import ThreadPool
20 from threading import current_thread
21 import atexit
22
23
24 class WLAnalyzer(PerFieldAnalyzerWrapper):
25     def __init__(self):
26         polish = PolishAnalyzer(Version.LUCENE_34)
27         simple = SimpleAnalyzer(Version.LUCENE_34)
28         keyword = KeywordAnalyzer(Version.LUCENE_34)
29         # not sure if needed: there's NOT_ANALYZED meaning basically the same
30
31         PerFieldAnalyzerWrapper.__init__(self, polish)
32
33         self.addAnalyzer("tags", simple)
34         self.addAnalyzer("technical_editors", simple)
35         self.addAnalyzer("editors", simple)
36         self.addAnalyzer("url", keyword)
37         self.addAnalyzer("source_url", keyword)
38         self.addAnalyzer("source_name", simple)
39         self.addAnalyzer("publisher", simple)
40         self.addAnalyzer("author", simple)
41         self.addAnalyzer("is_book", keyword)
42
43         #self.addanalyzer("fragment_anchor", keyword)
44
45
46 class IndexStore(object):
47     def __init__(self):
48         self.make_index_dir()
49         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
50
51     def make_index_dir(self):
52         try:
53             os.makedirs(settings.SEARCH_INDEX)
54         except OSError as exc:
55             if exc.errno == errno.EEXIST:
56                 pass
57             else: raise
58
59
60 class Index(IndexStore):
61     def __init__(self, analyzer=None):
62         IndexStore.__init__(self)
63         self.index = None
64         if not analyzer:
65             analyzer = WLAnalyzer()
66         self.analyzer = analyzer
67
68     def open(self, analyzer=None):
69         if self.index:
70             raise Exception("Index is already opened")
71         self.index = IndexWriter(self.store, self.analyzer,\
72                                  IndexWriter.MaxFieldLength.LIMITED)
73         return self.index
74
75     def close(self):
76         self.index.optimize()
77         self.index.close()
78         self.index = None
79
80     def remove_book(self, book):
81         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
82         self.index.deleteDocuments(q)
83
84     def index_book(self, book, overwrite=True):
85         if overwrite:
86             self.remove_book(book)
87
88
89         doc = self.extract_metadata(book)
90         parts = self.extract_content(book)
91         block = ArrayList().of_(Document)
92
93         for p in parts:
94             block.add(p)
95         block.add(doc)
96         self.index.addDocuments(block)
97
98     master_tags = [
99         'opowiadanie',
100         'powiesc',
101         'dramat_wierszowany_l',
102         'dramat_wierszowany_lp',
103         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
104         'wywiad'
105         ]
106
107     skip_header_tags = ['autor_utworu', 'nazwa_utworu', 'dzielo_nadrzedne']
108
109     def create_book_doc(self, book):
110         """
111         Create a lucene document connected to the book
112         """
113         doc = Document()
114         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
115         if book.parent is not None:
116             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
117         return doc
118
119     def extract_metadata(self, book):
120         book_info = dcparser.parse(book.xml_file)
121
122         print("extract metadata for book %s id=%d, thread%d" % (book.slug, book.id, current_thread().ident))
123         
124         doc = self.create_book_doc(book)
125         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
126         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
127         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
128
129         # validator, name
130         for field in dcparser.BookInfo.FIELDS:
131             if hasattr(book_info, field.name):
132                 if not getattr(book_info, field.name):
133                     continue
134                 # since no type information is available, we use validator
135                 type_indicator = field.validator
136                 if type_indicator == dcparser.as_unicode:
137                     s = getattr(book_info, field.name)
138                     if field.multiple:
139                         s = ', '.join(s)
140                     try:
141                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
142                     except JavaError as je:
143                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
144                 elif type_indicator == dcparser.as_person:
145                     p = getattr(book_info, field.name)
146                     if isinstance(p, dcparser.Person):
147                         persons = unicode(p)
148                     else:
149                         persons = ', '.join(map(unicode, p))
150                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
151                 elif type_indicator == dcparser.as_date:
152                     dt = getattr(book_info, field.name)
153                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
154         return doc
155
156     def get_master(self, root):
157         for master in root.iter():
158             if master.tag in self.master_tags:
159                 return master
160
161     
162     def extract_content(self, book):
163         wld = WLDocument.from_file(book.xml_file.path)
164         root = wld.edoc.getroot()
165
166         # first we build a sequence of top-level items.
167         # book_id
168         # header_index - the 0-indexed position of header element.
169         # content
170         master = self.get_master(root)
171         if master is None:
172             return []
173         
174         header_docs = []
175         for header, position in zip(list(master), range(len(master))):
176             if header.tag in self.skip_header_tags:
177                 continue
178             doc = self.create_book_doc(book)
179             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
180             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
181             content = u' '.join([t for t in header.itertext()])
182             doc.add(Field("content", content, Field.Store.YES, Field.Index.ANALYZED))
183             header_docs.append(doc)
184
185         def walker(node):
186             yield node, None
187             for child in list(node):
188                 for b, e in walker(child):
189                     yield b, e
190             yield None, node
191             return
192
193         # Then we create a document for each fragments
194         # fragment_anchor - the anchor
195         # themes - list of themes [not indexed]
196         fragment_docs = []
197         # will contain (framgent id -> { content: [], themes: [] }
198         fragments = {}
199         for start, end in walker(master):
200             if start is not None and start.tag == 'begin':
201                 fid = start.attrib['id'][1:]
202                 fragments[fid] = {'content': [], 'themes': []}
203                 fragments[fid]['content'].append(start.tail)
204             elif start is not None and start.tag == 'motyw':
205                 fid = start.attrib['id'][1:]
206                 fragments[fid]['themes'].append(start.text)
207                 fragments[fid]['content'].append(start.tail)
208             elif start is not None and start.tag == 'end':
209                 fid = start.attrib['id'][1:]
210                 if fid not in fragments:
211                     continue  # a broken <end> node, skip it
212                 frag = fragments[fid]
213                 del fragments[fid]
214
215                 def jstr(l):
216                     return u' '.join(map(
217                         lambda x: x == None and u'(none)' or unicode(x),
218                         l))
219
220                 doc = self.create_book_doc(book)
221                 doc.add(Field("fragment_anchor", fid,
222                               Field.Store.YES, Field.Index.NOT_ANALYZED))
223                 doc.add(Field("content",
224                               u' '.join(filter(lambda s: s is not None, frag['content'])),
225                               Field.Store.YES, Field.Index.ANALYZED))
226                 doc.add(Field("themes",
227                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
228                               Field.Store.NO, Field.Index.ANALYZED))
229
230                 fragment_docs.append(doc)
231             elif start is not None:
232                 for frag in fragments.values():
233                     frag['content'].append(start.text)
234             elif end is not None:
235                 for frag in fragments.values():
236                     frag['content'].append(end.tail)
237
238         return header_docs + fragment_docs
239
240     def __enter__(self):
241         self.open()
242         return self
243
244     def __exit__(self, type, value, tb):
245         self.close()
246
247
248 class ReusableIndex(Index):
249     """
250     Works like index, but does not close/optimize Lucene index
251     until program exit (uses atexit hook).
252     This is usefull for importbooks command.
253
254     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
255     """
256     index = None
257     pool = None
258     pool_jobs = None
259
260     def open(self, analyzer=None, threads=4):
261         if ReusableIndex.index is not None:
262             self.index = ReusableIndex.index
263         else:
264             print("opening index")
265             ReusableIndex.pool = ThreadPool(threads)
266             ReusableIndex.pool_jobs = []
267             Index.open(self, analyzer)
268             ReusableIndex.index = self.index
269             atexit.register(ReusableIndex.close_reusable)
270
271     def index_book(self, *args, **kw):
272         job = ReusableIndex.pool.apply_async(Index.index_book, (self,)+ args, kw)
273         ReusableIndex.pool_jobs.append(job)
274
275     @staticmethod
276     def close_reusable():
277         if ReusableIndex.index is not None:
278             print("closing index")
279             for job in ReusableIndex.pool_jobs:
280                 job.wait()
281             ReusableIndex.pool.close()
282
283             ReusableIndex.index.optimize()
284             ReusableIndex.index.close()
285             ReusableIndex.index = None
286
287     def close(self):
288         pass
289
290
291 class Search(IndexStore):
292     def __init__(self, default_field="content"):
293         IndexStore.__init__(self)
294         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
295         ## self.analyzer = WLAnalyzer()
296         self.searcher = IndexSearcher(self.store, True)
297         self.parser = QueryParser(Version.LUCENE_34, default_field,
298                                   self.analyzer)
299
300         self.parent_filter = TermsFilter()
301         self.parent_filter.addTerm(Term("is_book", "true"))
302
303     def query(self, query):
304         return self.parser.parse(query)
305
306     def wrapjoins(self, query, fields=[]):
307         """
308         This functions modifies the query in a recursive way,
309         so Term and Phrase Queries contained, which match
310         provided fields are wrapped in a BlockJoinQuery,
311         and so delegated to children documents.
312         """
313         if BooleanQuery.instance_(query):
314             qs = BooleanQuery.cast_(query)
315             for clause in qs:
316                 clause = BooleanClause.cast_(clause)
317                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
318             return qs
319         else:
320             termset = HashSet()
321             query.extractTerms(termset)
322             for t in termset:
323                 t = Term.cast_(t)
324                 if t.field() not in fields:
325                     return query
326             return BlockJoinQuery(query, self.parent_filter,
327                                   BlockJoinQuery.ScoreMode.Total)
328
329     def simple_search(self, query, max_results=50):
330         """Returns (books, total_hits)
331         """
332
333         tops = self.searcher.search(self.query(query), max_results)
334         bks = []
335         for found in tops.scoreDocs:
336             doc = self.searcher.doc(found.doc)
337             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
338         return (bks, tops.totalHits)
339
340     def search(self, query, max_results=50):
341         query = self.query(query)
342         query = self.wrapjoins(query, ["content", "themes"])
343
344         tops = self.searcher.search(query, max_results)
345         bks = []
346         for found in tops.scoreDocs:
347             doc = self.searcher.doc(found.doc)
348             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
349         return (bks, tops.totalHits)
350
351     def bsearch(self, query, max_results=50):
352         q = self.query(query)
353         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
354
355         tops = self.searcher.search(bjq, max_results)
356         bks = []
357         for found in tops.scoreDocs:
358             doc = self.searcher.doc(found.doc)
359             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
360         return (bks, tops.totalHits)
361
362 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
363 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
364 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
365
366 # while (tokenStream.incrementToken()) {
367 #     int startOffset = offsetAttribute.startOffset();
368 #     int endOffset = offsetAttribute.endOffset();
369 #     String term = charTermAttribute.toString();
370 # }
371
372
373 class SearchResult(object):
374     def __init__(self, searcher, scoreDocs, score=None):
375         if score:
376             self.score = score
377         else:
378             self.score = scoreDocs.score
379
380         self.fragments = []
381         self.scores = {}
382         self.sections = []
383
384         stored = searcher.doc(scoreDocs.doc)
385         self.book_id = int(stored.get("book_id"))
386
387         fragment = stored.get("fragment_anchor")
388         if fragment:
389             self.fragments.append(fragment)
390             self.scores[fragment] = scoreDocs.score
391
392         header_type = stored.get("header_type")
393         if header_type:
394             sec = (header_type, int(stored.get("header_index")))
395             self.sections.append(sec)
396             self.scores[sec] = scoreDocs.score
397
398     def get_book(self):
399         return catalogue.models.Book.objects.get(id=self.book_id)
400
401     book = property(get_book)
402
403     def get_parts(self):
404         book = self.book
405         parts = [{"header": s[0], "position": s[1], '_score_key': s} for s in self.sections] \
406             + [{"fragment": book.fragments.get(anchor=f), '_score_key':f} for f in self.fragments]
407
408         parts.sort(lambda a, b: cmp(self.scores[a['_score_key']], self.scores[b['_score_key']]))
409         print("bookid: %d parts: %s" % (self.book_id, parts))
410         return parts
411
412     parts = property(get_parts)
413
414     def merge(self, other):
415         if self.book_id != other.book_id:
416             raise ValueError("this search result is or book %d; tried to merge with %d" % (self.book_id, other.book_id))
417         self.fragments += other.fragments
418         self.sections += other.sections
419         self.scores.update(other.scores)
420         if other.score > self.score:
421             self.score = other.score
422         return self
423
424     def __unicode__(self):
425         return u'SearchResult(book_id=%d, score=%d)' % (self.book_id, self.score)
426
427     @staticmethod
428     def aggregate(*result_lists):
429         books = {}
430         for rl in result_lists:
431             for r in rl:
432                 if r.book_id in books:
433                     books[r.book_id].merge(r)
434                     #print(u"already have one with score %f, and this one has score %f" % (books[book.id][0], found.score))
435                 else:
436                     books[r.book_id] = r
437         return books.values()
438
439     def __cmp__(self, other):
440         return cmp(self.score, other.score)
441
442
443 class MultiSearch(Search):
444     """Class capable of IMDb-like searching"""
445     def get_tokens(self, queryreader):
446         if isinstance(queryreader, str) or isinstance(queryreader, unicode):
447             queryreader = StringReader(queryreader)
448         queryreader.reset()
449         tokens = self.analyzer.reusableTokenStream('content', queryreader)
450         toks = []
451         while tokens.incrementToken():
452             cta = tokens.getAttribute(CharTermAttribute.class_)
453             toks.append(cta.toString())
454         return toks
455
456     def make_phrase(self, tokens, field='content', slop=2):
457         phrase = PhraseQuery()
458         phrase.setSlop(slop)
459         for t in tokens:
460             term = Term(field, t)
461             phrase.add(term)
462         return phrase
463
464     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD):
465         q = BooleanQuery()
466         for t in tokens:
467             term = Term(field, t)
468             q.add(BooleanClause(TermQuery(term), modal))
469         return q
470
471     def content_query(self, query):
472         return BlockJoinQuery(query, self.parent_filter,
473                               BlockJoinQuery.ScoreMode.Total)
474
475     def search_perfect_book(self, tokens, max_results=20):
476         qrys = [self.make_phrase(tokens, field=fld) for fld in ['author', 'title']]
477
478         books = []
479         for q in qrys:
480             top = self.searcher.search(q, max_results)
481             for found in top.scoreDocs:
482                 books.append(SearchResult(self.searcher, found))
483         return books
484
485     def search_perfect_parts(self, tokens, max_results=20):
486         qrys = [self.make_phrase(tokens, field=fld) for fld in ['content']]
487
488         books = []
489         for q in qrys:
490             top = self.searcher.search(q, max_results)
491             for found in top.scoreDocs:
492                 books.append(SearchResult(self.searcher, found))
493
494         return books
495
496     def search_everywhere(self, tokens, max_results=20):
497         q = BooleanQuery()
498         in_meta = BooleanQuery()
499         in_content = BooleanQuery()
500
501         for fld in ['themes', 'content']:
502             in_content.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
503
504         for fld in ['author', 'title', 'epochs', 'genres', 'kinds']:
505             in_meta.add(BooleanClause(self.make_term_query(tokens, field=fld), BooleanClause.Occur.SHOULD))
506
507         q.add(BooleanClause(in_meta, BooleanClause.Occur.MUST))
508         in_content_join = self.content_query(in_content)
509         q.add(BooleanClause(in_content_join, BooleanClause.Occur.MUST))
510
511         collector = BlockJoinCollector(Sort.RELEVANCE, 100, True, True)
512
513         self.searcher.search(q, collector)
514
515         books = []
516
517         top_groups = collector.getTopGroups(in_content_join, Sort.RELEVANCE, 0, max_results, 0, True)
518         if top_groups:
519             for grp in top_groups.groups:
520                 for part in grp.scoreDocs:
521                     books.append(SearchResult(self.searcher, part, score=grp.maxScore))
522         return books
523
524     def multisearch(self, query, max_results=50):
525         """
526         Search strategy:
527         - (phrase) OR -> content
528                       -> title
529                       -> author
530         - (keywords)  -> author
531                       -> motyw
532                       -> tags
533                       -> content
534         """
535         # queryreader = StringReader(query)
536         # tokens = self.get_tokens(queryreader)
537
538         # top_level = BooleanQuery()
539         # Should = BooleanClause.Occur.SHOULD
540
541         # phrase_level = BooleanQuery()
542         # phrase_level.setBoost(1.3)
543
544         # p_content = self.make_phrase(tokens, joined=True)
545         # p_title = self.make_phrase(tokens, 'title')
546         # p_author = self.make_phrase(tokens, 'author')
547
548         # phrase_level.add(BooleanClause(p_content, Should))
549         # phrase_level.add(BooleanClause(p_title, Should))
550         # phrase_level.add(BooleanClause(p_author, Should))
551
552         # kw_level = BooleanQuery()
553
554         # kw_level.add(self.make_term_query(tokens, 'author'), Should)
555         # j_themes = self.make_term_query(tokens, 'themes', joined=True)
556         # kw_level.add(j_themes, Should)
557         # kw_level.add(self.make_term_query(tokens, 'tags'), Should)
558         # j_con = self.make_term_query(tokens, joined=True)
559         # kw_level.add(j_con, Should)
560
561         # top_level.add(BooleanClause(phrase_level, Should))
562         # top_level.add(BooleanClause(kw_level, Should))
563
564         return None
565
566     
567     def do_search(self, query, max_results=50, collector=None):
568         tops = self.searcher.search(query, max_results)
569         #tops = self.searcher.search(p_content, max_results)
570
571         bks = []
572         for found in tops.scoreDocs:
573             doc = self.searcher.doc(found.doc)
574             b = catalogue.models.Book.objects.get(id=doc.get("book_id"))
575             bks.append(b)
576             print "%s (%d) -> %f" % (b, b.id, found.score)
577         return (bks, tops.totalHits)