390679a626f56cc528afc1c0cb5377b3e37461bb
[wolnelektury.git] / lib / mutagen / _util.py
1 # Copyright 2006 Joe Wreschnig <piman@sacredchao.net>
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
6 #
7 # $Id: _util.py 4275 2008-06-01 06:32:37Z piman $
8
9 """Utility classes for Mutagen.
10
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
13 """
14
15 import struct
16
17 class DictMixin(object):
18     """Implement the dict API using keys() and __*item__ methods.
19
20     Similar to UserDict.DictMixin, this takes a class that defines
21     __getitem__, __setitem__, __delitem__, and keys(), and turns it
22     into a full dict-like object.
23
24     UserDict.DictMixin is not suitable for this purpose because it's
25     an old-style class.
26
27     This class is not optimized for very large dictionaries; many
28     functions have linear memory requirements. I recommend you
29     override some of these functions if speed is required.
30     """
31
32     def __iter__(self):
33         return iter(self.keys())
34
35     def has_key(self, key):
36         try: self[key]
37         except KeyError: return False
38         else: return True
39     __contains__ = has_key
40
41     iterkeys = lambda self: iter(self.keys())
42
43     def values(self):
44         return map(self.__getitem__, self.keys())
45     itervalues = lambda self: iter(self.values())
46
47     def items(self):
48         return zip(self.keys(), self.values())
49     iteritems = lambda s: iter(s.items())
50
51     def clear(self):
52         map(self.__delitem__, self.keys())
53
54     def pop(self, key, *args):
55         if len(args) > 1:
56             raise TypeError("pop takes at most two arguments")
57         try: value = self[key]
58         except KeyError:
59             if args: return args[0]
60             else: raise
61         del(self[key])
62         return value
63
64     def popitem(self):
65         try:
66             key = self.keys()[0]
67             return key, self.pop(key)
68         except IndexError: raise KeyError("dictionary is empty")
69
70     def update(self, other=None, **kwargs):
71         if other is None:
72             self.update(kwargs)
73             other = {}
74
75         try: map(self.__setitem__, other.keys(), other.values())
76         except AttributeError:
77             for key, value in other:
78                 self[key] = value
79
80     def setdefault(self, key, default=None):
81         try: return self[key]
82         except KeyError:
83             self[key] = default
84             return default
85
86     def get(self, key, default=None):
87         try: return self[key]
88         except KeyError: return default
89
90     def __repr__(self):
91         return repr(dict(self.items()))
92
93     def __cmp__(self, other):
94         if other is None: return 1
95         else: return cmp(dict(self.items()), other)
96
97     def __len__(self):
98         return len(self.keys())
99
100 class DictProxy(DictMixin):
101     def __init__(self, *args, **kwargs):
102         self.__dict = {}
103         super(DictProxy, self).__init__(*args, **kwargs)
104
105     def __getitem__(self, key):
106         return self.__dict[key]
107
108     def __setitem__(self, key, value):
109         self.__dict[key] = value
110
111     def __delitem__(self, key):
112         del(self.__dict[key])
113
114     def keys(self):
115         return self.__dict.keys()
116
117 class cdata(object):
118     """C character buffer to Python numeric type conversions."""
119
120     from struct import error
121
122     short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
123     ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
124
125     short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
126     ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
127
128     int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
129     uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
130
131     int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
132     uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
133
134     longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
135     ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
136
137     longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
138     ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
139
140     to_short_le = staticmethod(lambda data: struct.pack('<h', data))
141     to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
142
143     to_short_be = staticmethod(lambda data: struct.pack('>h', data))
144     to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
145
146     to_int_le = staticmethod(lambda data: struct.pack('<i', data))
147     to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
148
149     to_int_be = staticmethod(lambda data: struct.pack('>i', data))
150     to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
151
152     to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
153     to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
154
155     to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
156     to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
157
158     bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
159                        for val in range(256)])
160     del(i)
161     del(val)
162
163     test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
164
165 def lock(fileobj):
166     """Lock a file object 'safely'.
167
168     That means a failure to lock because the platform doesn't
169     support fcntl or filesystem locks is not considered a
170     failure. This call does block.
171
172     Returns whether or not the lock was successful, or
173     raises an exception in more extreme circumstances (full
174     lock table, invalid file).
175     """
176     try: import fcntl
177     except ImportError:
178         return False
179     else:
180         try: fcntl.lockf(fileobj, fcntl.LOCK_EX)
181         except IOError:
182             # FIXME: There's possibly a lot of complicated
183             # logic that needs to go here in case the IOError
184             # is EACCES or EAGAIN.
185             return False
186         else:
187             return True
188
189 def unlock(fileobj):
190     """Unlock a file object.
191
192     Don't call this on a file object unless a call to lock()
193     returned true.
194     """
195     # If this fails there's a mismatched lock/unlock pair,
196     # so we definitely don't want to ignore errors.
197     import fcntl
198     fcntl.lockf(fileobj, fcntl.LOCK_UN)
199
200 def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
201     """Insert size bytes of empty space starting at offset.
202
203     fobj must be an open file object, open rb+ or
204     equivalent. Mutagen tries to use mmap to resize the file, but
205     falls back to a significantly slower method if mmap fails.
206     """
207     assert 0 < size
208     assert 0 <= offset
209     locked = False
210     fobj.seek(0, 2)
211     filesize = fobj.tell()
212     movesize = filesize - offset
213     fobj.write('\x00' * size)
214     fobj.flush()
215     try:
216         try:
217             import mmap
218             map = mmap.mmap(fobj.fileno(), filesize + size)
219             try: map.move(offset + size, offset, movesize)
220             finally: map.close()
221         except (ValueError, EnvironmentError, ImportError):
222             # handle broken mmap scenarios
223             locked = lock(fobj)
224             fobj.truncate(filesize)
225
226             fobj.seek(0, 2)
227             padsize = size
228             # Don't generate an enormous string if we need to pad
229             # the file out several megs.
230             while padsize:
231                 addsize = min(BUFFER_SIZE, padsize)
232                 fobj.write("\x00" * addsize)
233                 padsize -= addsize
234
235             fobj.seek(filesize, 0)
236             while movesize:
237                 # At the start of this loop, fobj is pointing at the end
238                 # of the data we need to move, which is of movesize length.
239                 thismove = min(BUFFER_SIZE, movesize)
240                 # Seek back however much we're going to read this frame.
241                 fobj.seek(-thismove, 1)
242                 nextpos = fobj.tell()
243                 # Read it, so we're back at the end.
244                 data = fobj.read(thismove)
245                 # Seek back to where we need to write it.
246                 fobj.seek(-thismove + size, 1)
247                 # Write it.
248                 fobj.write(data)
249                 # And seek back to the end of the unmoved data.
250                 fobj.seek(nextpos)
251                 movesize -= thismove
252
253             fobj.flush()
254     finally:
255         if locked:
256             unlock(fobj)
257
258 def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
259     """Delete size bytes of empty space starting at offset.
260
261     fobj must be an open file object, open rb+ or
262     equivalent. Mutagen tries to use mmap to resize the file, but
263     falls back to a significantly slower method if mmap fails.
264     """
265     locked = False
266     assert 0 < size
267     assert 0 <= offset
268     fobj.seek(0, 2)
269     filesize = fobj.tell()
270     movesize = filesize - offset - size
271     assert 0 <= movesize
272     try:
273         if movesize > 0:
274             fobj.flush()
275             try:
276                 import mmap
277                 map = mmap.mmap(fobj.fileno(), filesize)
278                 try: map.move(offset, offset + size, movesize)
279                 finally: map.close()
280             except (ValueError, EnvironmentError, ImportError):
281                 # handle broken mmap scenarios
282                 locked = lock(fobj)
283                 fobj.seek(offset + size)
284                 buf = fobj.read(BUFFER_SIZE)
285                 while buf:
286                     fobj.seek(offset)
287                     fobj.write(buf)
288                     offset += len(buf)
289                     fobj.seek(offset + size)
290                     buf = fobj.read(BUFFER_SIZE)
291         fobj.truncate(filesize - size)
292         fobj.flush()
293     finally:
294         if locked:
295             unlock(fobj)
296
297 def utf8(data):
298     """Convert a basestring to a valid UTF-8 str."""
299     if isinstance(data, str):
300         return data.decode("utf-8", "replace").encode("utf-8")
301     elif isinstance(data, unicode):
302         return data.encode("utf-8")
303     else: raise TypeError("only unicode/str types can be converted to UTF-8")