multisearch fails to see joined queries -> will try to use one toplevel
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2 from django.conf import settings
3 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
4     NumericField, Version, Document, JavaError, IndexSearcher, \
5     QueryParser, Term, PerFieldAnalyzerWrapper, \
6     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
7     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
8     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
9     HashSet, BooleanClause, Term, CharTermAttribute, \
10     PhraseQuery, StringReader, TermQuery, BlockJoinQuery, \
11     Sort
12     # KeywordAnalyzer
13 import sys
14 import os
15 import errno
16 from librarian import dcparser
17 from librarian.parser import WLDocument
18 import catalogue.models
19 from multiprocessing.pool import ThreadPool
20 from threading import current_thread
21 import atexit
22
23
24 class WLAnalyzer(PerFieldAnalyzerWrapper):
25     def __init__(self):
26         polish = PolishAnalyzer(Version.LUCENE_34)
27         simple = SimpleAnalyzer(Version.LUCENE_34)
28         keyword = KeywordAnalyzer(Version.LUCENE_34)
29         # not sure if needed: there's NOT_ANALYZED meaning basically the same
30
31         PerFieldAnalyzerWrapper.__init__(self, polish)
32
33         self.addAnalyzer("tags", simple)
34         self.addAnalyzer("technical_editors", simple)
35         self.addAnalyzer("editors", simple)
36         self.addAnalyzer("url", keyword)
37         self.addAnalyzer("source_url", keyword)
38         self.addAnalyzer("source_name", simple)
39         self.addAnalyzer("publisher", simple)
40         self.addAnalyzer("author", simple)
41         self.addAnalyzer("is_book", keyword)
42
43         #self.addanalyzer("fragment_anchor", keyword)
44
45
46 class IndexStore(object):
47     def __init__(self):
48         self.make_index_dir()
49         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
50
51     def make_index_dir(self):
52         try:
53             os.makedirs(settings.SEARCH_INDEX)
54         except OSError as exc:
55             if exc.errno == errno.EEXIST:
56                 pass
57             else: raise
58
59
60 class Index(IndexStore):
61     def __init__(self, analyzer=None):
62         IndexStore.__init__(self)
63         self.index = None
64         if not analyzer:
65             analyzer = WLAnalyzer()
66         self.analyzer = analyzer
67
68     def open(self, analyzer=None):
69         if self.index:
70             raise Exception("Index is already opened")
71         self.index = IndexWriter(self.store, self.analyzer,\
72                                  IndexWriter.MaxFieldLength.LIMITED)
73         return self.index
74
75     def close(self):
76         self.index.optimize()
77         self.index.close()
78         self.index = None
79
80     def remove_book(self, book):
81         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
82         self.index.deleteDocuments(q)
83
84     def index_book(self, book, overwrite=True):
85         if overwrite:
86             self.remove_book(book)
87
88
89         doc = self.extract_metadata(book)
90         parts = self.extract_content(book)
91         block = ArrayList().of_(Document)
92
93         for p in parts:
94             block.add(p)
95         block.add(doc)
96         self.index.addDocuments(block)
97
98     master_tags = [
99         'opowiadanie',
100         'powiesc',
101         'dramat_wierszowany_l',
102         'dramat_wierszowany_lp',
103         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
104         'wywiad'
105         ]
106
107     skip_header_tags = ['autor_utworu', 'nazwa_utworu']
108
109     def create_book_doc(self, book):
110         """
111         Create a lucene document connected to the book
112         """
113         doc = Document()
114         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
115         if book.parent is not None:
116             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
117         return doc
118
119     def extract_metadata(self, book):
120         book_info = dcparser.parse(book.xml_file)
121
122         print("extract metadata for book %s id=%d, thread%d" % (book.slug, book.id, current_thread().ident))
123         
124         doc = self.create_book_doc(book)
125         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
126         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
127         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
128
129         # validator, name
130         for field in dcparser.BookInfo.FIELDS:
131             if hasattr(book_info, field.name):
132                 if not getattr(book_info, field.name):
133                     continue
134                 # since no type information is available, we use validator
135                 type_indicator = field.validator
136                 if type_indicator == dcparser.as_unicode:
137                     s = getattr(book_info, field.name)
138                     if field.multiple:
139                         s = ', '.join(s)
140                     try:
141                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
142                     except JavaError as je:
143                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
144                 elif type_indicator == dcparser.as_person:
145                     p = getattr(book_info, field.name)
146                     if isinstance(p, dcparser.Person):
147                         persons = unicode(p)
148                     else:
149                         persons = ', '.join(map(unicode, p))
150                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
151                 elif type_indicator == dcparser.as_date:
152                     dt = getattr(book_info, field.name)
153                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
154         return doc
155
156     def get_master(self, root):
157         for master in root.iter():
158             if master.tag in self.master_tags:
159                 return master
160
161     
162     def extract_content(self, book):
163         wld = WLDocument.from_file(book.xml_file.path)
164         root = wld.edoc.getroot()
165
166         # first we build a sequence of top-level items.
167         # book_id
168         # header_index - the 0-indexed position of header element.
169         # content
170         master = self.get_master(root)
171         if master is None:
172             return []
173         
174         header_docs = []
175         for header, position in zip(list(master), range(len(master))):
176             if header.tag in self.skip_header_tags:
177                 continue
178             doc = self.create_book_doc(book)
179             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
180             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
181             content = u' '.join([t for t in header.itertext()])
182             doc.add(Field("content", content, Field.Store.NO, Field.Index.ANALYZED))
183             header_docs.append(doc)
184
185         def walker(node):
186             yield node, None
187             for child in list(node):
188                 for b, e in walker(child):
189                     yield b, e
190             yield None, node
191             return
192
193         # Then we create a document for each fragments
194         # fragment_anchor - the anchor
195         # themes - list of themes [not indexed]
196         fragment_docs = []
197         # will contain (framgent id -> { content: [], themes: [] }
198         fragments = {}
199         for start, end in walker(master):
200             if start is not None and start.tag == 'begin':
201                 fid = start.attrib['id'][1:]
202                 fragments[fid] = {'content': [], 'themes': []}
203                 fragments[fid]['content'].append(start.tail)
204             elif start is not None and start.tag == 'motyw':
205                 fid = start.attrib['id'][1:]
206                 fragments[fid]['themes'].append(start.text)
207                 fragments[fid]['content'].append(start.tail)
208             elif start is not None and start.tag == 'end':
209                 fid = start.attrib['id'][1:]
210                 if fid not in fragments:
211                     continue  # a broken <end> node, skip it
212                 frag = fragments[fid]
213                 del fragments[fid]
214
215                 def jstr(l):
216                     return u' '.join(map(
217                         lambda x: x == None and u'(none)' or unicode(x),
218                         l))
219
220                 doc = self.create_book_doc(book)
221                 doc.add(Field("fragment_anchor", fid,
222                               Field.Store.YES, Field.Index.NOT_ANALYZED))
223                 doc.add(Field("content",
224                               u' '.join(filter(lambda s: s is not None, frag['content'])),
225                               Field.Store.NO, Field.Index.ANALYZED))
226                 doc.add(Field("themes",
227                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
228                               Field.Store.NO, Field.Index.ANALYZED))
229
230                 fragment_docs.append(doc)
231             elif start is not None:
232                 for frag in fragments.values():
233                     frag['content'].append(start.text)
234             elif end is not None:
235                 for frag in fragments.values():
236                     frag['content'].append(end.tail)
237
238         return header_docs + fragment_docs
239
240     def __enter__(self):
241         self.open()
242         return self
243
244     def __exit__(self, type, value, tb):
245         self.close()
246
247
248 class ReusableIndex(Index):
249     """
250     Works like index, but does not close/optimize Lucene index
251     until program exit (uses atexit hook).
252     This is usefull for importbooks command.
253
254     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
255     """
256     index = None
257     pool = None
258     pool_jobs = None
259
260     def open(self, analyzer=None, threads=4):
261         if ReusableIndex.index is not None:
262             self.index = ReusableIndex.index
263         else:
264             print("opening index")
265             ReusableIndex.pool = ThreadPool(threads)
266             ReusableIndex.pool_jobs = []
267             Index.open(self, analyzer)
268             ReusableIndex.index = self.index
269             atexit.register(ReusableIndex.close_reusable)
270
271     def index_book(self, *args, **kw):
272         job = ReusableIndex.pool.apply_async(Index.index_book, (self,)+ args, kw)
273         ReusableIndex.pool_jobs.append(job)
274
275     @staticmethod
276     def close_reusable():
277         if ReusableIndex.index is not None:
278             print("closing index")
279             for job in ReusableIndex.pool_jobs:
280                 job.wait()
281             ReusableIndex.pool.close()
282
283             ReusableIndex.index.optimize()
284             ReusableIndex.index.close()
285             ReusableIndex.index = None
286
287     def close(self):
288         pass
289
290
291 class Search(IndexStore):
292     def __init__(self, default_field="content"):
293         IndexStore.__init__(self)
294         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
295         ## self.analyzer = WLAnalyzer()
296         self.searcher = IndexSearcher(self.store, True)
297         self.parser = QueryParser(Version.LUCENE_34, default_field,
298                                   self.analyzer)
299
300         self.parent_filter = TermsFilter()
301         self.parent_filter.addTerm(Term("is_book", "true"))
302
303     def query(self, query):
304         return self.parser.parse(query)
305
306     def wrapjoins(self, query, fields=[]):
307         """
308         This functions modifies the query in a recursive way,
309         so Term and Phrase Queries contained, which match
310         provided fields are wrapped in a BlockJoinQuery,
311         and so delegated to children documents.
312         """
313         if BooleanQuery.instance_(query):
314             qs = BooleanQuery.cast_(query)
315             for clause in qs:
316                 clause = BooleanClause.cast_(clause)
317                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
318             return qs
319         else:
320             termset = HashSet()
321             query.extractTerms(termset)
322             for t in termset:
323                 t = Term.cast_(t)
324                 if t.field() not in fields:
325                     return query
326             return BlockJoinQuery(query, self.parent_filter,
327                                   BlockJoinQuery.ScoreMode.Total)
328
329     def simple_search(self, query, max_results=50):
330         """Returns (books, total_hits)
331         """
332
333         tops = self.searcher.search(self.query(query), max_results)
334         bks = []
335         for found in tops.scoreDocs:
336             doc = self.searcher.doc(found.doc)
337             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
338         return (bks, tops.totalHits)
339
340     def search(self, query, max_results=50):
341         query = self.query(query)
342         query = self.wrapjoins(query, ["content", "themes"])
343
344         tops = self.searcher.search(query, max_results)
345         bks = []
346         for found in tops.scoreDocs:
347             doc = self.searcher.doc(found.doc)
348             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
349         return (bks, tops.totalHits)
350
351     def bsearch(self, query, max_results=50):
352         q = self.query(query)
353         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
354
355         tops = self.searcher.search(bjq, max_results)
356         bks = []
357         for found in tops.scoreDocs:
358             doc = self.searcher.doc(found.doc)
359             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
360         return (bks, tops.totalHits)
361
362 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
363 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
364 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
365
366 # while (tokenStream.incrementToken()) {
367 #     int startOffset = offsetAttribute.startOffset();
368 #     int endOffset = offsetAttribute.endOffset();
369 #     String term = charTermAttribute.toString();
370 # }
371
372
373 class MultiSearch(Search):
374     """Class capable of IMDb-like searching"""
375     def get_tokens(self, queryreader):
376         if isinstance(queryreader, str):
377             queryreader = StringReader(queryreader)
378         queryreader.reset()
379         tokens = self.analyzer.reusableTokenStream('content', queryreader)
380         toks = []
381         while tokens.incrementToken():
382             cta = tokens.getAttribute(CharTermAttribute.class_)
383             toks.append(cta.toString())
384         return toks
385
386     def make_phrase(self, tokens, field='content', joined=False, slop=2):
387         phrase = PhraseQuery()
388         phrase.setSlop(slop)
389         for t in tokens:
390             term = Term(field, t)
391             phrase.add(term)
392         if joined:
393             phrase = self.content_query(phrase)
394         return phrase
395
396     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD, joined=False):
397         q = BooleanQuery()
398         for t in tokens:
399             term = Term(field, t)
400             q.add(BooleanClause(TermQuery(term), modal))
401         if joined:
402             q = self.content_query(q)
403         return q
404
405     def content_query(self, query):
406         return BlockJoinQuery(query, self.parent_filter,
407                               BlockJoinQuery.ScoreMode.Total)
408
409     def multisearch(self, query, max_results=50):
410         """
411         Search strategy:
412         - (phrase) OR -> content
413                       -> title
414                       -> author
415         - (keywords)  -> author
416                       -> motyw
417                       -> tags
418                       -> content
419         """
420         queryreader = StringReader(query)
421         tokens = self.get_tokens(queryreader)
422
423         top_level = BooleanQuery()
424         Should = BooleanClause.Occur.SHOULD
425
426         phrase_level = BooleanQuery()
427         phrase_level.setBoost(1.3)
428
429         p_content = self.make_phrase(tokens, joined=True)
430         p_title = self.makxe_phrase(tokens, 'title')
431         p_author = self.make_phrase(tokens, 'author')
432
433         phrase_level.add(BooleanClause(p_content, Should))
434         phrase_level.add(BooleanClause(p_title, Should))
435         phrase_level.add(BooleanClause(p_author, Should))
436
437         kw_level = BooleanQuery()
438
439         kw_level.add(self.make_term_query(tokens, 'author'), Should)
440         j_themes = self.make_term_query(tokens, 'themes', joined=True)
441         kw_level.add(j_themes, Should)
442         kw_level.add(self.make_term_query(tokens, 'tags'), Should)
443         j_con = self.make_term_query(tokens, joined=True)
444         kw_level.add(j_con, Should)
445
446         top_level.add(BooleanClause(phrase_level, Should))
447         top_level.add(BooleanClause(kw_level, Should))
448
449         collector = BlockJoinCollector(Sort.RELEVANCE, 100, True, True)
450
451         self.searcher.search(kw_level, collector)
452
453         # frazy w treści:
454         # ph1 = collector.getTopGroups(j_themes, Sort.RELEVANCE,
455         #                                        0, 10, 0, True)
456         #  reload(search.index); realod(search); s = search.MultiSearch(); s.multisearch(u'dusiołek')       
457         #        ph2 = collector.getTopGroups(j_con, Sort.RELEVANCE,
458         #                                     0, 10, 0, True)
459
460         import pdb; pdb.set_trace();
461         
462         return None
463
464     
465     def do_search(self, query, max_results=50, collector=None):
466         tops = self.searcher.search(query, max_results)
467         #tops = self.searcher.search(p_content, max_results)
468
469         bks = []
470         for found in tops.scoreDocs:
471             doc = self.searcher.doc(found.doc)
472             b = catalogue.models.Book.objects.get(id=doc.get("book_id"))
473             bks.append(b)
474             print "%s (%d) -> %f" % (b, b.id, found.score)
475         return (bks, tops.totalHits)