async indexing, fixes
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2 from django.conf import settings
3 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
4     NumericField, Version, Document, JavaError, IndexSearcher, \
5     QueryParser, Term, PerFieldAnalyzerWrapper, \
6     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
7     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
8     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
9     HashSet, BooleanClause, Term, CharTermAttribute, \
10     PhraseQuery, StringReader
11     # KeywordAnalyzer
12 import sys
13 import os
14 import errno
15 from librarian import dcparser
16 from librarian.parser import WLDocument
17 import catalogue.models
18 from multiprocessing.pool import ThreadPool
19 import atexit
20
21
22 class WLAnalyzer(PerFieldAnalyzerWrapper):
23     def __init__(self):
24         polish = PolishAnalyzer(Version.LUCENE_34)
25         simple = SimpleAnalyzer(Version.LUCENE_34)
26         keyword = KeywordAnalyzer(Version.LUCENE_34)
27         # not sure if needed: there's NOT_ANALYZED meaning basically the same
28
29         PerFieldAnalyzerWrapper.__init__(self, polish)
30
31         self.addAnalyzer("tags", simple)
32         self.addAnalyzer("technical_editors", simple)
33         self.addAnalyzer("editors", simple)
34         self.addAnalyzer("url", keyword)
35         self.addAnalyzer("source_url", keyword)
36         self.addAnalyzer("source_name", simple)
37         self.addAnalyzer("publisher", simple)
38         self.addAnalyzer("author", simple)
39         self.addAnalyzer("is_book", keyword)
40
41         #self.addanalyzer("fragment_anchor", keyword)
42
43
44 class IndexStore(object):
45     def __init__(self):
46         self.make_index_dir()
47         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
48
49     def make_index_dir(self):
50         try:
51             os.makedirs(settings.SEARCH_INDEX)
52         except OSError as exc:
53             if exc.errno == errno.EEXIST:
54                 pass
55             else: raise
56
57
58 class Index(IndexStore):
59     def __init__(self, analyzer=None):
60         IndexStore.__init__(self)
61         self.index = None
62         if not analyzer:
63             analyzer = WLAnalyzer()
64         self.analyzer = analyzer
65
66     def open(self, analyzer=None):
67         if self.index:
68             raise Exception("Index is already opened")
69         self.index = IndexWriter(self.store, self.analyzer,\
70                                  IndexWriter.MaxFieldLength.LIMITED)
71         return self.index
72
73     def close(self):
74         self.index.optimize()
75         self.index.close()
76         self.index = None
77
78     def remove_book(self, book):
79         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
80         self.index.deleteDocuments(q)
81
82     def index_book(self, book, overwrite=True):
83         if overwrite:
84             self.remove_book(book)
85             
86
87         doc = self.extract_metadata(book)
88         parts = self.extract_content(book)
89         block = ArrayList().of_(Document)
90
91         for p in parts:
92             block.add(p)
93         block.add(doc)
94         self.index.addDocuments(block)
95
96     master_tags = [
97         'opowiadanie',
98         'powiesc',
99         'dramat_wierszowany_l',
100         'dramat_wierszowany_lp',
101         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
102         'wywiad'
103         ]
104
105     skip_header_tags = ['autor_utworu', 'nazwa_utworu']
106
107     def create_book_doc(self, book):
108         """
109         Create a lucene document connected to the book
110         """
111         doc = Document()
112         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
113         if book.parent is not None:
114             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
115         return doc
116
117     def extract_metadata(self, book):
118         book_info = dcparser.parse(book.xml_file)
119
120         doc = self.create_book_doc(book)
121         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
122         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
123         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
124
125         # validator, name
126         for field in dcparser.BookInfo.FIELDS:
127             if hasattr(book_info, field.name):
128                 if not getattr(book_info, field.name):
129                     continue
130                 # since no type information is available, we use validator
131                 type_indicator = field.validator
132                 if type_indicator == dcparser.as_unicode:
133                     s = getattr(book_info, field.name)
134                     if field.multiple:
135                         s = ', '.join(s)
136                     try:
137                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
138                     except JavaError as je:
139                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
140                 elif type_indicator == dcparser.as_person:
141                     p = getattr(book_info, field.name)
142                     if isinstance(p, dcparser.Person):
143                         persons = unicode(p)
144                     else:
145                         persons = ', '.join(map(unicode, p))
146                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
147                 elif type_indicator == dcparser.as_date:
148                     dt = getattr(book_info, field.name)
149                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
150         return doc
151
152     def get_master(self, root):
153         for master in root.iter():
154             if master.tag in self.master_tags:
155                 return master
156
157     
158     def extract_content(self, book):
159         wld = WLDocument.from_file(book.xml_file.path)
160         root = wld.edoc.getroot()
161
162         # first we build a sequence of top-level items.
163         # book_id
164         # header_index - the 0-indexed position of header element.
165         # content
166         master = self.get_master(root)
167         if master is None:
168             return []
169         
170         header_docs = []
171         for header, position in zip(list(master), range(len(master))):
172             if header.tag in self.skip_header_tags:
173                 continue
174             doc = self.create_book_doc(book)
175             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
176             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
177             content = u' '.join([t for t in header.itertext()])
178             doc.add(Field("content", content, Field.Store.NO, Field.Index.ANALYZED))
179             header_docs.append(doc)
180
181         def walker(node):
182             yield node, None
183             for child in list(node):
184                 for b, e in walker(child):
185                     yield b, e
186             yield None, node
187             return
188
189         # Then we create a document for each fragments
190         # fragment_anchor - the anchor
191         # themes - list of themes [not indexed]
192         fragment_docs = []
193         # will contain (framgent id -> { content: [], themes: [] }
194         fragments = {}
195         for start, end in walker(master):
196             if start is not None and start.tag == 'begin':
197                 fid = start.attrib['id'][1:]
198                 fragments[fid] = {'content': [], 'themes': []}
199                 fragments[fid]['content'].append(start.tail)
200             elif start is not None and start.tag == 'motyw':
201                 fid = start.attrib['id'][1:]
202                 fragments[fid]['themes'].append(start.text)
203                 fragments[fid]['content'].append(start.tail)
204             elif start is not None and start.tag == 'end':
205                 fid = start.attrib['id'][1:]
206                 if fid not in fragments:
207                     continue  # a broken <end> node, skip it
208                 frag = fragments[fid]
209                 del fragments[fid]
210
211                 def jstr(l):
212                     return u' '.join(map(
213                         lambda x: x == None and u'(none)' or unicode(x),
214                         l))
215
216                 doc = self.create_book_doc(book)
217                 doc.add(Field("fragment_anchor", fid,
218                               Field.Store.YES, Field.Index.NOT_ANALYZED))
219                 doc.add(Field("content",
220                               u' '.join(filter(lambda s: s is not None, frag['content'])),
221                               Field.Store.NO, Field.Index.ANALYZED))
222                 doc.add(Field("themes",
223                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
224                               Field.Store.NO, Field.Index.ANALYZED))
225
226                 fragment_docs.append(doc)
227             elif start is not None:
228                 for frag in fragments.values():
229                     frag['content'].append(start.text)
230             elif end is not None:
231                 for frag in fragments.values():
232                     frag['content'].append(end.tail)
233
234         return header_docs + fragment_docs
235
236     def __enter__(self):
237         self.open()
238         return self
239
240     def __exit__(self, type, value, tb):
241         self.close()
242
243
244 class ReusableIndex(Index):
245     """
246     Works like index, but does not close/optimize Lucene index
247     until program exit (uses atexit hook).
248     This is usefull for importbooks command.
249
250     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
251     """
252     index = None
253     pool = None
254     pool_jobs = None
255
256     def open(self, analyzer=None, threads=4):
257         if ReusableIndex.index is not None:
258             self.index = ReusableIndex.index
259         else:
260             ReusableIndex.pool = ThreadPool(threads)
261             ReusableIndex.pool_jobs = []
262             Index.open(self, analyzer)
263             ReusableIndex.index = self.index
264             atexit.register(ReusableIndex.close_reusable)
265
266     def index_book(self, *args, **kw):
267         job = ReusableIndex.pool.apply_async(Index.index_book, (self,)+ args, kw)
268         ReusableIndex.pool_jobs.append(job)
269
270     @staticmethod
271     def close_reusable():
272         import pdb; pdb.set_trace()
273         if ReusableIndex.index is not None:
274             for job in ReusableIndex.pool_jobs:
275                 job.wait()
276             ReusableIndex.pool.close()
277
278             ReusableIndex.index.optimize()
279             ReusableIndex.index.close()
280             ReusableIndex.index = None
281
282     def close(self):
283         pass
284
285
286 class Search(IndexStore):
287     def __init__(self, default_field="content"):
288         IndexStore.__init__(self)
289         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
290         ## self.analyzer = WLAnalyzer()
291         self.searcher = IndexSearcher(self.store, True)
292         self.parser = QueryParser(Version.LUCENE_34, default_field,
293                                   self.analyzer)
294
295         self.parent_filter = TermsFilter()
296         self.parent_filter.addTerm(Term("is_book", "true"))
297
298     def query(self, query):
299         return self.parser.parse(query)
300
301     def wrapjoins(self, query, fields=[]):
302         """
303         This functions modifies the query in a recursive way,
304         so Term and Phrase Queries contained, which match
305         provided fields are wrapped in a BlockJoinQuery,
306         and so delegated to children documents.
307         """
308         if BooleanQuery.instance_(query):
309             qs = BooleanQuery.cast_(query)
310             for clause in qs:
311                 clause = BooleanClause.cast_(clause)
312                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
313             return qs
314         else:
315             termset = HashSet()
316             query.extractTerms(termset)
317             for t in termset:
318                 t = Term.cast_(t)
319                 if t.field() not in fields:
320                     return query
321             return BlockJoinQuery(query, self.parent_filter,
322                                   BlockJoinQuery.ScoreMode.Total)
323
324     def simple_search(self, query, max_results=50):
325         """Returns (books, total_hits)
326         """
327
328         tops = self.searcher.search(self.query(query), max_results)
329         bks = []
330         for found in tops.scoreDocs:
331             doc = self.searcher.doc(found.doc)
332             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
333         return (bks, tops.totalHits)
334
335     def search(self, query, max_results=50):
336         query = self.query(query)
337         query = self.wrapjoins(query, ["content", "themes"])
338
339         tops = self.searcher.search(query, max_results)
340         bks = []
341         for found in tops.scoreDocs:
342             doc = self.searcher.doc(found.doc)
343             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
344         return (bks, tops.totalHits)
345
346     def bsearch(self, query, max_results=50):
347         q = self.query(query)
348         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
349
350         tops = self.searcher.search(bjq, max_results)
351         bks = []
352         for found in tops.scoreDocs:
353             doc = self.searcher.doc(found.doc)
354             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
355         return (bks, tops.totalHits)
356
357 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
358 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
359 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
360
361 # while (tokenStream.incrementToken()) {
362 #     int startOffset = offsetAttribute.startOffset();
363 #     int endOffset = offsetAttribute.endOffset();
364 #     String term = charTermAttribute.toString();
365 # }
366
367
368 class MultiSearch(Search):
369     """Class capable of IMDb-like searching"""
370     def get_tokens(self, queryreader):
371         if isinstance(queryreader, str):
372             queryreader = StringReader(queryreader)
373         queryreader.reset()
374         tokens = self.analyzer.reusableTokenStream('content', queryreader)
375         toks = []
376         while tokens.incrementToken():
377             cta = tokens.getAttribute(CharTermAttribute.class_)
378             toks.append(cta)
379         return toks
380
381     def make_phrase(self, tokens, field='content', joined=False):
382         phrase = PhraseQuery()
383         for t in tokens:
384             term = Term(field, t)
385             phrase.add(term)
386         if joined:
387             phrase = self.content_query(phrase)
388         return phrase
389
390     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD, joined=False):
391         q = BooleanQuery()
392         for t in tokens:
393             term = Term(field, t)
394             q.add(BooleanClause(term, modal))
395         if joined:
396             self.content_query(q)
397         return q
398
399     def content_query(self, query):
400         return BlockJoinQuery(query, self.parent_filter,
401                               BlockJoinQuery.ScoreMode.Total)
402
403     def multiseach(self, query, max_results=50):
404         """
405         Search strategy:
406         - (phrase) OR -> content
407                       -> title
408                       -> author
409         - (keywords)  -> author
410                       -> motyw
411                       -> tags
412                       -> content
413         """
414         queryreader = StringReader(query)
415         tokens = self.get_tokens(queryreader)
416
417         top_level = BooleanQuery()
418         Should = BooleanClause.Occur.SHOULD
419
420         phrase_level = BooleanQuery()
421
422         p_content = self.make_phrase(tokens, joined=True)
423         p_title = self.make_phrase(tokens, 'title')
424         p_author = self.make_phrase(tokens, 'author')
425
426         phrase_level.add(BooleanClause(p_content, Should))
427         phrase_level.add(BooleanClause(p_title, Should))
428         phrase_level.add(BooleanClause(p_author, Should))
429
430         kw_level = BooleanQuery()
431
432         kw_level.add(self.make_term_query(tokens, 'author'), Should)
433         kw_level.add(self.make_term_query(tokens, 'themes', joined=True), Should)
434         kw_level.add(self.make_term_query(tokens, 'tags'), Should)
435         kw_level.add(self.make_term_query(tokens, joined=True), Should)
436
437         top_level.add(BooleanClause(phrase_level, Should))
438         top_level.add(BooleanClause(kw_level, Should))
439
440         tops = self.searcher.search(top_level, max_results)
441         bks = []
442         for found in tops.scoreDocs:
443             doc = self.searcher.doc(found.doc)
444             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
445         return (bks, tops.totalHits)