add --shared
[pylucene.git] / samples / LuceneInAction / lia / advsearching / SpanQueryTest.py
1 # ====================================================================
2 #   Licensed under the Apache License, Version 2.0 (the "License");
3 #   you may not use this file except in compliance with the License.
4 #   You may obtain a copy of the License at
5 #
6 #       http://www.apache.org/licenses/LICENSE-2.0
7 #
8 #   Unless required by applicable law or agreed to in writing, software
9 #   distributed under the License is distributed on an "AS IS" BASIS,
10 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 #   See the License for the specific language governing permissions and
12 #   limitations under the License.
13 # ====================================================================
14
15 from unittest import TestCase
16 from cStringIO import StringIO
17
18 from lucene import \
19      WhitespaceAnalyzer, Document, Field, IndexReader, IndexWriter, Term, \
20      IndexSearcher, PhraseQuery, SpanFirstQuery, SpanNearQuery, SpanNotQuery, \
21      SpanOrQuery, SpanTermQuery, RAMDirectory, TermAttribute, StringReader
22
23 from lia.analysis.AnalyzerUtils import AnalyzerUtils
24
25
26 class SpanQueryTest(TestCase):
27
28     def setUp(self):
29
30         self.directory = RAMDirectory()
31         self.analyzer = WhitespaceAnalyzer()
32
33         writer = IndexWriter(self.directory, self.analyzer, True,
34                              IndexWriter.MaxFieldLength.UNLIMITED)
35
36         doc = Document()
37         doc.add(Field("f", "the quick brown fox jumps over the lazy dog",
38                       Field.Store.YES, Field.Index.ANALYZED))
39         writer.addDocument(doc)
40
41         doc = Document()
42         doc.add(Field("f", "the quick red fox jumps over the sleepy cat",
43                       Field.Store.YES, Field.Index.ANALYZED))
44         writer.addDocument(doc)
45
46         writer.close()
47
48         self.searcher = IndexSearcher(self.directory, True)
49         self.reader = IndexReader.open(self.directory, True)
50
51         self.quick = SpanTermQuery(Term("f", "quick"))
52         self.brown = SpanTermQuery(Term("f", "brown"))
53         self.red = SpanTermQuery(Term("f", "red"))
54         self.fox = SpanTermQuery(Term("f", "fox"))
55         self.lazy = SpanTermQuery(Term("f", "lazy"))
56         self.sleepy = SpanTermQuery(Term("f", "sleepy"))
57         self.dog = SpanTermQuery(Term("f", "dog"))
58         self.cat = SpanTermQuery(Term("f", "cat"))
59
60     def assertOnlyBrownFox(self, query):
61
62         topDocs = self.searcher.search(query, 50)
63         self.assertEqual(1, topDocs.totalHits)
64         self.assertEqual(0, topDocs.scoreDocs[0].doc, "wrong doc")
65
66     def assertBothFoxes(self, query):
67
68         topDocs = self.searcher.search(query, 50)
69         self.assertEqual(2, topDocs.totalHits)
70
71     def assertNoMatches(self, query):
72
73         topDocs = self.searcher.search(query, 50)
74         self.assertEquals(0, topDocs.totalHits)
75
76     def testSpanTermQuery(self):
77
78         self.assertOnlyBrownFox(self.brown)
79         self.dumpSpans(self.brown)
80
81     def testSpanFirstQuery(self):
82
83         sfq = SpanFirstQuery(self.brown, 2)
84         self.assertNoMatches(sfq)
85
86         self.dumpSpans(sfq)
87
88         sfq = SpanFirstQuery(self.brown, 3)
89         self.dumpSpans(sfq)
90         self.assertOnlyBrownFox(sfq)
91
92     def testSpanNearQuery(self):
93
94         quick_brown_dog = [self.quick, self.brown, self.dog]
95         snq = SpanNearQuery(quick_brown_dog, 0, True)
96         self.assertNoMatches(snq)
97         self.dumpSpans(snq)
98
99         snq = SpanNearQuery(quick_brown_dog, 4, True)
100         self.assertNoMatches(snq)
101         self.dumpSpans(snq)
102
103         snq = SpanNearQuery(quick_brown_dog, 5, True)
104         self.assertOnlyBrownFox(snq)
105         self.dumpSpans(snq)
106
107         # interesting - even a sloppy phrase query would require
108         # more slop to match
109         snq = SpanNearQuery([self.lazy, self.fox], 3, False)
110         self.assertOnlyBrownFox(snq)
111         self.dumpSpans(snq)
112
113         pq = PhraseQuery()
114         pq.add(Term("f", "lazy"))
115         pq.add(Term("f", "fox"))
116         pq.setSlop(4)
117         self.assertNoMatches(pq)
118
119         pq.setSlop(5)
120         self.assertOnlyBrownFox(pq)
121
122     def testSpanNotQuery(self):
123
124         quick_fox = SpanNearQuery([self.quick, self.fox], 1, True)
125         self.assertBothFoxes(quick_fox)
126         self.dumpSpans(quick_fox)
127
128         quick_fox_dog = SpanNotQuery(quick_fox, self.dog)
129         self.assertBothFoxes(quick_fox_dog)
130         self.dumpSpans(quick_fox_dog)
131
132         no_quick_red_fox = SpanNotQuery(quick_fox, self.red)
133         self.assertOnlyBrownFox(no_quick_red_fox)
134         self.dumpSpans(no_quick_red_fox)
135
136     def testSpanOrQuery(self):
137
138         quick_fox = SpanNearQuery([self.quick, self.fox], 1, True)
139         lazy_dog = SpanNearQuery([self.lazy, self.dog], 0, True)
140         sleepy_cat = SpanNearQuery([self.sleepy, self.cat], 0, True)
141         qf_near_ld = SpanNearQuery([quick_fox, lazy_dog], 3, True)
142
143         self.assertOnlyBrownFox(qf_near_ld)
144         self.dumpSpans(qf_near_ld)
145
146         qf_near_sc = SpanNearQuery([quick_fox, sleepy_cat], 3, True)
147         self.dumpSpans(qf_near_sc)
148
149         orQ = SpanOrQuery([qf_near_ld, qf_near_sc])
150         self.assertBothFoxes(orQ)
151         self.dumpSpans(orQ)
152
153     def testPlay(self):
154
155         orQ = SpanOrQuery([self.quick, self.fox])
156         self.dumpSpans(orQ)
157
158         quick_fox = SpanNearQuery([self.quick, self.fox], 1, True)
159         sfq = SpanFirstQuery(quick_fox, 4)
160         self.dumpSpans(sfq)
161
162         self.dumpSpans(SpanTermQuery(Term("f", "the")))
163
164         quick_brown = SpanNearQuery([self.quick, self.brown], 0, False)
165         self.dumpSpans(quick_brown)
166
167     def dumpSpans(self, query):
168
169         spans = query.getSpans(self.reader)
170         print "%s:" % query
171         numSpans = 0
172
173         scoreDocs = self.searcher.search(query, 50).scoreDocs
174         scores = [0, 0]
175         for scoreDoc in scoreDocs:
176             scores[scoreDoc.doc] = scoreDoc.score
177
178         while spans.next():
179             numSpans += 1
180
181             id = spans.doc()
182             doc = self.reader.document(id)
183
184             # for simplicity - assume tokens are in sequential,
185             # positions, starting from 0
186             stream = self.analyzer.tokenStream("contents",
187                                                StringReader(doc.get("f")))
188             term = stream.addAttribute(TermAttribute.class_)
189       
190             buffer = StringIO()
191             buffer.write("   ")
192
193             i = 0
194             while stream.incrementToken():
195                 if i == spans.start():
196                     buffer.write("<")
197
198                 buffer.write(term.term())
199                 if i + 1 == spans.end():
200                     buffer.write(">")
201
202                 buffer.write(" ")
203                 i += 1
204       
205             buffer.write("(")
206             buffer.write(str(scores[id]))
207             buffer.write(") ")
208
209             print buffer.getvalue()
210             # print self.searcher.explain(query, id)
211
212         if numSpans == 0:
213             print "   No spans"
214
215         print ''