# This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
# Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
#
-from collections import defaultdict
import hashlib
+import os.path
import random
import re
import time
from base64 import urlsafe_b64encode
-
-from django.http import HttpResponse
-from django.core.files.uploadedfile import UploadedFile
-from django.core.files.storage import DefaultStorage
-from django.utils.encoding import force_unicode
-from django.conf import settings
-from os import mkdir, path, unlink
+from collections import defaultdict
from errno import EEXIST, ENOENT
from fcntl import flock, LOCK_EX
+from os import mkdir, path, unlink
from zipfile import ZipFile
+from django.conf import settings
+from django.core.files.storage import DefaultStorage
+from django.core.files.uploadedfile import UploadedFile
+from django.http import HttpResponse
+from django.utils.encoding import force_unicode
+
from reporting.utils import read_chunks
# Use the system (hardware-based) random number generator if it exists.
def get_random_hash(seed):
- sha_digest = hashlib.sha1('%s%s%s%s' %
- (randrange(0, MAX_SESSION_KEY), time.time(), unicode(seed).encode('utf-8', 'replace'),
- settings.SECRET_KEY)).digest()
+ sha_digest = hashlib.sha1('%s%s%s%s' % (
+ randrange(0, MAX_SESSION_KEY), time.time(), unicode(seed).encode('utf-8', 'replace'), settings.SECRET_KEY)
+ ).digest()
return urlsafe_b64encode(sha_digest).replace('=', '').replace('_', '-').lower()
try:
unlink(self.lockname)
except OSError as oe:
- if oe.errno != EEXIST:
+ if oe.errno != ENOENT:
raise oe
self.lock.close()
-#@task
+# @task
def create_zip(paths, zip_slug):
"""
Creates a zip in MEDIA_ROOT/zip directory containing files from path.
for chunk in read_chunks(f):
self.write(chunk)
+
class MultiQuerySet(object):
def __init__(self, *args, **kwargs):
self.querysets = args
(offset, stop, step) = item.indices(self.count())
except AttributeError:
# it's not a slice - make it one
- return self[item : item + 1][0]
+ return self[item:item + 1][0]
items = []
total_len = stop - offset
for qs in self.querysets:
stop = total_len - len(items)
continue
+
class SortedMultiQuerySet(MultiQuerySet):
def __init__(self, *args, **kwargs):
self.order_by = kwargs.pop('order_by', None)
(offset, stop, step) = item.indices(self.count())
except AttributeError:
# it's not a slice - make it one
- return self[item : item + 1][0]
+ return self[item:item + 1][0]
items = []
total_len = stop - offset
skipped = 0
candidate = competitor
candidate_i = i
except IndexError:
- continue # continue next sort_head
+ continue # continue next sort_head
# we have no more elements:
if candidate is None:
break
sort_heads[candidate_i] += 1
if skipped < offset:
skipped += 1
- continue # continue next item
+ continue # continue next item
items.append(candidate)
return items
except ValueError:
pass
else:
- # SGML: An end tag closes, back to the matching start tag, all unclosed intervening start tags with omitted end tags
+ # SGML: An end tag closes, back to the matching start tag,
+ # all unclosed intervening start tags with omitted end tags
open_tags = open_tags[i+1:]
else:
# Add it to the start of the open tags list
def __getattribute__(self, name):
if name.startswith('_'):
return object.__getattribute__(self, name)
- value = getattr(settings,
- "%s_%s" % (self._prefix, name),
- object.__getattribute__(self, name))
+ value = getattr(settings, "%s_%s" % (self._prefix, name), object.__getattribute__(self, name))
more = "_more_%s" % name
if hasattr(self, more):
value = getattr(self, more)(value)
def delete_from_cache_by_language(cache, key_template):
cache.delete_many([key_template % lc for lc, ln in settings.LANGUAGES])
+
+
+def gallery_path(slug):
+ return os.path.join(settings.MEDIA_ROOT, settings.IMAGE_DIR, slug)
+
+
+def gallery_url(slug):
+ return '%s%s%s/' % (settings.MEDIA_URL, settings.IMAGE_DIR, slug)
+
+
+def get_mp3_length(path):
+ from mutagen.mp3 import MP3
+ return int(MP3(path).info.length)