2 # $Id: sphinxapi.py 1216 2008-03-14 23:25:39Z shodan $
4 # Python version of Sphinx searchd client (Python API)
6 # Copyright (c) 2006-2008, Andrew Aksyonoff
7 # Copyright (c) 2006, Mike Osadnik
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License. You should have
12 # received a copy of the GPL license along with this program; if you
13 # did not, you can find it at http://www.gnu.org/
22 # known searchd commands
23 SEARCHD_COMMAND_SEARCH = 0
24 SEARCHD_COMMAND_EXCERPT = 1
25 SEARCHD_COMMAND_UPDATE = 2
26 SEARCHD_COMMAND_KEYWORDS= 3
28 # current client-side command implementation versions
29 VER_COMMAND_SEARCH = 0x113
30 VER_COMMAND_EXCERPT = 0x100
31 VER_COMMAND_UPDATE = 0x101
32 VER_COMMAND_KEYWORDS = 0x100
34 # known searchd status codes
45 SPH_MATCH_EXTENDED = 4
46 SPH_MATCH_FULLSCAN = 5
47 SPH_MATCH_EXTENDED2 = 6
49 # known ranking modes (extended2 mode only)
50 SPH_RANK_PROXIMITY_BM25 = 0 # default mode, phrase proximity major factor and BM25 minor one
51 SPH_RANK_BM25 = 1 # statistical mode, BM25 ranking only (faster but worse quality)
52 SPH_RANK_NONE = 2 # no ranking, all matches get a weight of 1
53 SPH_RANK_WORDCOUNT = 3 # simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
56 SPH_SORT_RELEVANCE = 0
57 SPH_SORT_ATTR_DESC = 1
59 SPH_SORT_TIME_SEGMENTS = 3
66 SPH_FILTER_FLOATRANGE = 2
68 # known attribute types
71 SPH_ATTR_TIMESTAMP = 2
75 SPH_ATTR_MULTI = 0X40000000L
77 # known grouping functions
88 Create a new client object, and fill defaults.
90 self._host = 'localhost' # searchd host (default is "localhost")
91 self._port = 3312 # searchd port (default is 3312)
92 self._offset = 0 # how much records to seek from result-set start (default is 0)
93 self._limit = 20 # how much records to return from result-set starting at offset (default is 20)
94 self._mode = SPH_MATCH_ALL # query matching mode (default is SPH_MATCH_ALL)
95 self._weights = [] # per-field weights (default is 1 for all fields)
96 self._sort = SPH_SORT_RELEVANCE # match sorting mode (default is SPH_SORT_RELEVANCE)
97 self._sortby = '' # attribute to sort by (defualt is "")
98 self._min_id = 0 # min ID to match (default is 0)
99 self._max_id = 0xFFFFFFFF # max ID to match (default is UINT_MAX)
100 self._filters = [] # search filters
101 self._groupby = '' # group-by attribute name
102 self._groupfunc = SPH_GROUPBY_DAY # group-by function (to pre-process group-by attribute value with)
103 self._groupsort = '@group desc' # group-by sorting clause (to sort groups in result set with)
104 self._groupdistinct = '' # group-by count-distinct attribute
105 self._maxmatches = 1000 # max matches to retrieve
106 self._cutoff = 0 # cutoff to stop searching at
107 self._retrycount = 0 # distributed retry count
108 self._retrydelay = 0 # distributed retry delay
109 self._anchor = {} # geographical anchor point
110 self._indexweights = {} # per-index weights
111 self._ranker = SPH_RANK_PROXIMITY_BM25 # ranking mode
112 self._maxquerytime = 0 # max query time, milliseconds (default is 0, do not limit)
113 self._fieldweights = {} # per-field-name weights
114 self._error = '' # last error message
115 self._warning = '' # last warning message
116 self._reqs = [] # requests array for multi-query
120 def GetLastError (self):
122 Get last error message (string).
127 def GetLastWarning (self):
129 Get last warning message (string).
134 def SetServer (self, host, port):
136 Set searchd server host and port.
138 assert(isinstance(host, str))
139 assert(isinstance(port, int))
146 INTERNAL METHOD, DO NOT CALL. Connects to searchd server.
149 sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
150 sock.connect ( ( self._host, self._port ) )
151 except socket.error, msg:
154 self._error = 'connection to %s:%s failed (%s)' % ( self._host, self._port, msg )
157 v = unpack('>L', sock.recv(4))
160 self._error = 'expected searchd protocol version, got %s' % v
163 # all ok, send my version
164 sock.send(pack('>L', 1))
168 def _GetResponse (self, sock, client_ver):
170 INTERNAL METHOD, DO NOT CALL. Gets and checks response packet from searchd server.
172 (status, ver, length) = unpack('>2HL', sock.recv(8))
176 chunk = sock.recv(left)
187 if not response or read!=length:
189 self._error = 'failed to read searchd response (status=%s, ver=%s, len=%s, read=%s)' \
190 % (status, ver, length, read)
192 self._error = 'received zero-sized searchd response'
196 if status==SEARCHD_WARNING:
197 wend = 4 + unpack ( '>L', response[0:4] )[0]
198 self._warning = response[4:wend]
199 return response[wend:]
201 if status==SEARCHD_ERROR:
202 self._error = 'searchd error: '+response[4:]
205 if status==SEARCHD_RETRY:
206 self._error = 'temporary searchd error: '+response[4:]
209 if status!=SEARCHD_OK:
210 self._error = 'unknown status code %d' % status
215 self._warning = 'searchd command v.%d.%d older than client\'s v.%d.%d, some options might not work' \
216 % (ver>>8, ver&0xff, client_ver>>8, client_ver&0xff)
221 def SetLimits (self, offset, limit, maxmatches=0, cutoff=0):
223 Set offset and count into result set, and optionally set max-matches and cutoff limits.
225 assert(isinstance(offset, int) and offset>=0)
226 assert(isinstance(limit, int) and limit>0)
227 assert(maxmatches>=0)
228 self._offset = offset
231 self._maxmatches = maxmatches
233 self._cutoff = cutoff
236 def SetMaxQueryTime (self, maxquerytime):
238 Set maximum query time, in milliseconds, per-index. 0 means 'do not limit'.
240 assert(isinstance(maxquerytime,int) and maxquerytime>0)
241 self._maxquerytime = maxquerytime
244 def SetMatchMode (self, mode):
248 assert(mode in [SPH_MATCH_ALL, SPH_MATCH_ANY, SPH_MATCH_PHRASE, SPH_MATCH_BOOLEAN, SPH_MATCH_EXTENDED, SPH_MATCH_FULLSCAN, SPH_MATCH_EXTENDED2])
252 def SetRankingMode (self, ranker):
256 assert(ranker in [SPH_RANK_PROXIMITY_BM25, SPH_RANK_BM25, SPH_RANK_NONE, SPH_RANK_WORDCOUNT])
257 self._ranker = ranker
260 def SetSortMode ( self, mode, clause='' ):
264 assert ( mode in [SPH_SORT_RELEVANCE, SPH_SORT_ATTR_DESC, SPH_SORT_ATTR_ASC, SPH_SORT_TIME_SEGMENTS, SPH_SORT_EXTENDED, SPH_SORT_EXPR] )
265 assert ( isinstance ( clause, str ) )
267 self._sortby = clause
270 def SetWeights (self, weights):
272 Set per-field weights.
273 WARNING, DEPRECATED; do not use it! use SetFieldWeights() instead
275 assert(isinstance(weights, list))
277 assert(isinstance(w, int))
278 self._weights = weights
281 def SetFieldWeights (self, weights):
283 Bind per-field weights by name; expects (name,field_weight) dictionary as argument.
285 assert(isinstance(weights,dict))
286 for key,val in weights.items():
287 assert(isinstance(key,str))
288 assert(isinstance(val,int))
289 self._fieldweights = weights
292 def SetIndexWeights (self, weights):
294 Bind per-index weights by name; expects (name,index_weight) dictionary as argument.
296 assert(isinstance(weights,dict))
297 for key,val in weights.items():
298 assert(isinstance(key,str))
299 assert(isinstance(val,int))
300 self._indexweights = weights
303 def SetIDRange (self, minid, maxid):
305 Set IDs range to match.
306 Only match records if document ID is beetwen $min and $max (inclusive).
308 assert(isinstance(minid, int))
309 assert(isinstance(maxid, int))
315 def SetFilter ( self, attribute, values, exclude=0 ):
317 Set values set filter.
318 Only match records where 'attribute' value is in given 'values' set.
320 assert(isinstance(attribute, str))
321 assert(isinstance(values, list))
325 assert(isinstance(value, int))
327 self._filters.append ( { 'type':SPH_FILTER_VALUES, 'attr':attribute, 'exclude':exclude, 'values':values } )
330 def SetFilterRange (self, attribute, min_, max_, exclude=0 ):
333 Only match records if 'attribute' value is beetwen 'min_' and 'max_' (inclusive).
335 assert(isinstance(attribute, str))
336 assert(isinstance(min_, int))
337 assert(isinstance(max_, int))
340 self._filters.append ( { 'type':SPH_FILTER_RANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_ } )
343 def SetFilterFloatRange (self, attribute, min_, max_, exclude=0 ):
344 assert(isinstance(attribute,str))
345 assert(isinstance(min_,float))
346 assert(isinstance(max_,float))
348 self._filters.append ( {'type':SPH_FILTER_FLOATRANGE, 'attr':attribute, 'exclude':exclude, 'min':min_, 'max':max_} )
351 def SetGeoAnchor (self, attrlat, attrlong, latitude, longitude):
352 assert(isinstance(attrlat,str))
353 assert(isinstance(attrlong,str))
354 assert(isinstance(latitude,float))
355 assert(isinstance(longitude,float))
356 self._anchor['attrlat'] = attrlat
357 self._anchor['attrlong'] = attrlong
358 self._anchor['lat'] = latitude
359 self._anchor['long'] = longitude
362 def SetGroupBy ( self, attribute, func, groupsort='@group desc' ):
364 Set grouping attribute and function.
366 assert(isinstance(attribute, str))
367 assert(func in [SPH_GROUPBY_DAY, SPH_GROUPBY_WEEK, SPH_GROUPBY_MONTH, SPH_GROUPBY_YEAR, SPH_GROUPBY_ATTR] )
368 assert(isinstance(groupsort, str))
370 self._groupby = attribute
371 self._groupfunc = func
372 self._groupsort = groupsort
375 def SetGroupDistinct (self, attribute):
376 assert(isinstance(attribute,str))
377 self._groupdistinct = attribute
380 def SetRetries (self, count, delay=0):
381 assert(isinstance(count,int) and count>=0)
382 assert(isinstance(delay,int) and delay>=0)
383 self._retrycount = count
384 self._retrydelay = delay
387 def ResetFilters (self):
389 Clear all filters (for multi-queries).
395 def ResetGroupBy (self):
397 Clear groupby settings (for multi-queries).
400 self._groupfunc = SPH_GROUPBY_DAY
401 self._groupsort = '@group desc'
402 self._groupdistinct = ''
405 def Query (self, query, index='*', comment=''):
407 Connect to searchd server and run given search query.
408 Returns None on failure; result set hash on success (see documentation for details).
410 assert(len(self._reqs)==0)
411 self.AddQuery(query,index,comment)
412 results = self.RunQueries()
414 if not results or len(results)==0:
416 self._error = results[0]['error']
417 self._warning = results[0]['warning']
418 if results[0]['status'] == SEARCHD_ERROR:
423 def AddQuery (self, query, index='*', comment=''):
428 req = [pack('>5L', self._offset, self._limit, self._mode, self._ranker, self._sort)]
429 req.append(pack('>L', len(self._sortby)))
430 req.append(self._sortby)
432 if isinstance(query,unicode):
433 query = query.encode('utf-8')
434 assert(isinstance(query,str))
436 req.append(pack('>L', len(query)))
439 req.append(pack('>L', len(self._weights)))
440 for w in self._weights:
441 req.append(pack('>L', w))
442 req.append(pack('>L', len(index)))
444 req.append(pack('>L',0)) # id64 range marker FIXME! IMPLEMENT!
445 req.append(pack('>L', self._min_id))
446 req.append(pack('>L', self._max_id))
449 req.append ( pack ( '>L', len(self._filters) ) )
450 for f in self._filters:
451 req.append ( pack ( '>L', len(f['attr'])) + f['attr'])
452 filtertype = f['type']
453 req.append ( pack ( '>L', filtertype))
454 if filtertype == SPH_FILTER_VALUES:
455 req.append ( pack ('>L', len(f['values'])))
456 for val in f['values']:
457 req.append ( pack ('>L', val))
458 elif filtertype == SPH_FILTER_RANGE:
459 req.append ( pack ('>2L', f['min'], f['max']))
460 elif filtertype == SPH_FILTER_FLOATRANGE:
461 req.append ( pack ('>2f', f['min'], f['max']))
462 req.append ( pack ( '>L', f['exclude'] ) )
464 # group-by, max-matches, group-sort
465 req.append ( pack ( '>2L', self._groupfunc, len(self._groupby) ) )
466 req.append ( self._groupby )
467 req.append ( pack ( '>2L', self._maxmatches, len(self._groupsort) ) )
468 req.append ( self._groupsort )
469 req.append ( pack ( '>LLL', self._cutoff, self._retrycount, self._retrydelay))
470 req.append ( pack ( '>L', len(self._groupdistinct)))
471 req.append ( self._groupdistinct)
474 if len(self._anchor) == 0:
475 req.append ( pack ('>L', 0))
477 attrlat, attrlong = self._anchor['attrlat'], self._anchor['attrlong']
478 latitude, longitude = self._anchor['lat'], self._anchor['long']
479 req.append ( pack ('>L', 1))
480 req.append ( pack ('>L', len(attrlat)) + attrlat)
481 req.append ( pack ('>L', len(attrlong)) + attrlong)
482 req.append ( pack ('>f', latitude) + pack ('>f', longitude))
485 req.append ( pack ('>L',len(self._indexweights)))
486 for indx,weight in self._indexweights.items():
487 req.append ( pack ('>L',len(indx)) + indx + pack ('>L',weight))
490 req.append ( pack ('>L', self._maxquerytime) )
493 req.append ( pack ('>L',len(self._fieldweights) ) )
494 for field,weight in self._fieldweights.items():
495 req.append ( pack ('>L',len(field)) + field + pack ('>L',weight) )
498 req.append ( pack('>L',len(comment)) + comment )
500 # send query, get response
503 self._reqs.append(req)
507 def RunQueries (self):
510 Returns None on network IO failure; or an array of result set hashes on success.
512 if len(self._reqs)==0:
513 self._error = 'no queries defined, issue AddQuery() first'
516 sock = self._Connect()
520 req = ''.join(self._reqs)
522 req = pack('>HHLL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, length, len(self._reqs))+req
525 response = self._GetResponse(sock, VER_COMMAND_SEARCH)
529 nreqs = len(self._reqs)
536 for i in range(0,nreqs,1):
539 result['warning'] = ''
540 status = unpack('>L', response[p:p+4])[0]
542 result['status'] = status
543 if status != SEARCHD_OK:
544 length = unpack('>L', response[p:p+4])[0]
546 message = response[p:p+length]
549 if status == SEARCHD_WARNING:
550 result['warning'] = message
552 result['error'] = message
559 nfields = unpack('>L', response[p:p+4])[0]
561 while nfields>0 and p<max_:
563 length = unpack('>L', response[p:p+4])[0]
565 fields.append(response[p:p+length])
568 result['fields'] = fields
570 nattrs = unpack('>L', response[p:p+4])[0]
572 while nattrs>0 and p<max_:
574 length = unpack('>L', response[p:p+4])[0]
576 attr = response[p:p+length]
578 type_ = unpack('>L', response[p:p+4])[0]
580 attrs.append([attr,type_])
582 result['attrs'] = attrs
585 count = unpack('>L', response[p:p+4])[0]
587 id64 = unpack('>L', response[p:p+4])[0]
591 result['matches'] = []
592 while count>0 and p<max_:
595 dochi, doc, weight = unpack('>3L', response[p:p+12])
599 doc, weight = unpack('>2L', response[p:p+8])
602 match = { 'id':doc, 'weight':weight, 'attrs':{} }
603 for i in range(len(attrs)):
604 if attrs[i][1] == SPH_ATTR_FLOAT:
605 match['attrs'][attrs[i][0]] = unpack('>f', response[p:p+4])[0]
606 elif attrs[i][1] == (SPH_ATTR_MULTI | SPH_ATTR_INTEGER):
607 match['attrs'][attrs[i][0]] = []
608 nvals = unpack('>L', response[p:p+4])[0]
610 for n in range(0,nvals,1):
611 match['attrs'][attrs[i][0]].append(unpack('>L', response[p:p+4])[0])
615 match['attrs'][attrs[i][0]] = unpack('>L', response[p:p+4])[0]
618 result['matches'].append ( match )
620 result['total'], result['total_found'], result['time'], words = unpack('>4L', response[p:p+16])
622 result['time'] = '%.3f' % (result['time']/1000.0)
628 length = unpack('>L', response[p:p+4])[0]
630 word = response[p:p+length]
632 docs, hits = unpack('>2L', response[p:p+8])
635 result['words'].append({'word':word, 'docs':docs, 'hits':hits})
637 results.append(result)
644 def BuildExcerpts (self, docs, index, words, opts=None):
646 Connect to searchd server and generate exceprts from given documents.
650 if isinstance(words,unicode):
651 words = words.encode('utf-8')
653 assert(isinstance(docs, list))
654 assert(isinstance(index, str))
655 assert(isinstance(words, str))
656 assert(isinstance(opts, dict))
658 sock = self._Connect()
664 opts.setdefault('before_match', '<b>')
665 opts.setdefault('after_match', '</b>')
666 opts.setdefault('chunk_separator', ' ... ')
667 opts.setdefault('limit', 256)
668 opts.setdefault('around', 5)
673 # mode=0, flags=1 (remove spaces)
674 req = [pack('>2L', 0, 1)]
677 req.append(pack('>L', len(index)))
681 req.append(pack('>L', len(words)))
685 req.append(pack('>L', len(opts['before_match'])))
686 req.append(opts['before_match'])
688 req.append(pack('>L', len(opts['after_match'])))
689 req.append(opts['after_match'])
691 req.append(pack('>L', len(opts['chunk_separator'])))
692 req.append(opts['chunk_separator'])
694 req.append(pack('>L', int(opts['limit'])))
695 req.append(pack('>L', int(opts['around'])))
698 req.append(pack('>L', len(docs)))
700 if isinstance(doc,unicode):
701 doc = doc.encode('utf-8')
702 assert(isinstance(doc, str))
703 req.append(pack('>L', len(doc)))
708 # send query, get response
712 req = pack('>2HL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, length)+req
713 wrote = sock.send(req)
715 response = self._GetResponse(sock, VER_COMMAND_EXCERPT )
724 for i in range(len(docs)):
725 length = unpack('>L', response[pos:pos+4])[0]
728 if pos+length > rlen:
729 self._error = 'incomplete reply'
732 res.append(response[pos:pos+length])
738 def UpdateAttributes ( self, index, attrs, values ):
740 Update given attribute values on given documents in given indexes.
741 Returns amount of updated documents (0 or more) on success, or -1 on failure.
743 'attrs' must be a list of strings.
744 'values' must be a dict with int key (document ID) and list of int values (new attribute values).
747 res = cl.UpdateAttributes ( 'test1', [ 'group_id', 'date_added' ], { 2:[123,1000000000], 4:[456,1234567890] } )
749 assert ( isinstance ( index, str ) )
750 assert ( isinstance ( attrs, list ) )
751 assert ( isinstance ( values, dict ) )
753 assert ( isinstance ( attr, str ) )
754 for docid, entry in values.items():
755 assert ( isinstance ( docid, int ) )
756 assert ( isinstance ( entry, list ) )
757 assert ( len(attrs)==len(entry) )
759 assert ( isinstance ( val, int ) )
762 req = [ pack('>L',len(index)), index ]
764 req.append ( pack('>L',len(attrs)) )
766 req.append ( pack('>L',len(attr)) + attr )
768 req.append ( pack('>L',len(values)) )
769 for docid, entry in values.items():
770 req.append ( pack('>q',docid) )
772 req.append ( pack('>L',val) )
774 # connect, send query, get response
775 sock = self._Connect()
781 req = pack ( '>2HL', SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, length ) + req
782 wrote = sock.send ( req )
784 response = self._GetResponse ( sock, VER_COMMAND_UPDATE )
789 updated = unpack ( '>L', response[0:4] )[0]
793 def BuildKeywords ( self, query, index, hits ):
795 Connect to searchd server, and generate keywords list for a given query.
796 Returns None on failure, or a list of keywords on success.
798 assert ( isinstance ( query, str ) )
799 assert ( isinstance ( index, str ) )
800 assert ( isinstance ( hits, int ) )
803 req = [ pack ( '>L', len(query) ) + query ]
804 req.append ( pack ( '>L', len(index) ) + index )
805 req.append ( pack ( '>L', hits ) )
807 # connect, send query, get response
808 sock = self._Connect()
814 req = pack ( '>2HL', SEARCHD_COMMAND_KEYWORDS, VER_COMMAND_KEYWORDS, length ) + req
815 wrote = sock.send ( req )
817 response = self._GetResponse ( sock, VER_COMMAND_KEYWORDS )
824 nwords = unpack ( '>L', response[0:4] )[0]
828 while nwords>0 and p<max_:
831 length = unpack ( '>L', response[p:p+4] )[0]
833 tokenized = response[p:p+length]
836 length = unpack ( '>L', response[p:p+4] )[0]
838 normalized = response[p:p+length]
841 entry = { 'tokenized':tokenized, 'normalized':normalized }
843 entry['docs'], entry['hits'] = unpack ( '>2L', response[p:p+8] )
848 if nwords>0 or p>max_:
849 self._error = 'incomplete reply'
854 # $Id: sphinxapi.py 1216 2008-03-14 23:25:39Z shodan $