From: Marek Stępniowski Date: Sun, 30 Nov 2008 18:22:43 +0000 (+0100) Subject: Added mutagen library. X-Git-Url: https://git.mdrn.pl/wolnelektury.git/commitdiff_plain/14fef303c668733a4ff0458bc468595e4a1629ab?hp=4f5547ad4cd06de77bdf0607e38aab2d45080bfc Added mutagen library. --- diff --git a/lib/mutagen/__init__.py b/lib/mutagen/__init__.py new file mode 100644 index 000000000..94717275d --- /dev/null +++ b/lib/mutagen/__init__.py @@ -0,0 +1,201 @@ +#! /usr/bin/env python +# +# mutagen aims to be an all purpose media tagging library +# Copyright (C) 2005 Michael Urman +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. +# +# $Id: __init__.py 4275 2008-06-01 06:32:37Z piman $ +# + +"""Mutagen aims to be an all purpose tagging library. + + import mutagen.[format] + metadata = mutagen.[format].Open(filename) + +metadata acts like a dictionary of tags in the file. Tags are generally a +list of string-like values, but may have additional methods available +depending on tag or format. They may also be entirely different objects +for certain keys, again depending on format. +""" + +version = (1, 14) +version_string = ".".join(map(str, version)) + +import warnings + +import mutagen._util + +class Metadata(object): + """An abstract dict-like object. + + Metadata is the base class for many of the tag objects in Mutagen. + """ + + def __init__(self, *args, **kwargs): + if args or kwargs: + self.load(*args, **kwargs) + + def load(self, *args, **kwargs): + raise NotImplementedError + + def save(self, filename=None): + raise NotImplementedError + + def delete(self, filename=None): + raise NotImplementedError + +class FileType(mutagen._util.DictMixin): + """An abstract object wrapping tags and audio stream information. + + Attributes: + info -- stream information (length, bitrate, sample rate) + tags -- metadata tags, if any + + Each file format has different potential tags and stream + information. + + FileTypes implement an interface very similar to Metadata; the + dict interface, save, load, and delete calls on a FileType call + the appropriate methods on its tag data. + """ + + info = None + tags = None + filename = None + _mimes = ["application/octet-stream"] + + def __init__(self, filename=None, *args, **kwargs): + if filename is None: + warnings.warn("FileType constructor requires a filename", + DeprecationWarning) + else: + self.load(filename, *args, **kwargs) + + def load(self, filename, *args, **kwargs): + raise NotImplementedError + + def __getitem__(self, key): + """Look up a metadata tag key. + + If the file has no tags at all, a KeyError is raised. + """ + if self.tags is None: raise KeyError, key + else: return self.tags[key] + + def __setitem__(self, key, value): + """Set a metadata tag. + + If the file has no tags, an appropriate format is added (but + not written until save is called). + """ + if self.tags is None: + self.add_tags() + self.tags[key] = value + + def __delitem__(self, key): + """Delete a metadata tag key. + + If the file has no tags at all, a KeyError is raised. + """ + if self.tags is None: raise KeyError, key + else: del(self.tags[key]) + + def keys(self): + """Return a list of keys in the metadata tag. + + If the file has no tags at all, an empty list is returned. + """ + if self.tags is None: return [] + else: return self.tags.keys() + + def delete(self, filename=None): + """Remove tags from a file.""" + if self.tags is not None: + if filename is None: + filename = self.filename + else: + warnings.warn( + "delete(filename=...) is deprecated, reload the file", + DeprecationWarning) + return self.tags.delete(filename) + + def save(self, filename=None, **kwargs): + """Save metadata tags.""" + if filename is None: + filename = self.filename + else: + warnings.warn( + "save(filename=...) is deprecated, reload the file", + DeprecationWarning) + if self.tags is not None: + return self.tags.save(filename, **kwargs) + else: raise ValueError("no tags in file") + + def pprint(self): + """Print stream information and comment key=value pairs.""" + stream = "%s (%s)" % (self.info.pprint(), self.mime[0]) + try: tags = self.tags.pprint() + except AttributeError: + return stream + else: return stream + ((tags and "\n" + tags) or "") + + def add_tags(self): + raise NotImplementedError + + def __get_mime(self): + mimes = [] + for Kind in type(self).__mro__: + for mime in getattr(Kind, '_mimes', []): + if mime not in mimes: + mimes.append(mime) + return mimes + + mime = property(__get_mime) + +def File(filename, options=None): + """Guess the type of the file and try to open it. + + The file type is decided by several things, such as the first 128 + bytes (which usually contains a file type identifier), the + filename extension, and the presence of existing tags. + + If no appropriate type could be found, None is returned. + """ + + if options is None: + from mutagen.asf import ASF + from mutagen.apev2 import APEv2File + from mutagen.flac import FLAC + from mutagen.id3 import ID3FileType + from mutagen.mp3 import MP3 + from mutagen.oggflac import OggFLAC + from mutagen.oggspeex import OggSpeex + from mutagen.oggtheora import OggTheora + from mutagen.oggvorbis import OggVorbis + from mutagen.trueaudio import TrueAudio + from mutagen.wavpack import WavPack + from mutagen.mp4 import MP4 + from mutagen.musepack import Musepack + from mutagen.monkeysaudio import MonkeysAudio + from mutagen.optimfrog import OptimFROG + options = [MP3, TrueAudio, OggTheora, OggSpeex, OggVorbis, OggFLAC, + FLAC, APEv2File, MP4, ID3FileType, WavPack, Musepack, + MonkeysAudio, OptimFROG, ASF] + + if not options: + return None + + fileobj = file(filename, "rb") + try: + header = fileobj.read(128) + results = [Kind.score(filename, fileobj, header) for Kind in options] + finally: + fileobj.close() + results = zip(results, options) + results.sort() + score, Kind = results[-1] + if score > 0: return Kind(filename) + else: return None diff --git a/lib/mutagen/_constants.py b/lib/mutagen/_constants.py new file mode 100644 index 000000000..2381e9790 --- /dev/null +++ b/lib/mutagen/_constants.py @@ -0,0 +1,153 @@ +"""Constants used by Mutagen.""" + +GENRES = [ + u"Blues", + u"Classic Rock", + u"Country", + u"Dance", + u"Disco", + u"Funk", + u"Grunge", + u"Hip-Hop", + u"Jazz", + u"Metal", + u"New Age", + u"Oldies", + u"Other", + u"Pop", + u"R&B", + u"Rap", + u"Reggae", + u"Rock", + u"Techno", + u"Industrial", + u"Alternative", + u"Ska", + u"Death Metal", + u"Pranks", + u"Soundtrack", + u"Euro-Techno", + u"Ambient", + u"Trip-Hop", + u"Vocal", + u"Jazz+Funk", + u"Fusion", + u"Trance", + u"Classical", + u"Instrumental", + u"Acid", + u"House", + u"Game", + u"Sound Clip", + u"Gospel", + u"Noise", + u"Alt. Rock", + u"Bass", + u"Soul", + u"Punk", + u"Space", + u"Meditative", + u"Instrumental Pop", + u"Instrumental Rock", + u"Ethnic", + u"Gothic", + u"Darkwave", + u"Techno-Industrial", + u"Electronic", + u"Pop-Folk", + u"Eurodance", + u"Dream", + u"Southern Rock", + u"Comedy", + u"Cult", + u"Gangsta", + u"Top 40", + u"Christian Rap", + u"Pop/Funk", + u"Jungle", + u"Native American", + u"Cabaret", + u"New Wave", + u"Psychadelic", + u"Rave", + u"Showtunes", + u"Trailer", + u"Lo-Fi", + u"Tribal", + u"Acid Punk", + u"Acid Jazz", + u"Polka", + u"Retro", + u"Musical", + u"Rock & Roll", + u"Hard Rock", + u"Folk", + u"Folk/Rock", + u"National Folk", + u"Swing", + u"Fusion", + u"Bebob", + u"Latin", + u"Revival", + u"Celtic", + u"Bluegrass", + u"Avantgarde", + u"Gothic Rock", + u"Progressive Rock", + u"Psychadelic Rock", + u"Symphonic Rock", + u"Slow Rock", + u"Big Band", + u"Chorus", + u"Easy Listening", + u"Acoustic", + u"Humour", + u"Speech", + u"Chanson", + u"Opera", + u"Chamber Music", + u"Sonata", + u"Symphony", + u"Booty Bass", + u"Primus", + u"Porn Groove", + u"Satire", + u"Slow Jam", + u"Club", + u"Tango", + u"Samba", + u"Folklore", + u"Ballad", + u"Power Ballad", + u"Rhythmic Soul", + u"Freestyle", + u"Duet", + u"Punk Rock", + u"Drum Solo", + u"A Capella", + u"Euro-House", + u"Dance Hall", + u"Goa", + u"Drum & Bass", + u"Club-House", + u"Hardcore", + u"Terror", + u"Indie", + u"BritPop", + u"Negerpunk", + u"Polsk Punk", + u"Beat", + u"Christian Gangsta Rap", + u"Heavy Metal", + u"Black Metal", + u"Crossover", + u"Contemporary Christian", + u"Christian Rock", + u"Merengue", + u"Salsa", + u"Thrash Metal", + u"Anime", + u"Jpop", + u"Synthpop" + ] +"""The ID3v1 genre list.""" diff --git a/lib/mutagen/_util.py b/lib/mutagen/_util.py new file mode 100644 index 000000000..390679a62 --- /dev/null +++ b/lib/mutagen/_util.py @@ -0,0 +1,303 @@ +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: _util.py 4275 2008-06-01 06:32:37Z piman $ + +"""Utility classes for Mutagen. + +You should not rely on the interfaces here being stable. They are +intended for internal use in Mutagen only. +""" + +import struct + +class DictMixin(object): + """Implement the dict API using keys() and __*item__ methods. + + Similar to UserDict.DictMixin, this takes a class that defines + __getitem__, __setitem__, __delitem__, and keys(), and turns it + into a full dict-like object. + + UserDict.DictMixin is not suitable for this purpose because it's + an old-style class. + + This class is not optimized for very large dictionaries; many + functions have linear memory requirements. I recommend you + override some of these functions if speed is required. + """ + + def __iter__(self): + return iter(self.keys()) + + def has_key(self, key): + try: self[key] + except KeyError: return False + else: return True + __contains__ = has_key + + iterkeys = lambda self: iter(self.keys()) + + def values(self): + return map(self.__getitem__, self.keys()) + itervalues = lambda self: iter(self.values()) + + def items(self): + return zip(self.keys(), self.values()) + iteritems = lambda s: iter(s.items()) + + def clear(self): + map(self.__delitem__, self.keys()) + + def pop(self, key, *args): + if len(args) > 1: + raise TypeError("pop takes at most two arguments") + try: value = self[key] + except KeyError: + if args: return args[0] + else: raise + del(self[key]) + return value + + def popitem(self): + try: + key = self.keys()[0] + return key, self.pop(key) + except IndexError: raise KeyError("dictionary is empty") + + def update(self, other=None, **kwargs): + if other is None: + self.update(kwargs) + other = {} + + try: map(self.__setitem__, other.keys(), other.values()) + except AttributeError: + for key, value in other: + self[key] = value + + def setdefault(self, key, default=None): + try: return self[key] + except KeyError: + self[key] = default + return default + + def get(self, key, default=None): + try: return self[key] + except KeyError: return default + + def __repr__(self): + return repr(dict(self.items())) + + def __cmp__(self, other): + if other is None: return 1 + else: return cmp(dict(self.items()), other) + + def __len__(self): + return len(self.keys()) + +class DictProxy(DictMixin): + def __init__(self, *args, **kwargs): + self.__dict = {} + super(DictProxy, self).__init__(*args, **kwargs) + + def __getitem__(self, key): + return self.__dict[key] + + def __setitem__(self, key, value): + self.__dict[key] = value + + def __delitem__(self, key): + del(self.__dict[key]) + + def keys(self): + return self.__dict.keys() + +class cdata(object): + """C character buffer to Python numeric type conversions.""" + + from struct import error + + short_le = staticmethod(lambda data: struct.unpack('h', data)[0]) + ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0]) + + int_le = staticmethod(lambda data: struct.unpack('i', data)[0]) + uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0]) + + longlong_le = staticmethod(lambda data: struct.unpack('q', data)[0]) + ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0]) + + to_short_le = staticmethod(lambda data: struct.pack('h', data)) + to_ushort_be = staticmethod(lambda data: struct.pack('>H', data)) + + to_int_le = staticmethod(lambda data: struct.pack('i', data)) + to_uint_be = staticmethod(lambda data: struct.pack('>I', data)) + + to_longlong_le = staticmethod(lambda data: struct.pack('q', data)) + to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data)) + + bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)])) + for val in range(256)]) + del(i) + del(val) + + test_bit = staticmethod(lambda value, n: bool((value >> n) & 1)) + +def lock(fileobj): + """Lock a file object 'safely'. + + That means a failure to lock because the platform doesn't + support fcntl or filesystem locks is not considered a + failure. This call does block. + + Returns whether or not the lock was successful, or + raises an exception in more extreme circumstances (full + lock table, invalid file). + """ + try: import fcntl + except ImportError: + return False + else: + try: fcntl.lockf(fileobj, fcntl.LOCK_EX) + except IOError: + # FIXME: There's possibly a lot of complicated + # logic that needs to go here in case the IOError + # is EACCES or EAGAIN. + return False + else: + return True + +def unlock(fileobj): + """Unlock a file object. + + Don't call this on a file object unless a call to lock() + returned true. + """ + # If this fails there's a mismatched lock/unlock pair, + # so we definitely don't want to ignore errors. + import fcntl + fcntl.lockf(fileobj, fcntl.LOCK_UN) + +def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16): + """Insert size bytes of empty space starting at offset. + + fobj must be an open file object, open rb+ or + equivalent. Mutagen tries to use mmap to resize the file, but + falls back to a significantly slower method if mmap fails. + """ + assert 0 < size + assert 0 <= offset + locked = False + fobj.seek(0, 2) + filesize = fobj.tell() + movesize = filesize - offset + fobj.write('\x00' * size) + fobj.flush() + try: + try: + import mmap + map = mmap.mmap(fobj.fileno(), filesize + size) + try: map.move(offset + size, offset, movesize) + finally: map.close() + except (ValueError, EnvironmentError, ImportError): + # handle broken mmap scenarios + locked = lock(fobj) + fobj.truncate(filesize) + + fobj.seek(0, 2) + padsize = size + # Don't generate an enormous string if we need to pad + # the file out several megs. + while padsize: + addsize = min(BUFFER_SIZE, padsize) + fobj.write("\x00" * addsize) + padsize -= addsize + + fobj.seek(filesize, 0) + while movesize: + # At the start of this loop, fobj is pointing at the end + # of the data we need to move, which is of movesize length. + thismove = min(BUFFER_SIZE, movesize) + # Seek back however much we're going to read this frame. + fobj.seek(-thismove, 1) + nextpos = fobj.tell() + # Read it, so we're back at the end. + data = fobj.read(thismove) + # Seek back to where we need to write it. + fobj.seek(-thismove + size, 1) + # Write it. + fobj.write(data) + # And seek back to the end of the unmoved data. + fobj.seek(nextpos) + movesize -= thismove + + fobj.flush() + finally: + if locked: + unlock(fobj) + +def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16): + """Delete size bytes of empty space starting at offset. + + fobj must be an open file object, open rb+ or + equivalent. Mutagen tries to use mmap to resize the file, but + falls back to a significantly slower method if mmap fails. + """ + locked = False + assert 0 < size + assert 0 <= offset + fobj.seek(0, 2) + filesize = fobj.tell() + movesize = filesize - offset - size + assert 0 <= movesize + try: + if movesize > 0: + fobj.flush() + try: + import mmap + map = mmap.mmap(fobj.fileno(), filesize) + try: map.move(offset, offset + size, movesize) + finally: map.close() + except (ValueError, EnvironmentError, ImportError): + # handle broken mmap scenarios + locked = lock(fobj) + fobj.seek(offset + size) + buf = fobj.read(BUFFER_SIZE) + while buf: + fobj.seek(offset) + fobj.write(buf) + offset += len(buf) + fobj.seek(offset + size) + buf = fobj.read(BUFFER_SIZE) + fobj.truncate(filesize - size) + fobj.flush() + finally: + if locked: + unlock(fobj) + +def utf8(data): + """Convert a basestring to a valid UTF-8 str.""" + if isinstance(data, str): + return data.decode("utf-8", "replace").encode("utf-8") + elif isinstance(data, unicode): + return data.encode("utf-8") + else: raise TypeError("only unicode/str types can be converted to UTF-8") diff --git a/lib/mutagen/_vorbis.py b/lib/mutagen/_vorbis.py new file mode 100644 index 000000000..eb0e0b9a0 --- /dev/null +++ b/lib/mutagen/_vorbis.py @@ -0,0 +1,226 @@ +# Vorbis comment support for Mutagen +# Copyright 2005-2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. + +"""Read and write Vorbis comment data. + +Vorbis comments are freeform key/value pairs; keys are +case-insensitive ASCII and values are Unicode strings. A key may have +multiple values. + +The specification is at http://www.xiph.org/vorbis/doc/v-comment.html. +""" + +import sys + +from cStringIO import StringIO + +import mutagen +from mutagen._util import DictMixin, cdata + +try: set +except NameError: + from sets import Set as set + +def is_valid_key(key): + """Return true if a string is a valid Vorbis comment key. + + Valid Vorbis comment keys are printable ASCII between 0x20 (space) + and 0x7D ('}'), excluding '='. + """ + for c in key: + if c < " " or c > "}" or c == "=": return False + else: return bool(key) +istag = is_valid_key + +class error(IOError): pass +class VorbisUnsetFrameError(error): pass +class VorbisEncodingError(error): pass + +class VComment(mutagen.Metadata, list): + """A Vorbis comment parser, accessor, and renderer. + + All comment ordering is preserved. A VComment is a list of + key/value pairs, and so any Python list method can be used on it. + + Vorbis comments are always wrapped in something like an Ogg Vorbis + bitstream or a FLAC metadata block, so this loads string data or a + file-like object, not a filename. + + Attributes: + vendor -- the stream 'vendor' (i.e. writer); default 'Mutagen' + """ + + vendor = u"Mutagen " + mutagen.version_string + + def __init__(self, data=None, *args, **kwargs): + # Collect the args to pass to load, this lets child classes + # override just load and get equivalent magic for the + # constructor. + if data is not None: + if isinstance(data, str): + data = StringIO(data) + elif not hasattr(data, 'read'): + raise TypeError("VComment requires string data or a file-like") + self.load(data, *args, **kwargs) + + def load(self, fileobj, errors='replace', framing=True): + """Parse a Vorbis comment from a file-like object. + + Keyword arguments: + errors: + 'strict', 'replace', or 'ignore'. This affects Unicode decoding + and how other malformed content is interpreted. + framing -- if true, fail if a framing bit is not present + + Framing bits are required by the Vorbis comment specification, + but are not used in FLAC Vorbis comment blocks. + + """ + try: + vendor_length = cdata.uint_le(fileobj.read(4)) + self.vendor = fileobj.read(vendor_length).decode('utf-8', errors) + count = cdata.uint_le(fileobj.read(4)) + for i in range(count): + length = cdata.uint_le(fileobj.read(4)) + try: string = fileobj.read(length).decode('utf-8', errors) + except (OverflowError, MemoryError): + raise error("cannot read %d bytes, too large" % length) + try: tag, value = string.split('=', 1) + except ValueError, err: + if errors == "ignore": + continue + elif errors == "replace": + tag, value = u"unknown%d" % i, string + else: + raise VorbisEncodingError, str(err), sys.exc_info()[2] + try: tag = tag.encode('ascii', errors) + except UnicodeEncodeError: + raise VorbisEncodingError, "invalid tag name %r" % tag + else: + if is_valid_key(tag): self.append((tag, value)) + if framing and not ord(fileobj.read(1)) & 0x01: + raise VorbisUnsetFrameError("framing bit was unset") + except (cdata.error, TypeError): + raise error("file is not a valid Vorbis comment") + + def validate(self): + """Validate keys and values. + + Check to make sure every key used is a valid Vorbis key, and + that every value used is a valid Unicode or UTF-8 string. If + any invalid keys or values are found, a ValueError is raised. + """ + + if not isinstance(self.vendor, unicode): + try: self.vendor.decode('utf-8') + except UnicodeDecodeError: raise ValueError + + for key, value in self: + try: + if not is_valid_key(key): raise ValueError + except: raise ValueError("%r is not a valid key" % key) + if not isinstance(value, unicode): + try: value.encode("utf-8") + except: raise ValueError("%r is not a valid value" % value) + else: return True + + def clear(self): + """Clear all keys from the comment.""" + del(self[:]) + + def write(self, framing=True): + """Return a string representation of the data. + + Validation is always performed, so calling this function on + invalid data may raise a ValueError. + + Keyword arguments: + framing -- if true, append a framing bit (see load) + """ + + self.validate() + + f = StringIO() + f.write(cdata.to_uint_le(len(self.vendor.encode('utf-8')))) + f.write(self.vendor.encode('utf-8')) + f.write(cdata.to_uint_le(len(self))) + for tag, value in self: + comment = "%s=%s" % (tag, value.encode('utf-8')) + f.write(cdata.to_uint_le(len(comment))) + f.write(comment) + if framing: f.write("\x01") + return f.getvalue() + + def pprint(self): + return "\n".join(["%s=%s" % (k.lower(), v) for k, v in self]) + +class VCommentDict(VComment, DictMixin): + """A VComment that looks like a dictionary. + + This object differs from a dictionary in two ways. First, + len(comment) will still return the number of values, not the + number of keys. Secondly, iterating through the object will + iterate over (key, value) pairs, not keys. Since a key may have + multiple values, the same value may appear multiple times while + iterating. + + Since Vorbis comment keys are case-insensitive, all keys are + normalized to lowercase ASCII. + """ + + def __getitem__(self, key): + """A list of values for the key. + + This is a copy, so comment['title'].append('a title') will not + work. + + """ + key = key.lower().encode('ascii') + values = [value for (k, value) in self if k.lower() == key] + if not values: raise KeyError, key + else: return values + + def __delitem__(self, key): + """Delete all values associated with the key.""" + key = key.lower().encode('ascii') + to_delete = filter(lambda x: x[0].lower() == key, self) + if not to_delete:raise KeyError, key + else: map(self.remove, to_delete) + + def __contains__(self, key): + """Return true if the key has any values.""" + key = key.lower().encode('ascii') + for k, value in self: + if k.lower() == key: return True + else: return False + + def __setitem__(self, key, values): + """Set a key's value or values. + + Setting a value overwrites all old ones. The value may be a + list of Unicode or UTF-8 strings, or a single Unicode or UTF-8 + string. + + """ + key = key.lower().encode('ascii') + if not isinstance(values, list): + values = [values] + try: del(self[key]) + except KeyError: pass + for value in values: + self.append((key, value)) + + def keys(self): + """Return all keys in the comment.""" + return self and map(str.lower, set(zip(*self)[0])) + + def as_dict(self): + """Return a copy of the comment data in a real dict.""" + d = {} + for key, value in self: + d.setdefault(key, []).append(value) + return d diff --git a/lib/mutagen/apev2.py b/lib/mutagen/apev2.py new file mode 100644 index 000000000..fcc9a1ec2 --- /dev/null +++ b/lib/mutagen/apev2.py @@ -0,0 +1,465 @@ +# An APEv2 tag reader +# +# Copyright 2005 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: apev2.py 4275 2008-06-01 06:32:37Z piman $ + +"""APEv2 reading and writing. + +The APEv2 format is most commonly used with Musepack files, but is +also the format of choice for WavPack and other formats. Some MP3s +also have APEv2 tags, but this can cause problems with many MP3 +decoders and taggers. + +APEv2 tags, like Vorbis comments, are freeform key=value pairs. APEv2 +keys can be any ASCII string with characters from 0x20 to 0x7E, +between 2 and 255 characters long. Keys are case-sensitive, but +readers are recommended to be case insensitive, and it is forbidden to +multiple keys which differ only in case. Keys are usually stored +title-cased (e.g. 'Artist' rather than 'artist'). + +APEv2 values are slightly more structured than Vorbis comments; values +are flagged as one of text, binary, or an external reference (usually +a URI). + +Based off the format specification found at +http://wiki.hydrogenaudio.org/index.php?title=APEv2_specification. +""" + +__all__ = ["APEv2", "APEv2File", "Open", "delete"] + +import struct +from cStringIO import StringIO + +def is_valid_apev2_key(key): + return (2 <= len(key) <= 255 and min(key) >= ' ' and max(key) <= '~' and + key not in ["OggS", "TAG", "ID3", "MP+"]) + +# There are three different kinds of APE tag values. +# "0: Item contains text information coded in UTF-8 +# 1: Item contains binary information +# 2: Item is a locator of external stored information [e.g. URL] +# 3: reserved" +TEXT, BINARY, EXTERNAL = range(3) + +HAS_HEADER = 1L << 31 +HAS_NO_FOOTER = 1L << 30 +IS_HEADER = 1L << 29 + +class error(IOError): pass +class APENoHeaderError(error, ValueError): pass +class APEUnsupportedVersionError(error, ValueError): pass +class APEBadItemError(error, ValueError): pass + +from mutagen import Metadata, FileType +from mutagen._util import DictMixin, cdata, utf8, delete_bytes + +class _APEv2Data(object): + # Store offsets of the important parts of the file. + start = header = data = footer = end = None + # Footer or header; seek here and read 32 to get version/size/items/flags + metadata = None + # Actual tag data + tag = None + + version = None + size = None + items = None + flags = 0 + + # The tag is at the start rather than the end. A tag at both + # the start and end of the file (i.e. the tag is the whole file) + # is not considered to be at the start. + is_at_start = False + + def __init__(self, fileobj): + self.__find_metadata(fileobj) + self.metadata = max(self.header, self.footer) + if self.metadata is None: return + self.__fill_missing(fileobj) + self.__fix_brokenness(fileobj) + if self.data is not None: + fileobj.seek(self.data) + self.tag = fileobj.read(self.size) + + def __find_metadata(self, fileobj): + # Try to find a header or footer. + + # Check for a simple footer. + try: fileobj.seek(-32, 2) + except IOError: + fileobj.seek(0, 2) + return + if fileobj.read(8) == "APETAGEX": + fileobj.seek(-8, 1) + self.footer = self.metadata = fileobj.tell() + return + + # Check for an APEv2 tag followed by an ID3v1 tag at the end. + try: + fileobj.seek(-128, 2) + if fileobj.read(3) == "TAG": + + fileobj.seek(-35, 1) # "TAG" + header length + if fileobj.read(8) == "APETAGEX": + fileobj.seek(-8, 1) + self.footer = fileobj.tell() + return + + # ID3v1 tag at the end, maybe preceded by Lyrics3v2. + # (http://www.id3.org/lyrics3200.html) + # (header length - "APETAGEX") - "LYRICS200" + fileobj.seek(15, 1) + if fileobj.read(9) == 'LYRICS200': + fileobj.seek(-15, 1) # "LYRICS200" + size tag + try: offset = int(fileobj.read(6)) + except ValueError: + raise IOError + + fileobj.seek(-32 - offset - 6, 1) + if fileobj.read(8) == "APETAGEX": + fileobj.seek(-8, 1) + self.footer = fileobj.tell() + return + + except IOError: + pass + + # Check for a tag at the start. + fileobj.seek(0, 0) + if fileobj.read(8) == "APETAGEX": + self.is_at_start = True + self.header = 0 + + def __fill_missing(self, fileobj): + fileobj.seek(self.metadata + 8) + self.version = fileobj.read(4) + self.size = cdata.uint_le(fileobj.read(4)) + self.items = cdata.uint_le(fileobj.read(4)) + self.flags = cdata.uint_le(fileobj.read(4)) + + if self.header is not None: + self.data = self.header + 32 + # If we're reading the header, the size is the header + # offset + the size, which includes the footer. + self.end = self.data + self.size + fileobj.seek(self.end - 32, 0) + if fileobj.read(8) == "APETAGEX": + self.footer = self.end - 32 + elif self.footer is not None: + self.end = self.footer + 32 + self.data = self.end - self.size + if self.flags & HAS_HEADER: + self.header = self.data - 32 + else: + self.header = self.data + else: raise APENoHeaderError("No APE tag found") + + def __fix_brokenness(self, fileobj): + # Fix broken tags written with PyMusepack. + if self.header is not None: start = self.header + else: start = self.data + fileobj.seek(start) + + while start > 0: + # Clean up broken writing from pre-Mutagen PyMusepack. + # It didn't remove the first 24 bytes of header. + try: fileobj.seek(-24, 1) + except IOError: + break + else: + if fileobj.read(8) == "APETAGEX": + fileobj.seek(-8, 1) + start = fileobj.tell() + else: break + self.start = start + +class APEv2(DictMixin, Metadata): + """A file with an APEv2 tag. + + ID3v1 tags are silently ignored and overwritten. + """ + + filename = None + + def __init__(self, *args, **kwargs): + self.__casemap = {} + self.__dict = {} + super(APEv2, self).__init__(*args, **kwargs) + # Internally all names are stored as lowercase, but the case + # they were set with is remembered and used when saving. This + # is roughly in line with the standard, which says that keys + # are case-sensitive but two keys differing only in case are + # not allowed, and recommends case-insensitive + # implementations. + + def pprint(self): + """Return tag key=value pairs in a human-readable format.""" + items = self.items() + items.sort() + return "\n".join(["%s=%s" % (k, v.pprint()) for k, v in items]) + + def load(self, filename): + """Load tags from a filename.""" + self.filename = filename + fileobj = file(filename, "rb") + try: + data = _APEv2Data(fileobj) + finally: + fileobj.close() + if data.tag: + self.clear() + self.__casemap.clear() + self.__parse_tag(data.tag, data.items) + else: + raise APENoHeaderError("No APE tag found") + + def __parse_tag(self, tag, count): + fileobj = StringIO(tag) + + for i in range(count): + size = cdata.uint_le(fileobj.read(4)) + flags = cdata.uint_le(fileobj.read(4)) + + # Bits 1 and 2 bits are flags, 0-3 + # Bit 0 is read/write flag, ignored + kind = (flags & 6) >> 1 + if kind == 3: + raise APEBadItemError("value type must be 0, 1, or 2") + key = value = fileobj.read(1) + while key[-1:] != '\x00' and value: + value = fileobj.read(1) + key += value + if key[-1:] == "\x00": + key = key[:-1] + value = fileobj.read(size) + self[key] = APEValue(value, kind) + + def __getitem__(self, key): + if not is_valid_apev2_key(key): + raise KeyError("%r is not a valid APEv2 key" % key) + return self.__dict[key.lower()] + + def __delitem__(self, key): + if not is_valid_apev2_key(key): + raise KeyError("%r is not a valid APEv2 key" % key) + del(self.__dict[key.lower()]) + + def __setitem__(self, key, value): + """'Magic' value setter. + + This function tries to guess at what kind of value you want to + store. If you pass in a valid UTF-8 or Unicode string, it + treats it as a text value. If you pass in a list, it treats it + as a list of string/Unicode values. If you pass in a string + that is not valid UTF-8, it assumes it is a binary value. + + If you need to force a specific type of value (e.g. binary + data that also happens to be valid UTF-8, or an external + reference), use the APEValue factory and set the value to the + result of that: + from mutagen.apev2 import APEValue, EXTERNAL + tag['Website'] = APEValue('http://example.org', EXTERNAL) + """ + + if not is_valid_apev2_key(key): + raise KeyError("%r is not a valid APEv2 key" % key) + + if not isinstance(value, _APEValue): + # let's guess at the content if we're not already a value... + if isinstance(value, unicode): + # unicode? we've got to be text. + value = APEValue(utf8(value), TEXT) + elif isinstance(value, list): + # list? text. + value = APEValue("\0".join(map(utf8, value)), TEXT) + else: + try: dummy = value.decode("utf-8") + except UnicodeError: + # invalid UTF8 text, probably binary + value = APEValue(value, BINARY) + else: + # valid UTF8, probably text + value = APEValue(value, TEXT) + self.__casemap[key.lower()] = key + self.__dict[key.lower()] = value + + def keys(self): + return [self.__casemap.get(key, key) for key in self.__dict.keys()] + + def save(self, filename=None): + """Save changes to a file. + + If no filename is given, the one most recently loaded is used. + + Tags are always written at the end of the file, and include + a header and a footer. + """ + + filename = filename or self.filename + try: + fileobj = file(filename, "r+b") + except IOError: + fileobj = file(filename, "w+b") + data = _APEv2Data(fileobj) + + if data.is_at_start: + delete_bytes(fileobj, data.end - data.start, data.start) + elif data.start is not None: + fileobj.seek(data.start) + # Delete an ID3v1 tag if present, too. + fileobj.truncate() + fileobj.seek(0, 2) + + # "APE tags items should be sorted ascending by size... This is + # not a MUST, but STRONGLY recommended. Actually the items should + # be sorted by importance/byte, but this is not feasible." + tags = [v._internal(k) for k, v in self.items()] + tags.sort(lambda a, b: cmp(len(a), len(b))) + num_tags = len(tags) + tags = "".join(tags) + + header = "APETAGEX%s%s" %( + # version, tag size, item count, flags + struct.pack("<4I", 2000, len(tags) + 32, num_tags, + HAS_HEADER | IS_HEADER), + "\0" * 8) + fileobj.write(header) + + fileobj.write(tags) + + footer = "APETAGEX%s%s" %( + # version, tag size, item count, flags + struct.pack("<4I", 2000, len(tags) + 32, num_tags, + HAS_HEADER), + "\0" * 8) + fileobj.write(footer) + fileobj.close() + + def delete(self, filename=None): + """Remove tags from a file.""" + filename = filename or self.filename + fileobj = file(filename, "r+b") + try: + data = _APEv2Data(fileobj) + if data.start is not None and data.size is not None: + delete_bytes(fileobj, data.end - data.start, data.start) + finally: + fileobj.close() + self.clear() + +Open = APEv2 + +def delete(filename): + """Remove tags from a file.""" + try: APEv2(filename).delete() + except APENoHeaderError: pass + +def APEValue(value, kind): + """APEv2 tag value factory. + + Use this if you need to specify the value's type manually. Binary + and text data are automatically detected by APEv2.__setitem__. + """ + if kind == TEXT: return APETextValue(value, kind) + elif kind == BINARY: return APEBinaryValue(value, kind) + elif kind == EXTERNAL: return APEExtValue(value, kind) + else: raise ValueError("kind must be TEXT, BINARY, or EXTERNAL") + +class _APEValue(object): + def __init__(self, value, kind): + self.kind = kind + self.value = value + + def __len__(self): + return len(self.value) + def __str__(self): + return self.value + + # Packed format for an item: + # 4B: Value length + # 4B: Value type + # Key name + # 1B: Null + # Key value + def _internal(self, key): + return "%s%s\0%s" %( + struct.pack("<2I", len(self.value), self.kind << 1), + key, self.value) + + def __repr__(self): + return "%s(%r, %d)" % (type(self).__name__, self.value, self.kind) + +class APETextValue(_APEValue): + """An APEv2 text value. + + Text values are Unicode/UTF-8 strings. They can be accessed like + strings (with a null seperating the values), or arrays of strings.""" + + def __unicode__(self): + return unicode(str(self), "utf-8") + + def __iter__(self): + """Iterate over the strings of the value (not the characters)""" + return iter(unicode(self).split("\0")) + + def __getitem__(self, index): + return unicode(self).split("\0")[index] + + def __len__(self): + return self.value.count("\0") + 1 + + def __cmp__(self, other): + return cmp(unicode(self), other) + + def __setitem__(self, index, value): + values = list(self) + values[index] = value.encode("utf-8") + self.value = "\0".join(values).encode("utf-8") + + def pprint(self): + return " / ".join(self) + +class APEBinaryValue(_APEValue): + """An APEv2 binary value.""" + + def pprint(self): return "[%d bytes]" % len(self) + +class APEExtValue(_APEValue): + """An APEv2 external value. + + External values are usually URI or IRI strings. + """ + def pprint(self): return "[External] %s" % unicode(self) + +class APEv2File(FileType): + class _Info(object): + length = 0 + bitrate = 0 + def __init__(self, fileobj): pass + pprint = staticmethod(lambda: "Unknown format with APEv2 tag.") + + def load(self, filename): + self.filename = filename + self.info = self._Info(file(filename, "rb")) + try: self.tags = APEv2(filename) + except error: self.tags = None + + def add_tags(self): + if self.tags is None: + self.tags = APEv2() + else: + raise ValueError("%r already has tags: %r" % (self, self.tags)) + + def score(filename, fileobj, header): + try: fileobj.seek(-160, 2) + except IOError: + fileobj.seek(0) + footer = fileobj.read() + filename = filename.lower() + return (("APETAGEX" in footer) - header.startswith("ID3")) + score = staticmethod(score) diff --git a/lib/mutagen/asf.py b/lib/mutagen/asf.py new file mode 100644 index 000000000..1d3d9568c --- /dev/null +++ b/lib/mutagen/asf.py @@ -0,0 +1,634 @@ +# Copyright 2006-2007 Lukas Lalinsky +# Copyright 2005-2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: asf.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write ASF (Window Media Audio) files.""" + +__all__ = ["ASF", "Open"] + +import struct +from mutagen import FileType, Metadata +from mutagen._util import insert_bytes, delete_bytes, DictMixin + +class error(IOError): pass +class ASFError(error): pass +class ASFHeaderError(error): pass + + +class ASFInfo(object): + """ASF stream information.""" + + def __init__(self): + self.length = 0.0 + self.sample_rate = 0 + self.bitrate = 0 + self.channels = 0 + + def pprint(self): + s = "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % ( + self.bitrate, self.sample_rate, self.channels, self.length) + return s + + +class ASFTags(list, DictMixin, Metadata): + """Dictionary containing ASF attributes.""" + + def pprint(self): + return "\n".join(["%s=%s" % (k, v) for k, v in self]) + + def __getitem__(self, key): + """A list of values for the key. + + This is a copy, so comment['title'].append('a title') will not + work. + + """ + values = [value for (k, value) in self if k == key] + if not values: raise KeyError, key + else: return values + + def __delitem__(self, key): + """Delete all values associated with the key.""" + to_delete = filter(lambda x: x[0] == key, self) + if not to_delete: raise KeyError, key + else: map(self.remove, to_delete) + + def __contains__(self, key): + """Return true if the key has any values.""" + for k, value in self: + if k == key: return True + else: return False + + def __setitem__(self, key, values): + """Set a key's value or values. + + Setting a value overwrites all old ones. The value may be a + list of Unicode or UTF-8 strings, or a single Unicode or UTF-8 + string. + + """ + if not isinstance(values, list): + values = [values] + try: del(self[key]) + except KeyError: pass + for value in values: + if key in _standard_attribute_names: + value = unicode(value) + elif not isinstance(value, ASFBaseAttribute): + if isinstance(value, basestring): + value = ASFUnicodeAttribute(value) + elif isinstance(value, bool): + value = ASFBoolAttribute(value) + elif isinstance(value, int): + value = ASFDWordAttribute(value) + elif isinstance(value, long): + value = ASFQWordAttribute(value) + self.append((key, value)) + + def keys(self): + """Return all keys in the comment.""" + return self and set(zip(*self)[0]) + + def as_dict(self): + """Return a copy of the comment data in a real dict.""" + d = {} + for key, value in self: + d.setdefault(key, []).append(value) + return d + + +class ASFBaseAttribute(object): + """Generic attribute.""" + TYPE = None + + def __init__(self, value=None, data=None, language=None, + stream=None, **kwargs): + self.language = language + self.stream = stream + if data: + self.value = self.parse(data, **kwargs) + else: + self.value = value + + def __repr__(self): + name = "%s(%r" % (type(self).__name__, self.value) + if self.language: + name += ", language=%d" % self.language + if self.stream: + name += ", stream=%d" % self.stream + name += ")" + return name + + def render(self, name): + name = name.encode("utf-16-le") + "\x00\x00" + data = self._render() + return (struct.pack(" self.size: + insert_bytes(fileobj, size - self.size, self.size) + if size < self.size: + delete_bytes(fileobj, self.size - size, 0) + fileobj.seek(0) + fileobj.write(data) + finally: + fileobj.close() + + def __read_file(self, fileobj): + header = fileobj.read(30) + if len(header) != 30 or header[:16] != HeaderObject.GUID: + raise ASFHeaderError, "Not an ASF file." + + self.extended_content_description_obj = None + self.content_description_obj = None + self.header_extension_obj = None + self.metadata_obj = None + self.metadata_library_obj = None + + self.size, self.num_objects = struct.unpack(" 2**24: + raise error("block is too long to write") + length = struct.pack(">I", len(datum))[-3:] + data.append(byte + length + datum) + return "".join(data) + writeblocks = staticmethod(writeblocks) + + def group_padding(blocks): + """Consolidate FLAC padding metadata blocks. + + The overall size of the rendered blocks does not change, so + this adds several bytes of padding for each merged block.""" + paddings = filter(lambda x: isinstance(x, Padding), blocks) + map(blocks.remove, paddings) + padding = Padding() + # total padding size is the sum of padding sizes plus 4 bytes + # per removed header. + size = sum([padding.length for padding in paddings]) + padding.length = size + 4 * (len(paddings) - 1) + blocks.append(padding) + group_padding = staticmethod(group_padding) + +class StreamInfo(MetadataBlock): + """FLAC stream information. + + This contains information about the audio data in the FLAC file. + Unlike most stream information objects in Mutagen, changes to this + one will rewritten to the file when it is saved. Unless you are + actually changing the audio stream itself, don't change any + attributes of this block. + + Attributes: + min_blocksize -- minimum audio block size + max_blocksize -- maximum audio block size + sample_rate -- audio sample rate in Hz + channels -- audio channels (1 for mono, 2 for stereo) + bits_per_sample -- bits per sample + total_samples -- total samples in file + length -- audio length in seconds + """ + + code = 0 + + def __eq__(self, other): + try: return (self.min_blocksize == other.min_blocksize and + self.max_blocksize == other.max_blocksize and + self.sample_rate == other.sample_rate and + self.channels == other.channels and + self.bits_per_sample == other.bits_per_sample and + self.total_samples == other.total_samples) + except: return False + + def load(self, data): + self.min_blocksize = int(to_int_be(data.read(2))) + self.max_blocksize = int(to_int_be(data.read(2))) + self.min_framesize = int(to_int_be(data.read(3))) + self.max_framesize = int(to_int_be(data.read(3))) + # first 16 bits of sample rate + sample_first = to_int_be(data.read(2)) + # last 4 bits of sample rate, 3 of channels, first 1 of bits/sample + sample_channels_bps = to_int_be(data.read(1)) + # last 4 of bits/sample, 36 of total samples + bps_total = to_int_be(data.read(5)) + + sample_tail = sample_channels_bps >> 4 + self.sample_rate = int((sample_first << 4) + sample_tail) + self.channels = int(((sample_channels_bps >> 1) & 7) + 1) + bps_tail = bps_total >> 36 + bps_head = (sample_channels_bps & 1) << 4 + self.bits_per_sample = int(bps_head + bps_tail + 1) + self.total_samples = bps_total & 0xFFFFFFFFFL + self.length = self.total_samples / float(self.sample_rate) + + self.md5_signature = to_int_be(data.read(16)) + + def write(self): + f = StringIO() + f.write(struct.pack(">I", self.min_blocksize)[-2:]) + f.write(struct.pack(">I", self.max_blocksize)[-2:]) + f.write(struct.pack(">I", self.min_framesize)[-3:]) + f.write(struct.pack(">I", self.max_framesize)[-3:]) + + # first 16 bits of sample rate + f.write(struct.pack(">I", self.sample_rate >> 4)[-2:]) + # 4 bits sample, 3 channel, 1 bps + byte = (self.sample_rate & 0xF) << 4 + byte += ((self.channels - 1) & 3) << 1 + byte += ((self.bits_per_sample - 1) >> 4) & 1 + f.write(chr(byte)) + # 4 bits of bps, 4 of sample count + byte = ((self.bits_per_sample - 1) & 0xF) << 4 + byte += (self.total_samples >> 32) & 0xF + f.write(chr(byte)) + # last 32 of sample count + f.write(struct.pack(">I", self.total_samples & 0xFFFFFFFFL)) + # MD5 signature + sig = self.md5_signature + f.write(struct.pack( + ">4I", (sig >> 96) & 0xFFFFFFFFL, (sig >> 64) & 0xFFFFFFFFL, + (sig >> 32) & 0xFFFFFFFFL, sig & 0xFFFFFFFFL)) + return f.getvalue() + + def pprint(self): + return "FLAC, %.2f seconds, %d Hz" % (self.length, self.sample_rate) + +class SeekPoint(tuple): + """A single seek point in a FLAC file. + + Placeholder seek points have first_sample of 0xFFFFFFFFFFFFFFFFL, + and byte_offset and num_samples undefined. Seek points must be + sorted in ascending order by first_sample number. Seek points must + be unique by first_sample number, except for placeholder + points. Placeholder points must occur last in the table and there + may be any number of them. + + Attributes: + first_sample -- sample number of first sample in the target frame + byte_offset -- offset from first frame to target frame + num_samples -- number of samples in target frame + """ + + def __new__(cls, first_sample, byte_offset, num_samples): + return super(cls, SeekPoint).__new__(cls, (first_sample, + byte_offset, num_samples)) + first_sample = property(lambda self: self[0]) + byte_offset = property(lambda self: self[1]) + num_samples = property(lambda self: self[2]) + +class SeekTable(MetadataBlock): + """Read and write FLAC seek tables. + + Attributes: + seekpoints -- list of SeekPoint objects + """ + + __SEEKPOINT_FORMAT = '>QQH' + __SEEKPOINT_SIZE = struct.calcsize(__SEEKPOINT_FORMAT) + + code = 3 + + def __init__(self, data): + self.seekpoints = [] + super(SeekTable, self).__init__(data) + + def __eq__(self, other): + try: return (self.seekpoints == other.seekpoints) + except (AttributeError, TypeError): return False + + def load(self, data): + self.seekpoints = [] + sp = data.read(self.__SEEKPOINT_SIZE) + while len(sp) == self.__SEEKPOINT_SIZE: + self.seekpoints.append(SeekPoint( + *struct.unpack(self.__SEEKPOINT_FORMAT, sp))) + sp = data.read(self.__SEEKPOINT_SIZE) + + def write(self): + f = StringIO() + for seekpoint in self.seekpoints: + packed = struct.pack(self.__SEEKPOINT_FORMAT, + seekpoint.first_sample, seekpoint.byte_offset, + seekpoint.num_samples) + f.write(packed) + return f.getvalue() + + def __repr__(self): + return "<%s seekpoints=%r>" % (type(self).__name__, self.seekpoints) + +class VCFLACDict(VCommentDict): + """Read and write FLAC Vorbis comments. + + FLACs don't use the framing bit at the end of the comment block. + So this extends VCommentDict to not use the framing bit. + """ + + code = 4 + + def load(self, data, errors='replace', framing=False): + super(VCFLACDict, self).load(data, errors=errors, framing=framing) + + def write(self, framing=False): + return super(VCFLACDict, self).write(framing=framing) + +class CueSheetTrackIndex(tuple): + """Index for a track in a cuesheet. + + For CD-DA, an index_number of 0 corresponds to the track + pre-gap. The first index in a track must have a number of 0 or 1, + and subsequently, index_numbers must increase by 1. Index_numbers + must be unique within a track. And index_offset must be evenly + divisible by 588 samples. + + Attributes: + index_number -- index point number + index_offset -- offset in samples from track start + """ + + def __new__(cls, index_number, index_offset): + return super(cls, CueSheetTrackIndex).__new__(cls, + (index_number, index_offset)) + index_number = property(lambda self: self[0]) + index_offset = property(lambda self: self[1]) + +class CueSheetTrack(object): + """A track in a cuesheet. + + For CD-DA, track_numbers must be 1-99, or 170 for the + lead-out. Track_numbers must be unique within a cue sheet. There + must be atleast one index in every track except the lead-out track + which must have none. + + Attributes: + track_number -- track number + start_offset -- track offset in samples from start of FLAC stream + isrc -- ISRC code + type -- 0 for audio, 1 for digital data + pre_emphasis -- true if the track is recorded with pre-emphasis + indexes -- list of CueSheetTrackIndex objects + """ + + def __init__(self, track_number, start_offset, isrc='', type_=0, + pre_emphasis=False): + self.track_number = track_number + self.start_offset = start_offset + self.isrc = isrc + self.type = type_ + self.pre_emphasis = pre_emphasis + self.indexes = [] + + def __eq__(self, other): + try: return (self.track_number == other.track_number and + self.start_offset == other.start_offset and + self.isrc == other.isrc and + self.type == other.type and + self.pre_emphasis == other.pre_emphasis and + self.indexes == other.indexes) + except (AttributeError, TypeError): return False + + def __repr__(self): + return ("<%s number=%r, offset=%d, isrc=%r, type=%r, " + "pre_emphasis=%r, indexes=%r)>") % ( + type(self).__name__, self.track_number, self.start_offset, + self.isrc, self.type, self.pre_emphasis, self.indexes) + +class CueSheet(MetadataBlock): + """Read and write FLAC embedded cue sheets. + + Number of tracks should be from 1 to 100. There should always be + exactly one lead-out track and that track must be the last track + in the cue sheet. + + Attributes: + media_catalog_number -- media catalog number in ASCII + lead_in_samples -- number of lead-in samples + compact_disc -- true if the cuesheet corresponds to a compact disc + tracks -- list of CueSheetTrack objects + lead_out -- lead-out as CueSheetTrack or None if lead-out was not found + """ + + __CUESHEET_FORMAT = '>128sQB258xB' + __CUESHEET_SIZE = struct.calcsize(__CUESHEET_FORMAT) + __CUESHEET_TRACK_FORMAT = '>QB12sB13xB' + __CUESHEET_TRACK_SIZE = struct.calcsize(__CUESHEET_TRACK_FORMAT) + __CUESHEET_TRACKINDEX_FORMAT = '>QB3x' + __CUESHEET_TRACKINDEX_SIZE = struct.calcsize(__CUESHEET_TRACKINDEX_FORMAT) + + code = 5 + + media_catalog_number = '' + lead_in_samples = 88200 + compact_disc = True + + def __init__(self, data): + self.tracks = [] + super(CueSheet, self).__init__(data) + + def __eq__(self, other): + try: + return (self.media_catalog_number == other.media_catalog_number and + self.lead_in_samples == other.lead_in_samples and + self.compact_disc == other.compact_disc and + self.tracks == other.tracks) + except (AttributeError, TypeError): return False + + def load(self, data): + header = data.read(self.__CUESHEET_SIZE) + media_catalog_number, lead_in_samples, flags, num_tracks = \ + struct.unpack(self.__CUESHEET_FORMAT, header) + self.media_catalog_number = media_catalog_number.rstrip('\0') + self.lead_in_samples = lead_in_samples + self.compact_disc = bool(flags & 0x80) + self.tracks = [] + for i in range(num_tracks): + track = data.read(self.__CUESHEET_TRACK_SIZE) + start_offset, track_number, isrc_padded, flags, num_indexes = \ + struct.unpack(self.__CUESHEET_TRACK_FORMAT, track) + isrc = isrc_padded.rstrip('\0') + type_ = (flags & 0x80) >> 7 + pre_emphasis = bool(flags & 0x40) + val = CueSheetTrack( + track_number, start_offset, isrc, type_, pre_emphasis) + for j in range(num_indexes): + index = data.read(self.__CUESHEET_TRACKINDEX_SIZE) + index_offset, index_number = struct.unpack( + self.__CUESHEET_TRACKINDEX_FORMAT, index) + val.indexes.append( + CueSheetTrackIndex(index_number, index_offset)) + self.tracks.append(val) + + def write(self): + f = StringIO() + flags = 0 + if self.compact_disc: flags |= 0x80 + packed = struct.pack( + self.__CUESHEET_FORMAT, self.media_catalog_number, + self.lead_in_samples, flags, len(self.tracks)) + f.write(packed) + for track in self.tracks: + track_flags = 0 + track_flags |= (track.type & 1) << 7 + if track.pre_emphasis: track_flags |= 0x40 + track_packed = struct.pack( + self.__CUESHEET_TRACK_FORMAT, track.start_offset, + track.track_number, track.isrc, track_flags, + len(track.indexes)) + f.write(track_packed) + for index in track.indexes: + index_packed = struct.pack( + self.__CUESHEET_TRACKINDEX_FORMAT, + index.index_offset, index.index_number) + f.write(index_packed) + return f.getvalue() + + def __repr__(self): + return ("<%s media_catalog_number=%r, lead_in=%r, compact_disc=%r, " + "tracks=%r>") % ( + type(self).__name__, self.media_catalog_number, + self.lead_in_samples, self.compact_disc, self.tracks) + +class Picture(MetadataBlock): + """Read and write FLAC embed pictures. + + Attributes: + type -- picture type (same as types for ID3 APIC frames) + mime -- MIME type of the picture + desc -- picture's description + width -- width in pixels + height -- height in pixels + depth -- color depth in bits-per-pixel + colors -- number of colors for indexed palettes (like GIF), + 0 for non-indexed + data -- picture data + """ + + code = 6 + + def __init__(self, data=None): + self.type = 0 + self.mime = u'' + self.desc = u'' + self.width = 0 + self.height = 0 + self.depth = 0 + self.colors = 0 + self.data = '' + super(Picture, self).__init__(data) + + def __eq__(self, other): + try: return (self.type == other.type and + self.mime == other.mime and + self.desc == other.desc and + self.width == other.width and + self.height == other.height and + self.depth == other.depth and + self.colors == other.colors and + self.data == other.data) + except (AttributeError, TypeError): return False + + def load(self, data): + self.type, length = struct.unpack('>2I', data.read(8)) + self.mime = data.read(length).decode('UTF-8', 'replace') + length, = struct.unpack('>I', data.read(4)) + self.desc = data.read(length).decode('UTF-8', 'replace') + (self.width, self.height, self.depth, + self.colors, length) = struct.unpack('>5I', data.read(20)) + self.data = data.read(length) + + def write(self): + f = StringIO() + mime = self.mime.encode('UTF-8') + f.write(struct.pack('>2I', self.type, len(mime))) + f.write(mime) + desc = self.desc.encode('UTF-8') + f.write(struct.pack('>I', len(desc))) + f.write(desc) + f.write(struct.pack('>5I', self.width, self.height, self.depth, + self.colors, len(self.data))) + f.write(self.data) + return f.getvalue() + + def __repr__(self): + return "<%s '%s' (%d bytes)>" % (type(self).__name__, self.mime, + len(self.data)) + +class Padding(MetadataBlock): + """Empty padding space for metadata blocks. + + To avoid rewriting the entire FLAC file when editing comments, + metadata is often padded. Padding should occur at the end, and no + more than one padding block should be in any FLAC file. Mutagen + handles this with MetadataBlock.group_padding. + """ + + code = 1 + + def __init__(self, data=""): super(Padding, self).__init__(data) + def load(self, data): self.length = len(data.read()) + def write(self): + try: return "\x00" * self.length + # On some 64 bit platforms this won't generate a MemoryError + # or OverflowError since you might have enough RAM, but it + # still generates a ValueError. On other 64 bit platforms, + # this will still succeed for extremely large values. + # Those should never happen in the real world, and if they + # do, writeblocks will catch it. + except (OverflowError, ValueError, MemoryError): + raise error("cannot write %d bytes" % self.length) + def __eq__(self, other): + return isinstance(other, Padding) and self.length == other.length + def __repr__(self): + return "<%s (%d bytes)>" % (type(self).__name__, self.length) + +class FLAC(FileType): + """A FLAC audio file. + + Attributes: + info -- stream information (length, bitrate, sample rate) + tags -- metadata tags, if any + cuesheet -- CueSheet object, if any + seektable -- SeekTable object, if any + pictures -- list of embedded pictures + """ + + _mimes = ["audio/x-flac", "application/x-flac"] + + METADATA_BLOCKS = [StreamInfo, Padding, None, SeekTable, VCFLACDict, + CueSheet, Picture] + """Known metadata block types, indexed by ID.""" + + def score(filename, fileobj, header): + return header.startswith("fLaC") + score = staticmethod(score) + + def __read_metadata_block(self, file): + byte = ord(file.read(1)) + size = to_int_be(file.read(3)) + try: + data = file.read(size) + if len(data) != size: + raise error( + "file said %d bytes, read %d bytes" % (size, len(data))) + block = self.METADATA_BLOCKS[byte & 0x7F](data) + except (IndexError, TypeError): + block = MetadataBlock(data) + block.code = byte & 0x7F + self.metadata_blocks.append(block) + else: + self.metadata_blocks.append(block) + if block.code == VCFLACDict.code: + if self.tags is None: self.tags = block + else: raise FLACVorbisError("> 1 Vorbis comment block found") + elif block.code == CueSheet.code: + if self.cuesheet is None: self.cuesheet = block + else: raise error("> 1 CueSheet block found") + elif block.code == SeekTable.code: + if self.seektable is None: self.seektable = block + else: raise error("> 1 SeekTable block found") + return (byte >> 7) ^ 1 + + def add_tags(self): + """Add a Vorbis comment block to the file.""" + if self.tags is None: + self.tags = VCFLACDict() + self.metadata_blocks.append(self.tags) + else: raise FLACVorbisError("a Vorbis comment already exists") + add_vorbiscomment = add_tags + + def delete(self, filename=None): + """Remove Vorbis comments from a file. + + If no filename is given, the one most recently loaded is used. + """ + if filename is None: filename = self.filename + for s in list(self.metadata_blocks): + if isinstance(s, VCFLACDict): + self.metadata_blocks.remove(s) + self.tags = None + self.save() + break + + vc = property(lambda s: s.tags, doc="Alias for tags; don't use this.") + + def load(self, filename): + """Load file information from a filename.""" + + self.metadata_blocks = [] + self.tags = None + self.cuesheet = None + self.seektable = None + self.filename = filename + fileobj = file(filename, "rb") + try: + self.__check_header(fileobj) + while self.__read_metadata_block(fileobj): pass + finally: + fileobj.close() + + try: self.metadata_blocks[0].length + except (AttributeError, IndexError): + raise FLACNoHeaderError("Stream info block not found") + + info = property(lambda s: s.metadata_blocks[0]) + + def add_picture(self, picture): + """Add a new picture to the file.""" + self.metadata_blocks.append(picture) + + def clear_pictures(self): + """Delete all pictures from the file.""" + self.metadata_blocks = filter(lambda b: b.code != Picture.code, + self.metadata_blocks) + + def __get_pictures(self): + return filter(lambda b: b.code == Picture.code, self.metadata_blocks) + pictures = property(__get_pictures, doc="List of embedded pictures") + + def save(self, filename=None, deleteid3=False): + """Save metadata blocks to a file. + + If no filename is given, the one most recently loaded is used. + """ + + if filename is None: filename = self.filename + f = open(filename, 'rb+') + + # Ensure we've got padding at the end, and only at the end. + # If adding makes it too large, we'll scale it down later. + self.metadata_blocks.append(Padding('\x00' * 1020)) + MetadataBlock.group_padding(self.metadata_blocks) + + header = self.__check_header(f) + available = self.__find_audio_offset(f) - header # "fLaC" and maybe ID3 + data = MetadataBlock.writeblocks(self.metadata_blocks) + + # Delete ID3v2 + if deleteid3 and header > 4: + available += header - 4 + header = 4 + + if len(data) > available: + # If we have too much data, see if we can reduce padding. + padding = self.metadata_blocks[-1] + newlength = padding.length - (len(data) - available) + if newlength > 0: + padding.length = newlength + data = MetadataBlock.writeblocks(self.metadata_blocks) + assert len(data) == available + + elif len(data) < available: + # If we have too little data, increase padding. + self.metadata_blocks[-1].length += (available - len(data)) + data = MetadataBlock.writeblocks(self.metadata_blocks) + assert len(data) == available + + if len(data) != available: + # We couldn't reduce the padding enough. + diff = (len(data) - available) + insert_bytes(f, diff, header) + + f.seek(header - 4) + f.write("fLaC" + data) + + # Delete ID3v1 + if deleteid3: + try: f.seek(-128, 2) + except IOError: pass + else: + if f.read(3) == "TAG": + f.seek(-128, 2) + f.truncate() + + def __find_audio_offset(self, fileobj): + byte = 0x00 + while not (byte >> 7) & 1: + byte = ord(fileobj.read(1)) + size = to_int_be(fileobj.read(3)) + fileobj.read(size) + return fileobj.tell() + + def __check_header(self, fileobj): + size = 4 + header = fileobj.read(4) + if header != "fLaC": + size = None + if header[:3] == "ID3": + size = 14 + BitPaddedInt(fileobj.read(6)[2:]) + fileobj.seek(size - 4) + if fileobj.read(4) != "fLaC": size = None + if size is None: + raise FLACNoHeaderError( + "%r is not a valid FLAC file" % fileobj.name) + return size + +Open = FLAC + +def delete(filename): + """Remove tags from a file.""" + FLAC(filename).delete() diff --git a/lib/mutagen/id3.py b/lib/mutagen/id3.py new file mode 100644 index 000000000..fb1357e43 --- /dev/null +++ b/lib/mutagen/id3.py @@ -0,0 +1,1956 @@ +# id3 support for mutagen +# Copyright (C) 2005 Michael Urman +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. +# +# $Id: id3.py 4275 2008-06-01 06:32:37Z piman $ + +"""ID3v2 reading and writing. + +This is based off of the following references: + http://www.id3.org/id3v2.4.0-structure.txt + http://www.id3.org/id3v2.4.0-frames.txt + http://www.id3.org/id3v2.3.0.html + http://www.id3.org/id3v2-00.txt + http://www.id3.org/id3v1.html + +Its largest deviation from the above (versions 2.3 and 2.2) is that it +will not interpret the / characters as a separator, and will almost +always accept null separators to generate multi-valued text frames. + +Because ID3 frame structure differs between frame types, each frame is +implemented as a different class (e.g. TIT2 as mutagen.id3.TIT2). Each +frame's documentation contains a list of its attributes. + +Since this file's documentation is a little unwieldy, you are probably +interested in the 'ID3' class to start with. +""" + +__all__ = ['ID3', 'ID3FileType', 'Frames', 'Open', 'delete'] + +import struct; from struct import unpack, pack +from zlib import error as zlibError +from warnings import warn + +import mutagen +from mutagen._util import insert_bytes, delete_bytes, DictProxy + +class error(Exception): pass +class ID3NoHeaderError(error, ValueError): pass +class ID3BadUnsynchData(error, ValueError): pass +class ID3BadCompressedData(error, ValueError): pass +class ID3TagError(error, ValueError): pass +class ID3UnsupportedVersionError(error, NotImplementedError): pass +class ID3EncryptionUnsupportedError(error, NotImplementedError): pass +class ID3JunkFrameError(error, ValueError): pass + +class ID3Warning(error, UserWarning): pass + +def is_valid_frame_id(frame_id): + return frame_id.isalnum() and frame_id.isupper() + +class ID3(DictProxy, mutagen.Metadata): + """A file with an ID3v2 tag. + + Attributes: + version -- ID3 tag version as a tuple + unknown_frames -- raw frame data of any unknown frames found + size -- the total size of the ID3 tag, including the header + """ + + PEDANTIC = True + version = (2, 4, 0) + + filename = None + size = 0 + __flags = 0 + __readbytes = 0 + __crc = None + + def __init__(self, *args, **kwargs): + self.unknown_frames = [] + super(ID3, self).__init__(*args, **kwargs) + + def __fullread(self, size): + try: + if size < 0: + raise ValueError('Requested bytes (%s) less than zero' % size) + if size > self.__filesize: + raise EOFError('Requested %#x of %#x (%s)' % + (long(size), long(self.__filesize), self.filename)) + except AttributeError: pass + data = self.__fileobj.read(size) + if len(data) != size: raise EOFError + self.__readbytes += size + return data + + def load(self, filename, known_frames=None, translate=True): + """Load tags from a filename. + + Keyword arguments: + filename -- filename to load tag data from + known_frames -- dict mapping frame IDs to Frame objects + translate -- Update all tags to ID3v2.4 internally. Mutagen is + only capable of writing ID3v2.4 tags, so if you + intend to save, this must be true. + + Example of loading a custom frame: + my_frames = dict(mutagen.id3.Frames) + class XMYF(Frame): ... + my_frames["XMYF"] = XMYF + mutagen.id3.ID3(filename, known_frames=my_frames) + """ + + from os.path import getsize + self.filename = filename + self.__known_frames = known_frames + self.__fileobj = file(filename, 'rb') + self.__filesize = getsize(filename) + try: + try: + self.__load_header() + except EOFError: + self.size = 0 + raise ID3NoHeaderError("%s: too small (%d bytes)" %( + filename, self.__filesize)) + except (ID3NoHeaderError, ID3UnsupportedVersionError), err: + self.size = 0 + import sys + stack = sys.exc_info()[2] + try: self.__fileobj.seek(-128, 2) + except EnvironmentError: raise err, None, stack + else: + frames = ParseID3v1(self.__fileobj.read(128)) + if frames is not None: + self.version = (1, 1) + map(self.add, frames.values()) + else: raise err, None, stack + else: + frames = self.__known_frames + if frames is None: + if (2,3,0) <= self.version: frames = Frames + elif (2,2,0) <= self.version: frames = Frames_2_2 + data = self.__fullread(self.size - 10) + for frame in self.__read_frames(data, frames=frames): + if isinstance(frame, Frame): self.add(frame) + else: self.unknown_frames.append(frame) + finally: + self.__fileobj.close() + del self.__fileobj + del self.__filesize + if translate: + self.update_to_v24() + + def getall(self, key): + """Return all frames with a given name (the list may be empty). + + This is best explained by examples: + id3.getall('TIT2') == [id3['TIT2']] + id3.getall('TTTT') == [] + id3.getall('TXXX') == [TXXX(desc='woo', text='bar'), + TXXX(desc='baz', text='quuuux'), ...] + + Since this is based on the frame's HashKey, which is + colon-separated, you can use it to do things like + getall('COMM:MusicMatch') or getall('TXXX:QuodLibet:'). + """ + if key in self: return [self[key]] + else: + key = key + ":" + return [v for s,v in self.items() if s.startswith(key)] + + def delall(self, key): + """Delete all tags of a given kind; see getall.""" + if key in self: del(self[key]) + else: + key = key + ":" + for k in filter(lambda s: s.startswith(key), self.keys()): + del(self[k]) + + def setall(self, key, values): + """Delete frames of the given type and add frames in 'values'.""" + self.delall(key) + for tag in values: + self[tag.HashKey] = tag + + def pprint(self): + """Return tags in a human-readable format. + + "Human-readable" is used loosely here. The format is intended + to mirror that used for Vorbis or APEv2 output, e.g. + TIT2=My Title + However, ID3 frames can have multiple keys: + POPM=user@example.org=3 128/255 + """ + return "\n".join(map(Frame.pprint, self.values())) + + def loaded_frame(self, tag): + """Deprecated; use the add method.""" + # turn 2.2 into 2.3/2.4 tags + if len(type(tag).__name__) == 3: tag = type(tag).__base__(tag) + self[tag.HashKey] = tag + + # add = loaded_frame (and vice versa) break applications that + # expect to be able to override loaded_frame (e.g. Quod Libet), + # as does making loaded_frame call add. + def add(self, frame): + """Add a frame to the tag.""" + return self.loaded_frame(frame) + + def __load_header(self): + fn = self.filename + data = self.__fullread(10) + id3, vmaj, vrev, flags, size = unpack('>3sBBB4s', data) + self.__flags = flags + self.size = BitPaddedInt(size) + 10 + self.version = (2, vmaj, vrev) + + if id3 != 'ID3': + raise ID3NoHeaderError("'%s' doesn't start with an ID3 tag" % fn) + if vmaj not in [2, 3, 4]: + raise ID3UnsupportedVersionError("'%s' ID3v2.%d not supported" + % (fn, vmaj)) + + if self.PEDANTIC: + if (2,4,0) <= self.version and (flags & 0x0f): + raise ValueError("'%s' has invalid flags %#02x" % (fn, flags)) + elif (2,3,0) <= self.version and (flags & 0x1f): + raise ValueError("'%s' has invalid flags %#02x" % (fn, flags)) + + if self.f_extended: + if self.version >= (2,4,0): + # "Where the 'Extended header size' is the size of the whole + # extended header, stored as a 32 bit synchsafe integer." + self.__extsize = BitPaddedInt(self.__fullread(4)) - 4 + else: + # "Where the 'Extended header size', currently 6 or 10 bytes, + # excludes itself." + self.__extsize = unpack('>L', self.__fullread(4))[0] + self.__extdata = self.__fullread(self.__extsize) + + def __determine_bpi(self, data, frames): + if self.version < (2,4,0): return int + # have to special case whether to use bitpaddedints here + # spec says to use them, but iTunes has it wrong + + # count number of tags found as BitPaddedInt and how far past + o = 0 + asbpi = 0 + while o < len(data)-10: + name, size, flags = unpack('>4sLH', data[o:o+10]) + size = BitPaddedInt(size) + o += 10+size + if name in frames: asbpi += 1 + bpioff = o - len(data) + + # count number of tags found as int and how far past + o = 0 + asint = 0 + while o < len(data)-10: + name, size, flags = unpack('>4sLH', data[o:o+10]) + o += 10+size + if name in frames: asint += 1 + intoff = o - len(data) + + # if more tags as int, or equal and bpi is past and int is not + if asint > asbpi or (asint == asbpi and (bpioff >= 1 and intoff <= 1)): + return int + return BitPaddedInt + + def __read_frames(self, data, frames): + if self.version < (2,4,0) and self.f_unsynch: + try: data = unsynch.decode(data) + except ValueError: pass + + if (2,3,0) <= self.version: + bpi = self.__determine_bpi(data, frames) + while data: + header = data[:10] + try: name, size, flags = unpack('>4sLH', header) + except struct.error: return # not enough header + if name.strip('\x00') == '': return + size = bpi(size) + framedata = data[10:10+size] + data = data[10+size:] + if size == 0: continue # drop empty frames + try: tag = frames[name] + except KeyError: + if is_valid_frame_id(name): yield header + framedata + else: + try: yield self.__load_framedata(tag, flags, framedata) + except NotImplementedError: yield header + framedata + except ID3JunkFrameError: pass + + elif (2,2,0) <= self.version: + while data: + header = data[0:6] + try: name, size = unpack('>3s3s', header) + except struct.error: return # not enough header + size, = struct.unpack('>L', '\x00'+size) + if name.strip('\x00') == '': return + framedata = data[6:6+size] + data = data[6+size:] + if size == 0: continue # drop empty frames + try: tag = frames[name] + except KeyError: + if is_valid_frame_id(name): yield header + framedata + else: + try: yield self.__load_framedata(tag, 0, framedata) + except NotImplementedError: yield header + framedata + except ID3JunkFrameError: pass + + def __load_framedata(self, tag, flags, framedata): + return tag.fromData(self, flags, framedata) + + f_unsynch = property(lambda s: bool(s.__flags & 0x80)) + f_extended = property(lambda s: bool(s.__flags & 0x40)) + f_experimental = property(lambda s: bool(s.__flags & 0x20)) + f_footer = property(lambda s: bool(s.__flags & 0x10)) + + #f_crc = property(lambda s: bool(s.__extflags & 0x8000)) + + def save(self, filename=None, v1=1): + """Save changes to a file. + + If no filename is given, the one most recently loaded is used. + + Keyword arguments: + v1 -- if 0, ID3v1 tags will be removed + if 1, ID3v1 tags will be updated but not added + if 2, ID3v1 tags will be created and/or updated + + The lack of a way to update only an ID3v1 tag is intentional. + """ + + # Sort frames by 'importance' + order = ["TIT2", "TPE1", "TRCK", "TALB", "TPOS", "TDRC", "TCON"] + order = dict(zip(order, range(len(order)))) + last = len(order) + frames = self.items() + frames.sort(lambda a, b: cmp(order.get(a[0][:4], last), + order.get(b[0][:4], last))) + + framedata = [self.__save_frame(frame) for (key, frame) in frames] + framedata.extend([data for data in self.unknown_frames + if len(data) > 10]) + if not framedata: + try: + self.delete(filename) + except EnvironmentError, err: + from errno import ENOENT + if err.errno != ENOENT: raise + return + + framedata = ''.join(framedata) + framesize = len(framedata) + + if filename is None: filename = self.filename + try: f = open(filename, 'rb+') + except IOError, err: + from errno import ENOENT + if err.errno != ENOENT: raise + f = open(filename, 'ab') # create, then reopen + f = open(filename, 'rb+') + try: + idata = f.read(10) + try: id3, vmaj, vrev, flags, insize = unpack('>3sBBB4s', idata) + except struct.error: id3, insize = '', 0 + insize = BitPaddedInt(insize) + if id3 != 'ID3': insize = -10 + + if insize >= framesize: outsize = insize + else: outsize = (framesize + 1023) & ~0x3FF + framedata += '\x00' * (outsize - framesize) + + framesize = BitPaddedInt.to_str(outsize, width=4) + flags = 0 + header = pack('>3sBBB4s', 'ID3', 4, 0, flags, framesize) + data = header + framedata + + if (insize < outsize): + insert_bytes(f, outsize-insize, insize+10) + f.seek(0) + f.write(data) + + try: + f.seek(-128, 2) + except IOError, err: + from errno import EINVAL + if err.errno != EINVAL: raise + f.seek(0, 2) # ensure read won't get "TAG" + + if f.read(3) == "TAG": + f.seek(-128, 2) + if v1 > 0: f.write(MakeID3v1(self)) + else: f.truncate() + elif v1 == 2: + f.seek(0, 2) + f.write(MakeID3v1(self)) + + finally: + f.close() + + def delete(self, filename=None, delete_v1=True, delete_v2=True): + """Remove tags from a file. + + If no filename is given, the one most recently loaded is used. + + Keyword arguments: + delete_v1 -- delete any ID3v1 tag + delete_v2 -- delete any ID3v2 tag + """ + if filename is None: + filename = self.filename + delete(filename, delete_v1, delete_v2) + self.clear() + + def __save_frame(self, frame): + flags = 0 + if self.PEDANTIC and isinstance(frame, TextFrame): + if len(str(frame)) == 0: return '' + framedata = frame._writeData() + usize = len(framedata) + if usize > 2048: + framedata = BitPaddedInt.to_str(usize) + framedata.encode('zlib') + flags |= Frame.FLAG24_COMPRESS | Frame.FLAG24_DATALEN + datasize = BitPaddedInt.to_str(len(framedata), width=4) + header = pack('>4s4sH', type(frame).__name__, datasize, flags) + return header + framedata + + def update_to_v24(self): + """Convert older tags into an ID3v2.4 tag. + + This updates old ID3v2 frames to ID3v2.4 ones (e.g. TYER to + TDRC). If you intend to save tags, you must call this function + at some point; it is called by default when loading the tag. + """ + + if self.version < (2,3,0): del self.unknown_frames[:] + # unsafe to write + + # TDAT, TYER, and TIME have been turned into TDRC. + try: + if str(self.get("TYER", "")).strip("\x00"): + date = str(self.pop("TYER")) + if str(self.get("TDAT", "")).strip("\x00"): + dat = str(self.pop("TDAT")) + date = "%s-%s-%s" % (date, dat[2:], dat[:2]) + if str(self.get("TIME", "")).strip("\x00"): + time = str(self.pop("TIME")) + date += "T%s:%s:00" % (time[:2], time[2:]) + if "TDRC" not in self: + self.add(TDRC(encoding=0, text=date)) + except UnicodeDecodeError: + # Old ID3 tags have *lots* of Unicode problems, so if TYER + # is bad, just chuck the frames. + pass + + # TORY can be the first part of a TDOR. + if "TORY" in self: + f = self.pop("TORY") + if "TDOR" not in self: + try: + self.add(TDOR(encoding=0, text=str(f))) + except UnicodeDecodeError: + pass + + # IPLS is now TIPL. + if "IPLS" in self: + f = self.pop("IPLS") + if "TIPL" not in self: + self.add(TIPL(encoding=f.encoding, people=f.people)) + + if "TCON" in self: + # Get rid of "(xx)Foobr" format. + self["TCON"].genres = self["TCON"].genres + + if self.version < (2, 3): + # ID3v2.2 PIC frames are slightly different. + pics = self.getall("APIC") + mimes = { "PNG": "image/png", "JPG": "image/jpeg" } + self.delall("APIC") + for pic in pics: + newpic = APIC( + encoding=pic.encoding, mime=mimes.get(pic.mime, pic.mime), + type=pic.type, desc=pic.desc, data=pic.data) + self.add(newpic) + + # ID3v2.2 LNK frames are just way too different to upgrade. + self.delall("LINK") + + # These can't be trivially translated to any ID3v2.4 tags, or + # should have been removed already. + for key in ["RVAD", "EQUA", "TRDA", "TSIZ", "TDAT", "TIME", "CRM"]: + if key in self: del(self[key]) + +def delete(filename, delete_v1=True, delete_v2=True): + """Remove tags from a file. + + Keyword arguments: + delete_v1 -- delete any ID3v1 tag + delete_v2 -- delete any ID3v2 tag + """ + + f = open(filename, 'rb+') + + if delete_v1: + try: + f.seek(-128, 2) + except IOError: pass + else: + if f.read(3) == "TAG": + f.seek(-128, 2) + f.truncate() + + # technically an insize=0 tag is invalid, but we delete it anyway + # (primarily because we used to write it) + if delete_v2: + f.seek(0, 0) + idata = f.read(10) + try: id3, vmaj, vrev, flags, insize = unpack('>3sBBB4s', idata) + except struct.error: id3, insize = '', -1 + insize = BitPaddedInt(insize) + if id3 == 'ID3' and insize >= 0: + delete_bytes(f, insize + 10, 0) + +class BitPaddedInt(int): + def __new__(cls, value, bits=7, bigendian=True): + "Strips 8-bits bits out of every byte" + mask = (1<<(bits))-1 + if isinstance(value, (int, long)): + bytes = [] + while value: + bytes.append(value & ((1<> 8 + if isinstance(value, str): + bytes = [ord(byte) & mask for byte in value] + if bigendian: bytes.reverse() + numeric_value = 0 + for shift, byte in zip(range(0, len(bytes)*bits, bits), bytes): + numeric_value += byte << shift + if isinstance(numeric_value, long): + self = long.__new__(BitPaddedLong, numeric_value) + else: + self = int.__new__(BitPaddedInt, numeric_value) + self.bits = bits + self.bigendian = bigendian + return self + + def as_str(value, bits=7, bigendian=True, width=4): + bits = getattr(value, 'bits', bits) + bigendian = getattr(value, 'bigendian', bigendian) + value = int(value) + mask = (1<> bits + # PCNT and POPM use growing integers of at least 4 bytes as counters. + if width == -1: width = max(4, len(bytes)) + if len(bytes) > width: + raise ValueError, 'Value too wide (%d bytes)' % len(bytes) + else: bytes.extend([0] * (width-len(bytes))) + if bigendian: bytes.reverse() + return ''.join(map(chr, bytes)) + to_str = staticmethod(as_str) + +class BitPaddedLong(long): + def as_str(value, bits=7, bigendian=True, width=4): + return BitPaddedInt.to_str(value, bits, bigendian, width) + to_str = staticmethod(as_str) + +class unsynch(object): + def decode(value): + output = [] + safe = True + append = output.append + for val in value: + if safe: + append(val) + safe = val != '\xFF' + else: + if val >= '\xE0': raise ValueError('invalid sync-safe string') + elif val != '\x00': append(val) + safe = True + if not safe: raise ValueError('string ended unsafe') + return ''.join(output) + decode = staticmethod(decode) + + def encode(value): + output = [] + safe = True + append = output.append + for val in value: + if safe: + append(val) + if val == '\xFF': safe = False + elif val == '\x00' or val >= '\xE0': + append('\x00') + append(val) + safe = val != '\xFF' + else: + append(val) + safe = True + if not safe: append('\x00') + return ''.join(output) + encode = staticmethod(encode) + +class Spec(object): + def __init__(self, name): self.name = name + def __hash__(self): raise TypeError("Spec objects are unhashable") + +class ByteSpec(Spec): + def read(self, frame, data): return ord(data[0]), data[1:] + def write(self, frame, value): return chr(value) + def validate(self, frame, value): return value + +class IntegerSpec(Spec): + def read(self, frame, data): + return int(BitPaddedInt(data, bits=8)), '' + def write(self, frame, value): + return BitPaddedInt.to_str(value, bits=8, width=-1) + def validate(self, frame, value): + return value + +class SizedIntegerSpec(Spec): + def __init__(self, name, size): + self.name, self.__sz = name, size + def read(self, frame, data): + return int(BitPaddedInt(data[:self.__sz], bits=8)), data[self.__sz:] + def write(self, frame, value): + return BitPaddedInt.to_str(value, bits=8, width=self.__sz) + def validate(self, frame, value): + return value + +class EncodingSpec(ByteSpec): + def read(self, frame, data): + enc, data = super(EncodingSpec, self).read(frame, data) + if enc < 16: return enc, data + else: return 0, chr(enc)+data + + def validate(self, frame, value): + if 0 <= value <= 3: return value + if value is None: return None + raise ValueError, 'Invalid Encoding: %r' % value + +class StringSpec(Spec): + def __init__(self, name, length): + super(StringSpec, self).__init__(name) + self.len = length + def read(s, frame, data): return data[:s.len], data[s.len:] + def write(s, frame, value): + if value is None: return '\x00' * s.len + else: return (str(value) + '\x00' * s.len)[:s.len] + def validate(s, frame, value): + if value is None: return None + if isinstance(value, basestring) and len(value) == s.len: return value + raise ValueError, 'Invalid StringSpec[%d] data: %r' % (s.len, value) + +class BinaryDataSpec(Spec): + def read(self, frame, data): return data, '' + def write(self, frame, value): return str(value) + def validate(self, frame, value): return str(value) + +class EncodedTextSpec(Spec): + # Okay, seriously. This is private and defined explicitly and + # completely by the ID3 specification. You can't just add + # encodings here however you want. + _encodings = ( ('latin1', '\x00'), ('utf16', '\x00\x00'), + ('utf_16_be', '\x00\x00'), ('utf8', '\x00') ) + + def read(self, frame, data): + enc, term = self._encodings[frame.encoding] + ret = '' + if len(term) == 1: + if term in data: + data, ret = data.split(term, 1) + else: + offset = -1 + try: + while True: + offset = data.index(term, offset+1) + if offset & 1: continue + data, ret = data[0:offset], data[offset+2:]; break + except ValueError: pass + + if len(data) < len(term): return u'', ret + return data.decode(enc), ret + + def write(self, frame, value): + enc, term = self._encodings[frame.encoding] + return value.encode(enc) + term + + def validate(self, frame, value): return unicode(value) + +class MultiSpec(Spec): + def __init__(self, name, *specs, **kw): + super(MultiSpec, self).__init__(name) + self.specs = specs + self.sep = kw.get('sep') + + def read(self, frame, data): + values = [] + while data: + record = [] + for spec in self.specs: + value, data = spec.read(frame, data) + record.append(value) + if len(self.specs) != 1: values.append(record) + else: values.append(record[0]) + return values, data + + def write(self, frame, value): + data = [] + if len(self.specs) == 1: + for v in value: + data.append(self.specs[0].write(frame, v)) + else: + for record in value: + for v, s in zip(record, self.specs): + data.append(s.write(frame, v)) + return ''.join(data) + + def validate(self, frame, value): + if value is None: return [] + if self.sep and isinstance(value, basestring): + value = value.split(self.sep) + if isinstance(value, list): + if len(self.specs) == 1: + return [self.specs[0].validate(frame, v) for v in value] + else: + return [ + [s.validate(frame, v) for (v,s) in zip(val, self.specs)] + for val in value ] + raise ValueError, 'Invalid MultiSpec data: %r' % value + +class EncodedNumericTextSpec(EncodedTextSpec): pass +class EncodedNumericPartTextSpec(EncodedTextSpec): pass + +class Latin1TextSpec(EncodedTextSpec): + def read(self, frame, data): + if '\x00' in data: data, ret = data.split('\x00',1) + else: ret = '' + return data.decode('latin1'), ret + + def write(self, data, value): + return value.encode('latin1') + '\x00' + + def validate(self, frame, value): return unicode(value) + +class ID3TimeStamp(object): + """A time stamp in ID3v2 format. + + This is a restricted form of the ISO 8601 standard; time stamps + take the form of: + YYYY-MM-DD HH:MM:SS + Or some partial form (YYYY-MM-DD HH, YYYY, etc.). + + The 'text' attribute contains the raw text data of the time stamp. + """ + + import re + def __init__(self, text): + if isinstance(text, ID3TimeStamp): text = text.text + self.text = text + + __formats = ['%04d'] + ['%02d'] * 5 + __seps = ['-', '-', ' ', ':', ':', 'x'] + def get_text(self): + parts = [self.year, self.month, self.day, + self.hour, self.minute, self.second] + pieces = [] + for i, part in enumerate(iter(iter(parts).next, None)): + pieces.append(self.__formats[i]%part + self.__seps[i]) + return u''.join(pieces)[:-1] + + def set_text(self, text, splitre=re.compile('[-T:/.]|\s+')): + year, month, day, hour, minute, second = \ + splitre.split(text + ':::::')[:6] + for a in 'year month day hour minute second'.split(): + try: v = int(locals()[a]) + except ValueError: v = None + setattr(self, a, v) + + text = property(get_text, set_text, doc="ID3v2.4 date and time.") + + def __str__(self): return self.text + def __repr__(self): return repr(self.text) + def __cmp__(self, other): return cmp(self.text, other.text) + def encode(self, *args): return self.text.encode(*args) + +class TimeStampSpec(EncodedTextSpec): + def read(self, frame, data): + value, data = super(TimeStampSpec, self).read(frame, data) + return self.validate(frame, value), data + + def write(self, frame, data): + return super(TimeStampSpec, self).write(frame, + data.text.replace(' ', 'T')) + + def validate(self, frame, value): + try: return ID3TimeStamp(value) + except TypeError: raise ValueError, "Invalid ID3TimeStamp: %r" % value + +class ChannelSpec(ByteSpec): + (OTHER, MASTER, FRONTRIGHT, FRONTLEFT, BACKRIGHT, BACKLEFT, FRONTCENTRE, + BACKCENTRE, SUBWOOFER) = range(9) + +class VolumeAdjustmentSpec(Spec): + def read(self, frame, data): + value, = unpack('>h', data[0:2]) + return value/512.0, data[2:] + + def write(self, frame, value): + return pack('>h', int(round(value * 512))) + + def validate(self, frame, value): return value + +class VolumePeakSpec(Spec): + def read(self, frame, data): + # http://bugs.xmms.org/attachment.cgi?id=113&action=view + peak = 0 + bits = ord(data[0]) + bytes = min(4, (bits + 7) >> 3) + # not enough frame data + if bytes + 1 > len(data): raise ID3JunkFrameError + shift = ((8 - (bits & 7)) & 7) + (4 - bytes) * 8 + for i in range(1, bytes+1): + peak *= 256 + peak += ord(data[i]) + peak *= 2**shift + return (float(peak) / (2**31-1)), data[1+bytes:] + + def write(self, frame, value): + # always write as 16 bits for sanity. + return "\x10" + pack('>H', int(round(value * 32768))) + + def validate(self, frame, value): return value + +class SynchronizedTextSpec(EncodedTextSpec): + def read(self, frame, data): + texts = [] + encoding, term = self._encodings[frame.encoding] + while data: + l = len(term) + value_idx = data.index(term) + value = data[:value_idx].decode(encoding) + time, = struct.unpack(">I", data[value_idx+l:value_idx+l+4]) + texts.append((value, time)) + data = data[value_idx+l+4:] + return texts, "" + + def write(self, frame, value): + data = [] + encoding, term = self._encodings[frame.encoding] + for text, time in frame.text: + text = text.encode(encoding) + term + data.append(text + struct.pack(">I", time)) + return "".join(data) + + def validate(self, frame, value): + return value + +class KeyEventSpec(Spec): + def read(self, frame, data): + events = [] + while len(data) >= 5: + events.append(struct.unpack(">bI", data[:5])) + data = data[5:] + return events, data + + def write(self, frame, value): + return "".join([struct.pack(">bI", *event) for event in value]) + + def validate(self, frame, value): + return value + +class VolumeAdjustmentsSpec(Spec): + # Not to be confused with VolumeAdjustmentSpec. + def read(self, frame, data): + adjustments = {} + while len(data) >= 4: + freq, adj = struct.unpack(">Hh", data[:4]) + data = data[4:] + freq /= 2.0 + adj /= 512.0 + adjustments[freq] = adj + adjustments = adjustments.items() + adjustments.sort() + return adjustments, data + + def write(self, frame, value): + value.sort() + return "".join([struct.pack(">Hh", int(freq * 2), int(adj * 512)) + for (freq, adj) in value]) + + def validate(self, frame, value): + return value + +class ASPIIndexSpec(Spec): + def read(self, frame, data): + if frame.b == 16: + format = "H" + size = 2 + elif frame.b == 8: + format = "B" + size = 1 + else: + warn("invalid bit count in ASPI (%d)" % frame.b, ID3Warning) + return [], data + + indexes = data[:frame.N * size] + data = data[frame.N * size:] + return list(struct.unpack(">" + format * frame.N, indexes)), data + + def write(self, frame, values): + if frame.b == 16: format = "H" + elif frame.b == 8: format = "B" + else: raise ValueError("frame.b must be 8 or 16") + return struct.pack(">" + format * frame.N, *values) + + def validate(self, frame, values): + return values + +class Frame(object): + """Fundamental unit of ID3 data. + + ID3 tags are split into frames. Each frame has a potentially + different structure, and so this base class is not very featureful. + """ + + FLAG23_ALTERTAG = 0x8000 + FLAG23_ALTERFILE = 0x4000 + FLAG23_READONLY = 0x2000 + FLAG23_COMPRESS = 0x0080 + FLAG23_ENCRYPT = 0x0040 + FLAG23_GROUP = 0x0020 + + FLAG24_ALTERTAG = 0x4000 + FLAG24_ALTERFILE = 0x2000 + FLAG24_READONLY = 0x1000 + FLAG24_GROUPID = 0x0040 + FLAG24_COMPRESS = 0x0008 + FLAG24_ENCRYPT = 0x0004 + FLAG24_UNSYNCH = 0x0002 + FLAG24_DATALEN = 0x0001 + + _framespec = [] + def __init__(self, *args, **kwargs): + if len(args)==1 and len(kwargs)==0 and isinstance(args[0], type(self)): + other = args[0] + for checker in self._framespec: + val = checker.validate(self, getattr(other, checker.name)) + setattr(self, checker.name, val) + else: + for checker, val in zip(self._framespec, args): + setattr(self, checker.name, checker.validate(self, val)) + for checker in self._framespec[len(args):]: + validated = checker.validate( + self, kwargs.get(checker.name, None)) + setattr(self, checker.name, validated) + + HashKey = property( + lambda s: s.FrameID, + doc="an internal key used to ensure frame uniqueness in a tag") + FrameID = property( + lambda s: type(s).__name__, + doc="ID3v2 three or four character frame ID") + + def __repr__(self): + """Python representation of a frame. + + The string returned is a valid Python expression to construct + a copy of this frame. + """ + kw = [] + for attr in self._framespec: + kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) + return '%s(%s)' % (type(self).__name__, ', '.join(kw)) + + def _readData(self, data): + odata = data + for reader in self._framespec: + if len(data): + try: value, data = reader.read(self, data) + except UnicodeDecodeError: + raise ID3JunkFrameError + else: raise ID3JunkFrameError + setattr(self, reader.name, value) + if data.strip('\x00'): + warn('Leftover data: %s: %r (from %r)' % ( + type(self).__name__, data, odata), + ID3Warning) + + def _writeData(self): + data = [] + for writer in self._framespec: + data.append(writer.write(self, getattr(self, writer.name))) + return ''.join(data) + + def pprint(self): + """Return a human-readable representation of the frame.""" + return "%s=%s" % (type(self).__name__, self._pprint()) + + def _pprint(self): + return "[unrepresentable data]" + + def fromData(cls, id3, tflags, data): + """Construct this ID3 frame from raw string data.""" + + if (2,4,0) <= id3.version: + if tflags & (Frame.FLAG24_COMPRESS | Frame.FLAG24_DATALEN): + # The data length int is syncsafe in 2.4 (but not 2.3). + # However, we don't actually need the data length int, + # except to work around a QL 0.12 bug, and in that case + # all we need are the raw bytes. + datalen_bytes = data[:4] + data = data[4:] + if tflags & Frame.FLAG24_UNSYNCH or id3.f_unsynch: + try: data = unsynch.decode(data) + except ValueError, err: + if id3.PEDANTIC: + raise ID3BadUnsynchData, '%s: %r' % (err, data) + if tflags & Frame.FLAG24_ENCRYPT: + raise ID3EncryptionUnsupportedError + if tflags & Frame.FLAG24_COMPRESS: + try: data = data.decode('zlib') + except zlibError, err: + # the initial mutagen that went out with QL 0.12 did not + # write the 4 bytes of uncompressed size. Compensate. + data = datalen_bytes + data + try: data = data.decode('zlib') + except zlibError, err: + if id3.PEDANTIC: + raise ID3BadCompressedData, '%s: %r' % (err, data) + + elif (2,3,0) <= id3.version: + if tflags & Frame.FLAG23_COMPRESS: + usize, = unpack('>L', data[:4]) + data = data[4:] + if tflags & Frame.FLAG23_ENCRYPT: + raise ID3EncryptionUnsupportedError + if tflags & Frame.FLAG23_COMPRESS: + try: data = data.decode('zlib') + except zlibError, err: + if id3.PEDANTIC: + raise ID3BadCompressedData, '%s: %r' % (err, data) + + frame = cls() + frame._rawdata = data + frame._flags = tflags + frame._readData(data) + return frame + fromData = classmethod(fromData) + + def __hash__(self): + raise TypeError("Frame objects are unhashable") + +class FrameOpt(Frame): + """A frame with optional parts. + + Some ID3 frames have optional data; this class extends Frame to + provide support for those parts. + """ + _optionalspec = [] + + def __init__(self, *args, **kwargs): + super(FrameOpt, self).__init__(*args, **kwargs) + for spec in self._optionalspec: + if spec.name in kwargs: + validated = spec.validate(self, kwargs[spec.name]) + setattr(self, spec.name, validated) + else: break + + def _readData(self, data): + odata = data + for reader in self._framespec: + if len(data): value, data = reader.read(self, data) + else: raise ID3JunkFrameError + setattr(self, reader.name, value) + if data: + for reader in self._optionalspec: + if len(data): value, data = reader.read(self, data) + else: break + setattr(self, reader.name, value) + if data.strip('\x00'): + warn('Leftover data: %s: %r (from %r)' % ( + type(self).__name__, data, odata), + ID3Warning) + + def _writeData(self): + data = [] + for writer in self._framespec: + data.append(writer.write(self, getattr(self, writer.name))) + for writer in self._optionalspec: + try: data.append(writer.write(self, getattr(self, writer.name))) + except AttributeError: break + return ''.join(data) + + def __repr__(self): + kw = [] + for attr in self._framespec: + kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) + for attr in self._optionalspec: + if hasattr(self, attr.name): + kw.append('%s=%r' % (attr.name, getattr(self, attr.name))) + return '%s(%s)' % (type(self).__name__, ', '.join(kw)) + + +class TextFrame(Frame): + """Text strings. + + Text frames support casts to unicode or str objects, as well as + list-like indexing, extend, and append. + + Iterating over a TextFrame iterates over its strings, not its + characters. + + Text frames have a 'text' attribute which is the list of strings, + and an 'encoding' attribute; 0 for ISO-8859 1, 1 UTF-16, 2 for + UTF-16BE, and 3 for UTF-8. If you don't want to worry about + encodings, just set it to 3. + """ + + _framespec = [ EncodingSpec('encoding'), + MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] + def __str__(self): return self.__unicode__().encode('utf-8') + def __unicode__(self): return u'\u0000'.join(self.text) + def __eq__(self, other): + if isinstance(other, str): return str(self) == other + elif isinstance(other, unicode): + return u'\u0000'.join(self.text) == other + return self.text == other + def __getitem__(self, item): return self.text[item] + def __iter__(self): return iter(self.text) + def append(self, value): return self.text.append(value) + def extend(self, value): return self.text.extend(value) + def _pprint(self): return " / ".join(self.text) + +class NumericTextFrame(TextFrame): + """Numerical text strings. + + The numeric value of these frames can be gotten with unary plus, e.g. + frame = TLEN('12345') + length = +frame + """ + + _framespec = [ EncodingSpec('encoding'), + MultiSpec('text', EncodedNumericTextSpec('text'), sep=u'\u0000') ] + + def __pos__(self): + """Return the numerical value of the string.""" + return int(self.text[0]) + +class NumericPartTextFrame(TextFrame): + """Multivalue numerical text strings. + + These strings indicate 'part (e.g. track) X of Y', and unary plus + returns the first value: + frame = TRCK('4/15') + track = +frame # track == 4 + """ + + _framespec = [ EncodingSpec('encoding'), + MultiSpec('text', EncodedNumericPartTextSpec('text'), sep=u'\u0000') ] + def __pos__(self): + return int(self.text[0].split("/")[0]) + +class TimeStampTextFrame(TextFrame): + """A list of time stamps. + + The 'text' attribute in this frame is a list of ID3TimeStamp + objects, not a list of strings. + """ + + _framespec = [ EncodingSpec('encoding'), + MultiSpec('text', TimeStampSpec('stamp'), sep=u',') ] + def __str__(self): return self.__unicode__().encode('utf-8') + def __unicode__(self): return ','.join([stamp.text for stamp in self.text]) + def _pprint(self): + return " / ".join([stamp.text for stamp in self.text]) + +class UrlFrame(Frame): + """A frame containing a URL string. + + The ID3 specification is silent about IRIs and normalized URL + forms. Mutagen assumes all URLs in files are encoded as Latin 1, + but string conversion of this frame returns a UTF-8 representation + for compatibility with other string conversions. + + The only sane way to handle URLs in MP3s is to restrict them to + ASCII. + """ + + _framespec = [ Latin1TextSpec('url') ] + def __str__(self): return self.url.encode('utf-8') + def __unicode__(self): return self.url + def __eq__(self, other): return self.url == other + def _pprint(self): return self.url + +class UrlFrameU(UrlFrame): + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.url)) + +class TALB(TextFrame): "Album" +class TBPM(NumericTextFrame): "Beats per minute" +class TCOM(TextFrame): "Composer" + +class TCON(TextFrame): + """Content type (Genre) + + ID3 has several ways genres can be represented; for convenience, + use the 'genres' property rather than the 'text' attribute. + """ + + from mutagen._constants import GENRES + + def __get_genres(self): + genres = [] + import re + genre_re = re.compile(r"((?:\((?P[0-9]+|RX|CR)\))*)(?P.+)?") + for value in self.text: + if value.isdigit(): + try: genres.append(self.GENRES[int(value)]) + except IndexError: genres.append(u"Unknown") + elif value == "CR": genres.append(u"Cover") + elif value == "RX": genres.append(u"Remix") + elif value: + newgenres = [] + genreid, dummy, genrename = genre_re.match(value).groups() + + if genreid: + for gid in genreid[1:-1].split(")("): + if gid.isdigit() and int(gid) < len(self.GENRES): + gid = unicode(self.GENRES[int(gid)]) + newgenres.append(gid) + elif gid == "CR": newgenres.append(u"Cover") + elif gid == "RX": newgenres.append(u"Remix") + else: newgenres.append(u"Unknown") + + if genrename: + # "Unescaping" the first parenthesis + if genrename.startswith("(("): genrename = genrename[1:] + if genrename not in newgenres: newgenres.append(genrename) + + genres.extend(newgenres) + + return genres + + def __set_genres(self, genres): + if isinstance(genres, basestring): genres = [genres] + self.text = map(self.__decode, genres) + + def __decode(self, value): + if isinstance(value, str): + enc = EncodedTextSpec._encodings[self.encoding][0] + return value.decode(enc) + else: return value + + genres = property(__get_genres, __set_genres, None, + "A list of genres parsed from the raw text data.") + + def _pprint(self): + return " / ".join(self.genres) + +class TCOP(TextFrame): "Copyright (c)" +class TCMP(NumericTextFrame): "iTunes Compilation Flag" +class TDAT(TextFrame): "Date of recording (DDMM)" +class TDEN(TimeStampTextFrame): "Encoding Time" +class TDOR(TimeStampTextFrame): "Original Release Time" +class TDLY(NumericTextFrame): "Audio Delay (ms)" +class TDRC(TimeStampTextFrame): "Recording Time" +class TDRL(TimeStampTextFrame): "Release Time" +class TDTG(TimeStampTextFrame): "Tagging Time" +class TENC(TextFrame): "Encoder" +class TEXT(TextFrame): "Lyricist" +class TFLT(TextFrame): "File type" +class TIME(TextFrame): "Time of recording (HHMM)" +class TIT1(TextFrame): "Content group description" +class TIT2(TextFrame): "Title" +class TIT3(TextFrame): "Subtitle/Description refinement" +class TKEY(TextFrame): "Starting Key" +class TLAN(TextFrame): "Audio Languages" +class TLEN(NumericTextFrame): "Audio Length (ms)" +class TMED(TextFrame): "Source Media Type" +class TMOO(TextFrame): "Mood" +class TOAL(TextFrame): "Original Album" +class TOFN(TextFrame): "Original Filename" +class TOLY(TextFrame): "Original Lyricist" +class TOPE(TextFrame): "Original Artist/Performer" +class TORY(NumericTextFrame): "Original Release Year" +class TOWN(TextFrame): "Owner/Licensee" +class TPE1(TextFrame): "Lead Artist/Performer/Soloist/Group" +class TPE2(TextFrame): "Band/Orchestra/Accompaniment" +class TPE3(TextFrame): "Conductor" +class TPE4(TextFrame): "Interpreter/Remixer/Modifier" +class TPOS(NumericPartTextFrame): "Part of set" +class TPRO(TextFrame): "Produced (P)" +class TPUB(TextFrame): "Publisher" +class TRCK(NumericPartTextFrame): "Track Number" +class TRDA(TextFrame): "Recording Dates" +class TRSN(TextFrame): "Internet Radio Station Name" +class TRSO(TextFrame): "Internet Radio Station Owner" +class TSIZ(NumericTextFrame): "Size of audio data (bytes)" +class TSOA(TextFrame): "Album Sort Order key" +class TSOP(TextFrame): "Perfomer Sort Order key" +class TSOT(TextFrame): "Title Sort Order key" +class TSRC(TextFrame): "International Standard Recording Code (ISRC)" +class TSSE(TextFrame): "Encoder settings" +class TSST(TextFrame): "Set Subtitle" +class TYER(NumericTextFrame): "Year of recording" + +class TXXX(TextFrame): + """User-defined text data. + + TXXX frames have a 'desc' attribute which is set to any Unicode + value (though the encoding of the text and the description must be + the same). Many taggers use this frame to store freeform keys. + """ + _framespec = [ EncodingSpec('encoding'), EncodedTextSpec('desc'), + MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + def _pprint(self): return "%s=%s" % (self.desc, " / ".join(self.text)) + +class WCOM(UrlFrameU): "Commercial Information" +class WCOP(UrlFrame): "Copyright Information" +class WOAF(UrlFrame): "Official File Information" +class WOAR(UrlFrameU): "Official Artist/Performer Information" +class WOAS(UrlFrame): "Official Source Information" +class WORS(UrlFrame): "Official Internet Radio Information" +class WPAY(UrlFrame): "Payment Information" +class WPUB(UrlFrame): "Official Publisher Information" + +class WXXX(UrlFrame): + """User-defined URL data. + + Like TXXX, this has a freeform description associated with it. + """ + _framespec = [ EncodingSpec('encoding'), EncodedTextSpec('desc'), + Latin1TextSpec('url') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + +class PairedTextFrame(Frame): + """Paired text strings. + + Some ID3 frames pair text strings, to associate names with a more + specific involvement in the song. The 'people' attribute of these + frames contains a list of pairs: + [['trumpet', 'Miles Davis'], ['bass', 'Paul Chambers']] + + Like text frames, these frames also have an encoding attribute. + """ + + _framespec = [ EncodingSpec('encoding'), MultiSpec('people', + EncodedTextSpec('involvement'), EncodedTextSpec('person')) ] + def __eq__(self, other): + return self.people == other + +class TIPL(PairedTextFrame): "Involved People List" +class TMCL(PairedTextFrame): "Musicians Credits List" +class IPLS(TIPL): "Involved People List" + +class MCDI(Frame): + """Binary dump of CD's TOC. + + The 'data' attribute contains the raw byte string. + """ + _framespec = [ BinaryDataSpec('data') ] + def __eq__(self, other): return self.data == other + +class ETCO(Frame): + """Event timing codes.""" + _framespec = [ ByteSpec("format"), KeyEventSpec("events") ] + def __eq__(self, other): return self.events == other + +class MLLT(Frame): + """MPEG location lookup table. + + This frame's attributes may be changed in the future based on + feedback from real-world use. + """ + _framespec = [ SizedIntegerSpec('frames', 2), + SizedIntegerSpec('bytes', 3), + SizedIntegerSpec('milliseconds', 3), + ByteSpec('bits_for_bytes'), + ByteSpec('bits_for_milliseconds'), + BinaryDataSpec('data') ] + def __eq__(self, other): return self.data == other + +class SYTC(Frame): + """Synchronised tempo codes. + + This frame's attributes may be changed in the future based on + feedback from real-world use. + """ + _framespec = [ ByteSpec("format"), BinaryDataSpec("data") ] + def __eq__(self, other): return self.data == other + +class USLT(Frame): + """Unsynchronised lyrics/text transcription. + + Lyrics have a three letter ISO language code ('lang'), a + description ('desc'), and a block of plain text ('text'). + """ + + _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), + EncodedTextSpec('desc'), EncodedTextSpec('text') ] + HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) + + def __str__(self): return self.text.encode('utf-8') + def __unicode__(self): return self.text + def __eq__(self, other): return self.text == other + +class SYLT(Frame): + """Synchronised lyrics/text.""" + + _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), + ByteSpec('format'), ByteSpec('type'), EncodedTextSpec('desc'), + SynchronizedTextSpec('text') ] + HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) + + def __eq__(self, other): + return str(self) == other + + def __str__(self): + return "".join([text for (text, time) in self.text]).encode('utf-8') + +class COMM(TextFrame): + """User comment. + + User comment frames have a descrption, like TXXX, and also a three + letter ISO language code in the 'lang' attribute. + """ + _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), + EncodedTextSpec('desc'), + MultiSpec('text', EncodedTextSpec('text'), sep=u'\u0000') ] + HashKey = property(lambda s: '%s:%s:%r' % (s.FrameID, s.desc, s.lang)) + def _pprint(self): return "%s=%r=%s" % ( + self.desc, self.lang, " / ".join(self.text)) + +class RVA2(Frame): + """Relative volume adjustment (2). + + This frame is used to implemented volume scaling, and in + particular, normalization using ReplayGain. + + Attributes: + desc -- description or context of this adjustment + channel -- audio channel to adjust (master is 1) + gain -- a + or - dB gain relative to some reference level + peak -- peak of the audio as a floating point number, [0, 1] + + When storing ReplayGain tags, use descriptions of 'album' and + 'track' on channel 1. + """ + + _framespec = [ Latin1TextSpec('desc'), ChannelSpec('channel'), + VolumeAdjustmentSpec('gain'), VolumePeakSpec('peak') ] + _channels = ["Other", "Master volume", "Front right", "Front left", + "Back right", "Back left", "Front centre", "Back centre", + "Subwoofer"] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + + def __eq__(self, other): + return ((str(self) == other) or + (self.desc == other.desc and + self.channel == other.channel and + self.gain == other.gain and + self.peak == other.peak)) + + def __str__(self): + return "%s: %+0.4f dB/%0.4f" % ( + self._channels[self.channel], self.gain, self.peak) + +class EQU2(Frame): + """Equalisation (2). + + Attributes: + method -- interpolation method (0 = band, 1 = linear) + desc -- identifying description + adjustments -- list of (frequency, vol_adjustment) pairs + """ + _framespec = [ ByteSpec("method"), Latin1TextSpec("desc"), + VolumeAdjustmentsSpec("adjustments") ] + def __eq__(self, other): return self.adjustments == other + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + +# class RVAD: unsupported +# class EQUA: unsupported + +class RVRB(Frame): + """Reverb.""" + _framespec = [ SizedIntegerSpec('left', 2), SizedIntegerSpec('right', 2), + ByteSpec('bounce_left'), ByteSpec('bounce_right'), + ByteSpec('feedback_ltl'), ByteSpec('feedback_ltr'), + ByteSpec('feedback_rtr'), ByteSpec('feedback_rtl'), + ByteSpec('premix_ltr'), ByteSpec('premix_rtl') ] + + def __eq__(self, other): return (self.left, self.right) == other + +class APIC(Frame): + """Attached (or linked) Picture. + + Attributes: + encoding -- text encoding for the description + mime -- a MIME type (e.g. image/jpeg) or '-->' if the data is a URI + type -- the source of the image (3 is the album front cover) + desc -- a text description of the image + data -- raw image data, as a byte string + + Mutagen will automatically compress large images when saving tags. + """ + _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('mime'), + ByteSpec('type'), EncodedTextSpec('desc'), BinaryDataSpec('data') ] + def __eq__(self, other): return self.data == other + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + def _pprint(self): + return "%s (%s, %d bytes)" % ( + self.desc, self.mime, len(self.data)) + +class PCNT(Frame): + """Play counter. + + The 'count' attribute contains the (recorded) number of times this + file has been played. + + This frame is basically obsoleted by POPM. + """ + _framespec = [ IntegerSpec('count') ] + + def __eq__(self, other): return self.count == other + def __pos__(self): return self.count + def _pprint(self): return unicode(self.count) + +class POPM(Frame): + """Popularimeter. + + This frame keys a rating (out of 255) and a play count to an email + address. + + Attributes: + email -- email this POPM frame is for + rating -- rating from 0 to 255 + count -- number of times the files has been played + """ + _framespec = [ Latin1TextSpec('email'), ByteSpec('rating'), + IntegerSpec('count') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.email)) + + def __eq__(self, other): return self.rating == other + def __pos__(self): return self.rating + def _pprint(self): return "%s=%s %s/255" % ( + self.email, self.count, self.rating) + +class GEOB(Frame): + """General Encapsulated Object. + + A blob of binary data, that is not a picture (those go in APIC). + + Attributes: + encoding -- encoding of the description + mime -- MIME type of the data or '-->' if the data is a URI + filename -- suggested filename if extracted + desc -- text description of the data + data -- raw data, as a byte string + """ + _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('mime'), + EncodedTextSpec('filename'), EncodedTextSpec('desc'), + BinaryDataSpec('data') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.desc)) + + def __eq__(self, other): return self.data == other + +class RBUF(FrameOpt): + """Recommended buffer size. + + Attributes: + size -- recommended buffer size in bytes + info -- if ID3 tags may be elsewhere in the file (optional) + offset -- the location of the next ID3 tag, if any + + Mutagen will not find the next tag itself. + """ + _framespec = [ SizedIntegerSpec('size', 3) ] + _optionalspec = [ ByteSpec('info'), SizedIntegerSpec('offset', 4) ] + + def __eq__(self, other): return self.size == other + def __pos__(self): return self.size + +class AENC(FrameOpt): + """Audio encryption. + + Attributes: + owner -- key identifying this encryption type + preview_start -- unencrypted data block offset + preview_length -- number of unencrypted blocks + data -- data required for decryption (optional) + + Mutagen cannot decrypt files. + """ + _framespec = [ Latin1TextSpec('owner'), + SizedIntegerSpec('preview_start', 2), + SizedIntegerSpec('preview_length', 2) ] + _optionalspec = [ BinaryDataSpec('data') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.owner)) + + def __str__(self): return self.owner.encode('utf-8') + def __unicode__(self): return self.owner + def __eq__(self, other): return self.owner == other + +class LINK(FrameOpt): + """Linked information. + + Attributes: + frameid -- the ID of the linked frame + url -- the location of the linked frame + data -- further ID information for the frame + """ + + _framespec = [ StringSpec('frameid', 4), Latin1TextSpec('url') ] + _optionalspec = [ BinaryDataSpec('data') ] + def __HashKey(self): + try: + return "%s:%s:%s:%r" % ( + self.FrameID, self.frameid, self.url, self.data) + except AttributeError: + return "%s:%s:%s" % (self.FrameID, self.frameid, self.url) + HashKey = property(__HashKey) + def __eq__(self, other): + try: return (self.frameid, self.url, self.data) == other + except AttributeError: return (self.frameid, self.url) == other + +class POSS(Frame): + """Position synchronisation frame + + Attribute: + format -- format of the position attribute (frames or milliseconds) + position -- current position of the file + """ + _framespec = [ ByteSpec('format'), IntegerSpec('position') ] + + def __pos__(self): return self.position + def __eq__(self, other): return self.position == other + +class UFID(Frame): + """Unique file identifier. + + Attributes: + owner -- format/type of identifier + data -- identifier + """ + + _framespec = [ Latin1TextSpec('owner'), BinaryDataSpec('data') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.owner)) + def __eq__(s, o): + if isinstance(o, UFI): return s.owner == o.owner and s.data == o.data + else: return s.data == o + def _pprint(self): + isascii = ord(max(self.data)) < 128 + if isascii: return "%s=%s" % (self.owner, self.data) + else: return "%s (%d bytes)" % (self.owner, len(self.data)) + +class USER(Frame): + """Terms of use. + + Attributes: + encoding -- text encoding + lang -- ISO three letter language code + text -- licensing terms for the audio + """ + _framespec = [ EncodingSpec('encoding'), StringSpec('lang', 3), + EncodedTextSpec('text') ] + HashKey = property(lambda s: '%s:%r' % (s.FrameID, s.lang)) + + def __str__(self): return self.text.encode('utf-8') + def __unicode__(self): return self.text + def __eq__(self, other): return self.text == other + def _pprint(self): return "%r=%s" % (self.lang, self.text) + +class OWNE(Frame): + """Ownership frame.""" + _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('price'), + StringSpec('date', 8), EncodedTextSpec('seller') ] + + def __str__(self): return self.seller.encode('utf-8') + def __unicode__(self): return self.seller + def __eq__(self, other): return self.seller == other + +class COMR(FrameOpt): + """Commercial frame.""" + _framespec = [ EncodingSpec('encoding'), Latin1TextSpec('price'), + StringSpec('valid_until', 8), Latin1TextSpec('contact'), + ByteSpec('format'), EncodedTextSpec('seller'), + EncodedTextSpec('desc')] + _optionalspec = [ Latin1TextSpec('mime'), BinaryDataSpec('logo') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s._writeData())) + def __eq__(self, other): return self._writeData() == other._writeData() + +class ENCR(Frame): + """Encryption method registration. + + The standard does not allow multiple ENCR frames with the same owner + or the same method. Mutagen only verifies that the owner is unique. + """ + _framespec = [ Latin1TextSpec('owner'), ByteSpec('method'), + BinaryDataSpec('data') ] + HashKey = property(lambda s: "%s:%s" % (s.FrameID, s.owner)) + def __str__(self): return self.data + def __eq__(self, other): return self.data == other + +class GRID(FrameOpt): + """Group identification registration.""" + _framespec = [ Latin1TextSpec('owner'), ByteSpec('group') ] + _optionalspec = [ BinaryDataSpec('data') ] + HashKey = property(lambda s: '%s:%s' % (s.FrameID, s.group)) + def __pos__(self): return self.group + def __str__(self): return self.owner.encode('utf-8') + def __unicode__(self): return self.owner + def __eq__(self, other): return self.owner == other or self.group == other + + +class PRIV(Frame): + """Private frame.""" + _framespec = [ Latin1TextSpec('owner'), BinaryDataSpec('data') ] + HashKey = property(lambda s: '%s:%s:%s' % ( + s.FrameID, s.owner, s.data.decode('latin1'))) + def __str__(self): return self.data + def __eq__(self, other): return self.data == other + def _pprint(self): + isascii = ord(max(self.data)) < 128 + if isascii: return "%s=%s" % (self.owner, self.data) + else: return "%s (%d bytes)" % (self.owner, len(self.data)) + +class SIGN(Frame): + """Signature frame.""" + _framespec = [ ByteSpec('group'), BinaryDataSpec('sig') ] + HashKey = property(lambda s: '%s:%c:%s' % (s.FrameID, s.group, s.sig)) + def __str__(self): return self.sig + def __eq__(self, other): return self.sig == other + +class SEEK(Frame): + """Seek frame. + + Mutagen does not find tags at seek offsets. + """ + _framespec = [ IntegerSpec('offset') ] + def __pos__(self): return self.offset + def __eq__(self, other): return self.offset == other + +class ASPI(Frame): + """Audio seek point index. + + Attributes: S, L, N, b, and Fi. For the meaning of these, see + the ID3v2.4 specification. Fi is a list of integers. + """ + _framespec = [ SizedIntegerSpec("S", 4), SizedIntegerSpec("L", 4), + SizedIntegerSpec("N", 2), ByteSpec("b"), + ASPIIndexSpec("Fi") ] + def __eq__(self, other): return self.Fi == other + +Frames = dict([(k,v) for (k,v) in globals().items() + if len(k)==4 and isinstance(v, type) and issubclass(v, Frame)]) +"""All supported ID3v2 frames, keyed by frame name.""" +del(k); del(v) + +# ID3v2.2 frames +class UFI(UFID): "Unique File Identifier" + +class TT1(TIT1): "Content group description" +class TT2(TIT2): "Title" +class TT3(TIT3): "Subtitle/Description refinement" +class TP1(TPE1): "Lead Artist/Performer/Soloist/Group" +class TP2(TPE2): "Band/Orchestra/Accompaniment" +class TP3(TPE3): "Conductor" +class TP4(TPE4): "Interpreter/Remixer/Modifier" +class TCM(TCOM): "Composer" +class TXT(TEXT): "Lyricist" +class TLA(TLAN): "Audio Language(s)" +class TCO(TCON): "Content Type (Genre)" +class TAL(TALB): "Album" +class TPA(TPOS): "Part of set" +class TRK(TRCK): "Track Number" +class TRC(TSRC): "International Standard Recording Code (ISRC)" +class TYE(TYER): "Year of recording" +class TDA(TDAT): "Date of recording (DDMM)" +class TIM(TIME): "Time of recording (HHMM)" +class TRD(TRDA): "Recording Dates" +class TMT(TMED): "Source Media Type" +class TFT(TFLT): "File Type" +class TBP(TBPM): "Beats per minute" +class TCP(TCMP): "iTunes Compilation Flag" +class TCR(TCOP): "Copyright (C)" +class TPB(TPUB): "Publisher" +class TEN(TENC): "Encoder" +class TSS(TSSE): "Encoder settings" +class TOF(TOFN): "Original Filename" +class TLE(TLEN): "Audio Length (ms)" +class TSI(TSIZ): "Audio Data size (bytes)" +class TDY(TDLY): "Audio Delay (ms)" +class TKE(TKEY): "Starting Key" +class TOT(TOAL): "Original Album" +class TOA(TOPE): "Original Artist/Perfomer" +class TOL(TOLY): "Original Lyricist" +class TOR(TORY): "Original Release Year" + +class TXX(TXXX): "User-defined Text" + +class WAF(WOAF): "Official File Information" +class WAR(WOAR): "Official Artist/Performer Information" +class WAS(WOAS): "Official Source Information" +class WCM(WCOM): "Commercial Information" +class WCP(WCOP): "Copyright Information" +class WPB(WPUB): "Official Publisher Information" + +class WXX(WXXX): "User-defined URL" + +class IPL(IPLS): "Involved people list" +class MCI(MCDI): "Binary dump of CD's TOC" +class ETC(ETCO): "Event timing codes" +class MLL(MLLT): "MPEG location lookup table" +class STC(SYTC): "Synced tempo codes" +class ULT(USLT): "Unsychronised lyrics/text transcription" +class SLT(SYLT): "Synchronised lyrics/text" +class COM(COMM): "Comment" +#class RVA(RVAD) +#class EQU(EQUA) +class REV(RVRB): "Reverb" +class PIC(APIC): + """Attached Picture. + + The 'mime' attribute of an ID3v2.2 attached picture must be either + 'PNG' or 'JPG'. + """ + _framespec = [ EncodingSpec('encoding'), StringSpec('mime', 3), + ByteSpec('type'), EncodedTextSpec('desc'), BinaryDataSpec('data') ] +class GEO(GEOB): "General Encapsulated Object" +class CNT(PCNT): "Play counter" +class POP(POPM): "Popularimeter" +class BUF(RBUF): "Recommended buffer size" + +class CRM(Frame): + """Encrypted meta frame""" + _framespec = [ Latin1TextSpec('owner'), Latin1TextSpec('desc'), + BinaryDataSpec('data') ] + def __eq__(self, other): return self.data == other + +class CRA(AENC): "Audio encryption" + +class LNK(LINK): + """Linked information""" + _framespec = [ StringSpec('frameid', 3), Latin1TextSpec('url') ] + _optionalspec = [ BinaryDataSpec('data') ] + +Frames_2_2 = dict([(k,v) for (k,v) in globals().items() + if len(k)==3 and isinstance(v, type) and issubclass(v, Frame)]) + +# support open(filename) as interface +Open = ID3 + +# ID3v1.1 support. +def ParseID3v1(string): + """Parse an ID3v1 tag, returning a list of ID3v2.4 frames.""" + from struct import error as StructError + frames = {} + try: + tag, title, artist, album, year, comment, track, genre = unpack( + "3s30s30s30s4s29sBB", string) + except StructError: return None + + if tag != "TAG": return None + def fix(string): + return string.split("\x00")[0].strip().decode('latin1') + title, artist, album, year, comment = map( + fix, [title, artist, album, year, comment]) + + if title: frames["TIT2"] = TIT2(encoding=0, text=title) + if artist: frames["TPE1"] = TPE1(encoding=0, text=[artist]) + if album: frames["TALB"] = TALB(encoding=0, text=album) + if year: frames["TDRC"] = TDRC(encoding=0, text=year) + if comment: frames["COMM"] = COMM( + encoding=0, lang="eng", desc="ID3v1 Comment", text=comment) + # Don't read a track number if it looks like the comment was + # padded with spaces instead of nulls (thanks, WinAmp). + if track and (track != 32 or string[-3] == '\x00'): + frames["TRCK"] = TRCK(encoding=0, text=str(track)) + if genre != 255: frames["TCON"] = TCON(encoding=0, text=str(genre)) + return frames + +def MakeID3v1(id3): + """Return an ID3v1.1 tag string from a dict of ID3v2.4 frames.""" + + v1 = {} + + for v2id, name in {"TIT2": "title", "TPE1": "artist", + "TALB": "album"}.items(): + if v2id in id3: + text = id3[v2id].text[0].encode('latin1', 'replace')[:30] + else: text = "" + v1[name] = text + ("\x00" * (30 - len(text))) + + if "COMM" in id3: + cmnt = id3["COMM"].text[0].encode('latin1', 'replace')[:28] + else: cmnt = "" + v1["comment"] = cmnt + ("\x00" * (29 - len(cmnt))) + + if "TRCK" in id3: + try: v1["track"] = chr(+id3["TRCK"]) + except ValueError: v1["track"] = "\x00" + else: v1["track"] = "\x00" + + if "TCON" in id3: + try: genre = id3["TCON"].genres[0] + except IndexError: pass + else: + if genre in TCON.GENRES: + v1["genre"] = chr(TCON.GENRES.index(genre)) + if "genre" not in v1: v1["genre"] = "\xff" + + if "TDRC" in id3: v1["year"] = str(id3["TDRC"])[:4] + else: v1["year"] = "\x00\x00\x00\x00" + + return ("TAG%(title)s%(artist)s%(album)s%(year)s%(comment)s" + "%(track)s%(genre)s") % v1 + +class ID3FileType(mutagen.FileType): + """An unknown type of file with ID3 tags.""" + + class _Info(object): + length = 0 + def __init__(self, fileobj, offset): pass + pprint = staticmethod(lambda: "Unknown format with ID3 tag") + + def score(filename, fileobj, header): + return header.startswith("ID3") + score = staticmethod(score) + + def add_tags(self, ID3=ID3): + """Add an empty ID3 tag to the file. + + A custom tag reader may be used in instead of the default + mutagen.id3.ID3 object, e.g. an EasyID3 reader. + """ + if self.tags is None: + self.tags = ID3() + else: + raise error("an ID3 tag already exists") + + def load(self, filename, ID3=ID3, **kwargs): + """Load stream and tag information from a file. + + A custom tag reader may be used in instead of the default + mutagen.id3.ID3 object, e.g. an EasyID3 reader. + """ + self.filename = filename + try: self.tags = ID3(filename, **kwargs) + except error: self.tags = None + if self.tags is not None: + try: offset = self.tags.size + except AttributeError: offset = None + else: offset = None + try: + fileobj = file(filename, "rb") + self.info = self._Info(fileobj, offset) + finally: + fileobj.close() + diff --git a/lib/mutagen/m4a.py b/lib/mutagen/m4a.py new file mode 100644 index 000000000..b1786f2b2 --- /dev/null +++ b/lib/mutagen/m4a.py @@ -0,0 +1,496 @@ +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: m4a.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write MPEG-4 audio files with iTunes metadata. + +This module will read MPEG-4 audio information and metadata, +as found in Apple's M4A (aka MP4, M4B, M4P) files. + +There is no official specification for this format. The source code +for TagLib, FAAD, and various MPEG specifications at +http://developer.apple.com/documentation/QuickTime/QTFF/, +http://www.geocities.com/xhelmboyx/quicktime/formats/mp4-layout.txt, +and http://wiki.multimedia.cx/index.php?title=Apple_QuickTime were all +consulted. + +This module does not support 64 bit atom sizes, and so will not +work on metadata over 4GB. +""" + +import struct +import sys + +from cStringIO import StringIO + +from mutagen import FileType, Metadata +from mutagen._constants import GENRES +from mutagen._util import cdata, insert_bytes, delete_bytes, DictProxy + +class error(IOError): pass +class M4AMetadataError(error): pass +class M4AStreamInfoError(error): pass +class M4AMetadataValueError(ValueError, M4AMetadataError): pass + +import warnings +warnings.warn( + "mutagen.m4a is deprecated; use mutagen.mp4 instead.", DeprecationWarning) + +# This is not an exhaustive list of container atoms, but just the +# ones this module needs to peek inside. +_CONTAINERS = ["moov", "udta", "trak", "mdia", "meta", "ilst", + "stbl", "minf", "stsd"] +_SKIP_SIZE = { "meta": 4 } + +__all__ = ['M4A', 'Open', 'delete', 'M4ACover'] + +class M4ACover(str): + """A cover artwork. + + Attributes: + format -- format of the image (either FORMAT_JPEG or FORMAT_PNG) + """ + FORMAT_JPEG = 0x0D + FORMAT_PNG = 0x0E + + def __new__(cls, data, format=None): + self = str.__new__(cls, data) + if format is None: format= M4ACover.FORMAT_JPEG + self.format = format + return self + +class Atom(object): + """An individual atom. + + Attributes: + children -- list child atoms (or None for non-container atoms) + length -- length of this atom, including length and name + name -- four byte name of the atom, as a str + offset -- location in the constructor-given fileobj of this atom + + This structure should only be used internally by Mutagen. + """ + + children = None + + def __init__(self, fileobj): + self.offset = fileobj.tell() + self.length, self.name = struct.unpack(">I4s", fileobj.read(8)) + if self.length == 1: + raise error("64 bit atom sizes are not supported") + elif self.length < 8: + return + + if self.name in _CONTAINERS: + self.children = [] + fileobj.seek(_SKIP_SIZE.get(self.name, 0), 1) + while fileobj.tell() < self.offset + self.length: + self.children.append(Atom(fileobj)) + else: + fileobj.seek(self.offset + self.length, 0) + + def render(name, data): + """Render raw atom data.""" + # this raises OverflowError if Py_ssize_t can't handle the atom data + size = len(data) + 8 + if size <= 0xFFFFFFFF: + return struct.pack(">I4s", size, name) + data + else: + return struct.pack(">I4sQ", 1, name, size + 8) + data + render = staticmethod(render) + + def __getitem__(self, remaining): + """Look up a child atom, potentially recursively. + + e.g. atom['udta', 'meta'] => + """ + if not remaining: + return self + elif self.children is None: + raise KeyError("%r is not a container" % self.name) + for child in self.children: + if child.name == remaining[0]: + return child[remaining[1:]] + else: + raise KeyError, "%r not found" % remaining[0] + + def __repr__(self): + klass = self.__class__.__name__ + if self.children is None: + return "<%s name=%r length=%r offset=%r>" % ( + klass, self.name, self.length, self.offset) + else: + children = "\n".join([" " + line for child in self.children + for line in repr(child).splitlines()]) + return "<%s name=%r length=%r offset=%r\n%s>" % ( + klass, self.name, self.length, self.offset, children) + +class Atoms(object): + """Root atoms in a given file. + + Attributes: + atoms -- a list of top-level atoms as Atom objects + + This structure should only be used internally by Mutagen. + """ + def __init__(self, fileobj): + self.atoms = [] + fileobj.seek(0, 2) + end = fileobj.tell() + fileobj.seek(0) + while fileobj.tell() < end: + self.atoms.append(Atom(fileobj)) + + def path(self, *names): + """Look up and return the complete path of an atom. + + For example, atoms.path('moov', 'udta', 'meta') will return a + list of three atoms, corresponding to the moov, udta, and meta + atoms. + """ + path = [self] + for name in names: + path.append(path[-1][name,]) + return path[1:] + + def __getitem__(self, names): + """Look up a child atom. + + 'names' may be a list of atoms (['moov', 'udta']) or a string + specifying the complete path ('moov.udta'). + """ + if isinstance(names, basestring): + names = names.split(".") + for child in self.atoms: + if child.name == names[0]: + return child[names[1:]] + else: + raise KeyError, "%s not found" % names[0] + + def __repr__(self): + return "\n".join([repr(child) for child in self.atoms]) + +class M4ATags(DictProxy, Metadata): + """Dictionary containing Apple iTunes metadata list key/values. + + Keys are four byte identifiers, except for freeform ('----') + keys. Values are usually unicode strings, but some atoms have a + special structure: + cpil -- boolean + trkn, disk -- tuple of 16 bit ints (current, total) + tmpo -- 16 bit int + covr -- list of M4ACover objects (which are tagged strs) + gnre -- not supported. Use '\\xa9gen' instead. + + The freeform '----' frames use a key in the format '----:mean:name' + where 'mean' is usually 'com.apple.iTunes' and 'name' is a unique + identifier for this frame. The value is a str, but is probably + text that can be decoded as UTF-8. + + M4A tag data cannot exist outside of the structure of an M4A file, + so this class should not be manually instantiated. + + Unknown non-text tags are removed. + """ + + def load(self, atoms, fileobj): + try: ilst = atoms["moov.udta.meta.ilst"] + except KeyError, key: + raise M4AMetadataError(key) + for atom in ilst.children: + fileobj.seek(atom.offset + 8) + data = fileobj.read(atom.length - 8) + parse = self.__atoms.get(atom.name, (M4ATags.__parse_text,))[0] + parse(self, atom, data) + + def __key_sort((key1, v1), (key2, v2)): + # iTunes always writes the tags in order of "relevance", try + # to copy it as closely as possible. + order = ["\xa9nam", "\xa9ART", "\xa9wrt", "\xa9alb", + "\xa9gen", "gnre", "trkn", "disk", + "\xa9day", "cpil", "tmpo", "\xa9too", + "----", "covr", "\xa9lyr"] + order = dict(zip(order, range(len(order)))) + last = len(order) + # If there's no key-based way to distinguish, order by length. + # If there's still no way, go by string comparison on the + # values, so we at least have something determinstic. + return (cmp(order.get(key1[:4], last), order.get(key2[:4], last)) or + cmp(len(v1), len(v2)) or cmp(v1, v2)) + __key_sort = staticmethod(__key_sort) + + def save(self, filename): + """Save the metadata to the given filename.""" + values = [] + items = self.items() + items.sort(self.__key_sort) + for key, value in items: + render = self.__atoms.get( + key[:4], (None, M4ATags.__render_text))[1] + values.append(render(self, key, value)) + data = Atom.render("ilst", "".join(values)) + + # Find the old atoms. + fileobj = file(filename, "rb+") + try: + atoms = Atoms(fileobj) + + moov = atoms["moov"] + + if moov != atoms.atoms[-1]: + # "Free" the old moov block. Something in the mdat + # block is not happy when its offset changes and it + # won't play back. So, rather than try to figure that + # out, just move the moov atom to the end of the file. + offset = self.__move_moov(fileobj, moov) + else: + offset = 0 + + try: + path = atoms.path("moov", "udta", "meta", "ilst") + except KeyError: + self.__save_new(fileobj, atoms, data, offset) + else: + self.__save_existing(fileobj, atoms, path, data, offset) + finally: + fileobj.close() + + def __move_moov(self, fileobj, moov): + fileobj.seek(moov.offset) + data = fileobj.read(moov.length) + fileobj.seek(moov.offset) + free = Atom.render("free", "\x00" * (moov.length - 8)) + fileobj.write(free) + fileobj.seek(0, 2) + # Figure out how far we have to shift all our successive + # seek calls, relative to what the atoms say. + old_end = fileobj.tell() + fileobj.write(data) + return old_end - moov.offset + + def __save_new(self, fileobj, atoms, ilst, offset): + hdlr = Atom.render("hdlr", "\x00" * 8 + "mdirappl" + "\x00" * 9) + meta = Atom.render("meta", "\x00\x00\x00\x00" + hdlr + ilst) + moov, udta = atoms.path("moov", "udta") + insert_bytes(fileobj, len(meta), udta.offset + offset + 8) + fileobj.seek(udta.offset + offset + 8) + fileobj.write(meta) + self.__update_parents(fileobj, [moov, udta], len(meta), offset) + + def __save_existing(self, fileobj, atoms, path, data, offset): + # Replace the old ilst atom. + ilst = path.pop() + delta = len(data) - ilst.length + fileobj.seek(ilst.offset + offset) + if delta > 0: + insert_bytes(fileobj, delta, ilst.offset + offset) + elif delta < 0: + delete_bytes(fileobj, -delta, ilst.offset + offset) + fileobj.seek(ilst.offset + offset) + fileobj.write(data) + self.__update_parents(fileobj, path, delta, offset) + + def __update_parents(self, fileobj, path, delta, offset): + # Update all parent atoms with the new size. + for atom in path: + fileobj.seek(atom.offset + offset) + size = cdata.uint_be(fileobj.read(4)) + delta + fileobj.seek(atom.offset + offset) + fileobj.write(cdata.to_uint_be(size)) + + def __render_data(self, key, flags, data): + data = struct.pack(">2I", flags, 0) + data + return Atom.render(key, Atom.render("data", data)) + + def __parse_freeform(self, atom, data): + try: + fileobj = StringIO(data) + mean_length = cdata.uint_be(fileobj.read(4)) + # skip over 8 bytes of atom name, flags + mean = fileobj.read(mean_length - 4)[8:] + name_length = cdata.uint_be(fileobj.read(4)) + name = fileobj.read(name_length - 4)[8:] + value_length = cdata.uint_be(fileobj.read(4)) + # Name, flags, and reserved bytes + value = fileobj.read(value_length - 4)[12:] + except struct.error: + # Some ---- atoms have no data atom, I have no clue why + # they actually end up in the file. + pass + else: + self["%s:%s:%s" % (atom.name, mean, name)] = value + def __render_freeform(self, key, value): + dummy, mean, name = key.split(":", 2) + mean = struct.pack(">I4sI", len(mean) + 12, "mean", 0) + mean + name = struct.pack(">I4sI", len(name) + 12, "name", 0) + name + value = struct.pack(">I4s2I", len(value) + 16, "data", 0x1, 0) + value + final = mean + name + value + return Atom.render("----", mean + name + value) + + def __parse_pair(self, atom, data): + self[atom.name] = struct.unpack(">2H", data[18:22]) + def __render_pair(self, key, value): + track, total = value + if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: + data = struct.pack(">4H", 0, track, total, 0) + return self.__render_data(key, 0, data) + else: + raise M4AMetadataValueError("invalid numeric pair %r" % (value,)) + + def __render_pair_no_trailing(self, key, value): + track, total = value + if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: + data = struct.pack(">3H", 0, track, total) + return self.__render_data(key, 0, data) + else: + raise M4AMetadataValueError("invalid numeric pair %r" % (value,)) + + def __parse_genre(self, atom, data): + # Translate to a freeform genre. + genre = cdata.short_be(data[16:18]) + if "\xa9gen" not in self: + try: self["\xa9gen"] = GENRES[genre - 1] + except IndexError: pass + + def __parse_tempo(self, atom, data): + self[atom.name] = cdata.short_be(data[16:18]) + def __render_tempo(self, key, value): + if 0 <= value < 1 << 16: + return self.__render_data(key, 0x15, cdata.to_ushort_be(value)) + else: + raise M4AMetadataValueError("invalid short integer %r" % value) + + def __parse_compilation(self, atom, data): + try: self[atom.name] = bool(ord(data[16:17])) + except TypeError: self[atom.name] = False + + def __render_compilation(self, key, value): + return self.__render_data(key, 0x15, chr(bool(value))) + + def __parse_cover(self, atom, data): + length, name, format = struct.unpack(">I4sI", data[:12]) + if name != "data": + raise M4AMetadataError( + "unexpected atom %r inside 'covr'" % name) + if format not in (M4ACover.FORMAT_JPEG, M4ACover.FORMAT_PNG): + format = M4ACover.FORMAT_JPEG + self[atom.name]= M4ACover(data[16:length], format) + def __render_cover(self, key, value): + try: format = value.format + except AttributeError: format = M4ACover.FORMAT_JPEG + data = Atom.render("data", struct.pack(">2I", format, 0) + value) + return Atom.render(key, data) + + def __parse_text(self, atom, data): + flags = cdata.uint_be(data[8:12]) + if flags == 1: + self[atom.name] = data[16:].decode('utf-8', 'replace') + def __render_text(self, key, value): + return self.__render_data(key, 0x1, value.encode('utf-8')) + + def delete(self, filename): + self.clear() + self.save(filename) + + __atoms = { + "----": (__parse_freeform, __render_freeform), + "trkn": (__parse_pair, __render_pair), + "disk": (__parse_pair, __render_pair_no_trailing), + "gnre": (__parse_genre, None), + "tmpo": (__parse_tempo, __render_tempo), + "cpil": (__parse_compilation, __render_compilation), + "covr": (__parse_cover, __render_cover), + } + + def pprint(self): + values = [] + for key, value in self.iteritems(): + key = key.decode('latin1') + try: values.append("%s=%s" % (key, value)) + except UnicodeDecodeError: + values.append("%s=[%d bytes of data]" % (key, len(value))) + return "\n".join(values) + +class M4AInfo(object): + """MPEG-4 stream information. + + Attributes: + bitrate -- bitrate in bits per second, as an int + length -- file length in seconds, as a float + """ + + bitrate = 0 + + def __init__(self, atoms, fileobj): + hdlr = atoms["moov.trak.mdia.hdlr"] + fileobj.seek(hdlr.offset) + if "soun" not in fileobj.read(hdlr.length): + raise M4AStreamInfoError("track has no audio data") + + mdhd = atoms["moov.trak.mdia.mdhd"] + fileobj.seek(mdhd.offset) + data = fileobj.read(mdhd.length) + if ord(data[8]) == 0: + offset = 20 + format = ">2I" + else: + offset = 28 + format = ">IQ" + end = offset + struct.calcsize(format) + unit, length = struct.unpack(format, data[offset:end]) + self.length = float(length) / unit + + try: + atom = atoms["moov.trak.mdia.minf.stbl.stsd"] + fileobj.seek(atom.offset) + data = fileobj.read(atom.length) + self.bitrate = cdata.uint_be(data[-17:-13]) + except (ValueError, KeyError): + # Bitrate values are optional. + pass + + def pprint(self): + return "MPEG-4 audio, %.2f seconds, %d bps" % ( + self.length, self.bitrate) + +class M4A(FileType): + """An MPEG-4 audio file, probably containing AAC. + + If more than one track is present in the file, the first is used. + Only audio ('soun') tracks will be read. + """ + + _mimes = ["audio/mp4", "audio/x-m4a", "audio/mpeg4", "audio/aac"] + + def load(self, filename): + self.filename = filename + fileobj = file(filename, "rb") + try: + atoms = Atoms(fileobj) + try: self.info = M4AInfo(atoms, fileobj) + except StandardError, err: + raise M4AStreamInfoError, err, sys.exc_info()[2] + try: self.tags = M4ATags(atoms, fileobj) + except M4AMetadataError: + self.tags = None + except StandardError, err: + raise M4AMetadataError, err, sys.exc_info()[2] + finally: + fileobj.close() + + def add_tags(self): + self.tags = M4ATags() + + def score(filename, fileobj, header): + return ("ftyp" in header) + ("mp4" in header) + score = staticmethod(score) + +Open = M4A + +def delete(filename): + """Remove tags from a file.""" + M4A(filename).delete() diff --git a/lib/mutagen/monkeysaudio.py b/lib/mutagen/monkeysaudio.py new file mode 100644 index 000000000..08d26387b --- /dev/null +++ b/lib/mutagen/monkeysaudio.py @@ -0,0 +1,80 @@ +# A Monkey's Audio (APE) reader/tagger +# +# Copyright 2006 Lukas Lalinsky +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: monkeysaudio.py 4275 2008-06-01 06:32:37Z piman $ + +"""Monkey's Audio streams with APEv2 tags. + +Monkey's Audio is a very efficient lossless audio compressor developed +by Matt Ashland. + +For more information, see http://www.monkeysaudio.com/. +""" + +__all__ = ["MonkeysAudio", "Open", "delete"] + +import struct + +from mutagen.apev2 import APEv2File, error, delete +from mutagen._util import cdata + +class MonkeysAudioHeaderError(error): pass + +class MonkeysAudioInfo(object): + """Monkey's Audio stream information. + + Attributes: + channels -- number of audio channels + length -- file length in seconds, as a float + sample_rate -- audio sampling rate in Hz + bits_per_sample -- bits per sample + version -- Monkey's Audio stream version, as a float (eg: 3.99) + """ + + def __init__(self, fileobj): + header = fileobj.read(76) + if len(header) != 76 or not header.startswith("MAC "): + raise MonkeysAudioHeaderError("not a Monkey's Audio file") + self.version = cdata.ushort_le(header[4:6]) + if self.version >= 3980: + (blocks_per_frame, final_frame_blocks, total_frames, + self.bits_per_sample, self.channels, + self.sample_rate) = struct.unpack("= 3950: + blocks_per_frame = 73728 * 4 + elif self.version >= 3900 or (self.version >= 3800 and + compression_level == 4): + blocks_per_frame = 73728 + else: + blocks_per_frame = 9216 + self.version /= 1000.0 + self.length = 0.0 + if self.sample_rate != 0 and total_frames > 0: + total_blocks = ((total_frames - 1) * blocks_per_frame + + final_frame_blocks) + self.length = float(total_blocks) / self.sample_rate + + def pprint(self): + return "Monkey's Audio %.2f, %.2f seconds, %d Hz" % ( + self.version, self.length, self.sample_rate) + +class MonkeysAudio(APEv2File): + _Info = MonkeysAudioInfo + _mimes = ["audio/ape", "audio/x-ape"] + + def score(filename, fileobj, header): + return header.startswith("MAC ") + filename.lower().endswith(".ape") + score = staticmethod(score) + +Open = MonkeysAudio diff --git a/lib/mutagen/mp3.py b/lib/mutagen/mp3.py new file mode 100644 index 000000000..41c4887ee --- /dev/null +++ b/lib/mutagen/mp3.py @@ -0,0 +1,223 @@ +# MP3 stream header information support for Mutagen. +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. + +"""MPEG audio stream information and tags.""" + +import os +import struct + +from mutagen.id3 import ID3FileType, BitPaddedInt, delete + +class error(RuntimeError): pass +class HeaderNotFoundError(error, IOError): pass +class InvalidMPEGHeader(error, IOError): pass + +# Mode values. +STEREO, JOINTSTEREO, DUALCHANNEL, MONO = range(4) + +class MPEGInfo(object): + """MPEG audio stream information + + Parse information about an MPEG audio file. This also reads the + Xing VBR header format. + + This code was implemented based on the format documentation at + http://www.dv.co.yu/mpgscript/mpeghdr.htm. + + Useful attributes: + length -- audio length, in seconds + bitrate -- audio bitrate, in bits per second + sketchy -- if true, the file may not be valid MPEG audio + + Useless attributes: + version -- MPEG version (1, 2, 2.5) + layer -- 1, 2, or 3 + mode -- One of STEREO, JOINTSTEREO, DUALCHANNEL, or MONO (0-3) + protected -- whether or not the file is "protected" + padding -- whether or not audio frames are padded + sample_rate -- audio sample rate, in Hz + """ + + # Map (version, layer) tuples to bitrates. + __BITRATE = { + (1, 1): range(0, 480, 32), + (1, 2): [0, 32, 48, 56, 64, 80, 96, 112,128,160,192,224,256,320,384], + (1, 3): [0, 32, 40, 48, 56, 64, 80, 96, 112,128,160,192,224,256,320], + (2, 1): [0, 32, 48, 56, 64, 80, 96, 112,128,144,160,176,192,224,256], + (2, 2): [0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96,112,128,144,160], + } + + __BITRATE[(2, 3)] = __BITRATE[(2, 2)] + for i in range(1, 4): __BITRATE[(2.5, i)] = __BITRATE[(2, i)] + + # Map version to sample rates. + __RATES = { + 1: [44100, 48000, 32000], + 2: [22050, 24000, 16000], + 2.5: [11025, 12000, 8000] + } + + sketchy = False + + def __init__(self, fileobj, offset=None): + """Parse MPEG stream information from a file-like object. + + If an offset argument is given, it is used to start looking + for stream information and Xing headers; otherwise, ID3v2 tags + will be skipped automatically. A correct offset can make + loading files significantly faster. + """ + + try: size = os.path.getsize(fileobj.name) + except (IOError, OSError, AttributeError): + fileobj.seek(0, 2) + size = fileobj.tell() + + # If we don't get an offset, try to skip an ID3v2 tag. + if offset is None: + fileobj.seek(0, 0) + idata = fileobj.read(10) + try: id3, insize = struct.unpack('>3sxxx4s', idata) + except struct.error: id3, insize = '', 0 + insize = BitPaddedInt(insize) + if id3 == 'ID3' and insize > 0: + offset = insize + else: offset = 0 + + # Try to find two valid headers (meaning, very likely MPEG data) + # at the given offset, 30% through the file, 60% through the file, + # and 90% through the file. + for i in [offset, 0.3 * size, 0.6 * size, 0.9 * size]: + try: self.__try(fileobj, int(i), size - offset) + except error, e: pass + else: break + # If we can't find any two consecutive frames, try to find just + # one frame back at the original offset given. + else: + self.__try(fileobj, offset, size - offset, False) + self.sketchy = True + + def __try(self, fileobj, offset, real_size, check_second=True): + # This is going to be one really long function; bear with it, + # because there's not really a sane point to cut it up. + fileobj.seek(offset, 0) + + # We "know" we have an MPEG file if we find two frames that look like + # valid MPEG data. If we can't find them in 32k of reads, something + # is horribly wrong (the longest frame can only be about 4k). This + # is assuming the offset didn't lie. + data = fileobj.read(32768) + + frame_1 = data.find("\xff") + while 0 <= frame_1 <= len(data) - 4: + frame_data = struct.unpack(">I", data[frame_1:frame_1 + 4])[0] + if (frame_data >> 16) & 0xE0 != 0xE0: + frame_1 = data.find("\xff", frame_1 + 2) + else: + version = (frame_data >> 19) & 0x3 + layer = (frame_data >> 17) & 0x3 + protection = (frame_data >> 16) & 0x1 + bitrate = (frame_data >> 12) & 0xF + sample_rate = (frame_data >> 10) & 0x3 + padding = (frame_data >> 9) & 0x1 + private = (frame_data >> 8) & 0x1 + self.mode = (frame_data >> 6) & 0x3 + mode_extension = (frame_data >> 4) & 0x3 + copyright = (frame_data >> 3) & 0x1 + original = (frame_data >> 2) & 0x1 + emphasis = (frame_data >> 0) & 0x3 + if (version == 1 or layer == 0 or sample_rate == 0x3 or + bitrate == 0 or bitrate == 0xF): + frame_1 = data.find("\xff", frame_1 + 2) + else: break + else: + raise HeaderNotFoundError("can't sync to an MPEG frame") + + # There is a serious problem here, which is that many flags + # in an MPEG header are backwards. + self.version = [2.5, None, 2, 1][version] + self.layer = 4 - layer + self.protected = not protection + self.padding = bool(padding) + + self.bitrate = self.__BITRATE[(self.version, self.layer)][bitrate] + self.bitrate *= 1000 + self.sample_rate = self.__RATES[self.version][sample_rate] + + if self.layer == 1: + frame_length = (12 * self.bitrate / self.sample_rate + padding) * 4 + frame_size = 384 + else: + frame_length = 144 * self.bitrate / self.sample_rate + padding + frame_size = 1152 + + if check_second: + possible = frame_1 + frame_length + if possible > len(data) + 4: + raise HeaderNotFoundError("can't sync to second MPEG frame") + frame_data = struct.unpack(">H", data[possible:possible + 2])[0] + if frame_data & 0xFFE0 != 0xFFE0: + raise HeaderNotFoundError("can't sync to second MPEG frame") + + frame_count = real_size / float(frame_length) + samples = frame_size * frame_count + self.length = samples / self.sample_rate + + # Try to find/parse the Xing header, which trumps the above length + # and bitrate calculation. + fileobj.seek(offset, 0) + data = fileobj.read(32768) + try: + xing = data[:-4].index("Xing") + except ValueError: + # Try to find/parse the VBRI header, which trumps the above length + # calculation. + try: + vbri = data[:-24].index("VBRI") + except ValueError: pass + else: + # If a VBRI header was found, this is definitely MPEG audio. + self.sketchy = False + vbri_version = struct.unpack('>H', data[vbri + 4:vbri + 6])[0] + if vbri_version == 1: + frame_count = struct.unpack('>I', data[vbri + 14:vbri + 18])[0] + samples = frame_size * frame_count + self.length = (samples / self.sample_rate) or self.length + else: + # If a Xing header was found, this is definitely MPEG audio. + self.sketchy = False + flags = struct.unpack('>I', data[xing + 4:xing + 8])[0] + if flags & 0x1: + frame_count = struct.unpack('>I', data[xing + 8:xing + 12])[0] + samples = frame_size * frame_count + self.length = (samples / self.sample_rate) or self.length + if flags & 0x2: + bytes = struct.unpack('>I', data[xing + 12:xing + 16])[0] + self.bitrate = int((bytes * 8) // self.length) + + def pprint(self): + s = "MPEG %s layer %d, %d bps, %s Hz, %.2f seconds" % ( + self.version, self.layer, self.bitrate, self.sample_rate, + self.length) + if self.sketchy: s += " (sketchy)" + return s + +class MP3(ID3FileType): + """An MPEG audio (usually MPEG-1 Layer 3) file.""" + + _Info = MPEGInfo + _mimes = ["audio/mp3", "audio/x-mp3", "audio/mpeg", "audio/mpg", + "audio/x-mpeg"] + + def score(filename, fileobj, header): + filename = filename.lower() + return (header.startswith("ID3") * 2 + filename.endswith(".mp3") + + filename.endswith(".mp2") + filename.endswith(".mpg") + + filename.endswith(".mpeg")) + score = staticmethod(score) + +Open = MP3 diff --git a/lib/mutagen/mp4.py b/lib/mutagen/mp4.py new file mode 100644 index 000000000..122b75308 --- /dev/null +++ b/lib/mutagen/mp4.py @@ -0,0 +1,670 @@ +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: mp4.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write MPEG-4 audio files with iTunes metadata. + +This module will read MPEG-4 audio information and metadata, +as found in Apple's MP4 (aka M4A, M4B, M4P) files. + +There is no official specification for this format. The source code +for TagLib, FAAD, and various MPEG specifications at +http://developer.apple.com/documentation/QuickTime/QTFF/, +http://www.geocities.com/xhelmboyx/quicktime/formats/mp4-layout.txt, +http://standards.iso.org/ittf/PubliclyAvailableStandards/c041828_ISO_IEC_14496-12_2005(E).zip, +and http://wiki.multimedia.cx/index.php?title=Apple_QuickTime were all +consulted. +""" + +import struct +import sys + +from mutagen import FileType, Metadata +from mutagen._constants import GENRES +from mutagen._util import cdata, insert_bytes, delete_bytes, DictProxy + +class error(IOError): pass +class MP4MetadataError(error): pass +class MP4StreamInfoError(error): pass +class MP4MetadataValueError(ValueError, MP4MetadataError): pass + +# This is not an exhaustive list of container atoms, but just the +# ones this module needs to peek inside. +_CONTAINERS = ["moov", "udta", "trak", "mdia", "meta", "ilst", + "stbl", "minf", "moof", "traf"] +_SKIP_SIZE = { "meta": 4 } + +__all__ = ['MP4', 'Open', 'delete', 'MP4Cover'] + +class MP4Cover(str): + """A cover artwork. + + Attributes: + format -- format of the image (either FORMAT_JPEG or FORMAT_PNG) + """ + FORMAT_JPEG = 0x0D + FORMAT_PNG = 0x0E + + def __new__(cls, data, format=None): + self = str.__new__(cls, data) + if format is None: format= MP4Cover.FORMAT_JPEG + self.format = format + return self + +class Atom(object): + """An individual atom. + + Attributes: + children -- list child atoms (or None for non-container atoms) + length -- length of this atom, including length and name + name -- four byte name of the atom, as a str + offset -- location in the constructor-given fileobj of this atom + + This structure should only be used internally by Mutagen. + """ + + children = None + + def __init__(self, fileobj): + self.offset = fileobj.tell() + self.length, self.name = struct.unpack(">I4s", fileobj.read(8)) + if self.length == 1: + self.length, = struct.unpack(">Q", fileobj.read(8)) + elif self.length < 8: + return + + if self.name in _CONTAINERS: + self.children = [] + fileobj.seek(_SKIP_SIZE.get(self.name, 0), 1) + while fileobj.tell() < self.offset + self.length: + self.children.append(Atom(fileobj)) + else: + fileobj.seek(self.offset + self.length, 0) + + def render(name, data): + """Render raw atom data.""" + # this raises OverflowError if Py_ssize_t can't handle the atom data + size = len(data) + 8 + if size <= 0xFFFFFFFF: + return struct.pack(">I4s", size, name) + data + else: + return struct.pack(">I4sQ", 1, name, size + 8) + data + render = staticmethod(render) + + def findall(self, name, recursive=False): + """Recursively find all child atoms by specified name.""" + if self.children is not None: + for child in self.children: + if child.name == name: + yield child + if recursive: + for atom in child.findall(name, True): + yield atom + + def __getitem__(self, remaining): + """Look up a child atom, potentially recursively. + + e.g. atom['udta', 'meta'] => + """ + if not remaining: + return self + elif self.children is None: + raise KeyError("%r is not a container" % self.name) + for child in self.children: + if child.name == remaining[0]: + return child[remaining[1:]] + else: + raise KeyError, "%r not found" % remaining[0] + + def __repr__(self): + klass = self.__class__.__name__ + if self.children is None: + return "<%s name=%r length=%r offset=%r>" % ( + klass, self.name, self.length, self.offset) + else: + children = "\n".join([" " + line for child in self.children + for line in repr(child).splitlines()]) + return "<%s name=%r length=%r offset=%r\n%s>" % ( + klass, self.name, self.length, self.offset, children) + +class Atoms(object): + """Root atoms in a given file. + + Attributes: + atoms -- a list of top-level atoms as Atom objects + + This structure should only be used internally by Mutagen. + """ + def __init__(self, fileobj): + self.atoms = [] + fileobj.seek(0, 2) + end = fileobj.tell() + fileobj.seek(0) + while fileobj.tell() + 8 <= end: + self.atoms.append(Atom(fileobj)) + + def path(self, *names): + """Look up and return the complete path of an atom. + + For example, atoms.path('moov', 'udta', 'meta') will return a + list of three atoms, corresponding to the moov, udta, and meta + atoms. + """ + path = [self] + for name in names: + path.append(path[-1][name,]) + return path[1:] + + def __getitem__(self, names): + """Look up a child atom. + + 'names' may be a list of atoms (['moov', 'udta']) or a string + specifying the complete path ('moov.udta'). + """ + if isinstance(names, basestring): + names = names.split(".") + for child in self.atoms: + if child.name == names[0]: + return child[names[1:]] + else: + raise KeyError, "%s not found" % names[0] + + def __repr__(self): + return "\n".join([repr(child) for child in self.atoms]) + +class MP4Tags(DictProxy, Metadata): + """Dictionary containing Apple iTunes metadata list key/values. + + Keys are four byte identifiers, except for freeform ('----') + keys. Values are usually unicode strings, but some atoms have a + special structure: + + Text values (multiple values per key are supported): + '\xa9nam' -- track title + '\xa9alb' -- album + '\xa9ART' -- artist + 'aART' -- album artist + '\xa9wrt' -- composer + '\xa9day' -- year + '\xa9cmt' -- comment + 'desc' -- description (usually used in podcasts) + 'purd' -- purchase date + '\xa9grp' -- grouping + '\xa9gen' -- genre + '\xa9lyr' -- lyrics + 'purl' -- podcast URL + 'egid' -- podcast episode GUID + 'catg' -- podcast category + 'keyw' -- podcast keywords + '\xa9too' -- encoded by + 'cprt' -- copyright + 'soal' -- album sort order + 'soaa' -- album artist sort order + 'soar' -- artist sort order + 'sonm' -- title sort order + 'soco' -- composer sort order + 'sosn' -- show sort order + 'tvsh' -- show name + + Boolean values: + 'cpil' -- part of a compilation + 'pgap' -- part of a gapless album + 'pcst' -- podcast (iTunes reads this only on import) + + Tuples of ints (multiple values per key are supported): + 'trkn' -- track number, total tracks + 'disk' -- disc number, total discs + + Others: + 'tmpo' -- tempo/BPM, 16 bit int + 'covr' -- cover artwork, list of MP4Cover objects (which are + tagged strs) + 'gnre' -- ID3v1 genre. Not supported, use '\xa9gen' instead. + + The freeform '----' frames use a key in the format '----:mean:name' + where 'mean' is usually 'com.apple.iTunes' and 'name' is a unique + identifier for this frame. The value is a str, but is probably + text that can be decoded as UTF-8. Multiple values per key are + supported. + + MP4 tag data cannot exist outside of the structure of an MP4 file, + so this class should not be manually instantiated. + + Unknown non-text tags are removed. + """ + + def load(self, atoms, fileobj): + try: ilst = atoms["moov.udta.meta.ilst"] + except KeyError, key: + raise MP4MetadataError(key) + for atom in ilst.children: + fileobj.seek(atom.offset + 8) + data = fileobj.read(atom.length - 8) + info = self.__atoms.get(atom.name, (MP4Tags.__parse_text, None)) + info[0](self, atom, data, *info[2:]) + + def __key_sort((key1, v1), (key2, v2)): + # iTunes always writes the tags in order of "relevance", try + # to copy it as closely as possible. + order = ["\xa9nam", "\xa9ART", "\xa9wrt", "\xa9alb", + "\xa9gen", "gnre", "trkn", "disk", + "\xa9day", "cpil", "pgap", "pcst", "tmpo", + "\xa9too", "----", "covr", "\xa9lyr"] + order = dict(zip(order, range(len(order)))) + last = len(order) + # If there's no key-based way to distinguish, order by length. + # If there's still no way, go by string comparison on the + # values, so we at least have something determinstic. + return (cmp(order.get(key1[:4], last), order.get(key2[:4], last)) or + cmp(len(v1), len(v2)) or cmp(v1, v2)) + __key_sort = staticmethod(__key_sort) + + def save(self, filename): + """Save the metadata to the given filename.""" + values = [] + items = self.items() + items.sort(self.__key_sort) + for key, value in items: + info = self.__atoms.get(key[:4], (None, MP4Tags.__render_text)) + try: + values.append(info[1](self, key, value, *info[2:])) + except (TypeError, ValueError), s: + raise MP4MetadataValueError, s, sys.exc_info()[2] + data = Atom.render("ilst", "".join(values)) + + # Find the old atoms. + fileobj = file(filename, "rb+") + try: + atoms = Atoms(fileobj) + try: + path = atoms.path("moov", "udta", "meta", "ilst") + except KeyError: + self.__save_new(fileobj, atoms, data) + else: + self.__save_existing(fileobj, atoms, path, data) + finally: + fileobj.close() + + def __pad_ilst(self, data, length=None): + if length is None: + length = ((len(data) + 1023) & ~1023) - len(data) + return Atom.render("free", "\x00" * length) + + def __save_new(self, fileobj, atoms, ilst): + hdlr = Atom.render("hdlr", "\x00" * 8 + "mdirappl" + "\x00" * 9) + meta = Atom.render( + "meta", "\x00\x00\x00\x00" + hdlr + ilst + self.__pad_ilst(ilst)) + try: + path = atoms.path("moov", "udta") + except KeyError: + # moov.udta not found -- create one + path = atoms.path("moov") + meta = Atom.render("udta", meta) + offset = path[-1].offset + 8 + insert_bytes(fileobj, len(meta), offset) + fileobj.seek(offset) + fileobj.write(meta) + self.__update_parents(fileobj, path, len(meta)) + self.__update_offsets(fileobj, atoms, len(meta), offset) + + def __save_existing(self, fileobj, atoms, path, data): + # Replace the old ilst atom. + ilst = path.pop() + offset = ilst.offset + length = ilst.length + + # Check for padding "free" atoms + meta = path[-1] + index = meta.children.index(ilst) + try: + prev = meta.children[index-1] + if prev.name == "free": + offset = prev.offset + length += prev.length + except IndexError: + pass + try: + next = meta.children[index+1] + if next.name == "free": + length += next.length + except IndexError: + pass + + delta = len(data) - length + if delta > 0 or (delta < 0 and delta > -8): + data += self.__pad_ilst(data) + delta = len(data) - length + insert_bytes(fileobj, delta, offset) + elif delta < 0: + data += self.__pad_ilst(data, -delta - 8) + delta = 0 + + fileobj.seek(offset) + fileobj.write(data) + self.__update_parents(fileobj, path, delta) + self.__update_offsets(fileobj, atoms, delta, offset) + + def __update_parents(self, fileobj, path, delta): + """Update all parent atoms with the new size.""" + for atom in path: + fileobj.seek(atom.offset) + size = cdata.uint_be(fileobj.read(4)) + delta + fileobj.seek(atom.offset) + fileobj.write(cdata.to_uint_be(size)) + + def __update_offset_table(self, fileobj, fmt, atom, delta, offset): + """Update offset table in the specified atom.""" + if atom.offset > offset: + atom.offset += delta + fileobj.seek(atom.offset + 12) + data = fileobj.read(atom.length - 12) + fmt = fmt % cdata.uint_be(data[:4]) + offsets = struct.unpack(fmt, data[4:]) + offsets = [o + (0, delta)[offset < o] for o in offsets] + fileobj.seek(atom.offset + 16) + fileobj.write(struct.pack(fmt, *offsets)) + + def __update_tfhd(self, fileobj, atom, delta, offset): + if atom.offset > offset: + atom.offset += delta + fileobj.seek(atom.offset + 9) + data = fileobj.read(atom.length - 9) + flags = cdata.uint_be("\x00" + data[:3]) + if flags & 1: + o = cdata.ulonglong_be(data[7:15]) + if o > offset: + o += delta + fileobj.seek(atom.offset + 16) + fileobj.write(cdata.to_ulonglong_be(o)) + + def __update_offsets(self, fileobj, atoms, delta, offset): + """Update offset tables in all 'stco' and 'co64' atoms.""" + if delta == 0: + return + moov = atoms["moov"] + for atom in moov.findall('stco', True): + self.__update_offset_table(fileobj, ">%dI", atom, delta, offset) + for atom in moov.findall('co64', True): + self.__update_offset_table(fileobj, ">%dQ", atom, delta, offset) + try: + for atom in atoms["moof"].findall('tfhd', True): + self.__update_tfhd(fileobj, atom, delta, offset) + except KeyError: + pass + + def __parse_data(self, atom, data): + pos = 0 + while pos < atom.length - 8: + length, name, flags = struct.unpack(">I4sI", data[pos:pos+12]) + if name != "data": + raise MP4MetadataError( + "unexpected atom %r inside %r" % (name, atom.name)) + yield flags, data[pos+16:pos+length] + pos += length + def __render_data(self, key, flags, value): + return Atom.render(key, "".join([ + Atom.render("data", struct.pack(">2I", flags, 0) + data) + for data in value])) + + def __parse_freeform(self, atom, data): + length = cdata.uint_be(data[:4]) + mean = data[12:length] + pos = length + length = cdata.uint_be(data[pos:pos+4]) + name = data[pos+12:pos+length] + pos += length + value = [] + while pos < atom.length - 8: + length, atom_name = struct.unpack(">I4s", data[pos:pos+8]) + if atom_name != "data": + raise MP4MetadataError( + "unexpected atom %r inside %r" % (atom_name, atom.name)) + value.append(data[pos+16:pos+length]) + pos += length + if value: + self["%s:%s:%s" % (atom.name, mean, name)] = value + def __render_freeform(self, key, value): + dummy, mean, name = key.split(":", 2) + mean = struct.pack(">I4sI", len(mean) + 12, "mean", 0) + mean + name = struct.pack(">I4sI", len(name) + 12, "name", 0) + name + if isinstance(value, basestring): + value = [value] + return Atom.render("----", mean + name + "".join([ + struct.pack(">I4s2I", len(data) + 16, "data", 1, 0) + data + for data in value])) + + def __parse_pair(self, atom, data): + self[atom.name] = [struct.unpack(">2H", data[2:6]) for + flags, data in self.__parse_data(atom, data)] + def __render_pair(self, key, value): + data = [] + for (track, total) in value: + if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: + data.append(struct.pack(">4H", 0, track, total, 0)) + else: + raise MP4MetadataValueError( + "invalid numeric pair %r" % ((track, total),)) + return self.__render_data(key, 0, data) + + def __render_pair_no_trailing(self, key, value): + data = [] + for (track, total) in value: + if 0 <= track < 1 << 16 and 0 <= total < 1 << 16: + data.append(struct.pack(">3H", 0, track, total)) + else: + raise MP4MetadataValueError( + "invalid numeric pair %r" % ((track, total),)) + return self.__render_data(key, 0, data) + + def __parse_genre(self, atom, data): + # Translate to a freeform genre. + genre = cdata.short_be(data[16:18]) + if "\xa9gen" not in self: + try: self["\xa9gen"] = [GENRES[genre - 1]] + except IndexError: pass + + def __parse_tempo(self, atom, data): + self[atom.name] = [cdata.ushort_be(value[1]) for + value in self.__parse_data(atom, data)] + + def __render_tempo(self, key, value): + try: + if len(value) == 0: + return self.__render_data(key, 0x15, "") + + if min(value) < 0 or max(value) >= 2**16: + raise MP4MetadataValueError( + "invalid 16 bit integers: %r" % value) + except TypeError: + raise MP4MetadataValueError( + "tmpo must be a list of 16 bit integers") + + values = map(cdata.to_ushort_be, value) + return self.__render_data(key, 0x15, values) + + def __parse_bool(self, atom, data): + try: self[atom.name] = bool(ord(data[16:17])) + except TypeError: self[atom.name] = False + def __render_bool(self, key, value): + return self.__render_data(key, 0x15, [chr(bool(value))]) + + def __parse_cover(self, atom, data): + self[atom.name] = [] + pos = 0 + while pos < atom.length - 8: + length, name, format = struct.unpack(">I4sI", data[pos:pos+12]) + if name != "data": + raise MP4MetadataError( + "unexpected atom %r inside 'covr'" % name) + if format not in (MP4Cover.FORMAT_JPEG, MP4Cover.FORMAT_PNG): + format = MP4Cover.FORMAT_JPEG + cover = MP4Cover(data[pos+16:pos+length], format) + self[atom.name].append(MP4Cover(data[pos+16:pos+length], format)) + pos += length + def __render_cover(self, key, value): + atom_data = [] + for cover in value: + try: format = cover.format + except AttributeError: format = MP4Cover.FORMAT_JPEG + atom_data.append( + Atom.render("data", struct.pack(">2I", format, 0) + cover)) + return Atom.render(key, "".join(atom_data)) + + def __parse_text(self, atom, data, expected_flags=1): + value = [text.decode('utf-8', 'replace') for flags, text + in self.__parse_data(atom, data) + if flags == expected_flags] + if value: + self[atom.name] = value + def __render_text(self, key, value, flags=1): + if isinstance(value, basestring): + value = [value] + return self.__render_data( + key, flags, [text.encode('utf-8') for text in value]) + + def delete(self, filename): + self.clear() + self.save(filename) + + __atoms = { + "----": (__parse_freeform, __render_freeform), + "trkn": (__parse_pair, __render_pair), + "disk": (__parse_pair, __render_pair_no_trailing), + "gnre": (__parse_genre, None), + "tmpo": (__parse_tempo, __render_tempo), + "cpil": (__parse_bool, __render_bool), + "pgap": (__parse_bool, __render_bool), + "pcst": (__parse_bool, __render_bool), + "covr": (__parse_cover, __render_cover), + "purl": (__parse_text, __render_text, 0), + "egid": (__parse_text, __render_text, 0), + } + + def pprint(self): + values = [] + for key, value in self.iteritems(): + key = key.decode('latin1') + if key == "covr": + values.append("%s=%s" % (key, ", ".join( + ["[%d bytes of data]" % len(data) for data in value]))) + elif isinstance(value, list): + values.append("%s=%s" % (key, " / ".join(map(unicode, value)))) + else: + values.append("%s=%s" % (key, value)) + return "\n".join(values) + +class MP4Info(object): + """MPEG-4 stream information. + + Attributes: + bitrate -- bitrate in bits per second, as an int + length -- file length in seconds, as a float + channels -- number of audio channels + sample_rate -- audio sampling rate in Hz + bits_per_sample -- bits per sample + """ + + bitrate = 0 + channels = 0 + sample_rate = 0 + bits_per_sample = 0 + + def __init__(self, atoms, fileobj): + for trak in list(atoms["moov"].findall("trak")): + hdlr = trak["mdia", "hdlr"] + fileobj.seek(hdlr.offset) + data = fileobj.read(hdlr.length) + if data[16:20] == "soun": + break + else: + raise MP4StreamInfoError("track has no audio data") + + mdhd = trak["mdia", "mdhd"] + fileobj.seek(mdhd.offset) + data = fileobj.read(mdhd.length) + if ord(data[8]) == 0: + offset = 20 + format = ">2I" + else: + offset = 28 + format = ">IQ" + end = offset + struct.calcsize(format) + unit, length = struct.unpack(format, data[offset:end]) + self.length = float(length) / unit + + try: + atom = trak["mdia", "minf", "stbl", "stsd"] + fileobj.seek(atom.offset) + data = fileobj.read(atom.length) + if data[20:24] == "mp4a": + length = cdata.uint_be(data[16:20]) + (self.channels, self.bits_per_sample, _, + self.sample_rate) = struct.unpack(">3HI", data[40:50]) + # ES descriptor type + if data[56:60] == "esds" and ord(data[64:65]) == 0x03: + pos = 65 + # skip extended descriptor type tag, length, ES ID + # and stream priority + if data[pos:pos+3] == "\x80\x80\x80": + pos += 3 + pos += 4 + # decoder config descriptor type + if ord(data[pos]) == 0x04: + pos += 1 + # skip extended descriptor type tag, length, + # object type ID, stream type, buffer size + # and maximum bitrate + if data[pos:pos+3] == "\x80\x80\x80": + pos += 3 + pos += 10 + # average bitrate + self.bitrate = cdata.uint_be(data[pos:pos+4]) + except (ValueError, KeyError): + # stsd atoms are optional + pass + + def pprint(self): + return "MPEG-4 audio, %.2f seconds, %d bps" % ( + self.length, self.bitrate) + +class MP4(FileType): + """An MPEG-4 audio file, probably containing AAC. + + If more than one track is present in the file, the first is used. + Only audio ('soun') tracks will be read. + """ + + _mimes = ["audio/mp4", "audio/x-m4a", "audio/mpeg4", "audio/aac"] + + def load(self, filename): + self.filename = filename + fileobj = file(filename, "rb") + try: + atoms = Atoms(fileobj) + try: self.info = MP4Info(atoms, fileobj) + except StandardError, err: + raise MP4StreamInfoError, err, sys.exc_info()[2] + try: self.tags = MP4Tags(atoms, fileobj) + except MP4MetadataError: + self.tags = None + except StandardError, err: + raise MP4MetadataError, err, sys.exc_info()[2] + finally: + fileobj.close() + + def add_tags(self): + self.tags = MP4Tags() + + def score(filename, fileobj, header): + return ("ftyp" in header) + ("mp4" in header) + score = staticmethod(score) + +Open = MP4 + +def delete(filename): + """Remove tags from a file.""" + MP4(filename).delete() diff --git a/lib/mutagen/musepack.py b/lib/mutagen/musepack.py new file mode 100644 index 000000000..7b13745fe --- /dev/null +++ b/lib/mutagen/musepack.py @@ -0,0 +1,118 @@ +# A Musepack reader/tagger +# +# Copyright 2006 Lukas Lalinsky +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: musepack.py 4275 2008-06-01 06:32:37Z piman $ + +"""Musepack audio streams with APEv2 tags. + +Musepack is an audio format originally based on the MPEG-1 Layer-2 +algorithms. Stream versions 4 through 7 are supported. + +For more information, see http://www.musepack.net/. +""" + +__all__ = ["Musepack", "Open", "delete"] + +import struct + +from mutagen.apev2 import APEv2File, error, delete +from mutagen.id3 import BitPaddedInt +from mutagen._util import cdata + +class MusepackHeaderError(error): pass + +RATES = [44100, 48000, 37800, 32000] + +class MusepackInfo(object): + """Musepack stream information. + + Attributes: + channels -- number of audio channels + length -- file length in seconds, as a float + sample_rate -- audio sampling rate in Hz + bitrate -- audio bitrate, in bits per second + version -- Musepack stream version + + Optional Attributes: + title_gain, title_peak -- Replay Gain and peak data for this song + album_gain, album_peak -- Replay Gain and peak data for this album + + These attributes are only available in stream version 7. The + gains are a float, +/- some dB. The peaks are a percentage [0..1] of + the maximum amplitude. This means to get a number comparable to + VorbisGain, you must multiply the peak by 2. + """ + + def __init__(self, fileobj): + header = fileobj.read(32) + if len(header) != 32: + raise MusepackHeaderError("not a Musepack file") + # Skip ID3v2 tags + if header[:3] == "ID3": + size = 10 + BitPaddedInt(header[6:10]) + fileobj.seek(size) + header = fileobj.read(32) + if len(header) != 32: + raise MusepackHeaderError("not a Musepack file") + # SV7 + if header.startswith("MP+"): + self.version = ord(header[3]) & 0xF + if self.version < 7: + raise MusepackHeaderError("not a Musepack file") + frames = cdata.uint_le(header[4:8]) + flags = cdata.uint_le(header[8:12]) + + self.title_peak, self.title_gain = struct.unpack( + "> 16) & 0x0003] + self.bitrate = 0 + # SV4-SV6 + else: + header_dword = cdata.uint_le(header[0:4]) + self.version = (header_dword >> 11) & 0x03FF; + if self.version < 4 or self.version > 6: + raise MusepackHeaderError("not a Musepack file") + self.bitrate = (header_dword >> 23) & 0x01FF; + self.sample_rate = 44100 + if self.version >= 5: + frames = cdata.uint_le(header[4:8]) + else: + frames = cdata.ushort_le(header[6:8]) + if self.version < 6: + frames -= 1 + self.channels = 2 + self.length = float(frames * 1152 - 576) / self.sample_rate + if not self.bitrate and self.length != 0: + fileobj.seek(0, 2) + self.bitrate = int(fileobj.tell() * 8 / (self.length * 1000) + 0.5) + + def pprint(self): + if self.version >= 7: + rg_data = ", Gain: %+0.2f (title), %+0.2f (album)" %( + self.title_gain, self.album_gain) + else: + rg_data = "" + return "Musepack, %.2f seconds, %d Hz%s" % ( + self.length, self.sample_rate, rg_data) + +class Musepack(APEv2File): + _Info = MusepackInfo + _mimes = ["audio/x-musepack", "audio/x-mpc"] + + def score(filename, fileobj, header): + return header.startswith("MP+") + filename.endswith(".mpc") + score = staticmethod(score) + +Open = Musepack diff --git a/lib/mutagen/ogg.py b/lib/mutagen/ogg.py new file mode 100644 index 000000000..c81add640 --- /dev/null +++ b/lib/mutagen/ogg.py @@ -0,0 +1,498 @@ +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: ogg.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write Ogg bitstreams and pages. + +This module reads and writes a subset of the Ogg bitstream format +version 0. It does *not* read or write Ogg Vorbis files! For that, +you should use mutagen.oggvorbis. + +This implementation is based on the RFC 3533 standard found at +http://www.xiph.org/ogg/doc/rfc3533.txt. +""" + +import struct +import sys +import zlib + +from cStringIO import StringIO + +from mutagen import FileType +from mutagen._util import cdata, insert_bytes, delete_bytes + +class error(IOError): + """Ogg stream parsing errors.""" + pass + +class OggPage(object): + """A single Ogg page (not necessarily a single encoded packet). + + A page is a header of 26 bytes, followed by the length of the + data, followed by the data. + + The constructor is givin a file-like object pointing to the start + of an Ogg page. After the constructor is finished it is pointing + to the start of the next page. + + Attributes: + version -- stream structure version (currently always 0) + position -- absolute stream position (default -1) + serial -- logical stream serial number (default 0) + sequence -- page sequence number within logical stream (default 0) + offset -- offset this page was read from (default None) + complete -- if the last packet on this page is complete (default True) + packets -- list of raw packet data (default []) + + Note that if 'complete' is false, the next page's 'continued' + property must be true (so set both when constructing pages). + + If a file-like object is supplied to the constructor, the above + attributes will be filled in based on it. + """ + + version = 0 + __type_flags = 0 + position = 0L + serial = 0 + sequence = 0 + offset = None + complete = True + + def __init__(self, fileobj=None): + self.packets = [] + + if fileobj is None: + return + + self.offset = fileobj.tell() + + header = fileobj.read(27) + if len(header) == 0: + raise EOFError + + try: + (oggs, self.version, self.__type_flags, self.position, + self.serial, self.sequence, crc, segments) = struct.unpack( + "<4sBBqIIiB", header) + except struct.error: + raise error("unable to read full header; got %r" % header) + + if oggs != "OggS": + raise error("read %r, expected %r, at 0x%x" % ( + oggs, "OggS", fileobj.tell() - 27)) + + if self.version != 0: + raise error("version %r unsupported" % self.version) + + total = 0 + lacings = [] + lacing_bytes = fileobj.read(segments) + if len(lacing_bytes) != segments: + raise error("unable to read %r lacing bytes" % segments) + for c in map(ord, lacing_bytes): + total += c + if c < 255: + lacings.append(total) + total = 0 + if total: + lacings.append(total) + self.complete = False + + self.packets = map(fileobj.read, lacings) + if map(len, self.packets) != lacings: + raise error("unable to read full data") + + def __eq__(self, other): + """Two Ogg pages are the same if they write the same data.""" + try: + return (self.write() == other.write()) + except AttributeError: + return False + + def __repr__(self): + attrs = ['version', 'position', 'serial', 'sequence', 'offset', + 'complete', 'continued', 'first', 'last'] + values = ["%s=%r" % (attr, getattr(self, attr)) for attr in attrs] + return "<%s %s, %d bytes in %d packets>" % ( + type(self).__name__, " ".join(values), sum(map(len, self.packets)), + len(self.packets)) + + def write(self): + """Return a string encoding of the page header and data. + + A ValueError is raised if the data is too big to fit in a + single page. + """ + + data = [ + struct.pack("<4sBBqIIi", "OggS", self.version, self.__type_flags, + self.position, self.serial, self.sequence, 0) + ] + + lacing_data = [] + for datum in self.packets: + quot, rem = divmod(len(datum), 255) + lacing_data.append("\xff" * quot + chr(rem)) + lacing_data = "".join(lacing_data) + if not self.complete and lacing_data.endswith("\x00"): + lacing_data = lacing_data[:-1] + data.append(chr(len(lacing_data))) + data.append(lacing_data) + data.extend(self.packets) + data = "".join(data) + + # Python's CRC is swapped relative to Ogg's needs. + crc = ~zlib.crc32(data.translate(cdata.bitswap), -1) + # Although we're using to_int_be, this actually makes the CRC + # a proper le integer, since Python's CRC is byteswapped. + crc = cdata.to_int_be(crc).translate(cdata.bitswap) + data = data[:22] + crc + data[26:] + return data + + def __size(self): + size = 27 # Initial header size + for datum in self.packets: + quot, rem = divmod(len(datum), 255) + size += quot + 1 + if not self.complete and rem == 0: + # Packet contains a multiple of 255 bytes and is not + # terminated, so we don't have a \x00 at the end. + size -= 1 + size += sum(map(len, self.packets)) + return size + + size = property(__size, doc="Total frame size.") + + def __set_flag(self, bit, val): + mask = 1 << bit + if val: self.__type_flags |= mask + else: self.__type_flags &= ~mask + + continued = property( + lambda self: cdata.test_bit(self.__type_flags, 0), + lambda self, v: self.__set_flag(0, v), + doc="The first packet is continued from the previous page.") + + first = property( + lambda self: cdata.test_bit(self.__type_flags, 1), + lambda self, v: self.__set_flag(1, v), + doc="This is the first page of a logical bitstream.") + + last = property( + lambda self: cdata.test_bit(self.__type_flags, 2), + lambda self, v: self.__set_flag(2, v), + doc="This is the last page of a logical bitstream.") + + def renumber(klass, fileobj, serial, start): + """Renumber pages belonging to a specified logical stream. + + fileobj must be opened with mode r+b or w+b. + + Starting at page number 'start', renumber all pages belonging + to logical stream 'serial'. Other pages will be ignored. + + fileobj must point to the start of a valid Ogg page; any + occuring after it and part of the specified logical stream + will be numbered. No adjustment will be made to the data in + the pages nor the granule position; only the page number, and + so also the CRC. + + If an error occurs (e.g. non-Ogg data is found), fileobj will + be left pointing to the place in the stream the error occured, + but the invalid data will be left intact (since this function + does not change the total file size). + """ + + number = start + while True: + try: page = OggPage(fileobj) + except EOFError: + break + else: + if page.serial != serial: + # Wrong stream, skip this page. + continue + # Changing the number can't change the page size, + # so seeking back based on the current size is safe. + fileobj.seek(-page.size, 1) + page.sequence = number + fileobj.write(page.write()) + fileobj.seek(page.offset + page.size, 0) + number += 1 + renumber = classmethod(renumber) + + def to_packets(klass, pages, strict=False): + """Construct a list of packet data from a list of Ogg pages. + + If strict is true, the first page must start a new packet, + and the last page must end the last packet. + """ + + serial = pages[0].serial + sequence = pages[0].sequence + packets = [] + + if strict: + if pages[0].continued: + raise ValueError("first packet is continued") + if not pages[-1].complete: + raise ValueError("last packet does not complete") + elif pages and pages[0].continued: + packets.append("") + + for page in pages: + if serial != page.serial: + raise ValueError("invalid serial number in %r" % page) + elif sequence != page.sequence: + raise ValueError("bad sequence number in %r" % page) + else: sequence += 1 + + if page.continued: packets[-1] += page.packets[0] + else: packets.append(page.packets[0]) + packets.extend(page.packets[1:]) + + return packets + to_packets = classmethod(to_packets) + + def from_packets(klass, packets, sequence=0, + default_size=4096, wiggle_room=2048): + """Construct a list of Ogg pages from a list of packet data. + + The algorithm will generate pages of approximately + default_size in size (rounded down to the nearest multiple of + 255). However, it will also allow pages to increase to + approximately default_size + wiggle_room if allowing the + wiggle room would finish a packet (only one packet will be + finished in this way per page; if the next packet would fit + into the wiggle room, it still starts on a new page). + + This method reduces packet fragmentation when packet sizes are + slightly larger than the default page size, while still + ensuring most pages are of the average size. + + Pages are numbered started at 'sequence'; other information is + uninitialized. + """ + + chunk_size = (default_size // 255) * 255 + + pages = [] + + page = OggPage() + page.sequence = sequence + + for packet in packets: + page.packets.append("") + while packet: + data, packet = packet[:chunk_size], packet[chunk_size:] + if page.size < default_size and len(page.packets) < 255: + page.packets[-1] += data + else: + # If we've put any packet data into this page yet, + # we need to mark it incomplete. However, we can + # also have just started this packet on an already + # full page, in which case, just start the new + # page with this packet. + if page.packets[-1]: + page.complete = False + if len(page.packets) == 1: + page.position = -1L + else: + page.packets.pop(-1) + pages.append(page) + page = OggPage() + page.continued = not pages[-1].complete + page.sequence = pages[-1].sequence + 1 + page.packets.append(data) + + if len(packet) < wiggle_room: + page.packets[-1] += packet + packet = "" + + if page.packets: + pages.append(page) + + return pages + from_packets = classmethod(from_packets) + + def replace(klass, fileobj, old_pages, new_pages): + """Replace old_pages with new_pages within fileobj. + + old_pages must have come from reading fileobj originally. + new_pages are assumed to have the 'same' data as old_pages, + and so the serial and sequence numbers will be copied, as will + the flags for the first and last pages. + + fileobj will be resized and pages renumbered as necessary. As + such, it must be opened r+b or w+b. + """ + + # Number the new pages starting from the first old page. + first = old_pages[0].sequence + for page, seq in zip(new_pages, range(first, first + len(new_pages))): + page.sequence = seq + page.serial = old_pages[0].serial + + new_pages[0].first = old_pages[0].first + new_pages[0].last = old_pages[0].last + new_pages[0].continued = old_pages[0].continued + + new_pages[-1].first = old_pages[-1].first + new_pages[-1].last = old_pages[-1].last + new_pages[-1].complete = old_pages[-1].complete + if not new_pages[-1].complete and len(new_pages[-1].packets) == 1: + new_pages[-1].position = -1L + + new_data = "".join(map(klass.write, new_pages)) + + # Make room in the file for the new data. + delta = len(new_data) + fileobj.seek(old_pages[0].offset, 0) + insert_bytes(fileobj, delta, old_pages[0].offset) + fileobj.seek(old_pages[0].offset, 0) + fileobj.write(new_data) + new_data_end = old_pages[0].offset + delta + + # Go through the old pages and delete them. Since we shifted + # the data down the file, we need to adjust their offsets. We + # also need to go backwards, so we don't adjust the deltas of + # the other pages. + old_pages.reverse() + for old_page in old_pages: + adj_offset = old_page.offset + delta + delete_bytes(fileobj, old_page.size, adj_offset) + + # Finally, if there's any discrepency in length, we need to + # renumber the pages for the logical stream. + if len(old_pages) != len(new_pages): + fileobj.seek(new_data_end, 0) + serial = new_pages[-1].serial + sequence = new_pages[-1].sequence + 1 + klass.renumber(fileobj, serial, sequence) + replace = classmethod(replace) + + def find_last(klass, fileobj, serial): + """Find the last page of the stream 'serial'. + + If the file is not multiplexed this function is fast. If it is, + it must read the whole the stream. + + This finds the last page in the actual file object, or the last + page in the stream (with eos set), whichever comes first. + """ + + # For non-muxed streams, look at the last page. + try: fileobj.seek(-256*256, 2) + except IOError: + # The file is less than 64k in length. + fileobj.seek(0) + data = fileobj.read() + try: index = data.rindex("OggS") + except ValueError: + raise error("unable to find final Ogg header") + stringobj = StringIO(data[index:]) + best_page = None + try: + page = OggPage(stringobj) + except error: + pass + else: + if page.serial == serial: + if page.last: return page + else: best_page = page + else: best_page = None + + # The stream is muxed, so use the slow way. + fileobj.seek(0) + try: + page = OggPage(fileobj) + while not page.last: + page = OggPage(fileobj) + while page.serial != serial: + page = OggPage(fileobj) + best_page = page + return page + except error: + return best_page + except EOFError: + return best_page + find_last = classmethod(find_last) + +class OggFileType(FileType): + """An generic Ogg file.""" + + _Info = None + _Tags = None + _Error = None + _mimes = ["application/ogg", "application/x-ogg"] + + def load(self, filename): + """Load file information from a filename.""" + + self.filename = filename + fileobj = file(filename, "rb") + try: + try: + self.info = self._Info(fileobj) + self.tags = self._Tags(fileobj, self.info) + + if self.info.length: + # The streaminfo gave us real length information, + # don't waste time scanning the Ogg. + return + + last_page = OggPage.find_last(fileobj, self.info.serial) + samples = last_page.position + try: + denom = self.info.sample_rate + except AttributeError: + denom = self.info.fps + self.info.length = samples / float(denom) + + except error, e: + raise self._Error, e, sys.exc_info()[2] + except EOFError: + raise self._Error, "no appropriate stream found" + finally: + fileobj.close() + + def delete(self, filename=None): + """Remove tags from a file. + + If no filename is given, the one most recently loaded is used. + """ + if filename is None: + filename = self.filename + + self.tags.clear() + fileobj = file(filename, "rb+") + try: + try: self.tags._inject(fileobj) + except error, e: + raise self._Error, e, sys.exc_info()[2] + except EOFError: + raise self._Error, "no appropriate stream found" + finally: + fileobj.close() + + def save(self, filename=None): + """Save a tag to a file. + + If no filename is given, the one most recently loaded is used. + """ + if filename is None: + filename = self.filename + fileobj = file(filename, "rb+") + try: + try: self.tags._inject(fileobj) + except error, e: + raise self._Error, e, sys.exc_info()[2] + except EOFError: + raise self._Error, "no appropriate stream found" + finally: + fileobj.close() diff --git a/lib/mutagen/oggflac.py b/lib/mutagen/oggflac.py new file mode 100644 index 000000000..05daf9a08 --- /dev/null +++ b/lib/mutagen/oggflac.py @@ -0,0 +1,127 @@ +# Ogg FLAC support. +# +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: oggflac.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write Ogg FLAC comments. + +This module handles FLAC files wrapped in an Ogg bitstream. The first +FLAC stream found is used. For 'naked' FLACs, see mutagen.flac. + +This module is bsaed off the specification at +http://flac.sourceforge.net/ogg_mapping.html. +""" + +__all__ = ["OggFLAC", "Open", "delete"] + +import struct + +from cStringIO import StringIO + +from mutagen.flac import StreamInfo, VCFLACDict +from mutagen.ogg import OggPage, OggFileType, error as OggError + +class error(OggError): pass +class OggFLACHeaderError(error): pass + +class OggFLACStreamInfo(StreamInfo): + """Ogg FLAC general header and stream info. + + This encompasses the Ogg wrapper for the FLAC STREAMINFO metadata + block, as well as the Ogg codec setup that precedes it. + + Attributes (in addition to StreamInfo's): + packets -- number of metadata packets + serial -- Ogg logical stream serial number + """ + + packets = 0 + serial = 0 + + def load(self, data): + page = OggPage(data) + while not page.packets[0].startswith("\x7FFLAC"): + page = OggPage(data) + major, minor, self.packets, flac = struct.unpack( + ">BBH4s", page.packets[0][5:13]) + if flac != "fLaC": + raise OggFLACHeaderError("invalid FLAC marker (%r)" % flac) + elif (major, minor) != (1, 0): + raise OggFLACHeaderError( + "unknown mapping version: %d.%d" % (major, minor)) + self.serial = page.serial + + # Skip over the block header. + stringobj = StringIO(page.packets[0][17:]) + super(OggFLACStreamInfo, self).load(StringIO(page.packets[0][17:])) + + def pprint(self): + return "Ogg " + super(OggFLACStreamInfo, self).pprint() + +class OggFLACVComment(VCFLACDict): + def load(self, data, info, errors='replace'): + # data should be pointing at the start of an Ogg page, after + # the first FLAC page. + pages = [] + complete = False + while not complete: + page = OggPage(data) + if page.serial == info.serial: + pages.append(page) + complete = page.complete or (len(page.packets) > 1) + comment = StringIO(OggPage.to_packets(pages)[0][4:]) + super(OggFLACVComment, self).load(comment, errors=errors) + + def _inject(self, fileobj): + """Write tag data into the FLAC Vorbis comment packet/page.""" + + # Ogg FLAC has no convenient data marker like Vorbis, but the + # second packet - and second page - must be the comment data. + fileobj.seek(0) + page = OggPage(fileobj) + while not page.packets[0].startswith("\x7FFLAC"): + page = OggPage(fileobj) + + first_page = page + while not (page.sequence == 1 and page.serial == first_page.serial): + page = OggPage(fileobj) + + old_pages = [page] + while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): + page = OggPage(fileobj) + if page.serial == first_page.serial: + old_pages.append(page) + + packets = OggPage.to_packets(old_pages, strict=False) + + # Set the new comment block. + data = self.write() + data = packets[0][0] + struct.pack(">I", len(data))[-3:] + data + packets[0] = data + + new_pages = OggPage.from_packets(packets, old_pages[0].sequence) + OggPage.replace(fileobj, old_pages, new_pages) + +class OggFLAC(OggFileType): + """An Ogg FLAC file.""" + + _Info = OggFLACStreamInfo + _Tags = OggFLACVComment + _Error = OggFLACHeaderError + _mimes = ["audio/x-oggflac"] + + def score(filename, fileobj, header): + return (header.startswith("OggS") * ( + ("FLAC" in header) + ("fLaC" in header))) + score = staticmethod(score) + +Open = OggFLAC + +def delete(filename): + """Remove tags from a file.""" + OggFLAC(filename).delete() diff --git a/lib/mutagen/oggspeex.py b/lib/mutagen/oggspeex.py new file mode 100644 index 000000000..86a5adfe2 --- /dev/null +++ b/lib/mutagen/oggspeex.py @@ -0,0 +1,125 @@ +# Ogg Speex support. +# +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: oggspeex.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write Ogg Speex comments. + +This module handles Speex files wrapped in an Ogg bitstream. The +first Speex stream found is used. + +Read more about Ogg Speex at http://www.speex.org/. This module is +based on the specification at http://www.speex.org/manual2/node7.html +and clarifications after personal communication with Jean-Marc, +http://lists.xiph.org/pipermail/speex-dev/2006-July/004676.html. +""" + +__all__ = ["OggSpeex", "Open", "delete"] + +from mutagen._vorbis import VCommentDict +from mutagen.ogg import OggPage, OggFileType, error as OggError +from mutagen._util import cdata + +class error(OggError): pass +class OggSpeexHeaderError(error): pass + +class OggSpeexInfo(object): + """Ogg Speex stream information. + + Attributes: + bitrate - nominal bitrate in bits per second + channels - number of channels + length - file length in seconds, as a float + + The reference encoder does not set the bitrate; in this case, + the bitrate will be 0. + """ + + length = 0 + + def __init__(self, fileobj): + page = OggPage(fileobj) + while not page.packets[0].startswith("Speex "): + page = OggPage(fileobj) + if not page.first: + raise OggSpeexHeaderError( + "page has ID header, but doesn't start a stream") + self.sample_rate = cdata.uint_le(page.packets[0][36:40]) + self.channels = cdata.uint_le(page.packets[0][48:52]) + self.bitrate = cdata.int_le(page.packets[0][52:56]) + if self.bitrate == -1: + self.bitrate = 0 + self.serial = page.serial + + def pprint(self): + return "Ogg Speex, %.2f seconds" % self.length + +class OggSpeexVComment(VCommentDict): + """Speex comments embedded in an Ogg bitstream.""" + + def __init__(self, fileobj, info): + pages = [] + complete = False + while not complete: + page = OggPage(fileobj) + if page.serial == info.serial: + pages.append(page) + complete = page.complete or (len(page.packets) > 1) + data = OggPage.to_packets(pages)[0] + "\x01" + super(OggSpeexVComment, self).__init__(data, framing=False) + + def _inject(self, fileobj): + """Write tag data into the Speex comment packet/page.""" + + fileobj.seek(0) + + # Find the first header page, with the stream info. + # Use it to get the serial number. + page = OggPage(fileobj) + while not page.packets[0].startswith("Speex "): + page = OggPage(fileobj) + + # Look for the next page with that serial number, it'll start + # the comment packet. + serial = page.serial + page = OggPage(fileobj) + while page.serial != serial: + page = OggPage(fileobj) + + # Then find all the pages with the comment packet. + old_pages = [page] + while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): + page = OggPage(fileobj) + if page.serial == old_pages[0].serial: + old_pages.append(page) + + packets = OggPage.to_packets(old_pages, strict=False) + + # Set the new comment packet. + packets[0] = self.write(framing=False) + + new_pages = OggPage.from_packets(packets, old_pages[0].sequence) + OggPage.replace(fileobj, old_pages, new_pages) + +class OggSpeex(OggFileType): + """An Ogg Speex file.""" + + _Info = OggSpeexInfo + _Tags = OggSpeexVComment + _Error = OggSpeexHeaderError + _mimes = ["audio/x-speex"] + + def score(filename, fileobj, header): + return (header.startswith("OggS") * ("Speex " in header)) + score = staticmethod(score) + +Open = OggSpeex + +def delete(filename): + """Remove tags from a file.""" + OggSpeex(filename).delete() diff --git a/lib/mutagen/oggtheora.py b/lib/mutagen/oggtheora.py new file mode 100644 index 000000000..e2620414f --- /dev/null +++ b/lib/mutagen/oggtheora.py @@ -0,0 +1,111 @@ +# Ogg Theora support. +# +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: oggtheora.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write Ogg Theora comments. + +This module handles Theora files wrapped in an Ogg bitstream. The +first Theora stream found is used. + +Based on the specification at http://theora.org/doc/Theora_I_spec.pdf. +""" + +__all__ = ["OggTheora", "Open", "delete"] + +import struct + +from mutagen._vorbis import VCommentDict +from mutagen.ogg import OggPage, OggFileType, error as OggError + +class error(OggError): pass +class OggTheoraHeaderError(error): pass + +class OggTheoraInfo(object): + """Ogg Theora stream information. + + Attributes: + length - file length in seconds, as a float + fps - video frames per second, as a float + """ + + length = 0 + + def __init__(self, fileobj): + page = OggPage(fileobj) + while not page.packets[0].startswith("\x80theora"): + page = OggPage(fileobj) + if not page.first: + raise OggTheoraHeaderError( + "page has ID header, but doesn't start a stream") + data = page.packets[0] + vmaj, vmin = struct.unpack("2B", data[7:9]) + if (vmaj, vmin) != (3, 2): + raise OggTheoraHeaderError( + "found Theora version %d.%d != 3.2" % (vmaj, vmin)) + fps_num, fps_den = struct.unpack(">2I", data[22:30]) + self.fps = fps_num / float(fps_den) + self.bitrate, = struct.unpack(">I", data[37:40] + "\x00") + self.serial = page.serial + + def pprint(self): + return "Ogg Theora, %.2f seconds, %d bps" % (self.length, self.bitrate) + +class OggTheoraCommentDict(VCommentDict): + """Theora comments embedded in an Ogg bitstream.""" + + def __init__(self, fileobj, info): + pages = [] + complete = False + while not complete: + page = OggPage(fileobj) + if page.serial == info.serial: + pages.append(page) + complete = page.complete or (len(page.packets) > 1) + data = OggPage.to_packets(pages)[0][7:] + super(OggTheoraCommentDict, self).__init__(data + "\x01") + + def _inject(self, fileobj): + """Write tag data into the Theora comment packet/page.""" + + fileobj.seek(0) + page = OggPage(fileobj) + while not page.packets[0].startswith("\x81theora"): + page = OggPage(fileobj) + + old_pages = [page] + while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): + page = OggPage(fileobj) + if page.serial == old_pages[0].serial: + old_pages.append(page) + + packets = OggPage.to_packets(old_pages, strict=False) + + packets[0] = "\x81theora" + self.write(framing=False) + + new_pages = OggPage.from_packets(packets, old_pages[0].sequence) + OggPage.replace(fileobj, old_pages, new_pages) + +class OggTheora(OggFileType): + """An Ogg Theora file.""" + + _Info = OggTheoraInfo + _Tags = OggTheoraCommentDict + _Error = OggTheoraHeaderError + _mimes = ["video/x-theora"] + + def score(filename, fileobj, header): + return (header.startswith("OggS") * + (("\x80theora" in header) + ("\x81theora" in header))) + score = staticmethod(score) + +Open = OggTheora + +def delete(filename): + """Remove tags from a file.""" + OggTheora(filename).delete() diff --git a/lib/mutagen/oggvorbis.py b/lib/mutagen/oggvorbis.py new file mode 100644 index 000000000..e1de1bd0d --- /dev/null +++ b/lib/mutagen/oggvorbis.py @@ -0,0 +1,119 @@ +# Ogg Vorbis support. +# +# Copyright 2006 Joe Wreschnig +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: oggvorbis.py 4275 2008-06-01 06:32:37Z piman $ + +"""Read and write Ogg Vorbis comments. + +This module handles Vorbis files wrapped in an Ogg bitstream. The +first Vorbis stream found is used. + +Read more about Ogg Vorbis at http://vorbis.com/. This module is based +on the specification at http://www.xiph.org/vorbis/doc/Vorbis_I_spec.html. +""" + +__all__ = ["OggVorbis", "Open", "delete"] + +import struct + +from mutagen._vorbis import VCommentDict +from mutagen.ogg import OggPage, OggFileType, error as OggError + +class error(OggError): pass +class OggVorbisHeaderError(error): pass + +class OggVorbisInfo(object): + """Ogg Vorbis stream information. + + Attributes: + length - file length in seconds, as a float + bitrate - nominal ('average') bitrate in bits per second, as an int + """ + + length = 0 + + def __init__(self, fileobj): + page = OggPage(fileobj) + while not page.packets[0].startswith("\x01vorbis"): + page = OggPage(fileobj) + if not page.first: + raise OggVorbisHeaderError( + "page has ID header, but doesn't start a stream") + (self.channels, self.sample_rate, max_bitrate, nominal_bitrate, + min_bitrate) = struct.unpack(" nominal_bitrate: + self.bitrate = min_bitrate + else: + self.bitrate = nominal_bitrate + + def pprint(self): + return "Ogg Vorbis, %.2f seconds, %d bps" % (self.length, self.bitrate) + +class OggVCommentDict(VCommentDict): + """Vorbis comments embedded in an Ogg bitstream.""" + + def __init__(self, fileobj, info): + pages = [] + complete = False + while not complete: + page = OggPage(fileobj) + if page.serial == info.serial: + pages.append(page) + complete = page.complete or (len(page.packets) > 1) + data = OggPage.to_packets(pages)[0][7:] # Strip off "\x03vorbis". + super(OggVCommentDict, self).__init__(data) + + def _inject(self, fileobj): + """Write tag data into the Vorbis comment packet/page.""" + + # Find the old pages in the file; we'll need to remove them, + # plus grab any stray setup packet data out of them. + fileobj.seek(0) + page = OggPage(fileobj) + while not page.packets[0].startswith("\x03vorbis"): + page = OggPage(fileobj) + + old_pages = [page] + while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1): + page = OggPage(fileobj) + if page.serial == old_pages[0].serial: + old_pages.append(page) + + packets = OggPage.to_packets(old_pages, strict=False) + + # Set the new comment packet. + packets[0] = "\x03vorbis" + self.write() + + new_pages = OggPage.from_packets(packets, old_pages[0].sequence) + OggPage.replace(fileobj, old_pages, new_pages) + +class OggVorbis(OggFileType): + """An Ogg Vorbis file.""" + + _Info = OggVorbisInfo + _Tags = OggVCommentDict + _Error = OggVorbisHeaderError + _mimes = ["audio/vorbis", "audio/x-vorbis"] + + def score(filename, fileobj, header): + return (header.startswith("OggS") * ("\x01vorbis" in header)) + score = staticmethod(score) + +Open = OggVorbis + +def delete(filename): + """Remove tags from a file.""" + OggVorbis(filename).delete() diff --git a/lib/mutagen/optimfrog.py b/lib/mutagen/optimfrog.py new file mode 100644 index 000000000..902c05723 --- /dev/null +++ b/lib/mutagen/optimfrog.py @@ -0,0 +1,64 @@ +# OptimFROG reader/tagger +# +# Copyright 2006 Lukas Lalinsky +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: optimfrog.py 4275 2008-06-01 06:32:37Z piman $ + +"""OptimFROG audio streams with APEv2 tags. + +OptimFROG is a lossless audio compression program. Its main goal is to +reduce at maximum the size of audio files, while permitting bit +identical restoration for all input. It is similar with the ZIP +compression, but it is highly specialized to compress audio data. + +Only versions 4.5 and higher are supported. + +For more information, see http://www.losslessaudio.org/ +""" + +__all__ = ["OptimFROG", "Open", "delete"] + +import struct +from mutagen.apev2 import APEv2File, error, delete + +class OptimFROGHeaderError(error): pass + +class OptimFROGInfo(object): + """OptimFROG stream information. + + Attributes: + channels - number of audio channels + length - file length in seconds, as a float + sample_rate - audio sampling rate in Hz + """ + + def __init__(self, fileobj): + header = fileobj.read(76) + if (len(header) != 76 or not header.startswith("OFR ") or + struct.unpack(" +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# $Id: wavpack.py 4275 2008-06-01 06:32:37Z piman $ + +"""WavPack reading and writing. + +WavPack is a lossless format that uses APEv2 tags. Read +http://www.wavpack.com/ for more information. +""" + +__all__ = ["WavPack", "Open", "delete"] + +from mutagen.apev2 import APEv2File, error, delete +from mutagen._util import cdata + +class WavPackHeaderError(error): pass + +RATES = [6000, 8000, 9600, 11025, 12000, 16000, 22050, 24000, 32000, 44100, + 48000, 64000, 88200, 96000, 192000] + +class WavPackInfo(object): + """WavPack stream information. + + Attributes: + channels - number of audio channels (1 or 2) + length - file length in seconds, as a float + sample_rate - audio sampling rate in Hz + version - WavPack stream version + """ + + def __init__(self, fileobj): + header = fileobj.read(28) + if len(header) != 28 or not header.startswith("wvpk"): + raise WavPackHeaderError("not a WavPack file") + samples = cdata.uint_le(header[12:16]) + flags = cdata.uint_le(header[24:28]) + self.version = cdata.short_le(header[8:10]) + self.channels = bool(flags & 4) or 2 + self.sample_rate = RATES[(flags >> 23) & 0xF] + self.length = float(samples) / self.sample_rate + + def pprint(self): + return "WavPack, %.2f seconds, %d Hz" % (self.length, self.sample_rate) + +class WavPack(APEv2File): + _Info = WavPackInfo + _mimes = ["audio/x-wavpack"] + + def score(filename, fileobj, header): + return header.startswith("wvpk") * 2 + score = staticmethod(score)