index indictor
[wolnelektury.git] / apps / search / index.py
1 # -*- coding: utf-8 -*-
2 from django.conf import settings
3 from lucene import SimpleFSDirectory, IndexWriter, File, Field, \
4     NumericField, Version, Document, JavaError, IndexSearcher, \
5     QueryParser, Term, PerFieldAnalyzerWrapper, \
6     SimpleAnalyzer, PolishAnalyzer, ArrayList, \
7     KeywordAnalyzer, NumericRangeQuery, BooleanQuery, \
8     BlockJoinQuery, BlockJoinCollector, TermsFilter, \
9     HashSet, BooleanClause, Term, CharTermAttribute, \
10     PhraseQuery, StringReader
11     # KeywordAnalyzer
12 import sys
13 import os
14 import errno
15 from librarian import dcparser
16 from librarian.parser import WLDocument
17 import catalogue.models
18 from multiprocessing.pool import ThreadPool
19 import atexit
20
21
22 class WLAnalyzer(PerFieldAnalyzerWrapper):
23     def __init__(self):
24         polish = PolishAnalyzer(Version.LUCENE_34)
25         simple = SimpleAnalyzer(Version.LUCENE_34)
26         keyword = KeywordAnalyzer(Version.LUCENE_34)
27         # not sure if needed: there's NOT_ANALYZED meaning basically the same
28
29         PerFieldAnalyzerWrapper.__init__(self, polish)
30
31         self.addAnalyzer("tags", simple)
32         self.addAnalyzer("technical_editors", simple)
33         self.addAnalyzer("editors", simple)
34         self.addAnalyzer("url", keyword)
35         self.addAnalyzer("source_url", keyword)
36         self.addAnalyzer("source_name", simple)
37         self.addAnalyzer("publisher", simple)
38         self.addAnalyzer("author", simple)
39         self.addAnalyzer("is_book", keyword)
40
41         #self.addanalyzer("fragment_anchor", keyword)
42
43
44 class IndexStore(object):
45     def __init__(self):
46         self.make_index_dir()
47         self.store = SimpleFSDirectory(File(settings.SEARCH_INDEX))
48
49     def make_index_dir(self):
50         try:
51             os.makedirs(settings.SEARCH_INDEX)
52         except OSError as exc:
53             if exc.errno == errno.EEXIST:
54                 pass
55             else: raise
56
57
58 class Index(IndexStore):
59     def __init__(self, analyzer=None):
60         IndexStore.__init__(self)
61         self.index = None
62         if not analyzer:
63             analyzer = WLAnalyzer()
64         self.analyzer = analyzer
65
66     def open(self, analyzer=None):
67         if self.index:
68             raise Exception("Index is already opened")
69         self.index = IndexWriter(self.store, self.analyzer,\
70                                  IndexWriter.MaxFieldLength.LIMITED)
71         return self.index
72
73     def close(self):
74         self.index.optimize()
75         self.index.close()
76         self.index = None
77
78     def remove_book(self, book):
79         q = NumericRangeQuery.newIntRange("book_id", book.id, book.id, True,True)
80         self.index.deleteDocuments(q)
81
82     def index_book(self, book, overwrite=True):
83         if overwrite:
84             self.remove_book(book)
85
86         doc = self.extract_metadata(book)
87         parts = self.extract_content(book)
88         block = ArrayList().of_(Document)
89
90         for p in parts:
91             block.add(p)
92         block.add(doc)
93         self.index.addDocuments(block)
94
95     master_tags = [
96         'opowiadanie',
97         'powiesc',
98         'dramat_wierszowany_l',
99         'dramat_wierszowany_lp',
100         'dramat_wspolczesny', 'liryka_l', 'liryka_lp',
101         'wywiad'
102         ]
103
104     skip_header_tags = ['autor_utworu', 'nazwa_utworu']
105
106     def create_book_doc(self, book):
107         """
108         Create a lucene document connected to the book
109         """
110         doc = Document()
111         doc.add(NumericField("book_id", Field.Store.YES, True).setIntValue(book.id))
112         if book.parent is not None:
113             doc.add(NumericField("parent_id", Field.Store.YES, True).setIntValue(book.parent.id))
114         return doc
115
116     def extract_metadata(self, book):
117         book_info = dcparser.parse(book.xml_file)
118
119         doc = self.create_book_doc(book)
120         doc.add(Field("slug", book.slug, Field.Store.NO, Field.Index.ANALYZED_NO_NORMS))
121         doc.add(Field("tags", ','.join([t.name for t in book.tags]), Field.Store.NO, Field.Index.ANALYZED))
122         doc.add(Field("is_book", 'true', Field.Store.NO, Field.Index.NOT_ANALYZED))
123
124         # validator, name
125         for field in dcparser.BookInfo.FIELDS:
126             if hasattr(book_info, field.name):
127                 if not getattr(book_info, field.name):
128                     continue
129                 # since no type information is available, we use validator
130                 type_indicator = field.validator
131                 if type_indicator == dcparser.as_unicode:
132                     s = getattr(book_info, field.name)
133                     if field.multiple:
134                         s = ', '.join(s)
135                     try:
136                         doc.add(Field(field.name, s, Field.Store.NO, Field.Index.ANALYZED))
137                     except JavaError as je:
138                         raise Exception("failed to add field: %s = '%s', %s(%s)" % (field.name, s, je.message, je.args))
139                 elif type_indicator == dcparser.as_person:
140                     p = getattr(book_info, field.name)
141                     if isinstance(p, dcparser.Person):
142                         persons = unicode(p)
143                     else:
144                         persons = ', '.join(map(unicode, p))
145                     doc.add(Field(field.name, persons, Field.Store.NO, Field.Index.ANALYZED))
146                 elif type_indicator == dcparser.as_date:
147                     dt = getattr(book_info, field.name)
148                     doc.add(Field(field.name, "%04d%02d%02d" % (dt.year, dt.month, dt.day), Field.Store.NO, Field.Index.NOT_ANALYZED))
149         return doc
150
151     def get_master(self, root):
152         for master in root.iter():
153             if master.tag in self.master_tags:
154                 return master
155
156     
157     def extract_content(self, book):
158         wld = WLDocument.from_file(book.xml_file.path)
159         root = wld.edoc.getroot()
160
161         # first we build a sequence of top-level items.
162         # book_id
163         # header_index - the 0-indexed position of header element.
164         # content
165         master = self.get_master(root)
166         if master is None:
167             return []
168         
169         header_docs = []
170         for header, position in zip(list(master), range(len(master))):
171             if header.tag in self.skip_header_tags:
172                 continue
173             doc = self.create_book_doc(book)
174             doc.add(NumericField("header_index", Field.Store.YES, True).setIntValue(position))
175             doc.add(Field("header_type", header.tag, Field.Store.YES, Field.Index.NOT_ANALYZED))
176             content = u' '.join([t for t in header.itertext()])
177             doc.add(Field("content", content, Field.Store.NO, Field.Index.ANALYZED))
178             header_docs.append(doc)
179
180         def walker(node):
181             yield node, None
182             for child in list(node):
183                 for b, e in walker(child):
184                     yield b, e
185             yield None, node
186             return
187
188         # Then we create a document for each fragments
189         # fragment_anchor - the anchor
190         # themes - list of themes [not indexed]
191         fragment_docs = []
192         # will contain (framgent id -> { content: [], themes: [] }
193         fragments = {}
194         for start, end in walker(master):
195             if start is not None and start.tag == 'begin':
196                 fid = start.attrib['id'][1:]
197                 fragments[fid] = {'content': [], 'themes': []}
198                 fragments[fid]['content'].append(start.tail)
199             elif start is not None and start.tag == 'motyw':
200                 fid = start.attrib['id'][1:]
201                 fragments[fid]['themes'].append(start.text)
202                 fragments[fid]['content'].append(start.tail)
203             elif start is not None and start.tag == 'end':
204                 fid = start.attrib['id'][1:]
205                 if fid not in fragments:
206                     continue  # a broken <end> node, skip it
207                 frag = fragments[fid]
208                 del fragments[fid]
209
210                 def jstr(l):
211                     return u' '.join(map(
212                         lambda x: x == None and u'(none)' or unicode(x),
213                         l))
214
215                 doc = self.create_book_doc(book)
216                 doc.add(Field("fragment_anchor", fid,
217                               Field.Store.YES, Field.Index.NOT_ANALYZED))
218                 doc.add(Field("content",
219                               u' '.join(filter(lambda s: s is not None, frag['content'])),
220                               Field.Store.NO, Field.Index.ANALYZED))
221                 doc.add(Field("themes",
222                               u' '.join(filter(lambda s: s is not None, frag['themes'])),
223                               Field.Store.NO, Field.Index.ANALYZED))
224
225                 fragment_docs.append(doc)
226             elif start is not None:
227                 for frag in fragments.values():
228                     frag['content'].append(start.text)
229             elif end is not None:
230                 for frag in fragments.values():
231                     frag['content'].append(end.tail)
232
233         return header_docs + fragment_docs
234
235     def __enter__(self):
236         self.open()
237         return self
238
239     def __exit__(self, type, value, tb):
240         self.close()
241
242
243 class ReusableIndex(Index):
244     """
245     Works like index, but does not close/optimize Lucene index
246     until program exit (uses atexit hook).
247     This is usefull for importbooks command.
248
249     if you cannot rely on atexit, use ReusableIndex.close_reusable() yourself.
250     """
251     index = None
252     pool = None
253     pool_jobs = None
254
255     def open(self, analyzer=None, threads=4):
256         if ReusableIndex.index is not None:
257             self.index = ReusableIndex.index
258         else:
259             ReusableIndex.pool = ThreadPool(threads)
260             ReusableIndex.pool_jobs = []
261             Index.open(self, analyzer)
262             ReusableIndex.index = self.index
263             atexit.register(ReusableIndex.close_reusable)
264
265     def index_book(self, *args, **kw):
266         job = ReusableIndex.pool.apply_async(Index.index_book, args, kw)
267         ReusableIndex.pool_jobs.append(job)
268
269     @staticmethod
270     def close_reusable():
271         if ReusableIndex.index is not None:
272             all_jobs = len(ReusableIndex.pool_jobs)
273             waited=1
274             for job in ReusableIndex.pool_jobs:
275                 sys.stdout.write("\rWaiting for search index job: %d/%d..." % 
276                 job.wait()
277                 waited+=1
278             print("Indexing done.")
279             ReusableIndex.pool.close()
280
281             ReusableIndex.index.optimize()
282             ReusableIndex.index.close()
283             ReusableIndex.index = None
284
285     def close(self):
286         pass
287
288
289 class Search(IndexStore):
290     def __init__(self, default_field="content"):
291         IndexStore.__init__(self)
292         self.analyzer = PolishAnalyzer(Version.LUCENE_34)
293         ## self.analyzer = WLAnalyzer()
294         self.searcher = IndexSearcher(self.store, True)
295         self.parser = QueryParser(Version.LUCENE_34, default_field,
296                                   self.analyzer)
297
298         self.parent_filter = TermsFilter()
299         self.parent_filter.addTerm(Term("is_book", "true"))
300
301     def query(self, query):
302         return self.parser.parse(query)
303
304     def wrapjoins(self, query, fields=[]):
305         """
306         This functions modifies the query in a recursive way,
307         so Term and Phrase Queries contained, which match
308         provided fields are wrapped in a BlockJoinQuery,
309         and so delegated to children documents.
310         """
311         if BooleanQuery.instance_(query):
312             qs = BooleanQuery.cast_(query)
313             for clause in qs:
314                 clause = BooleanClause.cast_(clause)
315                 clause.setQuery(self.wrapjoins(clause.getQuery(), fields))
316             return qs
317         else:
318             termset = HashSet()
319             query.extractTerms(termset)
320             for t in termset:
321                 t = Term.cast_(t)
322                 if t.field() not in fields:
323                     return query
324             return BlockJoinQuery(query, self.parent_filter,
325                                   BlockJoinQuery.ScoreMode.Total)
326
327     def simple_search(self, query, max_results=50):
328         """Returns (books, total_hits)
329         """
330
331         tops = self.searcher.search(self.query(query), max_results)
332         bks = []
333         for found in tops.scoreDocs:
334             doc = self.searcher.doc(found.doc)
335             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
336         return (bks, tops.totalHits)
337
338     def search(self, query, max_results=50):
339         query = self.query(query)
340         query = self.wrapjoins(query, ["content", "themes"])
341
342         tops = self.searcher.search(query, max_results)
343         bks = []
344         for found in tops.scoreDocs:
345             doc = self.searcher.doc(found.doc)
346             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
347         return (bks, tops.totalHits)
348
349     def bsearch(self, query, max_results=50):
350         q = self.query(query)
351         bjq = BlockJoinQuery(q, self.parent_filter, BlockJoinQuery.ScoreMode.Avg)
352
353         tops = self.searcher.search(bjq, max_results)
354         bks = []
355         for found in tops.scoreDocs:
356             doc = self.searcher.doc(found.doc)
357             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
358         return (bks, tops.totalHits)
359
360 # TokenStream tokenStream = analyzer.tokenStream(fieldName, reader);
361 # OffsetAttribute offsetAttribute = tokenStream.getAttribute(OffsetAttribute.class);
362 # CharTermAttribute charTermAttribute = tokenStream.getAttribute(CharTermAttribute.class);
363
364 # while (tokenStream.incrementToken()) {
365 #     int startOffset = offsetAttribute.startOffset();
366 #     int endOffset = offsetAttribute.endOffset();
367 #     String term = charTermAttribute.toString();
368 # }
369
370
371 class MultiSearch(Search):
372     """Class capable of IMDb-like searching"""
373     def get_tokens(self, queryreader):
374         if isinstance(queryreader, str):
375             queryreader = StringReader(queryreader)
376         queryreader.reset()
377         tokens = self.analyzer.reusableTokenStream('content', queryreader)
378         toks = []
379         while tokens.incrementToken():
380             cta = tokens.getAttribute(CharTermAttribute.class_)
381             toks.append(cta)
382         return toks
383
384     def make_phrase(self, tokens, field='content', joined=False):
385         phrase = PhraseQuery()
386         for t in tokens:
387             term = Term(field, t)
388             phrase.add(term)
389         if joined:
390             phrase = self.content_query(phrase)
391         return phrase
392
393     def make_term_query(self, tokens, field='content', modal=BooleanClause.Occur.SHOULD, joined=False):
394         q = BooleanQuery()
395         for t in tokens:
396             term = Term(field, t)
397             q.add(BooleanClause(term, modal))
398         if joined:
399             self.content_query(q)
400         return q
401
402     def content_query(self, query):
403         return BlockJoinQuery(query, self.parent_filter,
404                               BlockJoinQuery.ScoreMode.Total)
405
406     def multiseach(self, query, max_results=50):
407         """
408         Search strategy:
409         - (phrase) OR -> content
410                       -> title
411                       -> author
412         - (keywords)  -> author
413                       -> motyw
414                       -> tags
415                       -> content
416         """
417         queryreader = StringReader(query)
418         tokens = self.get_tokens(queryreader)
419
420         top_level = BooleanQuery()
421         Should = BooleanClause.Occur.SHOULD
422
423         phrase_level = BooleanQuery()
424
425         p_content = self.make_phrase(tokens, joined=True)
426         p_title = self.make_phrase(tokens, 'title')
427         p_author = self.make_phrase(tokens, 'author')
428
429         phrase_level.add(BooleanClause(p_content, Should))
430         phrase_level.add(BooleanClause(p_title, Should))
431         phrase_level.add(BooleanClause(p_author, Should))
432
433         kw_level = BooleanQuery()
434
435         kw_level.add(self.make_term_query(tokens, 'author'), Should)
436         kw_level.add(self.make_term_query(tokens, 'themes', joined=True), Should)
437         kw_level.add(self.make_term_query(tokens, 'tags'), Should)
438         kw_level.add(self.make_term_query(tokens, joined=True), Should)
439
440         top_level.add(BooleanClause(phrase_level, Should))
441         top_level.add(BooleanClause(kw_level, Should))
442
443         tops = self.searcher.search(top_level, max_results)
444         bks = []
445         for found in tops.scoreDocs:
446             doc = self.searcher.doc(found.doc)
447             bks.append(catalogue.models.Book.objects.get(id=doc.get("book_id")))
448         return (bks, tops.totalHits)