-# -*- coding: utf-8 -*-
# This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
# Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
#
-from collections import defaultdict
import hashlib
+import os.path
import random
import re
import time
from base64 import urlsafe_b64encode
-
-from django.http import HttpResponse
-from django.core.files.uploadedfile import UploadedFile
-from django.core.files.storage import DefaultStorage
-from django.utils.encoding import force_unicode
-from django.conf import settings
-from os import mkdir, path, unlink
+from collections import defaultdict
from errno import EEXIST, ENOENT
from fcntl import flock, LOCK_EX
+from os import mkdir, path, unlink
+from urllib.parse import urljoin
from zipfile import ZipFile
+from django.apps import apps
+from django.conf import settings
+from django.core.files.storage import DefaultStorage
+from django.core.files.uploadedfile import UploadedFile
+from django.http import HttpResponse
+from django.utils.encoding import force_str
+
from reporting.utils import read_chunks
# Use the system (hardware-based) random number generator if it exists.
randrange = random.SystemRandom().randrange
else:
randrange = random.randrange
-MAX_SESSION_KEY = 18446744073709551616L # 2 << 63
+MAX_SESSION_KEY = 18446744073709551616 # 2 << 63
def get_random_hash(seed):
- sha_digest = hashlib.sha1('%s%s%s%s' %
- (randrange(0, MAX_SESSION_KEY), time.time(), unicode(seed).encode('utf-8', 'replace'),
- settings.SECRET_KEY)).digest()
- return urlsafe_b64encode(sha_digest).replace('=', '').replace('_', '-').lower()
+ sha_digest = hashlib.sha1((
+ '%s%s%s%s' % (
+ randrange(0, MAX_SESSION_KEY),
+ time.time(),
+ str(seed).encode('utf-8', 'replace'),
+ settings.SECRET_KEY
+ )
+ ).encode('utf-8')).digest()
+ return urlsafe_b64encode(sha_digest).decode('latin1').replace('=', '').replace('_', '-').lower()
def split_tags(*tag_lists):
try:
unlink(self.lockname)
except OSError as oe:
- if oe.errno != EEXIST:
+ if oe.errno != ENOENT:
raise oe
self.lock.close()
-#@task
-def create_zip(paths, zip_slug):
+# @task
+def create_zip(paths, zip_slug, file_contents=None):
"""
Creates a zip in MEDIA_ROOT/zip directory containing files from path.
Resulting archive filename is ${zip_slug}.zip
if arcname is None:
arcname = path.basename(p)
zipf.write(p, arcname)
+ if file_contents:
+ for arcname, content in file_contents.items():
+ zipf.writestr(arcname, content)
finally:
zipf.close()
for chunk in read_chunks(f):
self.write(chunk)
+
class MultiQuerySet(object):
def __init__(self, *args, **kwargs):
self.querysets = args
(offset, stop, step) = item.indices(self.count())
except AttributeError:
# it's not a slice - make it one
- return self[item : item + 1][0]
+ return self[item:item + 1][0]
items = []
total_len = stop - offset
for qs in self.querysets:
stop = total_len - len(items)
continue
-class SortedMultiQuerySet(MultiQuerySet):
- def __init__(self, *args, **kwargs):
- self.order_by = kwargs.pop('order_by', None)
- self.sortfn = kwargs.pop('sortfn', None)
- if self.order_by is not None:
- self.sortfn = lambda a, b: cmp((getattr(a, f) for f in self.order_by),
- (getattr(b, f) for f in self.order_by))
- super(SortedMultiQuerySet, self).__init__(*args, **kwargs)
-
- def __getitem__(self, item):
- sort_heads = [0] * len(self.querysets)
- try:
- (offset, stop, step) = item.indices(self.count())
- except AttributeError:
- # it's not a slice - make it one
- return self[item : item + 1][0]
- items = []
- total_len = stop - offset
- skipped = 0
- i_s = range(len(sort_heads))
-
- while len(items) < total_len:
- candidate = None
- candidate_i = None
- for i in i_s:
- def get_next():
- return self.querysets[i][sort_heads[i]]
- try:
- if candidate is None:
- candidate = get_next()
- candidate_i = i
- else:
- competitor = get_next()
- if self.sortfn(candidate, competitor) > 0:
- candidate = competitor
- candidate_i = i
- except IndexError:
- continue # continue next sort_head
- # we have no more elements:
- if candidate is None:
- break
- sort_heads[candidate_i] += 1
- if skipped < offset:
- skipped += 1
- continue # continue next item
- items.append(candidate)
-
- return items
-
def truncate_html_words(s, num, end_text='...'):
"""Truncates HTML to a certain number of words (not counting tags and
This is just a version of django.utils.text.truncate_html_words with no space before the end_text.
"""
- s = force_unicode(s)
+ s = force_str(s)
length = int(num)
if length <= 0:
- return u''
+ return ''
html4_singlets = ('br', 'col', 'link', 'base', 'img', 'param', 'area', 'hr', 'input')
# Set up regular expressions
re_words = re.compile(r'&.*?;|<.*?>|(\w[\w-]*)', re.U)
except ValueError:
pass
else:
- # SGML: An end tag closes, back to the matching start tag, all unclosed intervening start tags with omitted end tags
+ # SGML: An end tag closes, back to the matching start tag,
+ # all unclosed intervening start tags with omitted end tags
open_tags = open_tags[i+1:]
else:
# Add it to the start of the open tags list
def __getattribute__(self, name):
if name.startswith('_'):
return object.__getattribute__(self, name)
- value = getattr(settings,
- "%s_%s" % (self._prefix, name),
- object.__getattribute__(self, name))
+ value = getattr(settings, "%s_%s" % (self._prefix, name), object.__getattribute__(self, name))
more = "_more_%s" % name
if hasattr(self, more):
value = getattr(self, more)(value)
def delete_from_cache_by_language(cache, key_template):
cache.delete_many([key_template % lc for lc, ln in settings.LANGUAGES])
+
+
+def gallery_path(slug):
+ return os.path.join(settings.MEDIA_ROOT, settings.IMAGE_DIR, slug) + '/'
+
+
+def gallery_url(slug):
+ return '%s%s%s/' % (settings.MEDIA_URL, settings.IMAGE_DIR, slug)
+
+
+def absolute_url(url):
+ Site = apps.get_model('sites', 'Site')
+ site = Site.objects.get_current()
+ base_url = '%s://%s' % (
+ 'https' if settings.SESSION_COOKIE_SECURE else 'http',
+ site.domain
+ )
+ return urljoin(base_url, url)
+
+
+def get_mp3_length(path):
+ from mutagen.mp3 import MP3
+ return int(MP3(path).info.length)
+
+
+def set_file_permissions(self, fieldfile):
+ if fieldfile.instance.preview:
+ fieldfile.set_readable(False)
+