From: Marek Stępniowski Date: Sun, 11 Oct 2009 17:57:14 +0000 (+0200) Subject: Usunięcie biblioteki z repozytorium i dodanie jej za to do requirements.txt. X-Git-Url: https://git.mdrn.pl/wolnelektury.git/commitdiff_plain/1477a9281c6c552b0cb00cf967c1cf1bc8739a54?ds=sidebyside;hp=e8390b9f10338ff5ced700f5dbd87ba30a3566bd Usunięcie biblioteki z repozytorium i dodanie jej za to do requirements.txt. --- diff --git a/lib/mutagen/__init__.py b/lib/mutagen/__init__.py deleted file mode 100644 index 94717275d..000000000 --- a/lib/mutagen/__init__.py +++ /dev/null @@ -1,201 +0,0 @@ -#! /usr/bin/env python -# -# mutagen aims to be an all purpose media tagging library -# Copyright (C) 2005 Michael Urman -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of version 2 of the GNU General Public License as -# published by the Free Software Foundation. -# -# $Id: __init__.py 4275 2008-06-01 06:32:37Z piman $ -# - -"""Mutagen aims to be an all purpose tagging library. - - import mutagen.[format] - metadata = mutagen.[format].Open(filename) - -metadata acts like a dictionary of tags in the file. Tags are generally a -list of string-like values, but may have additional methods available -depending on tag or format. They may also be entirely different objects -for certain keys, again depending on format. -""" - -version = (1, 14) -version_string = ".".join(map(str, version)) - -import warnings - -import mutagen._util - -class Metadata(object): - """An abstract dict-like object. - - Metadata is the base class for many of the tag objects in Mutagen. - """ - - def __init__(self, *args, **kwargs): - if args or kwargs: - self.load(*args, **kwargs) - - def load(self, *args, **kwargs): - raise NotImplementedError - - def save(self, filename=None): - raise NotImplementedError - - def delete(self, filename=None): - raise NotImplementedError - -class FileType(mutagen._util.DictMixin): - """An abstract object wrapping tags and audio stream information. - - Attributes: - info -- stream information (length, bitrate, sample rate) - tags -- metadata tags, if any - - Each file format has different potential tags and stream - information. - - FileTypes implement an interface very similar to Metadata; the - dict interface, save, load, and delete calls on a FileType call - the appropriate methods on its tag data. - """ - - info = None - tags = None - filename = None - _mimes = ["application/octet-stream"] - - def __init__(self, filename=None, *args, **kwargs): - if filename is None: - warnings.warn("FileType constructor requires a filename", - DeprecationWarning) - else: - self.load(filename, *args, **kwargs) - - def load(self, filename, *args, **kwargs): - raise NotImplementedError - - def __getitem__(self, key): - """Look up a metadata tag key. - - If the file has no tags at all, a KeyError is raised. - """ - if self.tags is None: raise KeyError, key - else: return self.tags[key] - - def __setitem__(self, key, value): - """Set a metadata tag. - - If the file has no tags, an appropriate format is added (but - not written until save is called). - """ - if self.tags is None: - self.add_tags() - self.tags[key] = value - - def __delitem__(self, key): - """Delete a metadata tag key. - - If the file has no tags at all, a KeyError is raised. - """ - if self.tags is None: raise KeyError, key - else: del(self.tags[key]) - - def keys(self): - """Return a list of keys in the metadata tag. - - If the file has no tags at all, an empty list is returned. - """ - if self.tags is None: return [] - else: return self.tags.keys() - - def delete(self, filename=None): - """Remove tags from a file.""" - if self.tags is not None: - if filename is None: - filename = self.filename - else: - warnings.warn( - "delete(filename=...) is deprecated, reload the file", - DeprecationWarning) - return self.tags.delete(filename) - - def save(self, filename=None, **kwargs): - """Save metadata tags.""" - if filename is None: - filename = self.filename - else: - warnings.warn( - "save(filename=...) is deprecated, reload the file", - DeprecationWarning) - if self.tags is not None: - return self.tags.save(filename, **kwargs) - else: raise ValueError("no tags in file") - - def pprint(self): - """Print stream information and comment key=value pairs.""" - stream = "%s (%s)" % (self.info.pprint(), self.mime[0]) - try: tags = self.tags.pprint() - except AttributeError: - return stream - else: return stream + ((tags and "\n" + tags) or "") - - def add_tags(self): - raise NotImplementedError - - def __get_mime(self): - mimes = [] - for Kind in type(self).__mro__: - for mime in getattr(Kind, '_mimes', []): - if mime not in mimes: - mimes.append(mime) - return mimes - - mime = property(__get_mime) - -def File(filename, options=None): - """Guess the type of the file and try to open it. - - The file type is decided by several things, such as the first 128 - bytes (which usually contains a file type identifier), the - filename extension, and the presence of existing tags. - - If no appropriate type could be found, None is returned. - """ - - if options is None: - from mutagen.asf import ASF - from mutagen.apev2 import APEv2File - from mutagen.flac import FLAC - from mutagen.id3 import ID3FileType - from mutagen.mp3 import MP3 - from mutagen.oggflac import OggFLAC - from mutagen.oggspeex import OggSpeex - from mutagen.oggtheora import OggTheora - from mutagen.oggvorbis import OggVorbis - from mutagen.trueaudio import TrueAudio - from mutagen.wavpack import WavPack - from mutagen.mp4 import MP4 - from mutagen.musepack import Musepack - from mutagen.monkeysaudio import MonkeysAudio - from mutagen.optimfrog import OptimFROG - options = [MP3, TrueAudio, OggTheora, OggSpeex, OggVorbis, OggFLAC, - FLAC, APEv2File, MP4, ID3FileType, WavPack, Musepack, - MonkeysAudio, OptimFROG, ASF] - - if not options: - return None - - fileobj = file(filename, "rb") - try: - header = fileobj.read(128) - results = [Kind.score(filename, fileobj, header) for Kind in options] - finally: - fileobj.close() - results = zip(results, options) - results.sort() - score, Kind = results[-1] - if score > 0: return Kind(filename) - else: return None diff --git a/lib/mutagen/_constants.py b/lib/mutagen/_constants.py deleted file mode 100644 index 2381e9790..000000000 --- a/lib/mutagen/_constants.py +++ /dev/null @@ -1,153 +0,0 @@ -"""Constants used by Mutagen.""" - -GENRES = [ - u"Blues", - u"Classic Rock", - u"Country", - u"Dance", - u"Disco", - u"Funk", - u"Grunge", - u"Hip-Hop", - u"Jazz", - u"Metal", - u"New Age", - u"Oldies", - u"Other", - u"Pop", - u"R&B", - u"Rap", - u"Reggae", - u"Rock", - u"Techno", - u"Industrial", - u"Alternative", - u"Ska", - u"Death Metal", - u"Pranks", - u"Soundtrack", - u"Euro-Techno", - u"Ambient", - u"Trip-Hop", - u"Vocal", - u"Jazz+Funk", - u"Fusion", - u"Trance", - u"Classical", - u"Instrumental", - u"Acid", - u"House", - u"Game", - u"Sound Clip", - u"Gospel", - u"Noise", - u"Alt. Rock", - u"Bass", - u"Soul", - u"Punk", - u"Space", - u"Meditative", - u"Instrumental Pop", - u"Instrumental Rock", - u"Ethnic", - u"Gothic", - u"Darkwave", - u"Techno-Industrial", - u"Electronic", - u"Pop-Folk", - u"Eurodance", - u"Dream", - u"Southern Rock", - u"Comedy", - u"Cult", - u"Gangsta", - u"Top 40", - u"Christian Rap", - u"Pop/Funk", - u"Jungle", - u"Native American", - u"Cabaret", - u"New Wave", - u"Psychadelic", - u"Rave", - u"Showtunes", - u"Trailer", - u"Lo-Fi", - u"Tribal", - u"Acid Punk", - u"Acid Jazz", - u"Polka", - u"Retro", - u"Musical", - u"Rock & Roll", - u"Hard Rock", - u"Folk", - u"Folk/Rock", - u"National Folk", - u"Swing", - u"Fusion", - u"Bebob", - u"Latin", - u"Revival", - u"Celtic", - u"Bluegrass", - u"Avantgarde", - u"Gothic Rock", - u"Progressive Rock", - u"Psychadelic Rock", - u"Symphonic Rock", - u"Slow Rock", - u"Big Band", - u"Chorus", - u"Easy Listening", - u"Acoustic", - u"Humour", - u"Speech", - u"Chanson", - u"Opera", - u"Chamber Music", - u"Sonata", - u"Symphony", - u"Booty Bass", - u"Primus", - u"Porn Groove", - u"Satire", - u"Slow Jam", - u"Club", - u"Tango", - u"Samba", - u"Folklore", - u"Ballad", - u"Power Ballad", - u"Rhythmic Soul", - u"Freestyle", - u"Duet", - u"Punk Rock", - u"Drum Solo", - u"A Capella", - u"Euro-House", - u"Dance Hall", - u"Goa", - u"Drum & Bass", - u"Club-House", - u"Hardcore", - u"Terror", - u"Indie", - u"BritPop", - u"Negerpunk", - u"Polsk Punk", - u"Beat", - u"Christian Gangsta Rap", - u"Heavy Metal", - u"Black Metal", - u"Crossover", - u"Contemporary Christian", - u"Christian Rock", - u"Merengue", - u"Salsa", - u"Thrash Metal", - u"Anime", - u"Jpop", - u"Synthpop" - ] -"""The ID3v1 genre list.""" diff --git a/lib/mutagen/_util.py b/lib/mutagen/_util.py deleted file mode 100644 index 390679a62..000000000 --- a/lib/mutagen/_util.py +++ /dev/null @@ -1,303 +0,0 @@ -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: _util.py 4275 2008-06-01 06:32:37Z piman $ - -"""Utility classes for Mutagen. - -You should not rely on the interfaces here being stable. They are -intended for internal use in Mutagen only. -""" - -import struct - -class DictMixin(object): - """Implement the dict API using keys() and __*item__ methods. - - Similar to UserDict.DictMixin, this takes a class that defines - __getitem__, __setitem__, __delitem__, and keys(), and turns it - into a full dict-like object. - - UserDict.DictMixin is not suitable for this purpose because it's - an old-style class. - - This class is not optimized for very large dictionaries; many - functions have linear memory requirements. I recommend you - override some of these functions if speed is required. - """ - - def __iter__(self): - return iter(self.keys()) - - def has_key(self, key): - try: self[key] - except KeyError: return False - else: return True - __contains__ = has_key - - iterkeys = lambda self: iter(self.keys()) - - def values(self): - return map(self.__getitem__, self.keys()) - itervalues = lambda self: iter(self.values()) - - def items(self): - return zip(self.keys(), self.values()) - iteritems = lambda s: iter(s.items()) - - def clear(self): - map(self.__delitem__, self.keys()) - - def pop(self, key, *args): - if len(args) > 1: - raise TypeError("pop takes at most two arguments") - try: value = self[key] - except KeyError: - if args: return args[0] - else: raise - del(self[key]) - return value - - def popitem(self): - try: - key = self.keys()[0] - return key, self.pop(key) - except IndexError: raise KeyError("dictionary is empty") - - def update(self, other=None, **kwargs): - if other is None: - self.update(kwargs) - other = {} - - try: map(self.__setitem__, other.keys(), other.values()) - except AttributeError: - for key, value in other: - self[key] = value - - def setdefault(self, key, default=None): - try: return self[key] - except KeyError: - self[key] = default - return default - - def get(self, key, default=None): - try: return self[key] - except KeyError: return default - - def __repr__(self): - return repr(dict(self.items())) - - def __cmp__(self, other): - if other is None: return 1 - else: return cmp(dict(self.items()), other) - - def __len__(self): - return len(self.keys()) - -class DictProxy(DictMixin): - def __init__(self, *args, **kwargs): - self.__dict = {} - super(DictProxy, self).__init__(*args, **kwargs) - - def __getitem__(self, key): - return self.__dict[key] - - def __setitem__(self, key, value): - self.__dict[key] = value - - def __delitem__(self, key): - del(self.__dict[key]) - - def keys(self): - return self.__dict.keys() - -class cdata(object): - """C character buffer to Python numeric type conversions.""" - - from struct import error - - short_le = staticmethod(lambda data: struct.unpack('h', data)[0]) - ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0]) - - int_le = staticmethod(lambda data: struct.unpack('i', data)[0]) - uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0]) - - longlong_le = staticmethod(lambda data: struct.unpack('q', data)[0]) - ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0]) - - to_short_le = staticmethod(lambda data: struct.pack('h', data)) - to_ushort_be = staticmethod(lambda data: struct.pack('>H', data)) - - to_int_le = staticmethod(lambda data: struct.pack('i', data)) - to_uint_be = staticmethod(lambda data: struct.pack('>I', data)) - - to_longlong_le = staticmethod(lambda data: struct.pack('q', data)) - to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data)) - - bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)])) - for val in range(256)]) - del(i) - del(val) - - test_bit = staticmethod(lambda value, n: bool((value >> n) & 1)) - -def lock(fileobj): - """Lock a file object 'safely'. - - That means a failure to lock because the platform doesn't - support fcntl or filesystem locks is not considered a - failure. This call does block. - - Returns whether or not the lock was successful, or - raises an exception in more extreme circumstances (full - lock table, invalid file). - """ - try: import fcntl - except ImportError: - return False - else: - try: fcntl.lockf(fileobj, fcntl.LOCK_EX) - except IOError: - # FIXME: There's possibly a lot of complicated - # logic that needs to go here in case the IOError - # is EACCES or EAGAIN. - return False - else: - return True - -def unlock(fileobj): - """Unlock a file object. - - Don't call this on a file object unless a call to lock() - returned true. - """ - # If this fails there's a mismatched lock/unlock pair, - # so we definitely don't want to ignore errors. - import fcntl - fcntl.lockf(fileobj, fcntl.LOCK_UN) - -def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16): - """Insert size bytes of empty space starting at offset. - - fobj must be an open file object, open rb+ or - equivalent. Mutagen tries to use mmap to resize the file, but - falls back to a significantly slower method if mmap fails. - """ - assert 0 < size - assert 0 <= offset - locked = False - fobj.seek(0, 2) - filesize = fobj.tell() - movesize = filesize - offset - fobj.write('\x00' * size) - fobj.flush() - try: - try: - import mmap - map = mmap.mmap(fobj.fileno(), filesize + size) - try: map.move(offset + size, offset, movesize) - finally: map.close() - except (ValueError, EnvironmentError, ImportError): - # handle broken mmap scenarios - locked = lock(fobj) - fobj.truncate(filesize) - - fobj.seek(0, 2) - padsize = size - # Don't generate an enormous string if we need to pad - # the file out several megs. - while padsize: - addsize = min(BUFFER_SIZE, padsize) - fobj.write("\x00" * addsize) - padsize -= addsize - - fobj.seek(filesize, 0) - while movesize: - # At the start of this loop, fobj is pointing at the end - # of the data we need to move, which is of movesize length. - thismove = min(BUFFER_SIZE, movesize) - # Seek back however much we're going to read this frame. - fobj.seek(-thismove, 1) - nextpos = fobj.tell() - # Read it, so we're back at the end. - data = fobj.read(thismove) - # Seek back to where we need to write it. - fobj.seek(-thismove + size, 1) - # Write it. - fobj.write(data) - # And seek back to the end of the unmoved data. - fobj.seek(nextpos) - movesize -= thismove - - fobj.flush() - finally: - if locked: - unlock(fobj) - -def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16): - """Delete size bytes of empty space starting at offset. - - fobj must be an open file object, open rb+ or - equivalent. Mutagen tries to use mmap to resize the file, but - falls back to a significantly slower method if mmap fails. - """ - locked = False - assert 0 < size - assert 0 <= offset - fobj.seek(0, 2) - filesize = fobj.tell() - movesize = filesize - offset - size - assert 0 <= movesize - try: - if movesize > 0: - fobj.flush() - try: - import mmap - map = mmap.mmap(fobj.fileno(), filesize) - try: map.move(offset, offset + size, movesize) - finally: map.close() - except (ValueError, EnvironmentError, ImportError): - # handle broken mmap scenarios - locked = lock(fobj) - fobj.seek(offset + size) - buf = fobj.read(BUFFER_SIZE) - while buf: - fobj.seek(offset) - fobj.write(buf) - offset += len(buf) - fobj.seek(offset + size) - buf = fobj.read(BUFFER_SIZE) - fobj.truncate(filesize - size) - fobj.flush() - finally: - if locked: - unlock(fobj) - -def utf8(data): - """Convert a basestring to a valid UTF-8 str.""" - if isinstance(data, str): - return data.decode("utf-8", "replace").encode("utf-8") - elif isinstance(data, unicode): - return data.encode("utf-8") - else: raise TypeError("only unicode/str types can be converted to UTF-8") diff --git a/lib/mutagen/_vorbis.py b/lib/mutagen/_vorbis.py deleted file mode 100644 index eb0e0b9a0..000000000 --- a/lib/mutagen/_vorbis.py +++ /dev/null @@ -1,226 +0,0 @@ -# Vorbis comment support for Mutagen -# Copyright 2005-2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of version 2 of the GNU General Public License as -# published by the Free Software Foundation. - -"""Read and write Vorbis comment data. - -Vorbis comments are freeform key/value pairs; keys are -case-insensitive ASCII and values are Unicode strings. A key may have -multiple values. - -The specification is at http://www.xiph.org/vorbis/doc/v-comment.html. -""" - -import sys - -from cStringIO import StringIO - -import mutagen -from mutagen._util import DictMixin, cdata - -try: set -except NameError: - from sets import Set as set - -def is_valid_key(key): - """Return true if a string is a valid Vorbis comment key. - - Valid Vorbis comment keys are printable ASCII between 0x20 (space) - and 0x7D ('}'), excluding '='. - """ - for c in key: - if c < " " or c > "}" or c == "=": return False - else: return bool(key) -istag = is_valid_key - -class error(IOError): pass -class VorbisUnsetFrameError(error): pass -class VorbisEncodingError(error): pass - -class VComment(mutagen.Metadata, list): - """A Vorbis comment parser, accessor, and renderer. - - All comment ordering is preserved. A VComment is a list of - key/value pairs, and so any Python list method can be used on it. - - Vorbis comments are always wrapped in something like an Ogg Vorbis - bitstream or a FLAC metadata block, so this loads string data or a - file-like object, not a filename. - - Attributes: - vendor -- the stream 'vendor' (i.e. writer); default 'Mutagen' - """ - - vendor = u"Mutagen " + mutagen.version_string - - def __init__(self, data=None, *args, **kwargs): - # Collect the args to pass to load, this lets child classes - # override just load and get equivalent magic for the - # constructor. - if data is not None: - if isinstance(data, str): - data = StringIO(data) - elif not hasattr(data, 'read'): - raise TypeError("VComment requires string data or a file-like") - self.load(data, *args, **kwargs) - - def load(self, fileobj, errors='replace', framing=True): - """Parse a Vorbis comment from a file-like object. - - Keyword arguments: - errors: - 'strict', 'replace', or 'ignore'. This affects Unicode decoding - and how other malformed content is interpreted. - framing -- if true, fail if a framing bit is not present - - Framing bits are required by the Vorbis comment specification, - but are not used in FLAC Vorbis comment blocks. - - """ - try: - vendor_length = cdata.uint_le(fileobj.read(4)) - self.vendor = fileobj.read(vendor_length).decode('utf-8', errors) - count = cdata.uint_le(fileobj.read(4)) - for i in range(count): - length = cdata.uint_le(fileobj.read(4)) - try: string = fileobj.read(length).decode('utf-8', errors) - except (OverflowError, MemoryError): - raise error("cannot read %d bytes, too large" % length) - try: tag, value = string.split('=', 1) - except ValueError, err: - if errors == "ignore": - continue - elif errors == "replace": - tag, value = u"unknown%d" % i, string - else: - raise VorbisEncodingError, str(err), sys.exc_info()[2] - try: tag = tag.encode('ascii', errors) - except UnicodeEncodeError: - raise VorbisEncodingError, "invalid tag name %r" % tag - else: - if is_valid_key(tag): self.append((tag, value)) - if framing and not ord(fileobj.read(1)) & 0x01: - raise VorbisUnsetFrameError("framing bit was unset") - except (cdata.error, TypeError): - raise error("file is not a valid Vorbis comment") - - def validate(self): - """Validate keys and values. - - Check to make sure every key used is a valid Vorbis key, and - that every value used is a valid Unicode or UTF-8 string. If - any invalid keys or values are found, a ValueError is raised. - """ - - if not isinstance(self.vendor, unicode): - try: self.vendor.decode('utf-8') - except UnicodeDecodeError: raise ValueError - - for key, value in self: - try: - if not is_valid_key(key): raise ValueError - except: raise ValueError("%r is not a valid key" % key) - if not isinstance(value, unicode): - try: value.encode("utf-8") - except: raise ValueError("%r is not a valid value" % value) - else: return True - - def clear(self): - """Clear all keys from the comment.""" - del(self[:]) - - def write(self, framing=True): - """Return a string representation of the data. - - Validation is always performed, so calling this function on - invalid data may raise a ValueError. - - Keyword arguments: - framing -- if true, append a framing bit (see load) - """ - - self.validate() - - f = StringIO() - f.write(cdata.to_uint_le(len(self.vendor.encode('utf-8')))) - f.write(self.vendor.encode('utf-8')) - f.write(cdata.to_uint_le(len(self))) - for tag, value in self: - comment = "%s=%s" % (tag, value.encode('utf-8')) - f.write(cdata.to_uint_le(len(comment))) - f.write(comment) - if framing: f.write("\x01") - return f.getvalue() - - def pprint(self): - return "\n".join(["%s=%s" % (k.lower(), v) for k, v in self]) - -class VCommentDict(VComment, DictMixin): - """A VComment that looks like a dictionary. - - This object differs from a dictionary in two ways. First, - len(comment) will still return the number of values, not the - number of keys. Secondly, iterating through the object will - iterate over (key, value) pairs, not keys. Since a key may have - multiple values, the same value may appear multiple times while - iterating. - - Since Vorbis comment keys are case-insensitive, all keys are - normalized to lowercase ASCII. - """ - - def __getitem__(self, key): - """A list of values for the key. - - This is a copy, so comment['title'].append('a title') will not - work. - - """ - key = key.lower().encode('ascii') - values = [value for (k, value) in self if k.lower() == key] - if not values: raise KeyError, key - else: return values - - def __delitem__(self, key): - """Delete all values associated with the key.""" - key = key.lower().encode('ascii') - to_delete = filter(lambda x: x[0].lower() == key, self) - if not to_delete:raise KeyError, key - else: map(self.remove, to_delete) - - def __contains__(self, key): - """Return true if the key has any values.""" - key = key.lower().encode('ascii') - for k, value in self: - if k.lower() == key: return True - else: return False - - def __setitem__(self, key, values): - """Set a key's value or values. - - Setting a value overwrites all old ones. The value may be a - list of Unicode or UTF-8 strings, or a single Unicode or UTF-8 - string. - - """ - key = key.lower().encode('ascii') - if not isinstance(values, list): - values = [values] - try: del(self[key]) - except KeyError: pass - for value in values: - self.append((key, value)) - - def keys(self): - """Return all keys in the comment.""" - return self and map(str.lower, set(zip(*self)[0])) - - def as_dict(self): - """Return a copy of the comment data in a real dict.""" - d = {} - for key, value in self: - d.setdefault(key, []).append(value) - return d diff --git a/lib/mutagen/apev2.py b/lib/mutagen/apev2.py deleted file mode 100644 index fcc9a1ec2..000000000 --- a/lib/mutagen/apev2.py +++ /dev/null @@ -1,465 +0,0 @@ -# An APEv2 tag reader -# -# Copyright 2005 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: apev2.py 4275 2008-06-01 06:32:37Z piman $ - -"""APEv2 reading and writing. - -The APEv2 format is most commonly used with Musepack files, but is -also the format of choice for WavPack and other formats. Some MP3s -also have APEv2 tags, but this can cause problems with many MP3 -decoders and taggers. - -APEv2 tags, like Vorbis comments, are freeform key=value pairs. APEv2 -keys can be any ASCII string with characters from 0x20 to 0x7E, -between 2 and 255 characters long. Keys are case-sensitive, but -readers are recommended to be case insensitive, and it is forbidden to -multiple keys which differ only in case. Keys are usually stored -title-cased (e.g. 'Artist' rather than 'artist'). - -APEv2 values are slightly more structured than Vorbis comments; values -are flagged as one of text, binary, or an external reference (usually -a URI). - -Based off the format specification found at -http://wiki.hydrogenaudio.org/index.php?title=APEv2_specification. -""" - -__all__ = ["APEv2", "APEv2File", "Open", "delete"] - -import struct -from cStringIO import StringIO - -def is_valid_apev2_key(key): - return (2 <= len(key) <= 255 and min(key) >= ' ' and max(key) <= '~' and - key not in ["OggS", "TAG", "ID3", "MP+"]) - -# There are three different kinds of APE tag values. -# "0: Item contains text information coded in UTF-8 -# 1: Item contains binary information -# 2: Item is a locator of external stored information [e.g. URL] -# 3: reserved" -TEXT, BINARY, EXTERNAL = range(3) - -HAS_HEADER = 1L << 31 -HAS_NO_FOOTER = 1L << 30 -IS_HEADER = 1L << 29 - -class error(IOError): pass -class APENoHeaderError(error, ValueError): pass -class APEUnsupportedVersionError(error, ValueError): pass -class APEBadItemError(error, ValueError): pass - -from mutagen import Metadata, FileType -from mutagen._util import DictMixin, cdata, utf8, delete_bytes - -class _APEv2Data(object): - # Store offsets of the important parts of the file. - start = header = data = footer = end = None - # Footer or header; seek here and read 32 to get version/size/items/flags - metadata = None - # Actual tag data - tag = None - - version = None - size = None - items = None - flags = 0 - - # The tag is at the start rather than the end. A tag at both - # the start and end of the file (i.e. the tag is the whole file) - # is not considered to be at the start. - is_at_start = False - - def __init__(self, fileobj): - self.__find_metadata(fileobj) - self.metadata = max(self.header, self.footer) - if self.metadata is None: return - self.__fill_missing(fileobj) - self.__fix_brokenness(fileobj) - if self.data is not None: - fileobj.seek(self.data) - self.tag = fileobj.read(self.size) - - def __find_metadata(self, fileobj): - # Try to find a header or footer. - - # Check for a simple footer. - try: fileobj.seek(-32, 2) - except IOError: - fileobj.seek(0, 2) - return - if fileobj.read(8) == "APETAGEX": - fileobj.seek(-8, 1) - self.footer = self.metadata = fileobj.tell() - return - - # Check for an APEv2 tag followed by an ID3v1 tag at the end. - try: - fileobj.seek(-128, 2) - if fileobj.read(3) == "TAG": - - fileobj.seek(-35, 1) # "TAG" + header length - if fileobj.read(8) == "APETAGEX": - fileobj.seek(-8, 1) - self.footer = fileobj.tell() - return - - # ID3v1 tag at the end, maybe preceded by Lyrics3v2. - # (http://www.id3.org/lyrics3200.html) - # (header length - "APETAGEX") - "LYRICS200" - fileobj.seek(15, 1) - if fileobj.read(9) == 'LYRICS200': - fileobj.seek(-15, 1) # "LYRICS200" + size tag - try: offset = int(fileobj.read(6)) - except ValueError: - raise IOError - - fileobj.seek(-32 - offset - 6, 1) - if fileobj.read(8) == "APETAGEX": - fileobj.seek(-8, 1) - self.footer = fileobj.tell() - return - - except IOError: - pass - - # Check for a tag at the start. - fileobj.seek(0, 0) - if fileobj.read(8) == "APETAGEX": - self.is_at_start = True - self.header = 0 - - def __fill_missing(self, fileobj): - fileobj.seek(self.metadata + 8) - self.version = fileobj.read(4) - self.size = cdata.uint_le(fileobj.read(4)) - self.items = cdata.uint_le(fileobj.read(4)) - self.flags = cdata.uint_le(fileobj.read(4)) - - if self.header is not None: - self.data = self.header + 32 - # If we're reading the header, the size is the header - # offset + the size, which includes the footer. - self.end = self.data + self.size - fileobj.seek(self.end - 32, 0) - if fileobj.read(8) == "APETAGEX": - self.footer = self.end - 32 - elif self.footer is not None: - self.end = self.footer + 32 - self.data = self.end - self.size - if self.flags & HAS_HEADER: - self.header = self.data - 32 - else: - self.header = self.data - else: raise APENoHeaderError("No APE tag found") - - def __fix_brokenness(self, fileobj): - # Fix broken tags written with PyMusepack. - if self.header is not None: start = self.header - else: start = self.data - fileobj.seek(start) - - while start > 0: - # Clean up broken writing from pre-Mutagen PyMusepack. - # It didn't remove the first 24 bytes of header. - try: fileobj.seek(-24, 1) - except IOError: - break - else: - if fileobj.read(8) == "APETAGEX": - fileobj.seek(-8, 1) - start = fileobj.tell() - else: break - self.start = start - -class APEv2(DictMixin, Metadata): - """A file with an APEv2 tag. - - ID3v1 tags are silently ignored and overwritten. - """ - - filename = None - - def __init__(self, *args, **kwargs): - self.__casemap = {} - self.__dict = {} - super(APEv2, self).__init__(*args, **kwargs) - # Internally all names are stored as lowercase, but the case - # they were set with is remembered and used when saving. This - # is roughly in line with the standard, which says that keys - # are case-sensitive but two keys differing only in case are - # not allowed, and recommends case-insensitive - # implementations. - - def pprint(self): - """Return tag key=value pairs in a human-readable format.""" - items = self.items() - items.sort() - return "\n".join(["%s=%s" % (k, v.pprint()) for k, v in items]) - - def load(self, filename): - """Load tags from a filename.""" - self.filename = filename - fileobj = file(filename, "rb") - try: - data = _APEv2Data(fileobj) - finally: - fileobj.close() - if data.tag: - self.clear() - self.__casemap.clear() - self.__parse_tag(data.tag, data.items) - else: - raise APENoHeaderError("No APE tag found") - - def __parse_tag(self, tag, count): - fileobj = StringIO(tag) - - for i in range(count): - size = cdata.uint_le(fileobj.read(4)) - flags = cdata.uint_le(fileobj.read(4)) - - # Bits 1 and 2 bits are flags, 0-3 - # Bit 0 is read/write flag, ignored - kind = (flags & 6) >> 1 - if kind == 3: - raise APEBadItemError("value type must be 0, 1, or 2") - key = value = fileobj.read(1) - while key[-1:] != '\x00' and value: - value = fileobj.read(1) - key += value - if key[-1:] == "\x00": - key = key[:-1] - value = fileobj.read(size) - self[key] = APEValue(value, kind) - - def __getitem__(self, key): - if not is_valid_apev2_key(key): - raise KeyError("%r is not a valid APEv2 key" % key) - return self.__dict[key.lower()] - - def __delitem__(self, key): - if not is_valid_apev2_key(key): - raise KeyError("%r is not a valid APEv2 key" % key) - del(self.__dict[key.lower()]) - - def __setitem__(self, key, value): - """'Magic' value setter. - - This function tries to guess at what kind of value you want to - store. If you pass in a valid UTF-8 or Unicode string, it - treats it as a text value. If you pass in a list, it treats it - as a list of string/Unicode values. If you pass in a string - that is not valid UTF-8, it assumes it is a binary value. - - If you need to force a specific type of value (e.g. binary - data that also happens to be valid UTF-8, or an external - reference), use the APEValue factory and set the value to the - result of that: - from mutagen.apev2 import APEValue, EXTERNAL - tag['Website'] = APEValue('http://example.org', EXTERNAL) - """ - - if not is_valid_apev2_key(key): - raise KeyError("%r is not a valid APEv2 key" % key) - - if not isinstance(value, _APEValue): - # let's guess at the content if we're not already a value... - if isinstance(value, unicode): - # unicode? we've got to be text. - value = APEValue(utf8(value), TEXT) - elif isinstance(value, list): - # list? text. - value = APEValue("\0".join(map(utf8, value)), TEXT) - else: - try: dummy = value.decode("utf-8") - except UnicodeError: - # invalid UTF8 text, probably binary - value = APEValue(value, BINARY) - else: - # valid UTF8, probably text - value = APEValue(value, TEXT) - self.__casemap[key.lower()] = key - self.__dict[key.lower()] = value - - def keys(self): - return [self.__casemap.get(key, key) for key in self.__dict.keys()] - - def save(self, filename=None): - """Save changes to a file. - - If no filename is given, the one most recently loaded is used. - - Tags are always written at the end of the file, and include - a header and a footer. - """ - - filename = filename or self.filename - try: - fileobj = file(filename, "r+b") - except IOError: - fileobj = file(filename, "w+b") - data = _APEv2Data(fileobj) - - if data.is_at_start: - delete_bytes(fileobj, data.end - data.start, data.start) - elif data.start is not None: - fileobj.seek(data.start) - # Delete an ID3v1 tag if present, too. - fileobj.truncate() - fileobj.seek(0, 2) - - # "APE tags items should be sorted ascending by size... This is - # not a MUST, but STRONGLY recommended. Actually the items should - # be sorted by importance/byte, but this is not feasible." - tags = [v._internal(k) for k, v in self.items()] - tags.sort(lambda a, b: cmp(len(a), len(b))) - num_tags = len(tags) - tags = "".join(tags) - - header = "APETAGEX%s%s" %( - # version, tag size, item count, flags - struct.pack("<4I", 2000, len(tags) + 32, num_tags, - HAS_HEADER | IS_HEADER), - "\0" * 8) - fileobj.write(header) - - fileobj.write(tags) - - footer = "APETAGEX%s%s" %( - # version, tag size, item count, flags - struct.pack("<4I", 2000, len(tags) + 32, num_tags, - HAS_HEADER), - "\0" * 8) - fileobj.write(footer) - fileobj.close() - - def delete(self, filename=None): - """Remove tags from a file.""" - filename = filename or self.filename - fileobj = file(filename, "r+b") - try: - data = _APEv2Data(fileobj) - if data.start is not None and data.size is not None: - delete_bytes(fileobj, data.end - data.start, data.start) - finally: - fileobj.close() - self.clear() - -Open = APEv2 - -def delete(filename): - """Remove tags from a file.""" - try: APEv2(filename).delete() - except APENoHeaderError: pass - -def APEValue(value, kind): - """APEv2 tag value factory. - - Use this if you need to specify the value's type manually. Binary - and text data are automatically detected by APEv2.__setitem__. - """ - if kind == TEXT: return APETextValue(value, kind) - elif kind == BINARY: return APEBinaryValue(value, kind) - elif kind == EXTERNAL: return APEExtValue(value, kind) - else: raise ValueError("kind must be TEXT, BINARY, or EXTERNAL") - -class _APEValue(object): - def __init__(self, value, kind): - self.kind = kind - self.value = value - - def __len__(self): - return len(self.value) - def __str__(self): - return self.value - - # Packed format for an item: - # 4B: Value length - # 4B: Value type - # Key name - # 1B: Null - # Key value - def _internal(self, key): - return "%s%s\0%s" %( - struct.pack("<2I", len(self.value), self.kind << 1), - key, self.value) - - def __repr__(self): - return "%s(%r, %d)" % (type(self).__name__, self.value, self.kind) - -class APETextValue(_APEValue): - """An APEv2 text value. - - Text values are Unicode/UTF-8 strings. They can be accessed like - strings (with a null seperating the values), or arrays of strings.""" - - def __unicode__(self): - return unicode(str(self), "utf-8") - - def __iter__(self): - """Iterate over the strings of the value (not the characters)""" - return iter(unicode(self).split("\0")) - - def __getitem__(self, index): - return unicode(self).split("\0")[index] - - def __len__(self): - return self.value.count("\0") + 1 - - def __cmp__(self, other): - return cmp(unicode(self), other) - - def __setitem__(self, index, value): - values = list(self) - values[index] = value.encode("utf-8") - self.value = "\0".join(values).encode("utf-8") - - def pprint(self): - return " / ".join(self) - -class APEBinaryValue(_APEValue): - """An APEv2 binary value.""" - - def pprint(self): return "[%d bytes]" % len(self) - -class APEExtValue(_APEValue): - """An APEv2 external value. - - External values are usually URI or IRI strings. - """ - def pprint(self): return "[External] %s" % unicode(self) - -class APEv2File(FileType): - class _Info(object): - length = 0 - bitrate = 0 - def __init__(self, fileobj): pass - pprint = staticmethod(lambda: "Unknown format with APEv2 tag.") - - def load(self, filename): - self.filename = filename - self.info = self._Info(file(filename, "rb")) - try: self.tags = APEv2(filename) - except error: self.tags = None - - def add_tags(self): - if self.tags is None: - self.tags = APEv2() - else: - raise ValueError("%r already has tags: %r" % (self, self.tags)) - - def score(filename, fileobj, header): - try: fileobj.seek(-160, 2) - except IOError: - fileobj.seek(0) - footer = fileobj.read() - filename = filename.lower() - return (("APETAGEX" in footer) - header.startswith("ID3")) - score = staticmethod(score) diff --git a/lib/mutagen/asf.py b/lib/mutagen/asf.py deleted file mode 100644 index 1d3d9568c..000000000 --- a/lib/mutagen/asf.py +++ /dev/null @@ -1,634 +0,0 @@ -# Copyright 2006-2007 Lukas Lalinsky -# Copyright 2005-2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: asf.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write ASF (Window Media Audio) files.""" - -__all__ = ["ASF", "Open"] - -import struct -from mutagen import FileType, Metadata -from mutagen._util import insert_bytes, delete_bytes, DictMixin - -class error(IOError): pass -class ASFError(error): pass -class ASFHeaderError(error): pass - - -class ASFInfo(object): - """ASF stream information.""" - - def __init__(self): - self.length = 0.0 - self.sample_rate = 0 - self.bitrate = 0 - self.channels = 0 - - def pprint(self): - s = "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % ( - self.bitrate, self.sample_rate, self.channels, self.length) - return s - - -class ASFTags(list, DictMixin, Metadata): - """Dictionary containing ASF attributes.""" - - def pprint(self): - return "\n".join(["%s=%s" % (k, v) for k, v in self]) - - def __getitem__(self, key): - """A list of values for the key. - - This is a copy, so comment['title'].append('a title') will not - work. - - """ - values = [value for (k, value) in self if k == key] - if not values: raise KeyError, key - else: return values - - def __delitem__(self, key): - """Delete all values associated with the key.""" - to_delete = filter(lambda x: x[0] == key, self) - if not to_delete: raise KeyError, key - else: map(self.remove, to_delete) - - def __contains__(self, key): - """Return true if the key has any values.""" - for k, value in self: - if k == key: return True - else: return False - - def __setitem__(self, key, values): - """Set a key's value or values. - - Setting a value overwrites all old ones. The value may be a - list of Unicode or UTF-8 strings, or a single Unicode or UTF-8 - string. - - """ - if not isinstance(values, list): - values = [values] - try: del(self[key]) - except KeyError: pass - for value in values: - if key in _standard_attribute_names: - value = unicode(value) - elif not isinstance(value, ASFBaseAttribute): - if isinstance(value, basestring): - value = ASFUnicodeAttribute(value) - elif isinstance(value, bool): - value = ASFBoolAttribute(value) - elif isinstance(value, int): - value = ASFDWordAttribute(value) - elif isinstance(value, long): - value = ASFQWordAttribute(value) - self.append((key, value)) - - def keys(self): - """Return all keys in the comment.""" - return self and set(zip(*self)[0]) - - def as_dict(self): - """Return a copy of the comment data in a real dict.""" - d = {} - for key, value in self: - d.setdefault(key, []).append(value) - return d - - -class ASFBaseAttribute(object): - """Generic attribute.""" - TYPE = None - - def __init__(self, value=None, data=None, language=None, - stream=None, **kwargs): - self.language = language - self.stream = stream - if data: - self.value = self.parse(data, **kwargs) - else: - self.value = value - - def __repr__(self): - name = "%s(%r" % (type(self).__name__, self.value) - if self.language: - name += ", language=%d" % self.language - if self.stream: - name += ", stream=%d" % self.stream - name += ")" - return name - - def render(self, name): - name = name.encode("utf-16-le") + "\x00\x00" - data = self._render() - return (struct.pack(" self.size: - insert_bytes(fileobj, size - self.size, self.size) - if size < self.size: - delete_bytes(fileobj, self.size - size, 0) - fileobj.seek(0) - fileobj.write(data) - finally: - fileobj.close() - - def __read_file(self, fileobj): - header = fileobj.read(30) - if len(header) != 30 or header[:16] != HeaderObject.GUID: - raise ASFHeaderError, "Not an ASF file." - - self.extended_content_description_obj = None - self.content_description_obj = None - self.header_extension_obj = None - self.metadata_obj = None - self.metadata_library_obj = None - - self.size, self.num_objects = struct.unpack(" 2**24: - raise error("block is too long to write") - length = struct.pack(">I", len(datum))[-3:] - data.append(byte + length + datum) - return "".join(data) - writeblocks = staticmethod(writeblocks) - - def group_padding(blocks): - """Consolidate FLAC padding metadata blocks. - - The overall size of the rendered blocks does not change, so - this adds several bytes of padding for each merged block.""" - paddings = filter(lambda x: isinstance(x, Padding), blocks) - map(blocks.remove, paddings) - padding = Padding() - # total padding size is the sum of padding sizes plus 4 bytes - # per removed header. - size = sum([padding.length for padding in paddings]) - padding.length = size + 4 * (len(paddings) - 1) - blocks.append(padding) - group_padding = staticmethod(group_padding) - -class StreamInfo(MetadataBlock): - """FLAC stream information. - - This contains information about the audio data in the FLAC file. - Unlike most stream information objects in Mutagen, changes to this - one will rewritten to the file when it is saved. Unless you are - actually changing the audio stream itself, don't change any - attributes of this block. - - Attributes: - min_blocksize -- minimum audio block size - max_blocksize -- maximum audio block size - sample_rate -- audio sample rate in Hz - channels -- audio channels (1 for mono, 2 for stereo) - bits_per_sample -- bits per sample - total_samples -- total samples in file - length -- audio length in seconds - """ - - code = 0 - - def __eq__(self, other): - try: return (self.min_blocksize == other.min_blocksize and - self.max_blocksize == other.max_blocksize and - self.sample_rate == other.sample_rate and - self.channels == other.channels and - self.bits_per_sample == other.bits_per_sample and - self.total_samples == other.total_samples) - except: return False - - def load(self, data): - self.min_blocksize = int(to_int_be(data.read(2))) - self.max_blocksize = int(to_int_be(data.read(2))) - self.min_framesize = int(to_int_be(data.read(3))) - self.max_framesize = int(to_int_be(data.read(3))) - # first 16 bits of sample rate - sample_first = to_int_be(data.read(2)) - # last 4 bits of sample rate, 3 of channels, first 1 of bits/sample - sample_channels_bps = to_int_be(data.read(1)) - # last 4 of bits/sample, 36 of total samples - bps_total = to_int_be(data.read(5)) - - sample_tail = sample_channels_bps >> 4 - self.sample_rate = int((sample_first << 4) + sample_tail) - self.channels = int(((sample_channels_bps >> 1) & 7) + 1) - bps_tail = bps_total >> 36 - bps_head = (sample_channels_bps & 1) << 4 - self.bits_per_sample = int(bps_head + bps_tail + 1) - self.total_samples = bps_total & 0xFFFFFFFFFL - self.length = self.total_samples / float(self.sample_rate) - - self.md5_signature = to_int_be(data.read(16)) - - def write(self): - f = StringIO() - f.write(struct.pack(">I", self.min_blocksize)[-2:]) - f.write(struct.pack(">I", self.max_blocksize)[-2:]) - f.write(struct.pack(">I", self.min_framesize)[-3:]) - f.write(struct.pack(">I", self.max_framesize)[-3:]) - - # first 16 bits of sample rate - f.write(struct.pack(">I", self.sample_rate >> 4)[-2:]) - # 4 bits sample, 3 channel, 1 bps - byte = (self.sample_rate & 0xF) << 4 - byte += ((self.channels - 1) & 3) << 1 - byte += ((self.bits_per_sample - 1) >> 4) & 1 - f.write(chr(byte)) - # 4 bits of bps, 4 of sample count - byte = ((self.bits_per_sample - 1) & 0xF) << 4 - byte += (self.total_samples >> 32) & 0xF - f.write(chr(byte)) - # last 32 of sample count - f.write(struct.pack(">I", self.total_samples & 0xFFFFFFFFL)) - # MD5 signature - sig = self.md5_signature - f.write(struct.pack( - ">4I", (sig >> 96) & 0xFFFFFFFFL, (sig >> 64) & 0xFFFFFFFFL, - (sig >> 32) & 0xFFFFFFFFL, sig & 0xFFFFFFFFL)) - return f.getvalue() - - def pprint(self): - return "FLAC, %.2f seconds, %d Hz" % (self.length, self.sample_rate) - -class SeekPoint(tuple): - """A single seek point in a FLAC file. - - Placeholder seek points have first_sample of 0xFFFFFFFFFFFFFFFFL, - and byte_offset and num_samples undefined. Seek points must be - sorted in ascending order by first_sample number. Seek points must - be unique by first_sample number, except for placeholder - points. Placeholder points must occur last in the table and there - may be any number of them. - - Attributes: - first_sample -- sample number of first sample in the target frame - byte_offset -- offset from first frame to target frame - num_samples -- number of samples in target frame - """ - - def __new__(cls, first_sample, byte_offset, num_samples): - return super(cls, SeekPoint).__new__(cls, (first_sample, - byte_offset, num_samples)) - first_sample = property(lambda self: self[0]) - byte_offset = property(lambda self: self[1]) - num_samples = property(lambda self: self[2]) - -class SeekTable(MetadataBlock): - """Read and write FLAC seek tables. - - Attributes: - seekpoints -- list of SeekPoint objects - """ - - __SEEKPOINT_FORMAT = '>QQH' - __SEEKPOINT_SIZE = struct.calcsize(__SEEKPOINT_FORMAT) - - code = 3 - - def __init__(self, data): - self.seekpoints = [] - super(SeekTable, self).__init__(data) - - def __eq__(self, other): - try: return (self.seekpoints == other.seekpoints) - except (AttributeError, TypeError): return False - - def load(self, data): - self.seekpoints = [] - sp = data.read(self.__SEEKPOINT_SIZE) - while len(sp) == self.__SEEKPOINT_SIZE: - self.seekpoints.append(SeekPoint( - *struct.unpack(self.__SEEKPOINT_FORMAT, sp))) - sp = data.read(self.__SEEKPOINT_SIZE) - - def write(self): - f = StringIO() - for seekpoint in self.seekpoints: - packed = struct.pack(self.__SEEKPOINT_FORMAT, - seekpoint.first_sample, seekpoint.byte_offset, - seekpoint.num_samples) - f.write(packed) - return f.getvalue() - - def __repr__(self): - return "<%s seekpoints=%r>" % (type(self).__name__, self.seekpoints) - -class VCFLACDict(VCommentDict): - """Read and write FLAC Vorbis comments. - - FLACs don't use the framing bit at the end of the comment block. - So this extends VCommentDict to not use the framing bit. - """ - - code = 4 - - def load(self, data, errors='replace', framing=False): - super(VCFLACDict, self).load(data, errors=errors, framing=framing) - - def write(self, framing=False): - return super(VCFLACDict, self).write(framing=framing) - -class CueSheetTrackIndex(tuple): - """Index for a track in a cuesheet. - - For CD-DA, an index_number of 0 corresponds to the track - pre-gap. The first index in a track must have a number of 0 or 1, - and subsequently, index_numbers must increase by 1. Index_numbers - must be unique within a track. And index_offset must be evenly - divisible by 588 samples. - - Attributes: - index_number -- index point number - index_offset -- offset in samples from track start - """ - - def __new__(cls, index_number, index_offset): - return super(cls, CueSheetTrackIndex).__new__(cls, - (index_number, index_offset)) - index_number = property(lambda self: self[0]) - index_offset = property(lambda self: self[1]) - -class CueSheetTrack(object): - """A track in a cuesheet. - - For CD-DA, track_numbers must be 1-99, or 170 for the - lead-out. Track_numbers must be unique within a cue sheet. There - must be atleast one index in every track except the lead-out track - which must have none. - - Attributes: - track_number -- track number - start_offset -- track offset in samples from start of FLAC stream - isrc -- ISRC code - type -- 0 for audio, 1 for digital data - pre_emphasis -- true if the track is recorded with pre-emphasis - indexes -- list of CueSheetTrackIndex objects - """ - - def __init__(self, track_number, start_offset, isrc='', type_=0, - pre_emphasis=False): - self.track_number = track_number - self.start_offset = start_offset - self.isrc = isrc - self.type = type_ - self.pre_emphasis = pre_emphasis - self.indexes = [] - - def __eq__(self, other): - try: return (self.track_number == other.track_number and - self.start_offset == other.start_offset and - self.isrc == other.isrc and - self.type == other.type and - self.pre_emphasis == other.pre_emphasis and - self.indexes == other.indexes) - except (AttributeError, TypeError): return False - - def __repr__(self): - return ("<%s number=%r, offset=%d, isrc=%r, type=%r, " - "pre_emphasis=%r, indexes=%r)>") % ( - type(self).__name__, self.track_number, self.start_offset, - self.isrc, self.type, self.pre_emphasis, self.indexes) - -class CueSheet(MetadataBlock): - """Read and write FLAC embedded cue sheets. - - Number of tracks should be from 1 to 100. There should always be - exactly one lead-out track and that track must be the last track - in the cue sheet. - - Attributes: - media_catalog_number -- media catalog number in ASCII - lead_in_samples -- number of lead-in samples - compact_disc -- true if the cuesheet corresponds to a compact disc - tracks -- list of CueSheetTrack objects - lead_out -- lead-out as CueSheetTrack or None if lead-out was not found - """ - - __CUESHEET_FORMAT = '>128sQB258xB' - __CUESHEET_SIZE = struct.calcsize(__CUESHEET_FORMAT) - __CUESHEET_TRACK_FORMAT = '>QB12sB13xB' - __CUESHEET_TRACK_SIZE = struct.calcsize(__CUESHEET_TRACK_FORMAT) - __CUESHEET_TRACKINDEX_FORMAT = '>QB3x' - __CUESHEET_TRACKINDEX_SIZE = struct.calcsize(__CUESHEET_TRACKINDEX_FORMAT) - - code = 5 - - media_catalog_number = '' - lead_in_samples = 88200 - compact_disc = True - - def __init__(self, data): - self.tracks = [] - super(CueSheet, self).__init__(data) - - def __eq__(self, other): - try: - return (self.media_catalog_number == other.media_catalog_number and - self.lead_in_samples == other.lead_in_samples and - self.compact_disc == other.compact_disc and - self.tracks == other.tracks) - except (AttributeError, TypeError): return False - - def load(self, data): - header = data.read(self.__CUESHEET_SIZE) - media_catalog_number, lead_in_samples, flags, num_tracks = \ - struct.unpack(self.__CUESHEET_FORMAT, header) - self.media_catalog_number = media_catalog_number.rstrip('\0') - self.lead_in_samples = lead_in_samples - self.compact_disc = bool(flags & 0x80) - self.tracks = [] - for i in range(num_tracks): - track = data.read(self.__CUESHEET_TRACK_SIZE) - start_offset, track_number, isrc_padded, flags, num_indexes = \ - struct.unpack(self.__CUESHEET_TRACK_FORMAT, track) - isrc = isrc_padded.rstrip('\0') - type_ = (flags & 0x80) >> 7 - pre_emphasis = bool(flags & 0x40) - val = CueSheetTrack( - track_number, start_offset, isrc, type_, pre_emphasis) - for j in range(num_indexes): - index = data.read(self.__CUESHEET_TRACKINDEX_SIZE) - index_offset, index_number = struct.unpack( - self.__CUESHEET_TRACKINDEX_FORMAT, index) - val.indexes.append( - CueSheetTrackIndex(index_number, index_offset)) - self.tracks.append(val) - - def write(self): - f = StringIO() - flags = 0 - if self.compact_disc: flags |= 0x80 - packed = struct.pack( - self.__CUESHEET_FORMAT, self.media_catalog_number, - self.lead_in_samples, flags, len(self.tracks)) - f.write(packed) - for track in self.tracks: - track_flags = 0 - track_flags |= (track.type & 1) << 7 - if track.pre_emphasis: track_flags |= 0x40 - track_packed = struct.pack( - self.__CUESHEET_TRACK_FORMAT, track.start_offset, - track.track_number, track.isrc, track_flags, - len(track.indexes)) - f.write(track_packed) - for index in track.indexes: - index_packed = struct.pack( - self.__CUESHEET_TRACKINDEX_FORMAT, - index.index_offset, index.index_number) - f.write(index_packed) - return f.getvalue() - - def __repr__(self): - return ("<%s media_catalog_number=%r, lead_in=%r, compact_disc=%r, " - "tracks=%r>") % ( - type(self).__name__, self.media_catalog_number, - self.lead_in_samples, self.compact_disc, self.tracks) - -class Picture(MetadataBlock): - """Read and write FLAC embed pictures. - - Attributes: - type -- picture type (same as types for ID3 APIC frames) - mime -- MIME type of the picture - desc -- picture's description - width -- width in pixels - height -- height in pixels - depth -- color depth in bits-per-pixel - colors -- number of colors for indexed palettes (like GIF), - 0 for non-indexed - data -- picture data - """ - - code = 6 - - def __init__(self, data=None): - self.type = 0 - self.mime = u'' - self.desc = u'' - self.width = 0 - self.height = 0 - self.depth = 0 - self.colors = 0 - self.data = '' - super(Picture, self).__init__(data) - - def __eq__(self, other): - try: return (self.type == other.type and - self.mime == other.mime and - self.desc == other.desc and - self.width == other.width and - self.height == other.height and - self.depth == other.depth and - self.colors == other.colors and - self.data == other.data) - except (AttributeError, TypeError): return False - - def load(self, data): - self.type, length = struct.unpack('>2I', data.read(8)) - self.mime = data.read(length).decode('UTF-8', 'replace') - length, = struct.unpack('>I', data.read(4)) - self.desc = data.read(length).decode('UTF-8', 'replace') - (self.width, self.height, self.depth, - self.colors, length) = struct.unpack('>5I', data.read(20)) - self.data = data.read(length) - - def write(self): - f = StringIO() - mime = self.mime.encode('UTF-8') - f.write(struct.pack('>2I', self.type, len(mime))) - f.write(mime) - desc = self.desc.encode('UTF-8') - f.write(struct.pack('>I', len(desc))) - f.write(desc) - f.write(struct.pack('>5I', self.width, self.height, self.depth, - self.colors, len(self.data))) - f.write(self.data) - return f.getvalue() - - def __repr__(self): - return "<%s '%s' (%d bytes)>" % (type(self).__name__, self.mime, - len(self.data)) - -class Padding(MetadataBlock): - """Empty padding space for metadata blocks. - - To avoid rewriting the entire FLAC file when editing comments, - metadata is often padded. Padding should occur at the end, and no - more than one padding block should be in any FLAC file. Mutagen - handles this with MetadataBlock.group_padding. - """ - - code = 1 - - def __init__(self, data=""): super(Padding, self).__init__(data) - def load(self, data): self.length = len(data.read()) - def write(self): - try: return "\x00" * self.length - # On some 64 bit platforms this won't generate a MemoryError - # or OverflowError since you might have enough RAM, but it - # still generates a ValueError. On other 64 bit platforms, - # this will still succeed for extremely large values. - # Those should never happen in the real world, and if they - # do, writeblocks will catch it. - except (OverflowError, ValueError, MemoryError): - raise error("cannot write %d bytes" % self.length) - def __eq__(self, other): - return isinstance(other, Padding) and self.length == other.length - def __repr__(self): - return "<%s (%d bytes)>" % (type(self).__name__, self.length) - -class FLAC(FileType): - """A FLAC audio file. - - Attributes: - info -- stream information (length, bitrate, sample rate) - tags -- metadata tags, if any - cuesheet -- CueSheet object, if any - seektable -- SeekTable object, if any - pictures -- list of embedded pictures - """ - - _mimes = ["audio/x-flac", "application/x-flac"] - - METADATA_BLOCKS = [StreamInfo, Padding, None, SeekTable, VCFLACDict, - CueSheet, Picture] - """Known metadata block types, indexed by ID.""" - - def score(filename, fileobj, header): - return header.startswith("fLaC") - score = staticmethod(score) - - def __read_metadata_block(self, file): - byte = ord(file.read(1)) - size = to_int_be(file.read(3)) - try: - data = file.read(size) - if len(data) != size: - raise error( - "file said %d bytes, read %d bytes" % (size, len(data))) - block = self.METADATA_BLOCKS[byte & 0x7F](data) - except (IndexError, TypeError): - block = MetadataBlock(data) - block.code = byte & 0x7F - self.metadata_blocks.append(block) - else: - self.metadata_blocks.append(block) - if block.code == VCFLACDict.code: - if self.tags is None: self.tags = block - else: raise FLACVorbisError("> 1 Vorbis comment block found") - elif block.code == CueSheet.code: - if self.cuesheet is None: self.cuesheet = block - else: raise error("> 1 CueSheet block found") - elif block.code == SeekTable.code: - if self.seektable is None: self.seektable = block - else: raise error("> 1 SeekTable block found") - return (byte >> 7) ^ 1 - - def add_tags(self): - """Add a Vorbis comment block to the file.""" - if self.tags is None: - self.tags = VCFLACDict() - self.metadata_blocks.append(self.tags) - else: raise FLACVorbisError("a Vorbis comment already exists") - add_vorbiscomment = add_tags - - def delete(self, filename=None): - """Remove Vorbis comments from a file. - - If no filename is given, the one most recently loaded is used. - """ - if filename is None: filename = self.filename - for s in list(self.metadata_blocks): - if isinstance(s, VCFLACDict): - self.metadata_blocks.remove(s) - self.tags = None - self.save() - break - - vc = property(lambda s: s.tags, doc="Alias for tags; don't use this.") - - def load(self, filename): - """Load file information from a filename.""" - - self.metadata_blocks = [] - self.tags = None - self.cuesheet = None - self.seektable = None - self.filename = filename - fileobj = file(filename, "rb") - try: - self.__check_header(fileobj) - while self.__read_metadata_block(fileobj): pass - finally: - fileobj.close() - - try: self.metadata_blocks[0].length - except (AttributeError, IndexError): - raise FLACNoHeaderError("Stream info block not found") - - info = property(lambda s: s.metadata_blocks[0]) - - def add_picture(self, picture): - """Add a new picture to the file.""" - self.metadata_blocks.append(picture) - - def clear_pictures(self): - """Delete all pictures from the file.""" - self.metadata_blocks = filter(lambda b: b.code != Picture.code, - self.metadata_blocks) - - def __get_pictures(self): - return filter(lambda b: b.code == Picture.code, self.metadata_blocks) - pictures = property(__get_pictures, doc="List of embedded pictures") - - def save(self, filename=None, deleteid3=False): - """Save metadata blocks to a file. - - If no filename is given, the one most recently loaded is used. - """ - - if filename is None: filename = self.filename - f = open(filename, 'rb+') - - # Ensure we've got padding at the end, and only at the end. - # If adding makes it too large, we'll scale it down later. - self.metadata_blocks.append(Padding('\x00' * 1020)) - MetadataBlock.group_padding(self.metadata_blocks) - - header = self.__check_header(f) - available = self.__find_audio_offset(f) - header # "fLaC" and maybe ID3 - data = MetadataBlock.writeblocks(self.metadata_blocks) - - # Delete ID3v2 - if deleteid3 and header > 4: - available += header - 4 - header = 4 - - if len(data) > available: - # If we have too much data, see if we can reduce padding. - padding = self.metadata_blocks[-1] - newlength = padding.length - (len(data) - available) - if newlength > 0: - padding.length = newlength - data = MetadataBlock.writeblocks(self.metadata_blocks) - assert len(data) == available - - elif len(data) < available: - # If we have too little data, increase padding. - self.metadata_blocks[-1].length += (available - len(data)) - data = MetadataBlock.writeblocks(self.metadata_blocks) - assert len(data) == available - - if len(data) != available: - # We couldn't reduce the padding enough. - diff = (len(data) - available) - insert_bytes(f, diff, header) - - f.seek(header - 4) - f.write("fLaC" + data) - - # Delete ID3v1 - if deleteid3: - try: f.seek(-128, 2) - except IOError: pass - else: - if f.read(3) == "TAG": - f.seek(-128, 2) - f.truncate() - - def __find_audio_offset(self, fileobj): - byte = 0x00 - while not (byte >> 7) & 1: - byte = ord(fileobj.read(1)) - size = to_int_be(fileobj.read(3)) - fileobj.read(size) - return fileobj.tell() - - def __check_header(self, fileobj): - size = 4 - header = fileobj.read(4) - if header != "fLaC": - size = None - if header[:3] == "ID3": - size = 14 + BitPaddedInt(fileobj.read(6)[2:]) - fileobj.seek(size - 4) - if fileobj.read(4) != "fLaC": size = None - if size is None: - raise FLACNoHeaderError( - "%r is not a valid FLAC file" % fileobj.name) - return size - -Open = FLAC - -def delete(filename): - """Remove tags from a file.""" - FLAC(filename).delete() diff --git a/lib/mutagen/id3.py b/lib/mutagen/id3.py deleted file mode 100644 index fb1357e43..000000000 --- a/lib/mutagen/id3.py +++ /dev/null @@ -1,1956 +0,0 @@ -# id3 support for mutagen -# Copyright (C) 2005 Michael Urman -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of version 2 of the GNU General Public License as -# published by the Free Software Foundation. -# -# $Id: id3.py 4275 2008-06-01 06:32:37Z piman $ - -"""ID3v2 reading and writing. - -This is based off of the following references: - http://www.id3.org/id3v2.4.0-structure.txt - http://www.id3.org/id3v2.4.0-frames.txt - http://www.id3.org/id3v2.3.0.html - http://www.id3.org/id3v2-00.txt - http://www.id3.org/id3v1.html - -Its largest deviation from the above (versions 2.3 and 2.2) is that it -will not interpret the / characters as a separator, and will almost -always accept null separators to generate multi-valued text frames. - -Because ID3 frame structure differs between frame types, each frame is -implemented as a different class (e.g. TIT2 as mutagen.id3.TIT2). Each -frame's documentation contains a list of its attributes. - -Since this file's documentation is a little unwieldy, you are probably -interested in the 'ID3' class to start with. -""" - -__all__ = ['ID3', 'ID3FileType', 'Frames', 'Open', 'delete'] - -import struct; from struct import unpack, pack -from zlib import error as zlibError -from warnings import warn - -import mutagen -from mutagen._util import insert_bytes, delete_bytes, DictProxy - -class error(Exception): pass -class ID3NoHeaderError(error, ValueError): pass -class ID3BadUnsynchData(error, ValueError): pass -class ID3BadCompressedData(error, ValueError): pass -class ID3TagError(error, ValueError): pass -class ID3UnsupportedVersionError(error, NotImplementedError): pass -class ID3EncryptionUnsupportedError(error, NotImplementedError): pass -class ID3JunkFrameError(error, ValueError): pass - -class ID3Warning(error, UserWarning): pass - -def is_valid_frame_id(frame_id): - return frame_id.isalnum() and frame_id.isupper() - -class ID3(DictProxy, mutagen.Metadata): - """A file with an ID3v2 tag. - - Attributes: - version -- ID3 tag version as a tuple - unknown_frames -- raw frame data of any unknown frames found - size -- the total size of the ID3 tag, including the header - """ - - PEDANTIC = True - version = (2, 4, 0) - - filename = None - size = 0 - __flags = 0 - __readbytes = 0 - __crc = None - - def __init__(self, *args, **kwargs): - self.unknown_frames = [] - super(ID3, self).__init__(*args, **kwargs) - - def __fullread(self, size): - try: - if size < 0: - raise ValueError('Requested bytes (%s) less than zero' % size) - if size > self.__filesize: - raise EOFError('Requested %#x of %#x (%s)' % - (long(size), long(self.__filesize), self.filename)) - except AttributeError: pass - data = self.__fileobj.read(size) - if len(data) != size: raise EOFError - self.__readbytes += size - return data - - def load(self, filename, known_frames=None, translate=True): - """Load tags from a filename. - - Keyword arguments: - filename -- filename to load tag data from - known_frames -- dict mapping frame IDs to Frame objects - translate -- Update all tags to ID3v2.4 internally. Mutagen is - only capable of writing ID3v2.4 tags, so if you - intend to save, this must be true. - - Example of loading a custom frame: - my_frames = dict(mutagen.id3.Frames) - class XMYF(Frame): ... - my_frames["XMYF"] = XMYF - mutagen.id3.ID3(filename, known_frames=my_frames) - """ - - from os.path import getsize - self.filename = filename - self.__known_frames = known_frames - self.__fileobj = file(filename, 'rb') - self.__filesize = getsize(filename) - try: - try: - self.__load_header() - except EOFError: - self.size = 0 - raise ID3NoHeaderError("%s: too small (%d bytes)" %( - filename, self.__filesize)) - except (ID3NoHeaderError, ID3UnsupportedVersionError), err: - self.size = 0 - import sys - stack = sys.exc_info()[2] - try: self.__fileobj.seek(-128, 2) - except EnvironmentError: raise err, None, stack - else: - frames = ParseID3v1(self.__fileobj.read(128)) - if frames is not None: - self.version = (1, 1) - map(self.add, frames.values()) - else: raise err, None, stack - else: - frames = self.__known_frames - if frames is None: - if (2,3,0) <= self.version: frames = Frames - elif (2,2,0) <= self.version: frames = Frames_2_2 - data = self.__fullread(self.size - 10) - for frame in self.__read_frames(data, frames=frames): - if isinstance(frame, Frame): self.add(frame) - else: self.unknown_frames.append(frame) - finally: - self.__fileobj.close() - del self.__fileobj - del self.__filesize - if translate: - self.update_to_v24() - - def getall(self, key): - """Return all frames with a given name (the list may be empty). - - This is best explained by examples: - id3.getall('TIT2') == [id3['TIT2']] - id3.getall('TTTT') == [] - id3.getall('TXXX') == [TXXX(desc='woo', text='bar'), - TXXX(desc='baz', text='quuuux'), ...] - - Since this is based on the frame's HashKey, which is - colon-separated, you can use it to do things like - getall('COMM:MusicMatch') or getall('TXXX:QuodLibet:'). - """ - if key in self: return [self[key]] - else: - key = key + ":" - return [v for s,v in self.items() if s.startswith(key)] - - def delall(self, key): - """Delete all tags of a given kind; see getall.""" - if key in self: del(self[key]) - else: - key = key + ":" - for k in filter(lambda s: s.startswith(key), self.keys()): - del(self[k]) - - def setall(self, key, values): - """Delete frames of the given type and add frames in 'values'.""" - self.delall(key) - for tag in values: - self[tag.HashKey] = tag - - def pprint(self): - """Return tags in a human-readable format. - - "Human-readable" is used loosely here. The format is intended - to mirror that used for Vorbis or APEv2 output, e.g. - TIT2=My Title - However, ID3 frames can have multiple keys: - POPM=user@example.org=3 128/255 - """ - return "\n".join(map(Frame.pprint, self.values())) - - def loaded_frame(self, tag): - """Deprecated; use the add method.""" - # turn 2.2 into 2.3/2.4 tags - if len(type(tag).__name__) == 3: tag = type(tag).__base__(tag) - self[tag.HashKey] = tag - - # add = loaded_frame (and vice versa) break applications that - # expect to be able to override loaded_frame (e.g. Quod Libet), - # as does making loaded_frame call add. - def add(self, frame): - """Add a frame to the tag.""" - return self.loaded_frame(frame) - - def __load_header(self): - fn = self.filename - data = self.__fullread(10) - id3, vmaj, vrev, flags, size = unpack('>3sBBB4s', data) - self.__flags = flags - self.size = BitPaddedInt(size) + 10 - self.version = (2, vmaj, vrev) - - if id3 != 'ID3': - raise ID3NoHeaderError("'%s' doesn't start with an ID3 tag" % fn) - if vmaj not in [2, 3, 4]: - raise ID3UnsupportedVersionError("'%s' ID3v2.%d not supported" - % (fn, vmaj)) - - if self.PEDANTIC: - if (2,4,0) <= self.version and (flags & 0x0f): - raise ValueError("'%s' has invalid flags %#02x" % (fn, flags)) - elif (2,3,0) <= self.version and (flags & 0x1f): - raise ValueError("'%s' has invalid flags %#02x" % (fn, flags)) - - if self.f_extended: - if self.version >= (2,4,0): - # "Where the 'Extended header size' is the size of the whole - # extended header, stored as a 32 bit synchsafe integer." - self.__extsize = BitPaddedInt(self.__fullread(4)) - 4 - else: - # "Where the 'Extended header size', currently 6 or 10 bytes, - # excludes itself." - self.__extsize = unpack('>L', self.__fullread(4))[0] - self.__extdata = self.__fullread(self.__extsize) - - def __determine_bpi(self, data, frames): - if self.version < (2,4,0): return int - # have to special case whether to use bitpaddedints here - # spec says to use them, but iTunes has it wrong - - # count number of tags found as BitPaddedInt and how far past - o = 0 - asbpi = 0 - while o < len(data)-10: - name, size, flags = unpack('>4sLH', data[o:o+10]) - size = BitPaddedInt(size) - o += 10+size - if name in frames: asbpi += 1 - bpioff = o - len(data) - - # count number of tags found as int and how far past - o = 0 - asint = 0 - while o < len(data)-10: - name, size, flags = unpack('>4sLH', data[o:o+10]) - o += 10+size - if name in frames: asint += 1 - intoff = o - len(data) - - # if more tags as int, or equal and bpi is past and int is not - if asint > asbpi or (asint == asbpi and (bpioff >= 1 and intoff <= 1)): - return int - return BitPaddedInt - - def __read_frames(self, data, frames): - if self.version < (2,4,0) and self.f_unsynch: - try: data = unsynch.decode(data) - except ValueError: pass - - if (2,3,0) <= self.version: - bpi = self.__determine_bpi(data, frames) - while data: - header = data[:10] - try: name, size, flags = unpack('>4sLH', header) - except struct.error: return # not enough header - if name.strip('\x00') == '': return - size = bpi(size) - framedata = data[10:10+size] - data = data[10+size:] - if size == 0: continue # drop empty frames - try: tag = frames[name] - except KeyError: - if is_valid_frame_id(name): yield header + framedata - else: - try: yield self.__load_framedata(tag, flags, framedata) - except NotImplementedError: yield header + framedata - except ID3JunkFrameError: pass - - elif (2,2,0) <= self.version: - while data: - header = data[0:6] - try: name, size = unpack('>3s3s', header) - except struct.error: return # not enough header - size, = struct.unpack('>L', '\x00'+size) - if name.strip('\x00') == '': return - framedata = data[6:6+size] - data = data[6+size:] - if size == 0: continue # drop empty frames - try: tag = frames[name] - except KeyError: - if is_valid_frame_id(name): yield header + framedata - else: - try: yield self.__load_framedata(tag, 0, framedata) - except NotImplementedError: yield header + framedata - except ID3JunkFrameError: pass - - def __load_framedata(self, tag, flags, framedata): - return tag.fromData(self, flags, framedata) - - f_unsynch = property(lambda s: bool(s.__flags & 0x80)) - f_extended = property(lambda s: bool(s.__flags & 0x40)) - f_experimental = property(lambda s: bool(s.__flags & 0x20)) - f_footer = property(lambda s: bool(s.__flags & 0x10)) - - #f_crc = property(lambda s: bool(s.__extflags & 0x8000)) - - def save(self, filename=None, v1=1): - """Save changes to a file. - - If no filename is given, the one most recently loaded is used. - - Keyword arguments: - v1 -- if 0, ID3v1 tags will be removed - if 1, ID3v1 tags will be updated but not added - if 2, ID3v1 tags will be created and/or updated - - The lack of a way to update only an ID3v1 tag is intentional. - """ - - # Sort frames by 'importance' - order = ["TIT2", "TPE1", "TRCK", "TALB", "TPOS", "TDRC", "TCON"] - order = dict(zip(order, range(len(order)))) - last = len(order) - frames = self.items() - frames.sort(lambda a, b: cmp(order.get(a[0][:4], last), - order.get(b[0][:4], last))) - - framedata = [self.__save_frame(frame) for (key, frame) in frames] - framedata.extend([data for data in self.unknown_frames - if len(data) > 10]) - if not framedata: - try: - self.delete(filename) - except EnvironmentError, err: - from errno import ENOENT - if err.errno != ENOENT: raise - return - - framedata = ''.join(framedata) - framesize = len(framedata) - - if filename is None: filename = self.filename - try: f = open(filename, 'rb+') - except IOError, err: - from errno import ENOENT - if err.errno != ENOENT: raise - f = open(filename, 'ab') # create, then reopen - f = open(filename, 'rb+') - try: - idata = f.read(10) - try: id3, vmaj, vrev, flags, insize = unpack('>3sBBB4s', idata) - except struct.error: id3, insize = '', 0 - insize = BitPaddedInt(insize) - if id3 != 'ID3': insize = -10 - - if insize >= framesize: outsize = insize - else: outsize = (framesize + 1023) & ~0x3FF - framedata += '\x00' * (outsize - framesize) - - framesize = BitPaddedInt.to_str(outsize, width=4) - flags = 0 - header = pack('>3sBBB4s', 'ID3', 4, 0, flags, framesize) - data = header + framedata - - if (insize < outsize): - insert_bytes(f, outsize-insize, insize+10) - f.seek(0) - f.write(data) - - try: - f.seek(-128, 2) - except IOError, err: - from errno import EINVAL - if err.errno != EINVAL: raise - f.seek(0, 2) # ensure read won't get "TAG" - - if f.read(3) == "TAG": - f.seek(-128, 2) - if v1 > 0: f.write(MakeID3v1(self)) - else: f.truncate() - elif v1 == 2: - f.seek(0, 2) - f.write(MakeID3v1(self)) - - finally: - f.close() - - def delete(self, filename=None, delete_v1=True, delete_v2=True): - """Remove tags from a file. - - If no filename is given, the one most recently loaded is used. - - Keyword arguments: - delete_v1 -- delete any ID3v1 tag - delete_v2 -- delete any ID3v2 tag - """ - if filename is None: - filename = self.filename - delete(filename, delete_v1, delete_v2) - self.clear() - - def __save_frame(self, frame): - flags = 0 - if self.PEDANTIC and isinstance(frame, TextFrame): - if len(str(frame)) == 0: return '' - framedata = frame._writeData() - usize = len(framedata) - if usize > 2048: - framedata = BitPaddedInt.to_str(usize) + framedata.encode('zlib') - flags |= Frame.FLAG24_COMPRESS | Frame.FLAG24_DATALEN - datasize = BitPaddedInt.to_str(len(framedata), width=4) - header = pack('>4s4sH', type(frame).__name__, datasize, flags) - return header + framedata - - def update_to_v24(self): - """Convert older tags into an ID3v2.4 tag. - - This updates old ID3v2 frames to ID3v2.4 ones (e.g. TYER to - TDRC). If you intend to save tags, you must call this function - at some point; it is called by default when loading the tag. - """ - - if self.version < (2,3,0): del self.unknown_frames[:] - # unsafe to write - - # TDAT, TYER, and TIME have been turned into TDRC. - try: - if str(self.get("TYER", "")).strip("\x00"): - date = str(self.pop("TYER")) - if str(self.get("TDAT", "")).strip("\x00"): - dat = str(self.pop("TDAT")) - date = "%s-%s-%s" % (date, dat[2:], dat[:2]) - if str(self.get("TIME", "")).strip("\x00"): - time = str(self.pop("TIME")) - date += "T%s:%s:00" % (time[:2], time[2:]) - if "TDRC" not in self: - self.add(TDRC(encoding=0, text=date)) - except UnicodeDecodeError: - # Old ID3 tags have *lots* of Unicode problems, so if TYER - # is bad, just chuck the frames. - pass - - # TORY can be the first part of a TDOR. - if "TORY" in self: - f = self.pop("TORY") - if "TDOR" not in self: - try: - self.add(TDOR(encoding=0, text=str(f))) - except UnicodeDecodeError: - pass - - # IPLS is now TIPL. - if "IPLS" in self: - f = self.pop("IPLS") - if "TIPL" not in self: - self.add(TIPL(encoding=f.encoding, people=f.people)) - - if "TCON" in self: - # Get rid of "(xx)Foobr" format. - self["TCON"].genres = self["TCON"].genres - - if self.version < (2, 3): - # ID3v2.2 PIC frames are slightly different. - pics = self.getall("APIC") - mimes = { "PNG": "image/png", "JPG": "image/jpeg" } - self.delall("APIC") - for pic in pics: - newpic = APIC( - encoding=pic.encoding, mime=mimes.get(pic.mime, pic.mime), - type=pic.type, desc=pic.desc, data=pic.data) - self.add(newpic) - - # ID3v2.2 LNK frames are just way too different to upgrade. - self.delall("LINK") - - # These can't be trivially translated to any ID3v2.4 tags, or - # should have been removed already. - for key in ["RVAD", "EQUA", "TRDA", "TSIZ", "TDAT", "TIME", "CRM"]: - if key in self: del(self[key]) - -def delete(filename, delete_v1=True, delete_v2=True): - """Remove tags from a file. - - Keyword arguments: - delete_v1 -- delete any ID3v1 tag - delete_v2 -- delete any ID3v2 tag - """ - - f = open(filename, 'rb+') - - if delete_v1: - try: - f.seek(-128, 2) - except IOError: pass - else: - if f.read(3) == "TAG": - f.seek(-128, 2) - f.truncate() - - # technically an insize=0 tag is invalid, but we delete it anyway - # (primarily because we used to write it) - if delete_v2: - f.seek(0, 0) - idata = f.read(10) - try: id3, vmaj, vrev, flags, insize = unpack('>3sBBB4s', idata) - except struct.error: id3, insize = '', -1 - insize = BitPaddedInt(insize) - if id3 == 'ID3' and insize >= 0: - delete_bytes(f, insize + 10, 0) - -class BitPaddedInt(int): - def __new__(cls, value, bits=7, bigendian=True): - "Strips 8-bits bits out of every byte" - mask = (1<<(bits))-1 - if isinstance(value, (int, long)): - bytes = [] - while value: - bytes.append(value & ((1<> 8 - if isinstance(value, str): - bytes = [ord(byte) & mask for byte in value] - if bigendian: bytes.reverse() - numeric_value = 0 - for shift, byte in zip(range(0, len(bytes)*bits, bits), bytes): - numeric_value += byte << shift - if isinstance(numeric_value, long): - self = long.__new__(BitPaddedLong, numeric_value) - else: - self = int.__new__(BitPaddedInt, numeric_value) - self.bits = bits - self.bigendian = bigendian - return self - - def as_str(value, bits=7, bigendian=True, width=4): - bits = getattr(value, 'bits', bits) - bigendian = getattr(value, 'bigendian', bigendian) - value = int(value) - mask = (1<> bits - # PCNT and POPM use growing integers of at least 4 bytes as counters. - if width == -1: width = max(4, len(bytes)) - if len(bytes) > width: - raise ValueError, 'Value too wide (%d bytes)' % len(bytes) - else: bytes.extend([0] * (width-len(bytes))) - if bigendian: bytes.reverse() - return ''.join(map(chr, bytes)) - to_str = staticmethod(as_str) - -class BitPaddedLong(long): - def as_str(value, bits=7, bigendian=True, width=4): - return BitPaddedInt.to_str(value, bits, bigendian, width) - to_str = staticmethod(as_str) - -class unsynch(object): - def decode(value): - output = [] - safe = True - append = output.append - for val in value: - if safe: - append(val) - safe = val != '\xFF' - else: - if val >= '\xE0': raise ValueError('invalid sync-safe string') - elif val != '\x00': append(val) - safe = True - if not safe: raise ValueError('string ended unsafe') - return ''.join(output) - decode = staticmethod(decode) - - def encode(value): - output = [] - safe = True - append = output.append - for val in value: - if safe: - append(val) - if val == '\xFF': safe = False - elif val == '\x00' or val >= '\xE0': - append('\x00') - append(val) - safe = val != '\xFF' - else: - append(val) - safe = True - if not safe: append('\x00') - return ''.join(output) - encode = staticmethod(encode) - -class Spec(object): - def __init__(self, name): self.name = name - def __hash__(self): raise TypeError("Spec objects are unhashable") - -class ByteSpec(Spec): - def read(self, frame, data): return ord(data[0]), data[1:] - def write(self, frame, value): return chr(value) - def validate(self, frame, value): return value - -class IntegerSpec(Spec): - def read(self, frame, data): - return int(BitPaddedInt(data, bits=8)), '' - def write(self, frame, value): - return BitPaddedInt.to_str(value, bits=8, width=-1) - def validate(self, frame, value): - return value - -class SizedIntegerSpec(Spec): - def __init__(self, name, size): - self.name, self.__sz = name, size - def read(self, frame, data): - return int(BitPaddedInt(data[:self.__sz], bits=8)), data[self.__sz:] - def write(self, frame, value): - return BitPaddedInt.to_str(value, bits=8, width=self.__sz) - def validate(self, frame, value): - return value - -class EncodingSpec(ByteSpec): - def read(self, frame, data): - enc, data = super(EncodingSpec, self).read(frame, data) - if enc < 16: return enc, data - else: return 0, chr(enc)+data - - def validate(self, frame, value): - if 0 <= value <= 3: return value - if value is None: return None - raise ValueError, 'Invalid Encoding: %r' % value - -class StringSpec(Spec): - def __init__(self, name, length): - super(StringSpec, self).__init__(name) - self.len = length - def read(s, frame, data): return data[:s.len], data[s.len:] - def write(s, frame, value): - if value is None: return '\x00' * s.len - else: return (str(value) + '\x00' * s.len)[:s.len] - def validate(s, frame, value): - if value is None: return None - if isinstance(value, basestring) and len(value) == s.len: return value - raise ValueError, 'Invalid StringSpec[%d] data: %r' % (s.len, value) - -class BinaryDataSpec(Spec): - def read(self, frame, data): return data, '' - def write(self, frame, value): return str(value) - def validate(self, frame, value): return str(value) - -class EncodedTextSpec(Spec): - # Okay, seriously. This is private and defined explicitly and - # completely by the ID3 specification. You can't just add - # encodings here however you want. - _encodings = ( ('latin1', '\x00'), ('utf16', '\x00\x00'), - ('utf_16_be', '\x00\x00'), ('utf8', '\x00') ) - - def read(self, frame, data): - enc, term = self._encodings[frame.encoding] - ret = '' - if len(term) == 1: - if term in data: - data, ret = data.split(term, 1) - else: - offset = -1 - try: - while True: - offset = data.index(term, offset+1) - if offset & 1: continue - data, ret = data[0:offset], data[offset+2:]; break - except ValueError: pass - - if len(data) < len(term): return u'', ret - return data.decode(enc), ret - - def write(self, frame, value): - enc, term = self._encodings[frame.encoding] - return value.encode(enc) + term - - def validate(self, frame, value): return unicode(value) - -class MultiSpec(Spec): - def __init__(self, name, *specs, **kw): - super(MultiSpec, self).__init__(name) - self.specs = specs - self.sep = kw.get('sep') - - def read(self, frame, data): - values = [] - while data: - record = [] - for spec in self.specs: - value, data = spec.read(frame, data) - record.append(value) - if len(self.specs) != 1: values.append(record) - else: values.append(record[0]) - return values, data - - def write(self, frame, value): - data = [] - if len(self.specs) == 1: - for v in value: - data.append(self.specs[0].write(frame, v)) - else: - for record in value: - for v, s in zip(record, self.specs): - data.append(s.write(frame, v)) - return ''.join(data) - - def validate(self, frame, value): - if value is None: return [] - if self.sep and isinstance(value, basestring): - value = value.split(self.sep) - if isinstance(value, list): - if len(self.specs) == 1: - return [self.specs[0].validate(frame, v) for v in value] - else: - return [ - [s.validate(frame, v) for (v,s) in zip(val, self.specs)] - for val in value ] - raise ValueError, 'Invalid MultiSpec data: %r' % value - -class EncodedNumericTextSpec(EncodedTextSpec): pass -class EncodedNumericPartTextSpec(EncodedTextSpec): pass - -class Latin1TextSpec(EncodedTextSpec): - def read(self, frame, data): - if '\x00' in data: data, ret = data.split('\x00',1) - else: ret = '' - return data.decode('latin1'), ret - - def write(self, data, value): - return value.encode('latin1') + '\x00' - - def validate(self, frame, value): return unicode(value) - -class ID3TimeStamp(object): - """A time stamp in ID3v2 format. - - This is a restricted form of the ISO 8601 standard; time stamps - take the form of: - YYYY-MM-DD HH:MM:SS - Or some partial form (YYYY-MM-DD HH, YYYY, etc.). - - The 'text' attribute contains the raw text data of the time stamp. - """ - - import re - def __init__(self, text): - if isinstance(text, ID3TimeStamp): text = text.text - self.text = text - - __formats = ['%04d'] + ['%02d'] * 5 - __seps = ['-', '-', ' ', ':', ':', 'x'] - def get_text(self): - parts = [self.year, self.month, self.day, - self.hour, self.minute, self.second] - pieces = [] - for i, part in enumerate(iter(iter(parts).next, None)): - pieces.append(self.__formats[i]%part + self.__seps[i]) - return u''.join(pieces)[:-1] - - def set_text(self, text, splitre=re.compile('[-T:/.]|\s+')): - year, month, day, hour, minute, second = \ - splitre.split(text + ':::::')[:6] - for a in 'year month day hour minute second'.split(): - try: v = int(locals()[a]) - except ValueError: v = None - setattr(self, a, v) - - text = property(get_text, set_text, doc="ID3v2.4 date and time.") - - def __str__(self): return self.text - def __repr__(self): return repr(self.text) - def __cmp__(self, other): return cmp(self.text, other.text) - def encode(self, *args): return self.text.encode(*args) - -class TimeStampSpec(EncodedTextSpec): - def read(self, frame, data): - value, data = super(TimeStampSpec, self).read(frame, data) - return self.validate(frame, value), data - - def write(self, frame, data): - return super(TimeStampSpec, self).write(frame, - data.text.replace(' ', 'T')) - - def validate(self, frame, value): - try: return ID3TimeStamp(value) - except TypeError: raise ValueError, "Invalid ID3TimeStamp: %r" % value - -class ChannelSpec(ByteSpec): - (OTHER, MASTER, FRONTRIGHT, FRONTLEFT, BACKRIGHT, BACKLEFT, FRONTCENTRE, - BACKCENTRE, SUBWOOFER) = range(9) - -class VolumeAdjustmentSpec(Spec): - def read(self, frame, data): - value, = unpack('>h', data[0:2]) - return value/512.0, data[2:] - - def write(self, frame, value): - return pack('>h', int(round(value * 512))) - - def validate(self, frame, value): return value - -class VolumePeakSpec(Spec): - def read(self, frame, data): - # http://bugs.xmms.org/attachment.cgi?id=113&action=view - peak = 0 - bits = ord(data[0]) - bytes = min(4, (bits + 7) >> 3) - # not enough frame data - if bytes + 1 > len(data): raise ID3JunkFrameError - shift = ((8 - (bits & 7)) & 7) + (4 - bytes) * 8 - for i in range(1, bytes+1): - peak *= 256 - peak += ord(data[i]) - peak *= 2**shift - return (float(peak) / (2**31-1)), data[1+bytes:] - - def write(self, frame, value): - # always write as 16 bits for sanity. - return "\x10" + pack('>H', int(round(value * 32768))) - - def validate(self, frame, value): return value - -class SynchronizedTextSpec(EncodedTextSpec): - def read(self, frame, data): - texts = [] - encoding, term = self._encodings[frame.encoding] - while data: - l = len(term) - value_idx = data.index(term) - value = data[:value_idx].decode(encoding) - time, = struct.unpack(">I", data[value_idx+l:value_idx+l+4]) - texts.append((value, time)) - data = data[value_idx+l+4:] - return texts, "" - - def write(self, frame, value): - data = [] - encoding, term = self._encodings[frame.encoding] - for text, time in frame.text: - text = text.encode(encoding) + term - data.append(text + struct.pack(">I", time)) - return "".join(data) - - def validate(self, frame, value): - return value - -class KeyEventSpec(Spec): - def read(self, frame, data): - events = [] - while len(data) >= 5: - events.append(struct.unpack(">bI", data[:5])) - data = data[5:] - return events, data - - def write(self, frame, value): - return "".join([struct.pack(">bI", *event) for event in value]) - - def validate(self, frame, value): - return value - -class VolumeAdjustmentsSpec(Spec): - # Not to be confused with VolumeAdjustmentSpec. - def read(self, frame, data): - adjustments = {} - while len(data) >= 4: - freq, adj = struct.unpack(">Hh", data[:4]) - data = data[4:] - freq /= 2.0 - adj /= 512.0 - adjustments[freq] = adj - adjustments = adjustments.items() - adjustments.sort() - return adjustments, data - - def write(self, frame, value): - value.sort() - return "".join([struct.pack(">Hh", int(freq * 2), int(adj * 512)) - for (freq, adj) in value]) - - def validate(self, frame, value): - return value - -class ASPIIndexSpec(Spec): - def read(self, frame, data): - if frame.b == 16: - format = "H" - size = 2 - elif frame.b == 8: - format = "B" - size = 1 - else: - warn("invalid bit count in ASPI (%d)" % frame.b, ID3Warning) - return [], data - - indexes = data[:frame.N * size] - data = data[frame.N * size:] - return list(struct.unpack(">" + format * frame.N, indexes)), data - - def write(self, frame, values): - if frame.b == 16: format = "H" - elif frame.b == 8: format = "B" - else: raise ValueError("frame.b must be 8 or 16") - return struct.pack(">" + format * frame.N, *values) - - def validate(self, frame, values): - return values - -class Frame(object): - """Fundamental unit of ID3 data. - - ID3 tags are split into frames. Each frame has a potentially - different structure, and so this base class is not very featureful. - """ - - FLAG23_ALTERTAG = 0x8000 - FLAG23_ALTERFILE = 0x4000 - FLAG23_READONLY = 0x2000 - FLAG23_COMPRESS = 0x0080 - FLAG23_ENCRYPT = 0x0040 - FLAG23_GROUP = 0x0020 - - FLAG24_ALTERTAG = 0x4000 - FLAG24_ALTERFILE = 0x2000 - FLAG24_READONLY = 0x1000 - FLAG24_GROUPID = 0x0040 - FLAG24_COMPRESS = 0x0008 - FLAG24_ENCRYPT = 0x0004 - FLAG24_UNSYNCH = 0x0002 - FLAG24_DATALEN = 0x0001 - - _framespec = [] - def __init__(self, *args, **kwargs): - if len(args)==1 and len(kwargs)==0 and isinstance(args[0], type(self)): - other = args[0] - for checker in self._framespec: - val = checker.validate(self, getattr(other, checker.name)) - setattr(self, checker.name, val) - else: - for checker, val in zip(self._framespec, args): - setattr(self, checker.name, checker.validate(self, val)) - for checker in self._framespec[len(args):]: - validated = checker.validate( - self, kwargs.get(checker.name, None)) - setattr(self, checker.name, validated) - - HashKey = property( - lambda s: s.FrameID, - doc="an internal key used to ensure frame uniqueness in a tag") - FrameID = property( - lambda s: type(s).__name__, - doc="ID3v2 three or four character frame ID") - - def __repr__(self): - """Python representation of a frame. - - The string returned is a valid Python expression to construct - a copy of this frame. - """ - kw = [] - for attr in self._framespec: - kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) - return '%s(%s)' % (type(self).__name__, ', '.join(kw)) - - def _readData(self, data): - odata = data - for reader in self._framespec: - if len(data): - try: value, data = reader.read(self, data) - except UnicodeDecodeError: - raise ID3JunkFrameError - else: raise ID3JunkFrameError - setattr(self, reader.name, value) - if data.strip('\x00'): - warn('Leftover data: %s: %r (from %r)' % ( - type(self).__name__, data, odata), - ID3Warning) - - def _writeData(self): - data = [] - for writer in self._framespec: - data.append(writer.write(self, getattr(self, writer.name))) - return ''.join(data) - - def pprint(self): - """Return a human-readable representation of the frame.""" - return "%s=%s" % (type(self).__name__, self._pprint()) - - def _pprint(self): - return "[unrepresentable data]" - - def fromData(cls, id3, tflags, data): - """Construct this ID3 frame from raw string data.""" - - if (2,4,0) <= id3.version: - if tflags & (Frame.FLAG24_COMPRESS | Frame.FLAG24_DATALEN): - # The data length int is syncsafe in 2.4 (but not 2.3). - # However, we don't actually need the data length int, - # except to work around a QL 0.12 bug, and in that case - # all we need are the raw bytes. - datalen_bytes = data[:4] - data = data[4:] - if tflags & Frame.FLAG24_UNSYNCH or id3.f_unsynch: - try: data = unsynch.decode(data) - except ValueError, err: - if id3.PEDANTIC: - raise ID3BadUnsynchData, '%s: %r' % (err, data) - if tflags & Frame.FLAG24_ENCRYPT: - raise ID3EncryptionUnsupportedError - if tflags & Frame.FLAG24_COMPRESS: - try: data = data.decode('zlib') - except zlibError, err: - # the initial mutagen that went out with QL 0.12 did not - # write the 4 bytes of uncompressed size. Compensate. - data = datalen_bytes + data - try: data = data.decode('zlib') - except zlibError, err: - if id3.PEDANTIC: - raise ID3BadCompressedData, '%s: %r' % (err, data) - - elif (2,3,0) <= id3.version: - if tflags & Frame.FLAG23_COMPRESS: - usize, = unpack('>L', data[:4]) - data = data[4:] - if tflags & Frame.FLAG23_ENCRYPT: - raise ID3EncryptionUnsupportedError - if tflags & Frame.FLAG23_COMPRESS: - try: data = data.decode('zlib') - except zlibError, err: - if id3.PEDANTIC: - raise ID3BadCompressedData, '%s: %r' % (err, data) - - frame = cls() - frame._rawdata = data - frame._flags = tflags - frame._readData(data) - return frame - fromData = classmethod(fromData) - - def __hash__(self): - raise TypeError("Frame objects are unhashable") - -class FrameOpt(Frame): - """A frame with optional parts. - - Some ID3 frames have optional data; this class extends Frame to - provide support for those parts. - """ - _optionalspec = [] - - def __init__(self, *args, **kwargs): - super(FrameOpt, self).__init__(*args, **kwargs) - for spec in self._optionalspec: - if spec.name in kwargs: - validated = spec.validate(self, kwargs[spec.name]) - setattr(self, spec.name, validated) - else: break - - def _readData(self, data): - odata = data - for reader in self._framespec: - if len(data): value, data = reader.read(self, data) - else: raise ID3JunkFrameError - setattr(self, reader.name, value) - if data: - for reader in self._optionalspec: - if len(data): value, data = reader.read(self, data) - else: break - setattr(self, reader.name, value) - if data.strip('\x00'): - warn('Leftover data: %s: %r (from %r)' % ( - type(self).__name__, data, odata), - ID3Warning) - - def _writeData(self): - data = [] - for writer in self._framespec: - data.append(writer.write(self, getattr(self, writer.name))) - for writer in self._optionalspec: - try: data.append(writer.write(self, getattr(self, writer.name))) - except AttributeError: break - return ''.join(data) - - def __repr__(self): - kw = [] - for attr in self._framespec: - kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) - for attr in self._optionalspec: - if hasattr(self, attr.name): - kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) - return '%s(%s)' % (type(self).__name__, ', '.join(kw)) - - -class TextFrame(Frame): - """Text strings. - - Text frames support casts to unicode or str objects, as well as - list-like indexing, extend, and append. - - Iterating over a TextFrame iterates over its strings, not its - characters. - - Text frames have a 'text' attribute which is the list of strings, - and an 'encoding' attribute; 0 for ISO-8859 1, 1 UTF-16, 2 for - UTF-16BE, and 3 for UTF-8. If you don't want to worry about - encodings, just set it to 3. - """ - - _framespec = [ EncodingSpec('encoding'), - MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] - def __str__(self): return self.__unicode__().encode('utf-8') - def __unicode__(self): return u'\u0000'.join(self.text) - def __eq__(self, other): - if isinstance(other, str): return str(self) == other - elif isinstance(other, unicode): - return u'\u0000'.join(self.text) == other - return self.text == other - def __getitem__(self, item): return self.text[item] - def __iter__(self): return iter(self.text) - def append(self, value): return self.text.append(value) - def extend(self, value): return self.text.extend(value) - def _pprint(self): return " / ".join(self.text) - -class NumericTextFrame(TextFrame): - """Numerical text strings. - - The numeric value of these frames can be gotten with unary plus, e.g. - frame = TLEN('12345') - length = +frame - """ - - _framespec = [ EncodingSpec('encoding'), - MultiSpec('text', EncodedNumericTextSpec('text'), sep=u'\u0000') ] - - def __pos__(self): - """Return the numerical value of the string.""" - return int(self.text[0]) - -class NumericPartTextFrame(TextFrame): - """Multivalue numerical text strings. - - These strings indicate 'part (e.g. track) X of Y', and unary plus - returns the first value: - frame = TRCK('4/15') - track = +frame # track == 4 - """ - - _framespec = [ EncodingSpec('encoding'), - MultiSpec('text', EncodedNumericPartTextSpec('text'), sep=u'\u0000') ] - def __pos__(self): - return int(self.text[0].split("/")[0]) - -class TimeStampTextFrame(TextFrame): - """A list of time stamps. - - The 'text' attribute in this frame is a list of ID3TimeStamp - objects, not a list of strings. - """ - - _framespec = [ EncodingSpec('encoding'), - MultiSpec('text', TimeStampSpec('stamp'), sep=u',') ] - def __str__(self): return self.__unicode__().encode('utf-8') - def __unicode__(self): return ','.join([stamp.text for stamp in self.text]) - def _pprint(self): - return " / ".join([stamp.text for stamp in self.text]) - -class UrlFrame(Frame): - """A frame containing a URL string. - - The ID3 specification is silent about IRIs and normalized URL - forms. Mutagen assumes all URLs in files are encoded as Latin 1, - but string conversion of this frame returns a UTF-8 representation - for compatibility with other string conversions. - - The only sane way to handle URLs in MP3s is to restrict them to - ASCII. - """ - - _framespec = [ Latin1TextSpec('url') ] - def __str__(self): return self.url.encode('utf-8') - def __unicode__(self): return self.url - def __eq__(self, other): return self.url == other - def _pprint(self): return self.url - -class UrlFrameU(UrlFrame): - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.url)) - -class TALB(TextFrame): "Album" -class TBPM(NumericTextFrame): "Beats per minute" -class TCOM(TextFrame): "Composer" - -class TCON(TextFrame): - """Content type (Genre) - - ID3 has several ways genres can be represented; for convenience, - use the 'genres' property rather than the 'text' attribute. - """ - - from mutagen._constants import GENRES - - def __get_genres(self): - genres = [] - import re - genre_re = re.compile(r"((?:\((?P[0-9]+|RX|CR)\))*)(?P.+)?") - for value in self.text: - if value.isdigit(): - try: genres.append(self.GENRES[int(value)]) - except IndexError: genres.append(u"Unknown") - elif value == "CR": genres.append(u"Cover") - elif value == "RX": genres.append(u"Remix") - elif value: - newgenres = [] - genreid, dummy, genrename = genre_re.match(value).groups() - - if genreid: - for gid in genreid[1:-1].split(")("): - if gid.isdigit() and int(gid) < len(self.GENRES): - gid = unicode(self.GENRES[int(gid)]) - newgenres.append(gid) - elif gid == "CR": newgenres.append(u"Cover") - elif gid == "RX": newgenres.append(u"Remix") - else: newgenres.append(u"Unknown") - - if genrename: - # "Unescaping" the first parenthesis - if genrename.startswith("(("): genrename = genrename[1:] - if genrename not in newgenres: newgenres.append(genrename) - - genres.extend(newgenres) - - return genres - - def __set_genres(self, genres): - if isinstance(genres, basestring): genres = [genres] - self.text = map(self.__decode, genres) - - def __decode(self, value): - if isinstance(value, str): - enc = EncodedTextSpec._encodings[self.encoding][0] - return value.decode(enc) - else: return value - - genres = property(__get_genres, __set_genres, None, - "A list of genres parsed from the raw text data.") - - def _pprint(self): - return " / ".join(self.genres) - -class TCOP(TextFrame): "Copyright (c)" -class TCMP(NumericTextFrame): "iTunes Compilation Flag" -class TDAT(TextFrame): "Date of recording (DDMM)" -class TDEN(TimeStampTextFrame): "Encoding Time" -class TDOR(TimeStampTextFrame): "Original Release Time" -class TDLY(NumericTextFrame): "Audio Delay (ms)" -class TDRC(TimeStampTextFrame): "Recording Time" -class TDRL(TimeStampTextFrame): "Release Time" -class TDTG(TimeStampTextFrame): "Tagging Time" -class TENC(TextFrame): "Encoder" -class TEXT(TextFrame): "Lyricist" -class TFLT(TextFrame): "File type" -class TIME(TextFrame): "Time of recording (HHMM)" -class TIT1(TextFrame): "Content group description" -class TIT2(TextFrame): "Title" -class TIT3(TextFrame): "Subtitle/Description refinement" -class TKEY(TextFrame): "Starting Key" -class TLAN(TextFrame): "Audio Languages" -class TLEN(NumericTextFrame): "Audio Length (ms)" -class TMED(TextFrame): "Source Media Type" -class TMOO(TextFrame): "Mood" -class TOAL(TextFrame): "Original Album" -class TOFN(TextFrame): "Original Filename" -class TOLY(TextFrame): "Original Lyricist" -class TOPE(TextFrame): "Original Artist/Performer" -class TORY(NumericTextFrame): "Original Release Year" -class TOWN(TextFrame): "Owner/Licensee" -class TPE1(TextFrame): "Lead Artist/Performer/Soloist/Group" -class TPE2(TextFrame): "Band/Orchestra/Accompaniment" -class TPE3(TextFrame): "Conductor" -class TPE4(TextFrame): "Interpreter/Remixer/Modifier" -class TPOS(NumericPartTextFrame): "Part of set" -class TPRO(TextFrame): "Produced (P)" -class TPUB(TextFrame): "Publisher" -class TRCK(NumericPartTextFrame): "Track Number" -class TRDA(TextFrame): "Recording Dates" -class TRSN(TextFrame): "Internet Radio Station Name" -class TRSO(TextFrame): "Internet Radio Station Owner" -class TSIZ(NumericTextFrame): "Size of audio data (bytes)" -class TSOA(TextFrame): "Album Sort Order key" -class TSOP(TextFrame): "Perfomer Sort Order key" -class TSOT(TextFrame): "Title Sort Order key" -class TSRC(TextFrame): "International Standard Recording Code (ISRC)" -class TSSE(TextFrame): "Encoder settings" -class TSST(TextFrame): "Set Subtitle" -class TYER(NumericTextFrame): "Year of recording" - -class TXXX(TextFrame): - """User-defined text data. - - TXXX frames have a 'desc' attribute which is set to any Unicode - value (though the encoding of the text and the description must be - the same). Many taggers use this frame to store freeform keys. - """ - _framespec = [ EncodingSpec('encoding'), EncodedTextSpec('desc'), - MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - def _pprint(self): return "%s=%s" % (self.desc, " / ".join(self.text)) - -class WCOM(UrlFrameU): "Commercial Information" -class WCOP(UrlFrame): "Copyright Information" -class WOAF(UrlFrame): "Official File Information" -class WOAR(UrlFrameU): "Official Artist/Performer Information" -class WOAS(UrlFrame): "Official Source Information" -class WORS(UrlFrame): "Official Internet Radio Information" -class WPAY(UrlFrame): "Payment Information" -class WPUB(UrlFrame): "Official Publisher Information" - -class WXXX(UrlFrame): - """User-defined URL data. - - Like TXXX, this has a freeform description associated with it. - """ - _framespec = [ EncodingSpec('encoding'), EncodedTextSpec('desc'), - Latin1TextSpec('url') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - -class PairedTextFrame(Frame): - """Paired text strings. - - Some ID3 frames pair text strings, to associate names with a more - specific involvement in the song. The 'people' attribute of these - frames contains a list of pairs: - [['trumpet', 'Miles Davis'], ['bass', 'Paul Chambers']] - - Like text frames, these frames also have an encoding attribute. - """ - - _framespec = [ EncodingSpec('encoding'), MultiSpec('people', - EncodedTextSpec('involvement'), EncodedTextSpec('person')) ] - def __eq__(self, other): - return self.people == other - -class TIPL(PairedTextFrame): "Involved People List" -class TMCL(PairedTextFrame): "Musicians Credits List" -class IPLS(TIPL): "Involved People List" - -class MCDI(Frame): - """Binary dump of CD's TOC. - - The 'data' attribute contains the raw byte string. - """ - _framespec = [ BinaryDataSpec('data') ] - def __eq__(self, other): return self.data == other - -class ETCO(Frame): - """Event timing codes.""" - _framespec = [ ByteSpec("format"), KeyEventSpec("events") ] - def __eq__(self, other): return self.events == other - -class MLLT(Frame): - """MPEG location lookup table. - - This frame's attributes may be changed in the future based on - feedback from real-world use. - """ - _framespec = [ SizedIntegerSpec('frames', 2), - SizedIntegerSpec('bytes', 3), - SizedIntegerSpec('milliseconds', 3), - ByteSpec('bits_for_bytes'), - ByteSpec('bits_for_milliseconds'), - BinaryDataSpec('data') ] - def __eq__(self, other): return self.data == other - -class SYTC(Frame): - """Synchronised tempo codes. - - This frame's attributes may be changed in the future based on - feedback from real-world use. - """ - _framespec = [ ByteSpec("format"), BinaryDataSpec("data") ] - def __eq__(self, other): return self.data == other - -class USLT(Frame): - """Unsynchronised lyrics/text transcription. - - Lyrics have a three letter ISO language code ('lang'), a - description ('desc'), and a block of plain text ('text'). - """ - - _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), - EncodedTextSpec('desc'), EncodedTextSpec('text') ] - HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) - - def __str__(self): return self.text.encode('utf-8') - def __unicode__(self): return self.text - def __eq__(self, other): return self.text == other - -class SYLT(Frame): - """Synchronised lyrics/text.""" - - _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), - ByteSpec('format'), ByteSpec('type'), EncodedTextSpec('desc'), - SynchronizedTextSpec('text') ] - HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) - - def __eq__(self, other): - return str(self) == other - - def __str__(self): - return "".join([text for (text, time) in self.text]).encode('utf-8') - -class COMM(TextFrame): - """User comment. - - User comment frames have a descrption, like TXXX, and also a three - letter ISO language code in the 'lang' attribute. - """ - _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), - EncodedTextSpec('desc'), - MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] - HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) - def _pprint(self): return "%s=%r=%s" % ( - self.desc, self.lang, " / ".join(self.text)) - -class RVA2(Frame): - """Relative volume adjustment (2). - - This frame is used to implemented volume scaling, and in - particular, normalization using ReplayGain. - - Attributes: - desc -- description or context of this adjustment - channel -- audio channel to adjust (master is 1) - gain -- a + or - dB gain relative to some reference level - peak -- peak of the audio as a floating point number, [0, 1] - - When storing ReplayGain tags, use descriptions of 'album' and - 'track' on channel 1. - """ - - _framespec = [ Latin1TextSpec('desc'), ChannelSpec('channel'), - VolumeAdjustmentSpec('gain'), VolumePeakSpec('peak') ] - _channels = ["Other", "Master volume", "Front right", "Front left", - "Back right", "Back left", "Front centre", "Back centre", - "Subwoofer"] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - - def __eq__(self, other): - return ((str(self) == other) or - (self.desc == other.desc and - self.channel == other.channel and - self.gain == other.gain and - self.peak == other.peak)) - - def __str__(self): - return "%s: %+0.4f dB/%0.4f" % ( - self._channels[self.channel], self.gain, self.peak) - -class EQU2(Frame): - """Equalisation (2). - - Attributes: - method -- interpolation method (0 = band, 1 = linear) - desc -- identifying description - adjustments -- list of (frequency, vol_adjustment) pairs - """ - _framespec = [ ByteSpec("method"), Latin1TextSpec("desc"), - VolumeAdjustmentsSpec("adjustments") ] - def __eq__(self, other): return self.adjustments == other - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - -# class RVAD: unsupported -# class EQUA: unsupported - -class RVRB(Frame): - """Reverb.""" - _framespec = [ SizedIntegerSpec('left', 2), SizedIntegerSpec('right', 2), - ByteSpec('bounce_left'), ByteSpec('bounce_right'), - ByteSpec('feedback_ltl'), ByteSpec('feedback_ltr'), - ByteSpec('feedback_rtr'), ByteSpec('feedback_rtl'), - ByteSpec('premix_ltr'), ByteSpec('premix_rtl') ] - - def __eq__(self, other): return (self.left, self.right) == other - -class APIC(Frame): - """Attached (or linked) Picture. - - Attributes: - encoding -- text encoding for the description - mime -- a MIME type (e.g. image/jpeg) or '-->' if the data is a URI - type -- the source of the image (3 is the album front cover) - desc -- a text description of the image - data -- raw image data, as a byte string - - Mutagen will automatically compress large images when saving tags. - """ - _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('mime'), - ByteSpec('type'), EncodedTextSpec('desc'), BinaryDataSpec('data') ] - def __eq__(self, other): return self.data == other - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - def _pprint(self): - return "%s (%s, %d bytes)" % ( - self.desc, self.mime, len(self.data)) - -class PCNT(Frame): - """Play counter. - - The 'count' attribute contains the (recorded) number of times this - file has been played. - - This frame is basically obsoleted by POPM. - """ - _framespec = [ IntegerSpec('count') ] - - def __eq__(self, other): return self.count == other - def __pos__(self): return self.count - def _pprint(self): return unicode(self.count) - -class POPM(Frame): - """Popularimeter. - - This frame keys a rating (out of 255) and a play count to an email - address. - - Attributes: - email -- email this POPM frame is for - rating -- rating from 0 to 255 - count -- number of times the files has been played - """ - _framespec = [ Latin1TextSpec('email'), ByteSpec('rating'), - IntegerSpec('count') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.email)) - - def __eq__(self, other): return self.rating == other - def __pos__(self): return self.rating - def _pprint(self): return "%s=%s %s/255" % ( - self.email, self.count, self.rating) - -class GEOB(Frame): - """General Encapsulated Object. - - A blob of binary data, that is not a picture (those go in APIC). - - Attributes: - encoding -- encoding of the description - mime -- MIME type of the data or '-->' if the data is a URI - filename -- suggested filename if extracted - desc -- text description of the data - data -- raw data, as a byte string - """ - _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('mime'), - EncodedTextSpec('filename'), EncodedTextSpec('desc'), - BinaryDataSpec('data') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) - - def __eq__(self, other): return self.data == other - -class RBUF(FrameOpt): - """Recommended buffer size. - - Attributes: - size -- recommended buffer size in bytes - info -- if ID3 tags may be elsewhere in the file (optional) - offset -- the location of the next ID3 tag, if any - - Mutagen will not find the next tag itself. - """ - _framespec = [ SizedIntegerSpec('size', 3) ] - _optionalspec = [ ByteSpec('info'), SizedIntegerSpec('offset', 4) ] - - def __eq__(self, other): return self.size == other - def __pos__(self): return self.size - -class AENC(FrameOpt): - """Audio encryption. - - Attributes: - owner -- key identifying this encryption type - preview_start -- unencrypted data block offset - preview_length -- number of unencrypted blocks - data -- data required for decryption (optional) - - Mutagen cannot decrypt files. - """ - _framespec = [ Latin1TextSpec('owner'), - SizedIntegerSpec('preview_start', 2), - SizedIntegerSpec('preview_length', 2) ] - _optionalspec = [ BinaryDataSpec('data') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.owner)) - - def __str__(self): return self.owner.encode('utf-8') - def __unicode__(self): return self.owner - def __eq__(self, other): return self.owner == other - -class LINK(FrameOpt): - """Linked information. - - Attributes: - frameid -- the ID of the linked frame - url -- the location of the linked frame - data -- further ID information for the frame - """ - - _framespec = [ StringSpec('frameid', 4), Latin1TextSpec('url') ] - _optionalspec = [ BinaryDataSpec('data') ] - def __HashKey(self): - try: - return "%s:%s:%s:%r" % ( - self.FrameID, self.frameid, self.url, self.data) - except AttributeError: - return "%s:%s:%s" % (self.FrameID, self.frameid, self.url) - HashKey = property(__HashKey) - def __eq__(self, other): - try: return (self.frameid, self.url, self.data) == other - except AttributeError: return (self.frameid, self.url) == other - -class POSS(Frame): - """Position synchronisation frame - - Attribute: - format -- format of the position attribute (frames or milliseconds) - position -- current position of the file - """ - _framespec = [ ByteSpec('format'), IntegerSpec('position') ] - - def __pos__(self): return self.position - def __eq__(self, other): return self.position == other - -class UFID(Frame): - """Unique file identifier. - - Attributes: - owner -- format/type of identifier - data -- identifier - """ - - _framespec = [ Latin1TextSpec('owner'), BinaryDataSpec('data') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.owner)) - def __eq__(s, o): - if isinstance(o, UFI): return s.owner == o.owner and s.data == o.data - else: return s.data == o - def _pprint(self): - isascii = ord(max(self.data)) < 128 - if isascii: return "%s=%s" % (self.owner, self.data) - else: return "%s (%d bytes)" % (self.owner, len(self.data)) - -class USER(Frame): - """Terms of use. - - Attributes: - encoding -- text encoding - lang -- ISO three letter language code - text -- licensing terms for the audio - """ - _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), - EncodedTextSpec('text') ] - HashKey = property(lambda s: '%s:%r' % (s.FrameID, s.lang)) - - def __str__(self): return self.text.encode('utf-8') - def __unicode__(self): return self.text - def __eq__(self, other): return self.text == other - def _pprint(self): return "%r=%s" % (self.lang, self.text) - -class OWNE(Frame): - """Ownership frame.""" - _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('price'), - StringSpec('date', 8), EncodedTextSpec('seller') ] - - def __str__(self): return self.seller.encode('utf-8') - def __unicode__(self): return self.seller - def __eq__(self, other): return self.seller == other - -class COMR(FrameOpt): - """Commercial frame.""" - _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('price'), - StringSpec('valid_until', 8), Latin1TextSpec('contact'), - ByteSpec('format'), EncodedTextSpec('seller'), - EncodedTextSpec('desc')] - _optionalspec = [ Latin1TextSpec('mime'), BinaryDataSpec('logo') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s._writeData())) - def __eq__(self, other): return self._writeData() == other._writeData() - -class ENCR(Frame): - """Encryption method registration. - - The standard does not allow multiple ENCR frames with the same owner - or the same method. Mutagen only verifies that the owner is unique. - """ - _framespec = [ Latin1TextSpec('owner'), ByteSpec('method'), - BinaryDataSpec('data') ] - HashKey = property(lambda s: "%s:%s" % (s.FrameID, s.owner)) - def __str__(self): return self.data - def __eq__(self, other): return self.data == other - -class GRID(FrameOpt): - """Group identification registration.""" - _framespec = [ Latin1TextSpec('owner'), ByteSpec('group') ] - _optionalspec = [ BinaryDataSpec('data') ] - HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.group)) - def __pos__(self): return self.group - def __str__(self): return self.owner.encode('utf-8') - def __unicode__(self): return self.owner - def __eq__(self, other): return self.owner == other or self.group == other - - -class PRIV(Frame): - """Private frame.""" - _framespec = [ Latin1TextSpec('owner'), BinaryDataSpec('data') ] - HashKey = property(lambda s: '%s:%s:%s' % ( - s.FrameID, s.owner, s.data.decode('latin1'))) - def __str__(self): return self.data - def __eq__(self, other): return self.data == other - def _pprint(self): - isascii = ord(max(self.data)) < 128 - if isascii: return "%s=%s" % (self.owner, self.data) - else: return "%s (%d bytes)" % (self.owner, len(self.data)) - -class SIGN(Frame): - """Signature frame.""" - _framespec = [ ByteSpec('group'), BinaryDataSpec('sig') ] - HashKey = property(lambda s: '%s:%c:%s' % (s.FrameID, s.group, s.sig)) - def __str__(self): return self.sig - def __eq__(self, other): return self.sig == other - -class SEEK(Frame): - """Seek frame. - - Mutagen does not find tags at seek offsets. - """ - _framespec = [ IntegerSpec('offset') ] - def __pos__(self): return self.offset - def __eq__(self, other): return self.offset == other - -class ASPI(Frame): - """Audio seek point index. - - Attributes: S, L, N, b, and Fi. For the meaning of these, see - the ID3v2.4 specification. Fi is a list of integers. - """ - _framespec = [ SizedIntegerSpec("S", 4), SizedIntegerSpec("L", 4), - SizedIntegerSpec("N", 2), ByteSpec("b"), - ASPIIndexSpec("Fi") ] - def __eq__(self, other): return self.Fi == other - -Frames = dict([(k,v) for (k,v) in globals().items() - if len(k)==4 and isinstance(v, type) and issubclass(v, Frame)]) -"""All supported ID3v2 frames, keyed by frame name.""" -del(k); del(v) - -# ID3v2.2 frames -class UFI(UFID): "Unique File Identifier" - -class TT1(TIT1): "Content group description" -class TT2(TIT2): "Title" -class TT3(TIT3): "Subtitle/Description refinement" -class TP1(TPE1): "Lead Artist/Performer/Soloist/Group" -class TP2(TPE2): "Band/Orchestra/Accompaniment" -class TP3(TPE3): "Conductor" -class TP4(TPE4): "Interpreter/Remixer/Modifier" -class TCM(TCOM): "Composer" -class TXT(TEXT): "Lyricist" -class TLA(TLAN): "Audio Language(s)" -class TCO(TCON): "Content Type (Genre)" -class TAL(TALB): "Album" -class TPA(TPOS): "Part of set" -class TRK(TRCK): "Track Number" -class TRC(TSRC): "International Standard Recording Code (ISRC)" -class TYE(TYER): "Year of recording" -class TDA(TDAT): "Date of recording (DDMM)" -class TIM(TIME): "Time of recording (HHMM)" -class TRD(TRDA): "Recording Dates" -class TMT(TMED): "Source Media Type" -class TFT(TFLT): "File Type" -class TBP(TBPM): "Beats per minute" -class TCP(TCMP): "iTunes Compilation Flag" -class TCR(TCOP): "Copyright (C)" -class TPB(TPUB): "Publisher" -class TEN(TENC): "Encoder" -class TSS(TSSE): "Encoder settings" -class TOF(TOFN): "Original Filename" -class TLE(TLEN): "Audio Length (ms)" -class TSI(TSIZ): "Audio Data size (bytes)" -class TDY(TDLY): "Audio Delay (ms)" -class TKE(TKEY): "Starting Key" -class TOT(TOAL): "Original Album" -class TOA(TOPE): "Original Artist/Perfomer" -class TOL(TOLY): "Original Lyricist" -class TOR(TORY): "Original Release Year" - -class TXX(TXXX): "User-defined Text" - -class WAF(WOAF): "Official File Information" -class WAR(WOAR): "Official Artist/Performer Information" -class WAS(WOAS): "Official Source Information" -class WCM(WCOM): "Commercial Information" -class WCP(WCOP): "Copyright Information" -class WPB(WPUB): "Official Publisher Information" - -class WXX(WXXX): "User-defined URL" - -class IPL(IPLS): "Involved people list" -class MCI(MCDI): "Binary dump of CD's TOC" -class ETC(ETCO): "Event timing codes" -class MLL(MLLT): "MPEG location lookup table" -class STC(SYTC): "Synced tempo codes" -class ULT(USLT): "Unsychronised lyrics/text transcription" -class SLT(SYLT): "Synchronised lyrics/text" -class COM(COMM): "Comment" -#class RVA(RVAD) -#class EQU(EQUA) -class REV(RVRB): "Reverb" -class PIC(APIC): - """Attached Picture. - - The 'mime' attribute of an ID3v2.2 attached picture must be either - 'PNG' or 'JPG'. - """ - _framespec = [ EncodingSpec('encoding'), StringSpec('mime', 3), - ByteSpec('type'), EncodedTextSpec('desc'), BinaryDataSpec('data') ] -class GEO(GEOB): "General Encapsulated Object" -class CNT(PCNT): "Play counter" -class POP(POPM): "Popularimeter" -class BUF(RBUF): "Recommended buffer size" - -class CRM(Frame): - """Encrypted meta frame""" - _framespec = [ Latin1TextSpec('owner'), Latin1TextSpec('desc'), - BinaryDataSpec('data') ] - def __eq__(self, other): return self.data == other - -class CRA(AENC): "Audio encryption" - -class LNK(LINK): - """Linked information""" - _framespec = [ StringSpec('frameid', 3), Latin1TextSpec('url') ] - _optionalspec = [ BinaryDataSpec('data') ] - -Frames_2_2 = dict([(k,v) for (k,v) in globals().items() - if len(k)==3 and isinstance(v, type) and issubclass(v, Frame)]) - -# support open(filename) as interface -Open = ID3 - -# ID3v1.1 support. -def ParseID3v1(string): - """Parse an ID3v1 tag, returning a list of ID3v2.4 frames.""" - from struct import error as StructError - frames = {} - try: - tag, title, artist, album, year, comment, track, genre = unpack( - "3s30s30s30s4s29sBB", string) - except StructError: return None - - if tag != "TAG": return None - def fix(string): - return string.split("\x00")[0].strip().decode('latin1') - title, artist, album, year, comment = map( - fix, [title, artist, album, year, comment]) - - if title: frames["TIT2"] = TIT2(encoding=0, text=title) - if artist: frames["TPE1"] = TPE1(encoding=0, text=[artist]) - if album: frames["TALB"] = TALB(encoding=0, text=album) - if year: frames["TDRC"] = TDRC(encoding=0, text=year) - if comment: frames["COMM"] = COMM( - encoding=0, lang="eng", desc="ID3v1 Comment", text=comment) - # Don't read a track number if it looks like the comment was - # padded with spaces instead of nulls (thanks, WinAmp). - if track and (track != 32 or string[-3] == '\x00'): - frames["TRCK"] = TRCK(encoding=0, text=str(track)) - if genre != 255: frames["TCON"] = TCON(encoding=0, text=str(genre)) - return frames - -def MakeID3v1(id3): - """Return an ID3v1.1 tag string from a dict of ID3v2.4 frames.""" - - v1 = {} - - for v2id, name in {"TIT2": "title", "TPE1": "artist", - "TALB": "album"}.items(): - if v2id in id3: - text = id3[v2id].text[0].encode('latin1', 'replace')[:30] - else: text = "" - v1[name] = text + ("\x00" * (30 - len(text))) - - if "COMM" in id3: - cmnt = id3["COMM"].text[0].encode('latin1', 'replace')[:28] - else: cmnt = "" - v1["comment"] = cmnt + ("\x00" * (29 - len(cmnt))) - - if "TRCK" in id3: - try: v1["track"] = chr(+id3["TRCK"]) - except ValueError: v1["track"] = "\x00" - else: v1["track"] = "\x00" - - if "TCON" in id3: - try: genre = id3["TCON"].genres[0] - except IndexError: pass - else: - if genre in TCON.GENRES: - v1["genre"] = chr(TCON.GENRES.index(genre)) - if "genre" not in v1: v1["genre"] = "\xff" - - if "TDRC" in id3: v1["year"] = str(id3["TDRC"])[:4] - else: v1["year"] = "\x00\x00\x00\x00" - - return ("TAG%(title)s%(artist)s%(album)s%(year)s%(comment)s" - "%(track)s%(genre)s") % v1 - -class ID3FileType(mutagen.FileType): - """An unknown type of file with ID3 tags.""" - - class _Info(object): - length = 0 - def __init__(self, fileobj, offset): pass - pprint = staticmethod(lambda: "Unknown format with ID3 tag") - - def score(filename, fileobj, header): - return header.startswith("ID3") - score = staticmethod(score) - - def add_tags(self, ID3=ID3): - """Add an empty ID3 tag to the file. - - A custom tag reader may be used in instead of the default - mutagen.id3.ID3 object, e.g. an EasyID3 reader. - """ - if self.tags is None: - self.tags = ID3() - else: - raise error("an ID3 tag already exists") - - def load(self, filename, ID3=ID3, **kwargs): - """Load stream and tag information from a file. - - A custom tag reader may be used in instead of the default - mutagen.id3.ID3 object, e.g. an EasyID3 reader. - """ - self.filename = filename - try: self.tags = ID3(filename, **kwargs) - except error: self.tags = None - if self.tags is not None: - try: offset = self.tags.size - except AttributeError: offset = None - else: offset = None - try: - fileobj = file(filename, "rb") - self.info = self._Info(fileobj, offset) - finally: - fileobj.close() - diff --git a/lib/mutagen/m4a.py b/lib/mutagen/m4a.py deleted file mode 100644 index b1786f2b2..000000000 --- a/lib/mutagen/m4a.py +++ /dev/null @@ -1,496 +0,0 @@ -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: m4a.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write MPEG-4 audio files with iTunes metadata. - -This module will read MPEG-4 audio information and metadata, -as found in Apple's M4A (aka MP4, M4B, M4P) files. - -There is no official specification for this format. The source code -for TagLib, FAAD, and various MPEG specifications at -http://developer.apple.com/documentation/QuickTime/QTFF/, -http://www.geocities.com/xhelmboyx/quicktime/formats/mp4-layout.txt, -and http://wiki.multimedia.cx/index.php?title=Apple_QuickTime were all -consulted. - -This module does not support 64 bit atom sizes, and so will not -work on metadata over 4GB. -""" - -import struct -import sys - -from cStringIO import StringIO - -from mutagen import FileType, Metadata -from mutagen._constants import GENRES -from mutagen._util import cdata, insert_bytes, delete_bytes, DictProxy - -class error(IOError): pass -class M4AMetadataError(error): pass -class M4AStreamInfoError(error): pass -class M4AMetadataValueError(ValueError, M4AMetadataError): pass - -import warnings -warnings.warn( - "mutagen.m4a is deprecated; use mutagen.mp4 instead.", DeprecationWarning) - -# This is not an exhaustive list of container atoms, but just the -# ones this module needs to peek inside. -_CONTAINERS = ["moov", "udta", "trak", "mdia", "meta", "ilst", - "stbl", "minf", "stsd"] -_SKIP_SIZE = { "meta": 4 } - -__all__ = ['M4A', 'Open', 'delete', 'M4ACover'] - -class M4ACover(str): - """A cover artwork. - - Attributes: - format -- format of the image (either FORMAT_JPEG or FORMAT_PNG) - """ - FORMAT_JPEG = 0x0D - FORMAT_PNG = 0x0E - - def __new__(cls, data, format=None): - self = str.__new__(cls, data) - if format is None: format= M4ACover.FORMAT_JPEG - self.format = format - return self - -class Atom(object): - """An individual atom. - - Attributes: - children -- list child atoms (or None for non-container atoms) - length -- length of this atom, including length and name - name -- four byte name of the atom, as a str - offset -- location in the constructor-given fileobj of this atom - - This structure should only be used internally by Mutagen. - """ - - children = None - - def __init__(self, fileobj): - self.offset = fileobj.tell() - self.length, self.name = struct.unpack(">I4s", fileobj.read(8)) - if self.length == 1: - raise error("64 bit atom sizes are not supported") - elif self.length < 8: - return - - if self.name in _CONTAINERS: - self.children = [] - fileobj.seek(_SKIP_SIZE.get(self.name, 0), 1) - while fileobj.tell() < self.offset + self.length: - self.children.append(Atom(fileobj)) - else: - fileobj.seek(self.offset + self.length, 0) - - def render(name, data): - """Render raw atom data.""" - # this raises OverflowError if Py_ssize_t can't handle the atom data - size = len(data) + 8 - if size <= 0xFFFFFFFF: - return struct.pack(">I4s", size, name) + data - else: - return struct.pack(">I4sQ", 1, name, size + 8) + data - render = staticmethod(render) - - def __getitem__(self, remaining): - """Look up a child atom, potentially recursively. - - e.g. atom['udta', 'meta'] => - """ - if not remaining: - return self - elif self.children is None: - raise KeyError("%r is not a container" % self.name) - for child in self.children: - if child.name == remaining[0]: - return child[remaining[1:]] - else: - raise KeyError, "%r not found" % remaining[0] - - def __repr__(self): - klass = self.__class__.__name__ - if self.children is None: - return "<%s name=%r length=%r offset=%r>" % ( - klass, self.name, self.length, self.offset) - else: - children = "\n".join([" " + line for child in self.children - for line in repr(child).splitlines()]) - return "<%s name=%r length=%r offset=%r\n%s>" % ( - klass, self.name, self.length, self.offset, children) - -class Atoms(object): - """Root atoms in a given file. - - Attributes: - atoms -- a list of top-level atoms as Atom objects - - This structure should only be used internally by Mutagen. - """ - def __init__(self, fileobj): - self.atoms = [] - fileobj.seek(0, 2) - end = fileobj.tell() - fileobj.seek(0) - while fileobj.tell() < end: - self.atoms.append(Atom(fileobj)) - - def path(self, *names): - """Look up and return the complete path of an atom. - - For example, atoms.path('moov', 'udta', 'meta') will return a - list of three atoms, corresponding to the moov, udta, and meta - atoms. - """ - path = [self] - for name in names: - path.append(path[-1][name,]) - return path[1:] - - def __getitem__(self, names): - """Look up a child atom. - - 'names' may be a list of atoms (['moov', 'udta']) or a string - specifying the complete path ('moov.udta'). - """ - if isinstance(names, basestring): - names = names.split(".") - for child in self.atoms: - if child.name == names[0]: - return child[names[1:]] - else: - raise KeyError, "%s not found" % names[0] - - def __repr__(self): - return "\n".join([repr(child) for child in self.atoms]) - -class M4ATags(DictProxy, Metadata): - """Dictionary containing Apple iTunes metadata list key/values. - - Keys are four byte identifiers, except for freeform ('----') - keys. Values are usually unicode strings, but some atoms have a - special structure: - cpil -- boolean - trkn, disk -- tuple of 16 bit ints (current, total) - tmpo -- 16 bit int - covr -- list of M4ACover objects (which are tagged strs) - gnre -- not supported. Use '\\xa9gen' instead. - - The freeform '----' frames use a key in the format '----:mean:name' - where 'mean' is usually 'com.apple.iTunes' and 'name' is a unique - identifier for this frame. The value is a str, but is probably - text that can be decoded as UTF-8. - - M4A tag data cannot exist outside of the structure of an M4A file, - so this class should not be manually instantiated. - - Unknown non-text tags are removed. - """ - - def load(self, atoms, fileobj): - try: ilst = atoms["moov.udta.meta.ilst"] - except KeyError, key: - raise M4AMetadataError(key) - for atom in ilst.children: - fileobj.seek(atom.offset + 8) - data = fileobj.read(atom.length - 8) - parse = self.__atoms.get(atom.name, (M4ATags.__parse_text,))[0] - parse(self, atom, data) - - def __key_sort((key1, v1), (key2, v2)): - # iTunes always writes the tags in order of "relevance", try - # to copy it as closely as possible. - order = ["\xa9nam", "\xa9ART", "\xa9wrt", "\xa9alb", - "\xa9gen", "gnre", "trkn", "disk", - "\xa9day", "cpil", "tmpo", "\xa9too", - "----", "covr", "\xa9lyr"] - order = dict(zip(order, range(len(order)))) - last = len(order) - # If there's no key-based way to distinguish, order by length. - # If there's still no way, go by string comparison on the - # values, so we at least have something determinstic. - return (cmp(order.get(key1[:4], last), order.get(key2[:4], last)) or - cmp(len(v1), len(v2)) or cmp(v1, v2)) - __key_sort = staticmethod(__key_sort) - - def save(self, filename): - """Save the metadata to the given filename.""" - values = [] - items = self.items() - items.sort(self.__key_sort) - for key, value in items: - render = self.__atoms.get( - key[:4], (None, M4ATags.__render_text))[1] - values.append(render(self, key, value)) - data = Atom.render("ilst", "".join(values)) - - # Find the old atoms. - fileobj = file(filename, "rb+") - try: - atoms = Atoms(fileobj) - - moov = atoms["moov"] - - if moov != atoms.atoms[-1]: - # "Free" the old moov block. Something in the mdat - # block is not happy when its offset changes and it - # won't play back. So, rather than try to figure that - # out, just move the moov atom to the end of the file. - offset = self.__move_moov(fileobj, moov) - else: - offset = 0 - - try: - path = atoms.path("moov", "udta", "meta", "ilst") - except KeyError: - self.__save_new(fileobj, atoms, data, offset) - else: - self.__save_existing(fileobj, atoms, path, data, offset) - finally: - fileobj.close() - - def __move_moov(self, fileobj, moov): - fileobj.seek(moov.offset) - data = fileobj.read(moov.length) - fileobj.seek(moov.offset) - free = Atom.render("free", "\x00" * (moov.length - 8)) - fileobj.write(free) - fileobj.seek(0, 2) - # Figure out how far we have to shift all our successive - # seek calls, relative to what the atoms say. - old_end = fileobj.tell() - fileobj.write(data) - return old_end - moov.offset - - def __save_new(self, fileobj, atoms, ilst, offset): - hdlr = Atom.render("hdlr", "\x00" * 8 + "mdirappl" + "\x00" * 9) - meta = Atom.render("meta", "\x00\x00\x00\x00" + hdlr + ilst) - moov, udta = atoms.path("moov", "udta") - insert_bytes(fileobj, len(meta), udta.offset + offset + 8) - fileobj.seek(udta.offset + offset + 8) - fileobj.write(meta) - self.__update_parents(fileobj, [moov, udta], len(meta), offset) - - def __save_existing(self, fileobj, atoms, path, data, offset): - # Replace the old ilst atom. - ilst = path.pop() - delta = len(data) - ilst.length - fileobj.seek(ilst.offset + offset) - if delta > 0: - insert_bytes(fileobj, delta, ilst.offset + offset) - elif delta < 0: - delete_bytes(fileobj, -delta, ilst.offset + offset) - fileobj.seek(ilst.offset + offset) - fileobj.write(data) - self.__update_parents(fileobj, path, delta, offset) - - def __update_parents(self, fileobj, path, delta, offset): - # Update all parent atoms with the new size. - for atom in path: - fileobj.seek(atom.offset + offset) - size = cdata.uint_be(fileobj.read(4)) + delta - fileobj.seek(atom.offset + offset) - fileobj.write(cdata.to_uint_be(size)) - - def __render_data(self, key, flags, data): - data = struct.pack(">2I", flags, 0) + data - return Atom.render(key, Atom.render("data", data)) - - def __parse_freeform(self, atom, data): - try: - fileobj = StringIO(data) - mean_length = cdata.uint_be(fileobj.read(4)) - # skip over 8 bytes of atom name, flags - mean = fileobj.read(mean_length - 4)[8:] - name_length = cdata.uint_be(fileobj.read(4)) - name = fileobj.read(name_length - 4)[8:] - value_length = cdata.uint_be(fileobj.read(4)) - # Name, flags, and reserved bytes - value = fileobj.read(value_length - 4)[12:] - except struct.error: - # Some ---- atoms have no data atom, I have no clue why - # they actually end up in the file. - pass - else: - self["%s:%s:%s" % (atom.name, mean, name)] = value - def __render_freeform(self, key, value): - dummy, mean, name = key.split(":", 2) - mean = struct.pack(">I4sI", len(mean) + 12, "mean", 0) + mean - name = struct.pack(">I4sI", len(name) + 12, "name", 0) + name - value = struct.pack(">I4s2I", len(value) + 16, "data", 0x1, 0) + value - final = mean + name + value - return Atom.render("----", mean + name + value) - - def __parse_pair(self, atom, data): - self[atom.name] = struct.unpack(">2H", data[18:22]) - def __render_pair(self, key, value): - track, total = value - if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: - data = struct.pack(">4H", 0, track, total, 0) - return self.__render_data(key, 0, data) - else: - raise M4AMetadataValueError("invalid numeric pair %r" % (value,)) - - def __render_pair_no_trailing(self, key, value): - track, total = value - if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: - data = struct.pack(">3H", 0, track, total) - return self.__render_data(key, 0, data) - else: - raise M4AMetadataValueError("invalid numeric pair %r" % (value,)) - - def __parse_genre(self, atom, data): - # Translate to a freeform genre. - genre = cdata.short_be(data[16:18]) - if "\xa9gen" not in self: - try: self["\xa9gen"] = GENRES[genre - 1] - except IndexError: pass - - def __parse_tempo(self, atom, data): - self[atom.name] = cdata.short_be(data[16:18]) - def __render_tempo(self, key, value): - if 0 <= value < 1 << 16: - return self.__render_data(key, 0x15, cdata.to_ushort_be(value)) - else: - raise M4AMetadataValueError("invalid short integer %r" % value) - - def __parse_compilation(self, atom, data): - try: self[atom.name] = bool(ord(data[16:17])) - except TypeError: self[atom.name] = False - - def __render_compilation(self, key, value): - return self.__render_data(key, 0x15, chr(bool(value))) - - def __parse_cover(self, atom, data): - length, name, format = struct.unpack(">I4sI", data[:12]) - if name != "data": - raise M4AMetadataError( - "unexpected atom %r inside 'covr'" % name) - if format not in (M4ACover.FORMAT_JPEG, M4ACover.FORMAT_PNG): - format = M4ACover.FORMAT_JPEG - self[atom.name]= M4ACover(data[16:length], format) - def __render_cover(self, key, value): - try: format = value.format - except AttributeError: format = M4ACover.FORMAT_JPEG - data = Atom.render("data", struct.pack(">2I", format, 0) + value) - return Atom.render(key, data) - - def __parse_text(self, atom, data): - flags = cdata.uint_be(data[8:12]) - if flags == 1: - self[atom.name] = data[16:].decode('utf-8', 'replace') - def __render_text(self, key, value): - return self.__render_data(key, 0x1, value.encode('utf-8')) - - def delete(self, filename): - self.clear() - self.save(filename) - - __atoms = { - "----": (__parse_freeform, __render_freeform), - "trkn": (__parse_pair, __render_pair), - "disk": (__parse_pair, __render_pair_no_trailing), - "gnre": (__parse_genre, None), - "tmpo": (__parse_tempo, __render_tempo), - "cpil": (__parse_compilation, __render_compilation), - "covr": (__parse_cover, __render_cover), - } - - def pprint(self): - values = [] - for key, value in self.iteritems(): - key = key.decode('latin1') - try: values.append("%s=%s" % (key, value)) - except UnicodeDecodeError: - values.append("%s=[%d bytes of data]" % (key, len(value))) - return "\n".join(values) - -class M4AInfo(object): - """MPEG-4 stream information. - - Attributes: - bitrate -- bitrate in bits per second, as an int - length -- file length in seconds, as a float - """ - - bitrate = 0 - - def __init__(self, atoms, fileobj): - hdlr = atoms["moov.trak.mdia.hdlr"] - fileobj.seek(hdlr.offset) - if "soun" not in fileobj.read(hdlr.length): - raise M4AStreamInfoError("track has no audio data") - - mdhd = atoms["moov.trak.mdia.mdhd"] - fileobj.seek(mdhd.offset) - data = fileobj.read(mdhd.length) - if ord(data[8]) == 0: - offset = 20 - format = ">2I" - else: - offset = 28 - format = ">IQ" - end = offset + struct.calcsize(format) - unit, length = struct.unpack(format, data[offset:end]) - self.length = float(length) / unit - - try: - atom = atoms["moov.trak.mdia.minf.stbl.stsd"] - fileobj.seek(atom.offset) - data = fileobj.read(atom.length) - self.bitrate = cdata.uint_be(data[-17:-13]) - except (ValueError, KeyError): - # Bitrate values are optional. - pass - - def pprint(self): - return "MPEG-4 audio, %.2f seconds, %d bps" % ( - self.length, self.bitrate) - -class M4A(FileType): - """An MPEG-4 audio file, probably containing AAC. - - If more than one track is present in the file, the first is used. - Only audio ('soun') tracks will be read. - """ - - _mimes = ["audio/mp4", "audio/x-m4a", "audio/mpeg4", "audio/aac"] - - def load(self, filename): - self.filename = filename - fileobj = file(filename, "rb") - try: - atoms = Atoms(fileobj) - try: self.info = M4AInfo(atoms, fileobj) - except StandardError, err: - raise M4AStreamInfoError, err, sys.exc_info()[2] - try: self.tags = M4ATags(atoms, fileobj) - except M4AMetadataError: - self.tags = None - except StandardError, err: - raise M4AMetadataError, err, sys.exc_info()[2] - finally: - fileobj.close() - - def add_tags(self): - self.tags = M4ATags() - - def score(filename, fileobj, header): - return ("ftyp" in header) + ("mp4" in header) - score = staticmethod(score) - -Open = M4A - -def delete(filename): - """Remove tags from a file.""" - M4A(filename).delete() diff --git a/lib/mutagen/monkeysaudio.py b/lib/mutagen/monkeysaudio.py deleted file mode 100644 index 08d26387b..000000000 --- a/lib/mutagen/monkeysaudio.py +++ /dev/null @@ -1,80 +0,0 @@ -# A Monkey's Audio (APE) reader/tagger -# -# Copyright 2006 Lukas Lalinsky -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: monkeysaudio.py 4275 2008-06-01 06:32:37Z piman $ - -"""Monkey's Audio streams with APEv2 tags. - -Monkey's Audio is a very efficient lossless audio compressor developed -by Matt Ashland. - -For more information, see http://www.monkeysaudio.com/. -""" - -__all__ = ["MonkeysAudio", "Open", "delete"] - -import struct - -from mutagen.apev2 import APEv2File, error, delete -from mutagen._util import cdata - -class MonkeysAudioHeaderError(error): pass - -class MonkeysAudioInfo(object): - """Monkey's Audio stream information. - - Attributes: - channels -- number of audio channels - length -- file length in seconds, as a float - sample_rate -- audio sampling rate in Hz - bits_per_sample -- bits per sample - version -- Monkey's Audio stream version, as a float (eg: 3.99) - """ - - def __init__(self, fileobj): - header = fileobj.read(76) - if len(header) != 76 or not header.startswith("MAC "): - raise MonkeysAudioHeaderError("not a Monkey's Audio file") - self.version = cdata.ushort_le(header[4:6]) - if self.version >= 3980: - (blocks_per_frame, final_frame_blocks, total_frames, - self.bits_per_sample, self.channels, - self.sample_rate) = struct.unpack("= 3950: - blocks_per_frame = 73728 * 4 - elif self.version >= 3900 or (self.version >= 3800 and - compression_level == 4): - blocks_per_frame = 73728 - else: - blocks_per_frame = 9216 - self.version /= 1000.0 - self.length = 0.0 - if self.sample_rate != 0 and total_frames > 0: - total_blocks = ((total_frames - 1) * blocks_per_frame + - final_frame_blocks) - self.length = float(total_blocks) / self.sample_rate - - def pprint(self): - return "Monkey's Audio %.2f, %.2f seconds, %d Hz" % ( - self.version, self.length, self.sample_rate) - -class MonkeysAudio(APEv2File): - _Info = MonkeysAudioInfo - _mimes = ["audio/ape", "audio/x-ape"] - - def score(filename, fileobj, header): - return header.startswith("MAC ") + filename.lower().endswith(".ape") - score = staticmethod(score) - -Open = MonkeysAudio diff --git a/lib/mutagen/mp3.py b/lib/mutagen/mp3.py deleted file mode 100644 index 41c4887ee..000000000 --- a/lib/mutagen/mp3.py +++ /dev/null @@ -1,223 +0,0 @@ -# MP3 stream header information support for Mutagen. -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of version 2 of the GNU General Public License as -# published by the Free Software Foundation. - -"""MPEG audio stream information and tags.""" - -import os -import struct - -from mutagen.id3 import ID3FileType, BitPaddedInt, delete - -class error(RuntimeError): pass -class HeaderNotFoundError(error, IOError): pass -class InvalidMPEGHeader(error, IOError): pass - -# Mode values. -STEREO, JOINTSTEREO, DUALCHANNEL, MONO = range(4) - -class MPEGInfo(object): - """MPEG audio stream information - - Parse information about an MPEG audio file. This also reads the - Xing VBR header format. - - This code was implemented based on the format documentation at - http://www.dv.co.yu/mpgscript/mpeghdr.htm. - - Useful attributes: - length -- audio length, in seconds - bitrate -- audio bitrate, in bits per second - sketchy -- if true, the file may not be valid MPEG audio - - Useless attributes: - version -- MPEG version (1, 2, 2.5) - layer -- 1, 2, or 3 - mode -- One of STEREO, JOINTSTEREO, DUALCHANNEL, or MONO (0-3) - protected -- whether or not the file is "protected" - padding -- whether or not audio frames are padded - sample_rate -- audio sample rate, in Hz - """ - - # Map (version, layer) tuples to bitrates. - __BITRATE = { - (1, 1): range(0, 480, 32), - (1, 2): [0, 32, 48, 56, 64, 80, 96, 112,128,160,192,224,256,320,384], - (1, 3): [0, 32, 40, 48, 56, 64, 80, 96, 112,128,160,192,224,256,320], - (2, 1): [0, 32, 48, 56, 64, 80, 96, 112,128,144,160,176,192,224,256], - (2, 2): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96,112,128,144,160], - } - - __BITRATE[(2, 3)] = __BITRATE[(2, 2)] - for i in range(1, 4): __BITRATE[(2.5, i)] = __BITRATE[(2, i)] - - # Map version to sample rates. - __RATES = { - 1: [44100, 48000, 32000], - 2: [22050, 24000, 16000], - 2.5: [11025, 12000, 8000] - } - - sketchy = False - - def __init__(self, fileobj, offset=None): - """Parse MPEG stream information from a file-like object. - - If an offset argument is given, it is used to start looking - for stream information and Xing headers; otherwise, ID3v2 tags - will be skipped automatically. A correct offset can make - loading files significantly faster. - """ - - try: size = os.path.getsize(fileobj.name) - except (IOError, OSError, AttributeError): - fileobj.seek(0, 2) - size = fileobj.tell() - - # If we don't get an offset, try to skip an ID3v2 tag. - if offset is None: - fileobj.seek(0, 0) - idata = fileobj.read(10) - try: id3, insize = struct.unpack('>3sxxx4s', idata) - except struct.error: id3, insize = '', 0 - insize = BitPaddedInt(insize) - if id3 == 'ID3' and insize > 0: - offset = insize - else: offset = 0 - - # Try to find two valid headers (meaning, very likely MPEG data) - # at the given offset, 30% through the file, 60% through the file, - # and 90% through the file. - for i in [offset, 0.3 * size, 0.6 * size, 0.9 * size]: - try: self.__try(fileobj, int(i), size - offset) - except error, e: pass - else: break - # If we can't find any two consecutive frames, try to find just - # one frame back at the original offset given. - else: - self.__try(fileobj, offset, size - offset, False) - self.sketchy = True - - def __try(self, fileobj, offset, real_size, check_second=True): - # This is going to be one really long function; bear with it, - # because there's not really a sane point to cut it up. - fileobj.seek(offset, 0) - - # We "know" we have an MPEG file if we find two frames that look like - # valid MPEG data. If we can't find them in 32k of reads, something - # is horribly wrong (the longest frame can only be about 4k). This - # is assuming the offset didn't lie. - data = fileobj.read(32768) - - frame_1 = data.find("\xff") - while 0 <= frame_1 <= len(data) - 4: - frame_data = struct.unpack(">I", data[frame_1:frame_1 + 4])[0] - if (frame_data >> 16) & 0xE0 != 0xE0: - frame_1 = data.find("\xff", frame_1 + 2) - else: - version = (frame_data >> 19) & 0x3 - layer = (frame_data >> 17) & 0x3 - protection = (frame_data >> 16) & 0x1 - bitrate = (frame_data >> 12) & 0xF - sample_rate = (frame_data >> 10) & 0x3 - padding = (frame_data >> 9) & 0x1 - private = (frame_data >> 8) & 0x1 - self.mode = (frame_data >> 6) & 0x3 - mode_extension = (frame_data >> 4) & 0x3 - copyright = (frame_data >> 3) & 0x1 - original = (frame_data >> 2) & 0x1 - emphasis = (frame_data >> 0) & 0x3 - if (version == 1 or layer == 0 or sample_rate == 0x3 or - bitrate == 0 or bitrate == 0xF): - frame_1 = data.find("\xff", frame_1 + 2) - else: break - else: - raise HeaderNotFoundError("can't sync to an MPEG frame") - - # There is a serious problem here, which is that many flags - # in an MPEG header are backwards. - self.version = [2.5, None, 2, 1][version] - self.layer = 4 - layer - self.protected = not protection - self.padding = bool(padding) - - self.bitrate = self.__BITRATE[(self.version, self.layer)][bitrate] - self.bitrate *= 1000 - self.sample_rate = self.__RATES[self.version][sample_rate] - - if self.layer == 1: - frame_length = (12 * self.bitrate / self.sample_rate + padding) * 4 - frame_size = 384 - else: - frame_length = 144 * self.bitrate / self.sample_rate + padding - frame_size = 1152 - - if check_second: - possible = frame_1 + frame_length - if possible > len(data) + 4: - raise HeaderNotFoundError("can't sync to second MPEG frame") - frame_data = struct.unpack(">H", data[possible:possible + 2])[0] - if frame_data & 0xFFE0 != 0xFFE0: - raise HeaderNotFoundError("can't sync to second MPEG frame") - - frame_count = real_size / float(frame_length) - samples = frame_size * frame_count - self.length = samples / self.sample_rate - - # Try to find/parse the Xing header, which trumps the above length - # and bitrate calculation. - fileobj.seek(offset, 0) - data = fileobj.read(32768) - try: - xing = data[:-4].index("Xing") - except ValueError: - # Try to find/parse the VBRI header, which trumps the above length - # calculation. - try: - vbri = data[:-24].index("VBRI") - except ValueError: pass - else: - # If a VBRI header was found, this is definitely MPEG audio. - self.sketchy = False - vbri_version = struct.unpack('>H', data[vbri + 4:vbri + 6])[0] - if vbri_version == 1: - frame_count = struct.unpack('>I', data[vbri + 14:vbri + 18])[0] - samples = frame_size * frame_count - self.length = (samples / self.sample_rate) or self.length - else: - # If a Xing header was found, this is definitely MPEG audio. - self.sketchy = False - flags = struct.unpack('>I', data[xing + 4:xing + 8])[0] - if flags & 0x1: - frame_count = struct.unpack('>I', data[xing + 8:xing + 12])[0] - samples = frame_size * frame_count - self.length = (samples / self.sample_rate) or self.length - if flags & 0x2: - bytes = struct.unpack('>I', data[xing + 12:xing + 16])[0] - self.bitrate = int((bytes * 8) // self.length) - - def pprint(self): - s = "MPEG %s layer %d, %d bps, %s Hz, %.2f seconds" % ( - self.version, self.layer, self.bitrate, self.sample_rate, - self.length) - if self.sketchy: s += " (sketchy)" - return s - -class MP3(ID3FileType): - """An MPEG audio (usually MPEG-1 Layer 3) file.""" - - _Info = MPEGInfo - _mimes = ["audio/mp3", "audio/x-mp3", "audio/mpeg", "audio/mpg", - "audio/x-mpeg"] - - def score(filename, fileobj, header): - filename = filename.lower() - return (header.startswith("ID3") * 2 + filename.endswith(".mp3") + - filename.endswith(".mp2") + filename.endswith(".mpg") + - filename.endswith(".mpeg")) - score = staticmethod(score) - -Open = MP3 diff --git a/lib/mutagen/mp4.py b/lib/mutagen/mp4.py deleted file mode 100644 index 122b75308..000000000 --- a/lib/mutagen/mp4.py +++ /dev/null @@ -1,670 +0,0 @@ -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: mp4.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write MPEG-4 audio files with iTunes metadata. - -This module will read MPEG-4 audio information and metadata, -as found in Apple's MP4 (aka M4A, M4B, M4P) files. - -There is no official specification for this format. The source code -for TagLib, FAAD, and various MPEG specifications at -http://developer.apple.com/documentation/QuickTime/QTFF/, -http://www.geocities.com/xhelmboyx/quicktime/formats/mp4-layout.txt, -http://standards.iso.org/ittf/PubliclyAvailableStandards/c041828_ISO_IEC_14496-12_2005(E).zip, -and http://wiki.multimedia.cx/index.php?title=Apple_QuickTime were all -consulted. -""" - -import struct -import sys - -from mutagen import FileType, Metadata -from mutagen._constants import GENRES -from mutagen._util import cdata, insert_bytes, delete_bytes, DictProxy - -class error(IOError): pass -class MP4MetadataError(error): pass -class MP4StreamInfoError(error): pass -class MP4MetadataValueError(ValueError, MP4MetadataError): pass - -# This is not an exhaustive list of container atoms, but just the -# ones this module needs to peek inside. -_CONTAINERS = ["moov", "udta", "trak", "mdia", "meta", "ilst", - "stbl", "minf", "moof", "traf"] -_SKIP_SIZE = { "meta": 4 } - -__all__ = ['MP4', 'Open', 'delete', 'MP4Cover'] - -class MP4Cover(str): - """A cover artwork. - - Attributes: - format -- format of the image (either FORMAT_JPEG or FORMAT_PNG) - """ - FORMAT_JPEG = 0x0D - FORMAT_PNG = 0x0E - - def __new__(cls, data, format=None): - self = str.__new__(cls, data) - if format is None: format= MP4Cover.FORMAT_JPEG - self.format = format - return self - -class Atom(object): - """An individual atom. - - Attributes: - children -- list child atoms (or None for non-container atoms) - length -- length of this atom, including length and name - name -- four byte name of the atom, as a str - offset -- location in the constructor-given fileobj of this atom - - This structure should only be used internally by Mutagen. - """ - - children = None - - def __init__(self, fileobj): - self.offset = fileobj.tell() - self.length, self.name = struct.unpack(">I4s", fileobj.read(8)) - if self.length == 1: - self.length, = struct.unpack(">Q", fileobj.read(8)) - elif self.length < 8: - return - - if self.name in _CONTAINERS: - self.children = [] - fileobj.seek(_SKIP_SIZE.get(self.name, 0), 1) - while fileobj.tell() < self.offset + self.length: - self.children.append(Atom(fileobj)) - else: - fileobj.seek(self.offset + self.length, 0) - - def render(name, data): - """Render raw atom data.""" - # this raises OverflowError if Py_ssize_t can't handle the atom data - size = len(data) + 8 - if size <= 0xFFFFFFFF: - return struct.pack(">I4s", size, name) + data - else: - return struct.pack(">I4sQ", 1, name, size + 8) + data - render = staticmethod(render) - - def findall(self, name, recursive=False): - """Recursively find all child atoms by specified name.""" - if self.children is not None: - for child in self.children: - if child.name == name: - yield child - if recursive: - for atom in child.findall(name, True): - yield atom - - def __getitem__(self, remaining): - """Look up a child atom, potentially recursively. - - e.g. atom['udta', 'meta'] => - """ - if not remaining: - return self - elif self.children is None: - raise KeyError("%r is not a container" % self.name) - for child in self.children: - if child.name == remaining[0]: - return child[remaining[1:]] - else: - raise KeyError, "%r not found" % remaining[0] - - def __repr__(self): - klass = self.__class__.__name__ - if self.children is None: - return "<%s name=%r length=%r offset=%r>" % ( - klass, self.name, self.length, self.offset) - else: - children = "\n".join([" " + line for child in self.children - for line in repr(child).splitlines()]) - return "<%s name=%r length=%r offset=%r\n%s>" % ( - klass, self.name, self.length, self.offset, children) - -class Atoms(object): - """Root atoms in a given file. - - Attributes: - atoms -- a list of top-level atoms as Atom objects - - This structure should only be used internally by Mutagen. - """ - def __init__(self, fileobj): - self.atoms = [] - fileobj.seek(0, 2) - end = fileobj.tell() - fileobj.seek(0) - while fileobj.tell() + 8 <= end: - self.atoms.append(Atom(fileobj)) - - def path(self, *names): - """Look up and return the complete path of an atom. - - For example, atoms.path('moov', 'udta', 'meta') will return a - list of three atoms, corresponding to the moov, udta, and meta - atoms. - """ - path = [self] - for name in names: - path.append(path[-1][name,]) - return path[1:] - - def __getitem__(self, names): - """Look up a child atom. - - 'names' may be a list of atoms (['moov', 'udta']) or a string - specifying the complete path ('moov.udta'). - """ - if isinstance(names, basestring): - names = names.split(".") - for child in self.atoms: - if child.name == names[0]: - return child[names[1:]] - else: - raise KeyError, "%s not found" % names[0] - - def __repr__(self): - return "\n".join([repr(child) for child in self.atoms]) - -class MP4Tags(DictProxy, Metadata): - """Dictionary containing Apple iTunes metadata list key/values. - - Keys are four byte identifiers, except for freeform ('----') - keys. Values are usually unicode strings, but some atoms have a - special structure: - - Text values (multiple values per key are supported): - '\xa9nam' -- track title - '\xa9alb' -- album - '\xa9ART' -- artist - 'aART' -- album artist - '\xa9wrt' -- composer - '\xa9day' -- year - '\xa9cmt' -- comment - 'desc' -- description (usually used in podcasts) - 'purd' -- purchase date - '\xa9grp' -- grouping - '\xa9gen' -- genre - '\xa9lyr' -- lyrics - 'purl' -- podcast URL - 'egid' -- podcast episode GUID - 'catg' -- podcast category - 'keyw' -- podcast keywords - '\xa9too' -- encoded by - 'cprt' -- copyright - 'soal' -- album sort order - 'soaa' -- album artist sort order - 'soar' -- artist sort order - 'sonm' -- title sort order - 'soco' -- composer sort order - 'sosn' -- show sort order - 'tvsh' -- show name - - Boolean values: - 'cpil' -- part of a compilation - 'pgap' -- part of a gapless album - 'pcst' -- podcast (iTunes reads this only on import) - - Tuples of ints (multiple values per key are supported): - 'trkn' -- track number, total tracks - 'disk' -- disc number, total discs - - Others: - 'tmpo' -- tempo/BPM, 16 bit int - 'covr' -- cover artwork, list of MP4Cover objects (which are - tagged strs) - 'gnre' -- ID3v1 genre. Not supported, use '\xa9gen' instead. - - The freeform '----' frames use a key in the format '----:mean:name' - where 'mean' is usually 'com.apple.iTunes' and 'name' is a unique - identifier for this frame. The value is a str, but is probably - text that can be decoded as UTF-8. Multiple values per key are - supported. - - MP4 tag data cannot exist outside of the structure of an MP4 file, - so this class should not be manually instantiated. - - Unknown non-text tags are removed. - """ - - def load(self, atoms, fileobj): - try: ilst = atoms["moov.udta.meta.ilst"] - except KeyError, key: - raise MP4MetadataError(key) - for atom in ilst.children: - fileobj.seek(atom.offset + 8) - data = fileobj.read(atom.length - 8) - info = self.__atoms.get(atom.name, (MP4Tags.__parse_text, None)) - info[0](self, atom, data, *info[2:]) - - def __key_sort((key1, v1), (key2, v2)): - # iTunes always writes the tags in order of "relevance", try - # to copy it as closely as possible. - order = ["\xa9nam", "\xa9ART", "\xa9wrt", "\xa9alb", - "\xa9gen", "gnre", "trkn", "disk", - "\xa9day", "cpil", "pgap", "pcst", "tmpo", - "\xa9too", "----", "covr", "\xa9lyr"] - order = dict(zip(order, range(len(order)))) - last = len(order) - # If there's no key-based way to distinguish, order by length. - # If there's still no way, go by string comparison on the - # values, so we at least have something determinstic. - return (cmp(order.get(key1[:4], last), order.get(key2[:4], last)) or - cmp(len(v1), len(v2)) or cmp(v1, v2)) - __key_sort = staticmethod(__key_sort) - - def save(self, filename): - """Save the metadata to the given filename.""" - values = [] - items = self.items() - items.sort(self.__key_sort) - for key, value in items: - info = self.__atoms.get(key[:4], (None, MP4Tags.__render_text)) - try: - values.append(info[1](self, key, value, *info[2:])) - except (TypeError, ValueError), s: - raise MP4MetadataValueError, s, sys.exc_info()[2] - data = Atom.render("ilst", "".join(values)) - - # Find the old atoms. - fileobj = file(filename, "rb+") - try: - atoms = Atoms(fileobj) - try: - path = atoms.path("moov", "udta", "meta", "ilst") - except KeyError: - self.__save_new(fileobj, atoms, data) - else: - self.__save_existing(fileobj, atoms, path, data) - finally: - fileobj.close() - - def __pad_ilst(self, data, length=None): - if length is None: - length = ((len(data) + 1023) & ~1023) - len(data) - return Atom.render("free", "\x00" * length) - - def __save_new(self, fileobj, atoms, ilst): - hdlr = Atom.render("hdlr", "\x00" * 8 + "mdirappl" + "\x00" * 9) - meta = Atom.render( - "meta", "\x00\x00\x00\x00" + hdlr + ilst + self.__pad_ilst(ilst)) - try: - path = atoms.path("moov", "udta") - except KeyError: - # moov.udta not found -- create one - path = atoms.path("moov") - meta = Atom.render("udta", meta) - offset = path[-1].offset + 8 - insert_bytes(fileobj, len(meta), offset) - fileobj.seek(offset) - fileobj.write(meta) - self.__update_parents(fileobj, path, len(meta)) - self.__update_offsets(fileobj, atoms, len(meta), offset) - - def __save_existing(self, fileobj, atoms, path, data): - # Replace the old ilst atom. - ilst = path.pop() - offset = ilst.offset - length = ilst.length - - # Check for padding "free" atoms - meta = path[-1] - index = meta.children.index(ilst) - try: - prev = meta.children[index-1] - if prev.name == "free": - offset = prev.offset - length += prev.length - except IndexError: - pass - try: - next = meta.children[index+1] - if next.name == "free": - length += next.length - except IndexError: - pass - - delta = len(data) - length - if delta > 0 or (delta < 0 and delta > -8): - data += self.__pad_ilst(data) - delta = len(data) - length - insert_bytes(fileobj, delta, offset) - elif delta < 0: - data += self.__pad_ilst(data, -delta - 8) - delta = 0 - - fileobj.seek(offset) - fileobj.write(data) - self.__update_parents(fileobj, path, delta) - self.__update_offsets(fileobj, atoms, delta, offset) - - def __update_parents(self, fileobj, path, delta): - """Update all parent atoms with the new size.""" - for atom in path: - fileobj.seek(atom.offset) - size = cdata.uint_be(fileobj.read(4)) + delta - fileobj.seek(atom.offset) - fileobj.write(cdata.to_uint_be(size)) - - def __update_offset_table(self, fileobj, fmt, atom, delta, offset): - """Update offset table in the specified atom.""" - if atom.offset > offset: - atom.offset += delta - fileobj.seek(atom.offset + 12) - data = fileobj.read(atom.length - 12) - fmt = fmt % cdata.uint_be(data[:4]) - offsets = struct.unpack(fmt, data[4:]) - offsets = [o + (0, delta)[offset < o] for o in offsets] - fileobj.seek(atom.offset + 16) - fileobj.write(struct.pack(fmt, *offsets)) - - def __update_tfhd(self, fileobj, atom, delta, offset): - if atom.offset > offset: - atom.offset += delta - fileobj.seek(atom.offset + 9) - data = fileobj.read(atom.length - 9) - flags = cdata.uint_be("\x00" + data[:3]) - if flags & 1: - o = cdata.ulonglong_be(data[7:15]) - if o > offset: - o += delta - fileobj.seek(atom.offset + 16) - fileobj.write(cdata.to_ulonglong_be(o)) - - def __update_offsets(self, fileobj, atoms, delta, offset): - """Update offset tables in all 'stco' and 'co64' atoms.""" - if delta == 0: - return - moov = atoms["moov"] - for atom in moov.findall('stco', True): - self.__update_offset_table(fileobj, ">%dI", atom, delta, offset) - for atom in moov.findall('co64', True): - self.__update_offset_table(fileobj, ">%dQ", atom, delta, offset) - try: - for atom in atoms["moof"].findall('tfhd', True): - self.__update_tfhd(fileobj, atom, delta, offset) - except KeyError: - pass - - def __parse_data(self, atom, data): - pos = 0 - while pos < atom.length - 8: - length, name, flags = struct.unpack(">I4sI", data[pos:pos+12]) - if name != "data": - raise MP4MetadataError( - "unexpected atom %r inside %r" % (name, atom.name)) - yield flags, data[pos+16:pos+length] - pos += length - def __render_data(self, key, flags, value): - return Atom.render(key, "".join([ - Atom.render("data", struct.pack(">2I", flags, 0) + data) - for data in value])) - - def __parse_freeform(self, atom, data): - length = cdata.uint_be(data[:4]) - mean = data[12:length] - pos = length - length = cdata.uint_be(data[pos:pos+4]) - name = data[pos+12:pos+length] - pos += length - value = [] - while pos < atom.length - 8: - length, atom_name = struct.unpack(">I4s", data[pos:pos+8]) - if atom_name != "data": - raise MP4MetadataError( - "unexpected atom %r inside %r" % (atom_name, atom.name)) - value.append(data[pos+16:pos+length]) - pos += length - if value: - self["%s:%s:%s" % (atom.name, mean, name)] = value - def __render_freeform(self, key, value): - dummy, mean, name = key.split(":", 2) - mean = struct.pack(">I4sI", len(mean) + 12, "mean", 0) + mean - name = struct.pack(">I4sI", len(name) + 12, "name", 0) + name - if isinstance(value, basestring): - value = [value] - return Atom.render("----", mean + name + "".join([ - struct.pack(">I4s2I", len(data) + 16, "data", 1, 0) + data - for data in value])) - - def __parse_pair(self, atom, data): - self[atom.name] = [struct.unpack(">2H", data[2:6]) for - flags, data in self.__parse_data(atom, data)] - def __render_pair(self, key, value): - data = [] - for (track, total) in value: - if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: - data.append(struct.pack(">4H", 0, track, total, 0)) - else: - raise MP4MetadataValueError( - "invalid numeric pair %r" % ((track, total),)) - return self.__render_data(key, 0, data) - - def __render_pair_no_trailing(self, key, value): - data = [] - for (track, total) in value: - if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: - data.append(struct.pack(">3H", 0, track, total)) - else: - raise MP4MetadataValueError( - "invalid numeric pair %r" % ((track, total),)) - return self.__render_data(key, 0, data) - - def __parse_genre(self, atom, data): - # Translate to a freeform genre. - genre = cdata.short_be(data[16:18]) - if "\xa9gen" not in self: - try: self["\xa9gen"] = [GENRES[genre - 1]] - except IndexError: pass - - def __parse_tempo(self, atom, data): - self[atom.name] = [cdata.ushort_be(value[1]) for - value in self.__parse_data(atom, data)] - - def __render_tempo(self, key, value): - try: - if len(value) == 0: - return self.__render_data(key, 0x15, "") - - if min(value) < 0 or max(value) >= 2**16: - raise MP4MetadataValueError( - "invalid 16 bit integers: %r" % value) - except TypeError: - raise MP4MetadataValueError( - "tmpo must be a list of 16 bit integers") - - values = map(cdata.to_ushort_be, value) - return self.__render_data(key, 0x15, values) - - def __parse_bool(self, atom, data): - try: self[atom.name] = bool(ord(data[16:17])) - except TypeError: self[atom.name] = False - def __render_bool(self, key, value): - return self.__render_data(key, 0x15, [chr(bool(value))]) - - def __parse_cover(self, atom, data): - self[atom.name] = [] - pos = 0 - while pos < atom.length - 8: - length, name, format = struct.unpack(">I4sI", data[pos:pos+12]) - if name != "data": - raise MP4MetadataError( - "unexpected atom %r inside 'covr'" % name) - if format not in (MP4Cover.FORMAT_JPEG, MP4Cover.FORMAT_PNG): - format = MP4Cover.FORMAT_JPEG - cover = MP4Cover(data[pos+16:pos+length], format) - self[atom.name].append(MP4Cover(data[pos+16:pos+length], format)) - pos += length - def __render_cover(self, key, value): - atom_data = [] - for cover in value: - try: format = cover.format - except AttributeError: format = MP4Cover.FORMAT_JPEG - atom_data.append( - Atom.render("data", struct.pack(">2I", format, 0) + cover)) - return Atom.render(key, "".join(atom_data)) - - def __parse_text(self, atom, data, expected_flags=1): - value = [text.decode('utf-8', 'replace') for flags, text - in self.__parse_data(atom, data) - if flags == expected_flags] - if value: - self[atom.name] = value - def __render_text(self, key, value, flags=1): - if isinstance(value, basestring): - value = [value] - return self.__render_data( - key, flags, [text.encode('utf-8') for text in value]) - - def delete(self, filename): - self.clear() - self.save(filename) - - __atoms = { - "----": (__parse_freeform, __render_freeform), - "trkn": (__parse_pair, __render_pair), - "disk": (__parse_pair, __render_pair_no_trailing), - "gnre": (__parse_genre, None), - "tmpo": (__parse_tempo, __render_tempo), - "cpil": (__parse_bool, __render_bool), - "pgap": (__parse_bool, __render_bool), - "pcst": (__parse_bool, __render_bool), - "covr": (__parse_cover, __render_cover), - "purl": (__parse_text, __render_text, 0), - "egid": (__parse_text, __render_text, 0), - } - - def pprint(self): - values = [] - for key, value in self.iteritems(): - key = key.decode('latin1') - if key == "covr": - values.append("%s=%s" % (key, ", ".join( - ["[%d bytes of data]" % len(data) for data in value]))) - elif isinstance(value, list): - values.append("%s=%s" % (key, " / ".join(map(unicode, value)))) - else: - values.append("%s=%s" % (key, value)) - return "\n".join(values) - -class MP4Info(object): - """MPEG-4 stream information. - - Attributes: - bitrate -- bitrate in bits per second, as an int - length -- file length in seconds, as a float - channels -- number of audio channels - sample_rate -- audio sampling rate in Hz - bits_per_sample -- bits per sample - """ - - bitrate = 0 - channels = 0 - sample_rate = 0 - bits_per_sample = 0 - - def __init__(self, atoms, fileobj): - for trak in list(atoms["moov"].findall("trak")): - hdlr = trak["mdia", "hdlr"] - fileobj.seek(hdlr.offset) - data = fileobj.read(hdlr.length) - if data[16:20] == "soun": - break - else: - raise MP4StreamInfoError("track has no audio data") - - mdhd = trak["mdia", "mdhd"] - fileobj.seek(mdhd.offset) - data = fileobj.read(mdhd.length) - if ord(data[8]) == 0: - offset = 20 - format = ">2I" - else: - offset = 28 - format = ">IQ" - end = offset + struct.calcsize(format) - unit, length = struct.unpack(format, data[offset:end]) - self.length = float(length) / unit - - try: - atom = trak["mdia", "minf", "stbl", "stsd"] - fileobj.seek(atom.offset) - data = fileobj.read(atom.length) - if data[20:24] == "mp4a": - length = cdata.uint_be(data[16:20]) - (self.channels, self.bits_per_sample, _, - self.sample_rate) = struct.unpack(">3HI", data[40:50]) - # ES descriptor type - if data[56:60] == "esds" and ord(data[64:65]) == 0x03: - pos = 65 - # skip extended descriptor type tag, length, ES ID - # and stream priority - if data[pos:pos+3] == "\x80\x80\x80": - pos += 3 - pos += 4 - # decoder config descriptor type - if ord(data[pos]) == 0x04: - pos += 1 - # skip extended descriptor type tag, length, - # object type ID, stream type, buffer size - # and maximum bitrate - if data[pos:pos+3] == "\x80\x80\x80": - pos += 3 - pos += 10 - # average bitrate - self.bitrate = cdata.uint_be(data[pos:pos+4]) - except (ValueError, KeyError): - # stsd atoms are optional - pass - - def pprint(self): - return "MPEG-4 audio, %.2f seconds, %d bps" % ( - self.length, self.bitrate) - -class MP4(FileType): - """An MPEG-4 audio file, probably containing AAC. - - If more than one track is present in the file, the first is used. - Only audio ('soun') tracks will be read. - """ - - _mimes = ["audio/mp4", "audio/x-m4a", "audio/mpeg4", "audio/aac"] - - def load(self, filename): - self.filename = filename - fileobj = file(filename, "rb") - try: - atoms = Atoms(fileobj) - try: self.info = MP4Info(atoms, fileobj) - except StandardError, err: - raise MP4StreamInfoError, err, sys.exc_info()[2] - try: self.tags = MP4Tags(atoms, fileobj) - except MP4MetadataError: - self.tags = None - except StandardError, err: - raise MP4MetadataError, err, sys.exc_info()[2] - finally: - fileobj.close() - - def add_tags(self): - self.tags = MP4Tags() - - def score(filename, fileobj, header): - return ("ftyp" in header) + ("mp4" in header) - score = staticmethod(score) - -Open = MP4 - -def delete(filename): - """Remove tags from a file.""" - MP4(filename).delete() diff --git a/lib/mutagen/musepack.py b/lib/mutagen/musepack.py deleted file mode 100644 index 7b13745fe..000000000 --- a/lib/mutagen/musepack.py +++ /dev/null @@ -1,118 +0,0 @@ -# A Musepack reader/tagger -# -# Copyright 2006 Lukas Lalinsky -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: musepack.py 4275 2008-06-01 06:32:37Z piman $ - -"""Musepack audio streams with APEv2 tags. - -Musepack is an audio format originally based on the MPEG-1 Layer-2 -algorithms. Stream versions 4 through 7 are supported. - -For more information, see http://www.musepack.net/. -""" - -__all__ = ["Musepack", "Open", "delete"] - -import struct - -from mutagen.apev2 import APEv2File, error, delete -from mutagen.id3 import BitPaddedInt -from mutagen._util import cdata - -class MusepackHeaderError(error): pass - -RATES = [44100, 48000, 37800, 32000] - -class MusepackInfo(object): - """Musepack stream information. - - Attributes: - channels -- number of audio channels - length -- file length in seconds, as a float - sample_rate -- audio sampling rate in Hz - bitrate -- audio bitrate, in bits per second - version -- Musepack stream version - - Optional Attributes: - title_gain, title_peak -- Replay Gain and peak data for this song - album_gain, album_peak -- Replay Gain and peak data for this album - - These attributes are only available in stream version 7. The - gains are a float, +/- some dB. The peaks are a percentage [0..1] of - the maximum amplitude. This means to get a number comparable to - VorbisGain, you must multiply the peak by 2. - """ - - def __init__(self, fileobj): - header = fileobj.read(32) - if len(header) != 32: - raise MusepackHeaderError("not a Musepack file") - # Skip ID3v2 tags - if header[:3] == "ID3": - size = 10 + BitPaddedInt(header[6:10]) - fileobj.seek(size) - header = fileobj.read(32) - if len(header) != 32: - raise MusepackHeaderError("not a Musepack file") - # SV7 - if header.startswith("MP+"): - self.version = ord(header[3]) & 0xF - if self.version < 7: - raise MusepackHeaderError("not a Musepack file") - frames = cdata.uint_le(header[4:8]) - flags = cdata.uint_le(header[8:12]) - - self.title_peak, self.title_gain = struct.unpack( - "> 16) & 0x0003] - self.bitrate = 0 - # SV4-SV6 - else: - header_dword = cdata.uint_le(header[0:4]) - self.version = (header_dword >> 11) & 0x03FF; - if self.version < 4 or self.version > 6: - raise MusepackHeaderError("not a Musepack file") - self.bitrate = (header_dword >> 23) & 0x01FF; - self.sample_rate = 44100 - if self.version >= 5: - frames = cdata.uint_le(header[4:8]) - else: - frames = cdata.ushort_le(header[6:8]) - if self.version < 6: - frames -= 1 - self.channels = 2 - self.length = float(frames * 1152 - 576) / self.sample_rate - if not self.bitrate and self.length != 0: - fileobj.seek(0, 2) - self.bitrate = int(fileobj.tell() * 8 / (self.length * 1000) + 0.5) - - def pprint(self): - if self.version >= 7: - rg_data = ", Gain: %+0.2f (title), %+0.2f (album)" %( - self.title_gain, self.album_gain) - else: - rg_data = "" - return "Musepack, %.2f seconds, %d Hz%s" % ( - self.length, self.sample_rate, rg_data) - -class Musepack(APEv2File): - _Info = MusepackInfo - _mimes = ["audio/x-musepack", "audio/x-mpc"] - - def score(filename, fileobj, header): - return header.startswith("MP+") + filename.endswith(".mpc") - score = staticmethod(score) - -Open = Musepack diff --git a/lib/mutagen/ogg.py b/lib/mutagen/ogg.py deleted file mode 100644 index c81add640..000000000 --- a/lib/mutagen/ogg.py +++ /dev/null @@ -1,498 +0,0 @@ -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: ogg.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write Ogg bitstreams and pages. - -This module reads and writes a subset of the Ogg bitstream format -version 0. It does *not* read or write Ogg Vorbis files! For that, -you should use mutagen.oggvorbis. - -This implementation is based on the RFC 3533 standard found at -http://www.xiph.org/ogg/doc/rfc3533.txt. -""" - -import struct -import sys -import zlib - -from cStringIO import StringIO - -from mutagen import FileType -from mutagen._util import cdata, insert_bytes, delete_bytes - -class error(IOError): - """Ogg stream parsing errors.""" - pass - -class OggPage(object): - """A single Ogg page (not necessarily a single encoded packet). - - A page is a header of 26 bytes, followed by the length of the - data, followed by the data. - - The constructor is givin a file-like object pointing to the start - of an Ogg page. After the constructor is finished it is pointing - to the start of the next page. - - Attributes: - version -- stream structure version (currently always 0) - position -- absolute stream position (default -1) - serial -- logical stream serial number (default 0) - sequence -- page sequence number within logical stream (default 0) - offset -- offset this page was read from (default None) - complete -- if the last packet on this page is complete (default True) - packets -- list of raw packet data (default []) - - Note that if 'complete' is false, the next page's 'continued' - property must be true (so set both when constructing pages). - - If a file-like object is supplied to the constructor, the above - attributes will be filled in based on it. - """ - - version = 0 - __type_flags = 0 - position = 0L - serial = 0 - sequence = 0 - offset = None - complete = True - - def __init__(self, fileobj=None): - self.packets = [] - - if fileobj is None: - return - - self.offset = fileobj.tell() - - header = fileobj.read(27) - if len(header) == 0: - raise EOFError - - try: - (oggs, self.version, self.__type_flags, self.position, - self.serial, self.sequence, crc, segments) = struct.unpack( - "<4sBBqIIiB", header) - except struct.error: - raise error("unable to read full header; got %r" % header) - - if oggs != "OggS": - raise error("read %r, expected %r, at 0x%x" % ( - oggs, "OggS", fileobj.tell() - 27)) - - if self.version != 0: - raise error("version %r unsupported" % self.version) - - total = 0 - lacings = [] - lacing_bytes = fileobj.read(segments) - if len(lacing_bytes) != segments: - raise error("unable to read %r lacing bytes" % segments) - for c in map(ord, lacing_bytes): - total += c - if c < 255: - lacings.append(total) - total = 0 - if total: - lacings.append(total) - self.complete = False - - self.packets = map(fileobj.read, lacings) - if map(len, self.packets) != lacings: - raise error("unable to read full data") - - def __eq__(self, other): - """Two Ogg pages are the same if they write the same data.""" - try: - return (self.write() == other.write()) - except AttributeError: - return False - - def __repr__(self): - attrs = ['version', 'position', 'serial', 'sequence', 'offset', - 'complete', 'continued', 'first', 'last'] - values = ["%s=%r" % (attr, getattr(self, attr)) for attr in attrs] - return "<%s %s, %d bytes in %d packets>" % ( - type(self).__name__, " ".join(values), sum(map(len, self.packets)), - len(self.packets)) - - def write(self): - """Return a string encoding of the page header and data. - - A ValueError is raised if the data is too big to fit in a - single page. - """ - - data = [ - struct.pack("<4sBBqIIi", "OggS", self.version, self.__type_flags, - self.position, self.serial, self.sequence, 0) - ] - - lacing_data = [] - for datum in self.packets: - quot, rem = divmod(len(datum), 255) - lacing_data.append("\xff" * quot + chr(rem)) - lacing_data = "".join(lacing_data) - if not self.complete and lacing_data.endswith("\x00"): - lacing_data = lacing_data[:-1] - data.append(chr(len(lacing_data))) - data.append(lacing_data) - data.extend(self.packets) - data = "".join(data) - - # Python's CRC is swapped relative to Ogg's needs. - crc = ~zlib.crc32(data.translate(cdata.bitswap), -1) - # Although we're using to_int_be, this actually makes the CRC - # a proper le integer, since Python's CRC is byteswapped. - crc = cdata.to_int_be(crc).translate(cdata.bitswap) - data = data[:22] + crc + data[26:] - return data - - def __size(self): - size = 27 # Initial header size - for datum in self.packets: - quot, rem = divmod(len(datum), 255) - size += quot + 1 - if not self.complete and rem == 0: - # Packet contains a multiple of 255 bytes and is not - # terminated, so we don't have a \x00 at the end. - size -= 1 - size += sum(map(len, self.packets)) - return size - - size = property(__size, doc="Total frame size.") - - def __set_flag(self, bit, val): - mask = 1 << bit - if val: self.__type_flags |= mask - else: self.__type_flags &= ~mask - - continued = property( - lambda self: cdata.test_bit(self.__type_flags, 0), - lambda self, v: self.__set_flag(0, v), - doc="The first packet is continued from the previous page.") - - first = property( - lambda self: cdata.test_bit(self.__type_flags, 1), - lambda self, v: self.__set_flag(1, v), - doc="This is the first page of a logical bitstream.") - - last = property( - lambda self: cdata.test_bit(self.__type_flags, 2), - lambda self, v: self.__set_flag(2, v), - doc="This is the last page of a logical bitstream.") - - def renumber(klass, fileobj, serial, start): - """Renumber pages belonging to a specified logical stream. - - fileobj must be opened with mode r+b or w+b. - - Starting at page number 'start', renumber all pages belonging - to logical stream 'serial'. Other pages will be ignored. - - fileobj must point to the start of a valid Ogg page; any - occuring after it and part of the specified logical stream - will be numbered. No adjustment will be made to the data in - the pages nor the granule position; only the page number, and - so also the CRC. - - If an error occurs (e.g. non-Ogg data is found), fileobj will - be left pointing to the place in the stream the error occured, - but the invalid data will be left intact (since this function - does not change the total file size). - """ - - number = start - while True: - try: page = OggPage(fileobj) - except EOFError: - break - else: - if page.serial != serial: - # Wrong stream, skip this page. - continue - # Changing the number can't change the page size, - # so seeking back based on the current size is safe. - fileobj.seek(-page.size, 1) - page.sequence = number - fileobj.write(page.write()) - fileobj.seek(page.offset + page.size, 0) - number += 1 - renumber = classmethod(renumber) - - def to_packets(klass, pages, strict=False): - """Construct a list of packet data from a list of Ogg pages. - - If strict is true, the first page must start a new packet, - and the last page must end the last packet. - """ - - serial = pages[0].serial - sequence = pages[0].sequence - packets = [] - - if strict: - if pages[0].continued: - raise ValueError("first packet is continued") - if not pages[-1].complete: - raise ValueError("last packet does not complete") - elif pages and pages[0].continued: - packets.append("") - - for page in pages: - if serial != page.serial: - raise ValueError("invalid serial number in %r" % page) - elif sequence != page.sequence: - raise ValueError("bad sequence number in %r" % page) - else: sequence += 1 - - if page.continued: packets[-1] += page.packets[0] - else: packets.append(page.packets[0]) - packets.extend(page.packets[1:]) - - return packets - to_packets = classmethod(to_packets) - - def from_packets(klass, packets, sequence=0, - default_size=4096, wiggle_room=2048): - """Construct a list of Ogg pages from a list of packet data. - - The algorithm will generate pages of approximately - default_size in size (rounded down to the nearest multiple of - 255). However, it will also allow pages to increase to - approximately default_size + wiggle_room if allowing the - wiggle room would finish a packet (only one packet will be - finished in this way per page; if the next packet would fit - into the wiggle room, it still starts on a new page). - - This method reduces packet fragmentation when packet sizes are - slightly larger than the default page size, while still - ensuring most pages are of the average size. - - Pages are numbered started at 'sequence'; other information is - uninitialized. - """ - - chunk_size = (default_size // 255) * 255 - - pages = [] - - page = OggPage() - page.sequence = sequence - - for packet in packets: - page.packets.append("") - while packet: - data, packet = packet[:chunk_size], packet[chunk_size:] - if page.size < default_size and len(page.packets) < 255: - page.packets[-1] += data - else: - # If we've put any packet data into this page yet, - # we need to mark it incomplete. However, we can - # also have just started this packet on an already - # full page, in which case, just start the new - # page with this packet. - if page.packets[-1]: - page.complete = False - if len(page.packets) == 1: - page.position = -1L - else: - page.packets.pop(-1) - pages.append(page) - page = OggPage() - page.continued = not pages[-1].complete - page.sequence = pages[-1].sequence + 1 - page.packets.append(data) - - if len(packet) < wiggle_room: - page.packets[-1] += packet - packet = "" - - if page.packets: - pages.append(page) - - return pages - from_packets = classmethod(from_packets) - - def replace(klass, fileobj, old_pages, new_pages): - """Replace old_pages with new_pages within fileobj. - - old_pages must have come from reading fileobj originally. - new_pages are assumed to have the 'same' data as old_pages, - and so the serial and sequence numbers will be copied, as will - the flags for the first and last pages. - - fileobj will be resized and pages renumbered as necessary. As - such, it must be opened r+b or w+b. - """ - - # Number the new pages starting from the first old page. - first = old_pages[0].sequence - for page, seq in zip(new_pages, range(first, first + len(new_pages))): - page.sequence = seq - page.serial = old_pages[0].serial - - new_pages[0].first = old_pages[0].first - new_pages[0].last = old_pages[0].last - new_pages[0].continued = old_pages[0].continued - - new_pages[-1].first = old_pages[-1].first - new_pages[-1].last = old_pages[-1].last - new_pages[-1].complete = old_pages[-1].complete - if not new_pages[-1].complete and len(new_pages[-1].packets) == 1: - new_pages[-1].position = -1L - - new_data = "".join(map(klass.write, new_pages)) - - # Make room in the file for the new data. - delta = len(new_data) - fileobj.seek(old_pages[0].offset, 0) - insert_bytes(fileobj, delta, old_pages[0].offset) - fileobj.seek(old_pages[0].offset, 0) - fileobj.write(new_data) - new_data_end = old_pages[0].offset + delta - - # Go through the old pages and delete them. Since we shifted - # the data down the file, we need to adjust their offsets. We - # also need to go backwards, so we don't adjust the deltas of - # the other pages. - old_pages.reverse() - for old_page in old_pages: - adj_offset = old_page.offset + delta - delete_bytes(fileobj, old_page.size, adj_offset) - - # Finally, if there's any discrepency in length, we need to - # renumber the pages for the logical stream. - if len(old_pages) != len(new_pages): - fileobj.seek(new_data_end, 0) - serial = new_pages[-1].serial - sequence = new_pages[-1].sequence + 1 - klass.renumber(fileobj, serial, sequence) - replace = classmethod(replace) - - def find_last(klass, fileobj, serial): - """Find the last page of the stream 'serial'. - - If the file is not multiplexed this function is fast. If it is, - it must read the whole the stream. - - This finds the last page in the actual file object, or the last - page in the stream (with eos set), whichever comes first. - """ - - # For non-muxed streams, look at the last page. - try: fileobj.seek(-256*256, 2) - except IOError: - # The file is less than 64k in length. - fileobj.seek(0) - data = fileobj.read() - try: index = data.rindex("OggS") - except ValueError: - raise error("unable to find final Ogg header") - stringobj = StringIO(data[index:]) - best_page = None - try: - page = OggPage(stringobj) - except error: - pass - else: - if page.serial == serial: - if page.last: return page - else: best_page = page - else: best_page = None - - # The stream is muxed, so use the slow way. - fileobj.seek(0) - try: - page = OggPage(fileobj) - while not page.last: - page = OggPage(fileobj) - while page.serial != serial: - page = OggPage(fileobj) - best_page = page - return page - except error: - return best_page - except EOFError: - return best_page - find_last = classmethod(find_last) - -class OggFileType(FileType): - """An generic Ogg file.""" - - _Info = None - _Tags = None - _Error = None - _mimes = ["application/ogg", "application/x-ogg"] - - def load(self, filename): - """Load file information from a filename.""" - - self.filename = filename - fileobj = file(filename, "rb") - try: - try: - self.info = self._Info(fileobj) - self.tags = self._Tags(fileobj, self.info) - - if self.info.length: - # The streaminfo gave us real length information, - # don't waste time scanning the Ogg. - return - - last_page = OggPage.find_last(fileobj, self.info.serial) - samples = last_page.position - try: - denom = self.info.sample_rate - except AttributeError: - denom = self.info.fps - self.info.length = samples / float(denom) - - except error, e: - raise self._Error, e, sys.exc_info()[2] - except EOFError: - raise self._Error, "no appropriate stream found" - finally: - fileobj.close() - - def delete(self, filename=None): - """Remove tags from a file. - - If no filename is given, the one most recently loaded is used. - """ - if filename is None: - filename = self.filename - - self.tags.clear() - fileobj = file(filename, "rb+") - try: - try: self.tags._inject(fileobj) - except error, e: - raise self._Error, e, sys.exc_info()[2] - except EOFError: - raise self._Error, "no appropriate stream found" - finally: - fileobj.close() - - def save(self, filename=None): - """Save a tag to a file. - - If no filename is given, the one most recently loaded is used. - """ - if filename is None: - filename = self.filename - fileobj = file(filename, "rb+") - try: - try: self.tags._inject(fileobj) - except error, e: - raise self._Error, e, sys.exc_info()[2] - except EOFError: - raise self._Error, "no appropriate stream found" - finally: - fileobj.close() diff --git a/lib/mutagen/oggflac.py b/lib/mutagen/oggflac.py deleted file mode 100644 index 05daf9a08..000000000 --- a/lib/mutagen/oggflac.py +++ /dev/null @@ -1,127 +0,0 @@ -# Ogg FLAC support. -# -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: oggflac.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write Ogg FLAC comments. - -This module handles FLAC files wrapped in an Ogg bitstream. The first -FLAC stream found is used. For 'naked' FLACs, see mutagen.flac. - -This module is bsaed off the specification at -http://flac.sourceforge.net/ogg_mapping.html. -""" - -__all__ = ["OggFLAC", "Open", "delete"] - -import struct - -from cStringIO import StringIO - -from mutagen.flac import StreamInfo, VCFLACDict -from mutagen.ogg import OggPage, OggFileType, error as OggError - -class error(OggError): pass -class OggFLACHeaderError(error): pass - -class OggFLACStreamInfo(StreamInfo): - """Ogg FLAC general header and stream info. - - This encompasses the Ogg wrapper for the FLAC STREAMINFO metadata - block, as well as the Ogg codec setup that precedes it. - - Attributes (in addition to StreamInfo's): - packets -- number of metadata packets - serial -- Ogg logical stream serial number - """ - - packets = 0 - serial = 0 - - def load(self, data): - page = OggPage(data) - while not page.packets[0].startswith("\x7FFLAC"): - page = OggPage(data) - major, minor, self.packets, flac = struct.unpack( - ">BBH4s", page.packets[0][5:13]) - if flac != "fLaC": - raise OggFLACHeaderError("invalid FLAC marker (%r)" % flac) - elif (major, minor) != (1, 0): - raise OggFLACHeaderError( - "unknown mapping version: %d.%d" % (major, minor)) - self.serial = page.serial - - # Skip over the block header. - stringobj = StringIO(page.packets[0][17:]) - super(OggFLACStreamInfo, self).load(StringIO(page.packets[0][17:])) - - def pprint(self): - return "Ogg " + super(OggFLACStreamInfo, self).pprint() - -class OggFLACVComment(VCFLACDict): - def load(self, data, info, errors='replace'): - # data should be pointing at the start of an Ogg page, after - # the first FLAC page. - pages = [] - complete = False - while not complete: - page = OggPage(data) - if page.serial == info.serial: - pages.append(page) - complete = page.complete or (len(page.packets) > 1) - comment = StringIO(OggPage.to_packets(pages)[0][4:]) - super(OggFLACVComment, self).load(comment, errors=errors) - - def _inject(self, fileobj): - """Write tag data into the FLAC Vorbis comment packet/page.""" - - # Ogg FLAC has no convenient data marker like Vorbis, but the - # second packet - and second page - must be the comment data. - fileobj.seek(0) - page = OggPage(fileobj) - while not page.packets[0].startswith("\x7FFLAC"): - page = OggPage(fileobj) - - first_page = page - while not (page.sequence == 1 and page.serial == first_page.serial): - page = OggPage(fileobj) - - old_pages = [page] - while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): - page = OggPage(fileobj) - if page.serial == first_page.serial: - old_pages.append(page) - - packets = OggPage.to_packets(old_pages, strict=False) - - # Set the new comment block. - data = self.write() - data = packets[0][0] + struct.pack(">I", len(data))[-3:] + data - packets[0] = data - - new_pages = OggPage.from_packets(packets, old_pages[0].sequence) - OggPage.replace(fileobj, old_pages, new_pages) - -class OggFLAC(OggFileType): - """An Ogg FLAC file.""" - - _Info = OggFLACStreamInfo - _Tags = OggFLACVComment - _Error = OggFLACHeaderError - _mimes = ["audio/x-oggflac"] - - def score(filename, fileobj, header): - return (header.startswith("OggS") * ( - ("FLAC" in header) + ("fLaC" in header))) - score = staticmethod(score) - -Open = OggFLAC - -def delete(filename): - """Remove tags from a file.""" - OggFLAC(filename).delete() diff --git a/lib/mutagen/oggspeex.py b/lib/mutagen/oggspeex.py deleted file mode 100644 index 86a5adfe2..000000000 --- a/lib/mutagen/oggspeex.py +++ /dev/null @@ -1,125 +0,0 @@ -# Ogg Speex support. -# -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: oggspeex.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write Ogg Speex comments. - -This module handles Speex files wrapped in an Ogg bitstream. The -first Speex stream found is used. - -Read more about Ogg Speex at http://www.speex.org/. This module is -based on the specification at http://www.speex.org/manual2/node7.html -and clarifications after personal communication with Jean-Marc, -http://lists.xiph.org/pipermail/speex-dev/2006-July/004676.html. -""" - -__all__ = ["OggSpeex", "Open", "delete"] - -from mutagen._vorbis import VCommentDict -from mutagen.ogg import OggPage, OggFileType, error as OggError -from mutagen._util import cdata - -class error(OggError): pass -class OggSpeexHeaderError(error): pass - -class OggSpeexInfo(object): - """Ogg Speex stream information. - - Attributes: - bitrate - nominal bitrate in bits per second - channels - number of channels - length - file length in seconds, as a float - - The reference encoder does not set the bitrate; in this case, - the bitrate will be 0. - """ - - length = 0 - - def __init__(self, fileobj): - page = OggPage(fileobj) - while not page.packets[0].startswith("Speex "): - page = OggPage(fileobj) - if not page.first: - raise OggSpeexHeaderError( - "page has ID header, but doesn't start a stream") - self.sample_rate = cdata.uint_le(page.packets[0][36:40]) - self.channels = cdata.uint_le(page.packets[0][48:52]) - self.bitrate = cdata.int_le(page.packets[0][52:56]) - if self.bitrate == -1: - self.bitrate = 0 - self.serial = page.serial - - def pprint(self): - return "Ogg Speex, %.2f seconds" % self.length - -class OggSpeexVComment(VCommentDict): - """Speex comments embedded in an Ogg bitstream.""" - - def __init__(self, fileobj, info): - pages = [] - complete = False - while not complete: - page = OggPage(fileobj) - if page.serial == info.serial: - pages.append(page) - complete = page.complete or (len(page.packets) > 1) - data = OggPage.to_packets(pages)[0] + "\x01" - super(OggSpeexVComment, self).__init__(data, framing=False) - - def _inject(self, fileobj): - """Write tag data into the Speex comment packet/page.""" - - fileobj.seek(0) - - # Find the first header page, with the stream info. - # Use it to get the serial number. - page = OggPage(fileobj) - while not page.packets[0].startswith("Speex "): - page = OggPage(fileobj) - - # Look for the next page with that serial number, it'll start - # the comment packet. - serial = page.serial - page = OggPage(fileobj) - while page.serial != serial: - page = OggPage(fileobj) - - # Then find all the pages with the comment packet. - old_pages = [page] - while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): - page = OggPage(fileobj) - if page.serial == old_pages[0].serial: - old_pages.append(page) - - packets = OggPage.to_packets(old_pages, strict=False) - - # Set the new comment packet. - packets[0] = self.write(framing=False) - - new_pages = OggPage.from_packets(packets, old_pages[0].sequence) - OggPage.replace(fileobj, old_pages, new_pages) - -class OggSpeex(OggFileType): - """An Ogg Speex file.""" - - _Info = OggSpeexInfo - _Tags = OggSpeexVComment - _Error = OggSpeexHeaderError - _mimes = ["audio/x-speex"] - - def score(filename, fileobj, header): - return (header.startswith("OggS") * ("Speex " in header)) - score = staticmethod(score) - -Open = OggSpeex - -def delete(filename): - """Remove tags from a file.""" - OggSpeex(filename).delete() diff --git a/lib/mutagen/oggtheora.py b/lib/mutagen/oggtheora.py deleted file mode 100644 index e2620414f..000000000 --- a/lib/mutagen/oggtheora.py +++ /dev/null @@ -1,111 +0,0 @@ -# Ogg Theora support. -# -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: oggtheora.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write Ogg Theora comments. - -This module handles Theora files wrapped in an Ogg bitstream. The -first Theora stream found is used. - -Based on the specification at http://theora.org/doc/Theora_I_spec.pdf. -""" - -__all__ = ["OggTheora", "Open", "delete"] - -import struct - -from mutagen._vorbis import VCommentDict -from mutagen.ogg import OggPage, OggFileType, error as OggError - -class error(OggError): pass -class OggTheoraHeaderError(error): pass - -class OggTheoraInfo(object): - """Ogg Theora stream information. - - Attributes: - length - file length in seconds, as a float - fps - video frames per second, as a float - """ - - length = 0 - - def __init__(self, fileobj): - page = OggPage(fileobj) - while not page.packets[0].startswith("\x80theora"): - page = OggPage(fileobj) - if not page.first: - raise OggTheoraHeaderError( - "page has ID header, but doesn't start a stream") - data = page.packets[0] - vmaj, vmin = struct.unpack("2B", data[7:9]) - if (vmaj, vmin) != (3, 2): - raise OggTheoraHeaderError( - "found Theora version %d.%d != 3.2" % (vmaj, vmin)) - fps_num, fps_den = struct.unpack(">2I", data[22:30]) - self.fps = fps_num / float(fps_den) - self.bitrate, = struct.unpack(">I", data[37:40] + "\x00") - self.serial = page.serial - - def pprint(self): - return "Ogg Theora, %.2f seconds, %d bps" % (self.length, self.bitrate) - -class OggTheoraCommentDict(VCommentDict): - """Theora comments embedded in an Ogg bitstream.""" - - def __init__(self, fileobj, info): - pages = [] - complete = False - while not complete: - page = OggPage(fileobj) - if page.serial == info.serial: - pages.append(page) - complete = page.complete or (len(page.packets) > 1) - data = OggPage.to_packets(pages)[0][7:] - super(OggTheoraCommentDict, self).__init__(data + "\x01") - - def _inject(self, fileobj): - """Write tag data into the Theora comment packet/page.""" - - fileobj.seek(0) - page = OggPage(fileobj) - while not page.packets[0].startswith("\x81theora"): - page = OggPage(fileobj) - - old_pages = [page] - while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): - page = OggPage(fileobj) - if page.serial == old_pages[0].serial: - old_pages.append(page) - - packets = OggPage.to_packets(old_pages, strict=False) - - packets[0] = "\x81theora" + self.write(framing=False) - - new_pages = OggPage.from_packets(packets, old_pages[0].sequence) - OggPage.replace(fileobj, old_pages, new_pages) - -class OggTheora(OggFileType): - """An Ogg Theora file.""" - - _Info = OggTheoraInfo - _Tags = OggTheoraCommentDict - _Error = OggTheoraHeaderError - _mimes = ["video/x-theora"] - - def score(filename, fileobj, header): - return (header.startswith("OggS") * - (("\x80theora" in header) + ("\x81theora" in header))) - score = staticmethod(score) - -Open = OggTheora - -def delete(filename): - """Remove tags from a file.""" - OggTheora(filename).delete() diff --git a/lib/mutagen/oggvorbis.py b/lib/mutagen/oggvorbis.py deleted file mode 100644 index e1de1bd0d..000000000 --- a/lib/mutagen/oggvorbis.py +++ /dev/null @@ -1,119 +0,0 @@ -# Ogg Vorbis support. -# -# Copyright 2006 Joe Wreschnig -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: oggvorbis.py 4275 2008-06-01 06:32:37Z piman $ - -"""Read and write Ogg Vorbis comments. - -This module handles Vorbis files wrapped in an Ogg bitstream. The -first Vorbis stream found is used. - -Read more about Ogg Vorbis at http://vorbis.com/. This module is based -on the specification at http://www.xiph.org/vorbis/doc/Vorbis_I_spec.html. -""" - -__all__ = ["OggVorbis", "Open", "delete"] - -import struct - -from mutagen._vorbis import VCommentDict -from mutagen.ogg import OggPage, OggFileType, error as OggError - -class error(OggError): pass -class OggVorbisHeaderError(error): pass - -class OggVorbisInfo(object): - """Ogg Vorbis stream information. - - Attributes: - length - file length in seconds, as a float - bitrate - nominal ('average') bitrate in bits per second, as an int - """ - - length = 0 - - def __init__(self, fileobj): - page = OggPage(fileobj) - while not page.packets[0].startswith("\x01vorbis"): - page = OggPage(fileobj) - if not page.first: - raise OggVorbisHeaderError( - "page has ID header, but doesn't start a stream") - (self.channels, self.sample_rate, max_bitrate, nominal_bitrate, - min_bitrate) = struct.unpack(" nominal_bitrate: - self.bitrate = min_bitrate - else: - self.bitrate = nominal_bitrate - - def pprint(self): - return "Ogg Vorbis, %.2f seconds, %d bps" % (self.length, self.bitrate) - -class OggVCommentDict(VCommentDict): - """Vorbis comments embedded in an Ogg bitstream.""" - - def __init__(self, fileobj, info): - pages = [] - complete = False - while not complete: - page = OggPage(fileobj) - if page.serial == info.serial: - pages.append(page) - complete = page.complete or (len(page.packets) > 1) - data = OggPage.to_packets(pages)[0][7:] # Strip off "\x03vorbis". - super(OggVCommentDict, self).__init__(data) - - def _inject(self, fileobj): - """Write tag data into the Vorbis comment packet/page.""" - - # Find the old pages in the file; we'll need to remove them, - # plus grab any stray setup packet data out of them. - fileobj.seek(0) - page = OggPage(fileobj) - while not page.packets[0].startswith("\x03vorbis"): - page = OggPage(fileobj) - - old_pages = [page] - while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): - page = OggPage(fileobj) - if page.serial == old_pages[0].serial: - old_pages.append(page) - - packets = OggPage.to_packets(old_pages, strict=False) - - # Set the new comment packet. - packets[0] = "\x03vorbis" + self.write() - - new_pages = OggPage.from_packets(packets, old_pages[0].sequence) - OggPage.replace(fileobj, old_pages, new_pages) - -class OggVorbis(OggFileType): - """An Ogg Vorbis file.""" - - _Info = OggVorbisInfo - _Tags = OggVCommentDict - _Error = OggVorbisHeaderError - _mimes = ["audio/vorbis", "audio/x-vorbis"] - - def score(filename, fileobj, header): - return (header.startswith("OggS") * ("\x01vorbis" in header)) - score = staticmethod(score) - -Open = OggVorbis - -def delete(filename): - """Remove tags from a file.""" - OggVorbis(filename).delete() diff --git a/lib/mutagen/optimfrog.py b/lib/mutagen/optimfrog.py deleted file mode 100644 index 902c05723..000000000 --- a/lib/mutagen/optimfrog.py +++ /dev/null @@ -1,64 +0,0 @@ -# OptimFROG reader/tagger -# -# Copyright 2006 Lukas Lalinsky -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: optimfrog.py 4275 2008-06-01 06:32:37Z piman $ - -"""OptimFROG audio streams with APEv2 tags. - -OptimFROG is a lossless audio compression program. Its main goal is to -reduce at maximum the size of audio files, while permitting bit -identical restoration for all input. It is similar with the ZIP -compression, but it is highly specialized to compress audio data. - -Only versions 4.5 and higher are supported. - -For more information, see http://www.losslessaudio.org/ -""" - -__all__ = ["OptimFROG", "Open", "delete"] - -import struct -from mutagen.apev2 import APEv2File, error, delete - -class OptimFROGHeaderError(error): pass - -class OptimFROGInfo(object): - """OptimFROG stream information. - - Attributes: - channels - number of audio channels - length - file length in seconds, as a float - sample_rate - audio sampling rate in Hz - """ - - def __init__(self, fileobj): - header = fileobj.read(76) - if (len(header) != 76 or not header.startswith("OFR ") or - struct.unpack(" -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 as -# published by the Free Software Foundation. -# -# $Id: wavpack.py 4275 2008-06-01 06:32:37Z piman $ - -"""WavPack reading and writing. - -WavPack is a lossless format that uses APEv2 tags. Read -http://www.wavpack.com/ for more information. -""" - -__all__ = ["WavPack", "Open", "delete"] - -from mutagen.apev2 import APEv2File, error, delete -from mutagen._util import cdata - -class WavPackHeaderError(error): pass - -RATES = [6000, 8000, 9600, 11025, 12000, 16000, 22050, 24000, 32000, 44100, - 48000, 64000, 88200, 96000, 192000] - -class WavPackInfo(object): - """WavPack stream information. - - Attributes: - channels - number of audio channels (1 or 2) - length - file length in seconds, as a float - sample_rate - audio sampling rate in Hz - version - WavPack stream version - """ - - def __init__(self, fileobj): - header = fileobj.read(28) - if len(header) != 28 or not header.startswith("wvpk"): - raise WavPackHeaderError("not a WavPack file") - samples = cdata.uint_le(header[12:16]) - flags = cdata.uint_le(header[24:28]) - self.version = cdata.short_le(header[8:10]) - self.channels = bool(flags & 4) or 2 - self.sample_rate = RATES[(flags >> 23) & 0xF] - self.length = float(samples) / self.sample_rate - - def pprint(self): - return "WavPack, %.2f seconds, %d Hz" % (self.length, self.sample_rate) - -class WavPack(APEv2File): - _Info = WavPackInfo - _mimes = ["audio/x-wavpack"] - - def score(filename, fileobj, header): - return header.startswith("wvpk") * 2 - score = staticmethod(score) diff --git a/requirements.txt b/requirements.txt index cd8a0f98d..f52c8fd27 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ Django==1.1 librarian==1.2.6 lxml==2.2.2 Imaging==1.1.6 +mutagen==1.17