1 # Copyright 2006 Joe Wreschnig <piman@sacredchao.net>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4275 2008-06-01 06:32:37Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
17 class DictMixin(object):
18 """Implement the dict API using keys() and __*item__ methods.
20 Similar to UserDict.DictMixin, this takes a class that defines
21 __getitem__, __setitem__, __delitem__, and keys(), and turns it
22 into a full dict-like object.
24 UserDict.DictMixin is not suitable for this purpose because it's
27 This class is not optimized for very large dictionaries; many
28 functions have linear memory requirements. I recommend you
29 override some of these functions if speed is required.
33 return iter(self.keys())
35 def has_key(self, key):
37 except KeyError: return False
39 __contains__ = has_key
41 iterkeys = lambda self: iter(self.keys())
44 return map(self.__getitem__, self.keys())
45 itervalues = lambda self: iter(self.values())
48 return zip(self.keys(), self.values())
49 iteritems = lambda s: iter(s.items())
52 map(self.__delitem__, self.keys())
54 def pop(self, key, *args):
56 raise TypeError("pop takes at most two arguments")
57 try: value = self[key]
59 if args: return args[0]
67 return key, self.pop(key)
68 except IndexError: raise KeyError("dictionary is empty")
70 def update(self, other=None, **kwargs):
75 try: map(self.__setitem__, other.keys(), other.values())
76 except AttributeError:
77 for key, value in other:
80 def setdefault(self, key, default=None):
86 def get(self, key, default=None):
88 except KeyError: return default
91 return repr(dict(self.items()))
93 def __cmp__(self, other):
94 if other is None: return 1
95 else: return cmp(dict(self.items()), other)
98 return len(self.keys())
100 class DictProxy(DictMixin):
101 def __init__(self, *args, **kwargs):
103 super(DictProxy, self).__init__(*args, **kwargs)
105 def __getitem__(self, key):
106 return self.__dict[key]
108 def __setitem__(self, key, value):
109 self.__dict[key] = value
111 def __delitem__(self, key):
112 del(self.__dict[key])
115 return self.__dict.keys()
118 """C character buffer to Python numeric type conversions."""
120 from struct import error
122 short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
123 ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
125 short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
126 ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
128 int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
129 uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
131 int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
132 uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
134 longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
135 ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
137 longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
138 ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
140 to_short_le = staticmethod(lambda data: struct.pack('<h', data))
141 to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
143 to_short_be = staticmethod(lambda data: struct.pack('>h', data))
144 to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
146 to_int_le = staticmethod(lambda data: struct.pack('<i', data))
147 to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
149 to_int_be = staticmethod(lambda data: struct.pack('>i', data))
150 to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
152 to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
153 to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
155 to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
156 to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
158 bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
159 for val in range(256)])
163 test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
166 """Lock a file object 'safely'.
168 That means a failure to lock because the platform doesn't
169 support fcntl or filesystem locks is not considered a
170 failure. This call does block.
172 Returns whether or not the lock was successful, or
173 raises an exception in more extreme circumstances (full
174 lock table, invalid file).
180 try: fcntl.lockf(fileobj, fcntl.LOCK_EX)
182 # FIXME: There's possibly a lot of complicated
183 # logic that needs to go here in case the IOError
184 # is EACCES or EAGAIN.
190 """Unlock a file object.
192 Don't call this on a file object unless a call to lock()
195 # If this fails there's a mismatched lock/unlock pair,
196 # so we definitely don't want to ignore errors.
198 fcntl.lockf(fileobj, fcntl.LOCK_UN)
200 def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
201 """Insert size bytes of empty space starting at offset.
203 fobj must be an open file object, open rb+ or
204 equivalent. Mutagen tries to use mmap to resize the file, but
205 falls back to a significantly slower method if mmap fails.
211 filesize = fobj.tell()
212 movesize = filesize - offset
213 fobj.write('\x00' * size)
218 map = mmap.mmap(fobj.fileno(), filesize + size)
219 try: map.move(offset + size, offset, movesize)
221 except (ValueError, EnvironmentError, ImportError):
222 # handle broken mmap scenarios
224 fobj.truncate(filesize)
228 # Don't generate an enormous string if we need to pad
229 # the file out several megs.
231 addsize = min(BUFFER_SIZE, padsize)
232 fobj.write("\x00" * addsize)
235 fobj.seek(filesize, 0)
237 # At the start of this loop, fobj is pointing at the end
238 # of the data we need to move, which is of movesize length.
239 thismove = min(BUFFER_SIZE, movesize)
240 # Seek back however much we're going to read this frame.
241 fobj.seek(-thismove, 1)
242 nextpos = fobj.tell()
243 # Read it, so we're back at the end.
244 data = fobj.read(thismove)
245 # Seek back to where we need to write it.
246 fobj.seek(-thismove + size, 1)
249 # And seek back to the end of the unmoved data.
258 def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
259 """Delete size bytes of empty space starting at offset.
261 fobj must be an open file object, open rb+ or
262 equivalent. Mutagen tries to use mmap to resize the file, but
263 falls back to a significantly slower method if mmap fails.
269 filesize = fobj.tell()
270 movesize = filesize - offset - size
277 map = mmap.mmap(fobj.fileno(), filesize)
278 try: map.move(offset, offset + size, movesize)
280 except (ValueError, EnvironmentError, ImportError):
281 # handle broken mmap scenarios
283 fobj.seek(offset + size)
284 buf = fobj.read(BUFFER_SIZE)
289 fobj.seek(offset + size)
290 buf = fobj.read(BUFFER_SIZE)
291 fobj.truncate(filesize - size)
298 """Convert a basestring to a valid UTF-8 str."""
299 if isinstance(data, str):
300 return data.decode("utf-8", "replace").encode("utf-8")
301 elif isinstance(data, unicode):
302 return data.encode("utf-8")
303 else: raise TypeError("only unicode/str types can be converted to UTF-8")