reusable index. first attempt at rich search query
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2 from django.conf import settings
3 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
4     NumericField, Version, Document, JavaError, IndexSearcher, \
5     QueryParser, Term, PerFieldAnalyzerWrapper, \
6     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
7     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
8     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
9     HashSet, BooleanClause, Term, CharTermAttribute, \
10     PhraseQuery, StringReader
11     # KeywordAnalyzer
12 import os
13 import errno
14 from librarian import dcparser
15 from librarian.parser import WLDocument
16 import catalogue.models
17 from multiprocessing.pool import ThreadPool
18 import atexit
19
20
21 class WLAnalyzer(PerFieldAnalyzerWrapper):
22     def __init__(self):
23         polish = PolishAnalyzer(Version.LUCENE_34)
24         simple = SimpleAnalyzer(Version.LUCENE_34)
25         keyword = KeywordAnalyzer(Version.LUCENE_34)
26         # not sure if needed: there's NOT_ANALYZED meaning basically the same
27
28         PerFieldAnalyzerWrapper.__init__(self, polish)
29
30         self.addAnalyzer("tags", simple)
31         self.addAnalyzer("technical_editors", simple)
32         self.addAnalyzer("editors", simple)
33         self.addAnalyzer("url", keyword)
34         self.addAnalyzer("source_url", keyword)
35         self.addAnalyzer("source_name", simple)
36         self.addAnalyzer("publisher", simple)
37         self.addAnalyzer("author", simple)
38         self.addAnalyzer("is_book", keyword)
39
40         #self.addanalyzer("fragment_anchor", keyword)
41
42
43 class IndexStore(object):
44     def __init__(self):
45         self.make_index_dir()
46         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
47
48     def make_index_dir(self):
49         try:
50             os.makedirs(settings.SEARCH_INDEX)
51         except OSError as exc:
52             if exc.errno == errno.EEXIST:
53                 pass
54             else: raise
55
56
57 class Index(IndexStore):
58     def __init__(self, analyzer=None):
59         IndexStore.__init__(self)
60         self.index = None
61         if not analyzer:
62             analyzer = WLAnalyzer()
63         self.analyzer = analyzer
64
65     def open(self, analyzer=None):
66         if self.index:
67             raise Exception("Index is already opened")
68         self.index = IndexWriter(self.store, self.analyzer,\
69                                  IndexWriter.MaxFieldLength.LIMITED)
70         return self.index
71
72     def close(self):
73         self.index.optimize()
74         self.index.close()
75         self.index = None
76
77     def remove_book(self, book):
78         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
79         self.index.deleteDocuments(q)
80
81     def index_book(self, book, overwrite=True):
82         if overwrite:
83             self.remove_book(book)
84
85         doc = self.extract_metadata(book)
86         parts = self.extract_content(book)
87         block = ArrayList().of_(Document)
88
89         for p in parts:
90             block.add(p)
91         block.add(doc)
92         self.index.addDocuments(block)
93
94     master_tags = [
95         'opowiadanie',
96         'powiesc',
97         'dramat_wierszowany_l',
98         'dramat_wierszowany_lp',
99         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
100         'wywiad'
101         ]
102
103     skip_header_tags = ['autor_utworu', 'nazwa_utworu']
104
105     def create_book_doc(self, book):
106         """
107         Create a lucene document connected to the book
108         """
109         doc = Document()
110         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
111         if book.parent is not None:
112             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
113         return doc
114
115     def extract_metadata(self, book):
116         book_info = dcparser.parse(book.xml_file)
117
118         doc = self.create_book_doc(book)
119         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
120         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
121         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
122
123         # validator, name
124         for field in dcparser.BookInfo.FIELDS:
125             if hasattr(book_info, field.name):
126                 if not getattr(book_info, field.name):
127                     continue
128                 # since no type information is available, we use validator
129                 type_indicator = field.validator
130                 if type_indicator == dcparser.as_unicode:
131                     s = getattr(book_info, field.name)
132                     if field.multiple:
133                         s = ', '.join(s)
134                     try:
135                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
136                     except JavaError as je:
137                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
138                 elif type_indicator == dcparser.as_person:
139                     p = getattr(book_info, field.name)
140                     if isinstance(p, dcparser.Person):
141                         persons = unicode(p)
142                     else:
143                         persons = ', '.join(map(unicode, p))
144                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
145                 elif type_indicator == dcparser.as_date:
146                     dt = getattr(book_info, field.name)
147                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
148         return doc
149
150     def get_master(self, root):
151         for master in root.iter():
152             if master.tag in self.master_tags:
153                 return master
154
155     
156     def extract_content(self, book):
157         wld = WLDocument.from_file(book.xml_file.path)
158         root = wld.edoc.getroot()
159
160         # first we build a sequence of top-level items.
161         # book_id
162         # header_index - the 0-indexed position of header element.
163         # content
164         master = self.get_master(root)
165         if master is None:
166             return []
167         
168         header_docs = []
169         for header, position in zip(list(master), range(len(master))):
170             if header.tag in self.skip_header_tags:
171                 continue
172             doc = self.create_book_doc(book)
173             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
174             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
175             content = u' '.join([t for t in header.itertext()])
176             doc.add(Field("content", content, Field.Store.NO, Field.Index.ANALYZED))
177             header_docs.append(doc)
178
179         def walker(node):
180             yield node, None
181             for child in list(node):
182                 for b, e in walker(child):
183                     yield b, e
184             yield None, node
185             return
186
187         # Then we create a document for each fragments
188         # fragment_anchor - the anchor
189         # themes - list of themes [not indexed]
190         fragment_docs = []
191         # will contain (framgent id -> { content: [], themes: [] }
192         fragments = {}
193         for start, end in walker(master):
194             if start is not None and start.tag == 'begin':
195                 fid = start.attrib['id'][1:]
196                 fragments[fid] = {'content': [], 'themes': []}
197                 fragments[fid]['content'].append(start.tail)
198             elif start is not None and start.tag == 'motyw':
199                 fid = start.attrib['id'][1:]
200                 fragments[fid]['themes'].append(start.text)
201                 fragments[fid]['content'].append(start.tail)
202             elif start is not None and start.tag == 'end':
203                 fid = start.attrib['id'][1:]
204                 if fid not in fragments:
205                     continue  # a broken <end> node, skip it
206                 frag = fragments[fid]
207                 del fragments[fid]
208
209                 def jstr(l):
210                     return u' '.join(map(
211                         lambda x: x == None and u'(none)' or unicode(x),
212                         l))
213
214                 doc = self.create_book_doc(book)
215                 doc.add(Field("fragment_anchor", fid,
216                               Field.Store.YES, Field.Index.NOT_ANALYZED))
217                 doc.add(Field("content",
218                               u' '.join(filter(lambda s: s is not None, frag['content'])),
219                               Field.Store.NO, Field.Index.ANALYZED))
220                 doc.add(Field("themes",
221                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
222                               Field.Store.NO, Field.Index.ANALYZED))
223
224                 fragment_docs.append(doc)
225             elif start is not None:
226                 for frag in fragments.values():
227                     frag['content'].append(start.text)
228             elif end is not None:
229                 for frag in fragments.values():
230                     frag['content'].append(end.tail)
231
232         return header_docs + fragment_docs
233
234     def __enter__(self):
235         self.open()
236         return self
237
238     def __exit__(self, type, value, tb):
239         self.close()
240
241
242 class ReusableIndex(Index):
243     """
244     Works like index, but does not close/optimize Lucene index
245     until program exit (uses atexit hook).
246     This is usefull for importbooks command.
247
248     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
249     """
250     index = None
251     pool = None
252     pool_jobs = None
253
254     def open(self, analyzer=None, threads=4):
255         if ReusableIndex.index is not None:
256             self.index = ReusableIndex.index
257         else:
258             ReusableIndex.pool = ThreadPool(threads)
259             ReusableIndex.pool_jobs = []
260             Index.open(self, analyzer)
261             ReusableIndex.index = self.index
262             atexit.register(ReusableIndex.close_reusable)
263
264     def index_book(self, *args, **kw):
265         job = ReusableIndex.pool.apply_async(Index.index_book, args, kw)
266         ReusableIndex.pool_jobs.append(job)
267
268     @staticmethod
269     def close_reusable():
270         if ReusableIndex.index is not None:
271             for job in ReusableIndex.pool_jobs:
272                 job.wait()
273             ReusableIndex.pool.close()
274
275             ReusableIndex.index.optimize()
276             ReusableIndex.index.close()
277             ReusableIndex.index = None
278
279     def close(self):
280         pass
281
282
283 class Search(IndexStore):
284     def __init__(self, default_field="content"):
285         IndexStore.__init__(self)
286         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
287         ## self.analyzer = WLAnalyzer()
288         self.searcher = IndexSearcher(self.store, True)
289         self.parser = QueryParser(Version.LUCENE_34, default_field,
290                                   self.analyzer)
291
292         self.parent_filter = TermsFilter()
293         self.parent_filter.addTerm(Term("is_book", "true"))
294
295     def query(self, query):
296         return self.parser.parse(query)
297
298     def wrapjoins(self, query, fields=[]):
299         """
300         This functions modifies the query in a recursive way,
301         so Term and Phrase Queries contained, which match
302         provided fields are wrapped in a BlockJoinQuery,
303         and so delegated to children documents.
304         """
305         if BooleanQuery.instance_(query):
306             qs = BooleanQuery.cast_(query)
307             for clause in qs:
308                 clause = BooleanClause.cast_(clause)
309                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
310             return qs
311         else:
312             termset = HashSet()
313             query.extractTerms(termset)
314             for t in termset:
315                 t = Term.cast_(t)
316                 if t.field() not in fields:
317                     return query
318             return BlockJoinQuery(query, self.parent_filter,
319                                   BlockJoinQuery.ScoreMode.Total)
320
321     def simple_search(self, query, max_results=50):
322         """Returns (books, total_hits)
323         """
324
325         tops = self.searcher.search(self.query(query), max_results)
326         bks = []
327         for found in tops.scoreDocs:
328             doc = self.searcher.doc(found.doc)
329             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
330         return (bks, tops.totalHits)
331
332     def search(self, query, max_results=50):
333         query = self.query(query)
334         query = self.wrapjoins(query, ["content", "themes"])
335
336         tops = self.searcher.search(query, max_results)
337         bks = []
338         for found in tops.scoreDocs:
339             doc = self.searcher.doc(found.doc)
340             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
341         return (bks, tops.totalHits)
342
343     def bsearch(self, query, max_results=50):
344         q = self.query(query)
345         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
346
347         tops = self.searcher.search(bjq, max_results)
348         bks = []
349         for found in tops.scoreDocs:
350             doc = self.searcher.doc(found.doc)
351             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
352         return (bks, tops.totalHits)
353
354 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
355 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
356 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
357
358 # while (tokenStream.incrementToken()) {
359 #     int startOffset = offsetAttribute.startOffset();
360 #     int endOffset = offsetAttribute.endOffset();
361 #     String term = charTermAttribute.toString();
362 # }
363
364
365 class MultiSearch(Search):
366     """Class capable of IMDb-like searching"""
367     def get_tokens(self, queryreader):
368         if isinstance(queryreader, str):
369             queryreader = StringReader(queryreader)
370         queryreader.reset()
371         tokens = self.analyzer.reusableTokenStream('content', queryreader)
372         toks = []
373         while tokens.incrementToken():
374             cta = tokens.getAttribute(CharTermAttribute.class_)
375             toks.append(cta)
376         return toks
377
378     def make_phrase(self, tokens, field='content', joined=False):
379         phrase = PhraseQuery()
380         for t in tokens:
381             term = Term(field, t)
382             phrase.add(term)
383         if joined:
384             phrase = self.content_query(phrase)
385         return phrase
386
387     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD, joined=False):
388         q = BooleanQuery()
389         for t in tokens:
390             term = Term(field, t)
391             q.add(BooleanClause(term, modal))
392         if joined:
393             self.content_query(q)
394         return q
395
396     def content_query(self, query):
397         return BlockJoinQuery(query, self.parent_filter,
398                               BlockJoinQuery.ScoreMode.Total)
399
400     def multiseach(self, query, max_results=50):
401         """
402         Search strategy:
403         - (phrase) OR -> content
404                       -> title
405                       -> author
406         - (keywords)  -> author
407                       -> motyw
408                       -> tags
409                       -> content
410         """
411         queryreader = StringReader(query)
412         tokens = self.get_tokens(queryreader)
413
414         top_level = BooleanQuery()
415         Should = BooleanClause.Occur.SHOULD
416
417         phrase_level = BooleanQuery()
418
419         p_content = self.make_phrase(tokens, joined=True)
420         p_title = self.make_phrase(tokens, 'title')
421         p_author = self.make_phrase(tokens, 'author')
422
423         phrase_level.add(BooleanClause(p_content, Should))
424         phrase_level.add(BooleanClause(p_title, Should))
425         phrase_level.add(BooleanClause(p_author, Should))
426
427         kw_level = BooleanQuery()
428
429         kw_level.add(self.make_term_query(tokens, 'author'), Should)
430         kw_level.add(self.make_term_query(tokens, 'themes', joined=True), Should)
431         kw_level.add(self.make_term_query(tokens, 'tags'), Should)
432         kw_level.add(self.make_term_query(tokens, joined=True), Should)
433
434         top_level.add(BooleanClause(phrase_level, Should))
435         top_level.add(BooleanClause(kw_level, Should))
436
437         tops = self.searcher.search(top_level, max_results)
438         bks = []
439         for found in tops.scoreDocs:
440             doc = self.searcher.doc(found.doc)
441             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
442         return (bks, tops.totalHits)