--- /dev/null
+# Generated by Django 3.0.6 on 2020-06-03 00:11
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('archive', '0016_auto_20200529_1528'),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name='audiobook',
+ name='youtube_volume',
+ field=models.CharField(blank=True, help_text='If set, audiobooks with the save value will be published as single YouTube video.', max_length=100, verbose_name='Volume name for YouTube'),
+ ),
+ ]
part_name = models.CharField(max_length=255, verbose_name=_('part name'), help_text=_('eg. chapter in a novel'),
default='', blank=True)
index = models.IntegerField(verbose_name=_('index'), default=0, help_text=_('Ordering of parts of a book.'))
- youtube_volume = models.CharField(_('Volume name for YouTube'), max_length=1000, blank=True, help_text=_('If set, audiobooks with the save value will be published as single YouTube video.'))
+ youtube_volume = models.CharField(
+ _("Volume name for YouTube"),
+ max_length=100,
+ blank=True,
+ help_text=_(
+ "If set, audiobooks with the save value will be published as single YouTube video."
+ ),
+ )
artist = models.CharField(max_length=255, verbose_name=_('artist'))
conductor = models.CharField(max_length=255, verbose_name=_('conductor'))
encoded_by = models.CharField(max_length=255, verbose_name=_('encoded by'))
prev_volume = a.youtube_volume
return index
+ @property
+ def is_youtube_publishable(self):
+ return (
+ not self.youtube_volume
+ or not type(self)
+ .objects.filter(youtube_volume=self.youtube_volume, index__lt=self.index)
+ .exists()
+ )
+
def get_mp3_tags(self): return json.loads(self.mp3_tags) if self.mp3_tags else None
def get_ogg_tags(self): return json.loads(self.ogg_tags) if self.ogg_tags else None
def get_mp3_published_tags(self): return json.loads(self.mp3_published_tags) if self.mp3_published_tags else None
def book(self):
apidata = requests.get(f'https://wolnelektury.pl/api/books/{self.slug}/').json()
return apidata
-
out_file = NamedTemporaryFile(delete=False, prefix='%d-' % aid, suffix='.%s' % self.ext)
out_file.close()
- self.encode(audiobook.source_file.path, out_file.name)
+ self.encode(self.get_source_file_paths(audiobook), out_file.name)
self.set_status(aid, status.TAGGING)
self.set_tags(audiobook, out_file.name)
self.set_status(aid, status.SENDING)
self.save(audiobook, out_file.name)
+ def get_source_file_paths(self, audiobook):
+ return [audiobook.source_file.path]
+
def on_failure(self, exc, task_id, args, kwargs, einfo):
aid = (args[0], kwargs.get('aid'))[0]
self.set_status(aid, None)
}
@staticmethod
- def encode(in_path, out_path):
+ def encode(in_paths, out_path):
+ assert len(in_paths) == 1
+ in_path = in_paths[0]
# 44.1kHz 64kbps mono MP3
subprocess.check_call(['ffmpeg',
'-i', in_path.encode('utf-8'),
prefix = ext = 'ogg'
@staticmethod
- def encode(in_path, out_path):
+ def encode(in_paths, out_path):
+ assert len(in_paths) == 1
+ in_path = in_paths[0]
# 44.1kHz 64kbps mono Ogg Vorbis
subprocess.check_call(['ffmpeg',
'-i', in_path.encode('utf-8'),
audiobook.mp3_status = None
audiobook.ogg_status = None
audiobook.youtube_status = None
+ audiobook.youtube_queued = None
audiobook.save()
return redirect(file_managed, aid)
def list_publishing(request):
division = 'publishing'
- objects = models.Audiobook.objects.exclude(mp3_status=None, ogg_status=None, youtube_status=None)
+ objects = models.Audiobook.objects.exclude(
+ mp3_status=None, ogg_status=None, youtube_status=None
+ ).order_by("youtube_queued", "title")
objects_by_status = {}
for o in objects:
statuses = set()
--- /dev/null
+- model: archive.Project
+ pk: 1
+ fields:
+ name: A project
+- model: youtube.YouTube
+ pk: 1
+ fields:
+ loop_video: loop.mkv
+ thumbnail_template: "template.jpg"
+
+- model: archive.Audiobook
+ pk: 1
+ fields:
+ title: A title, part 1
+ slug: a-slug
+ source_file: part1.flac
+ project_id: 1
+ index: 1
+- model: archive.Audiobook
+ pk: 2
+ fields:
+ title: A title, part 2
+ slug: a-slug
+ source_file: part2.flac
+ project_id: 1
+ index: 2
+ youtube_volume: parts 2, 3
+- model: archive.Audiobook
+ pk: 3
+ fields:
+ title: A title, part 3
+ slug: a-slug
+ source_file: part3.flac
+ project_id: 1
+ index: 3
+ youtube_volume: parts 2, 3
+- model: archive.Audiobook
+ pk: 4
+ fields:
+ title: A title, part 4
+ slug: a-slug
+ source_file: part4.flac
+ project_id: 1
+ index: 4
--- /dev/null
+from django.core.management.base import BaseCommand
+from archive.constants import status
+from archive.models import Audiobook
+from youtube import tasks
+
+
+class Command(BaseCommand):
+ help = "Schedules some audiobooks for uploading to YouTube."
+
+ def add_arguments(self, parser):
+ parser.add_argument("--limit", type=int, default=6)
+
+ def handle(self, *args, **options):
+ for audiobook in Audiobook.objects.exclude(youtube_queued=None).order_by(
+ "youtube_queued"
+ )[: options["limit"]]:
+ audiobook.youtube_task = tasks.YouTubeTask.delay(
+ None, audiobook.id, True
+ ).task_id
+ audiobook.youtube_status = status.WAITING
+ audiobook.save(update_fields=["youtube_task", "youtube_status"])
"https://www.googleapis.com/youtube/v3/videos",
params={"part": part},
json=data
- )
+ )
- def prepare_file(self, input_path, output_path=None):
- audio = self.prepare_audio(input_path)
- duration = self.get_duration(input_path)
+ def prepare_file(self, input_paths, output_path=None):
+ audio = self.prepare_audio(input_paths)
+ duration = self.get_duration(input_paths)
video = self.prepare_video(duration)
output = mux([video, audio], output_path=output_path)
unlink(audio)
unlink(video)
return output
- def get_duration(self, input_path):
- d = get_duration(input_path)
+ def get_duration(self, input_paths):
+ d = 0
+ for input_path in input_paths:
+ d += get_duration(input_path)
if self.intro_flac:
d += get_duration(self.intro_flac.path)
if self.outro_flac:
d += get_duration(self.outro_flac.path)
return d
-
- def prepare_audio(self, input_path):
+
+ def prepare_audio(self, input_paths):
files = []
if self.intro_flac:
files.append(standardize_audio(self.intro_flac.path))
- files.append(standardize_audio(input_path, cache=False))
+ for input_path in input_paths:
+ files.append(standardize_audio(input_path, cache=False))
if self.outro_flac:
files.append(standardize_audio(self.outro_flac.path))
output = concat_audio(files)
unlink(d)
return output
-
def prepare_video(self, duration):
concat = []
outro = []
buf = io.BytesIO()
img.save(buf, format='PNG')
return buf
-
+
class Card(models.Model):
youtube = models.ForeignKey(YouTube, models.CASCADE)
ext = 'mkv'
prefix = 'youtube'
- def encode(self, in_path, out_path):
- YouTube.objects.first().prepare_file(in_path, out_path)
+ def encode(self, in_paths, out_path):
+ YouTube.objects.first().prepare_file(in_paths, out_path)
def set_tags(self, audiobook, filename):
pass
def put(self, user, audiobook, filename):
YouTube.objects.first().publish(audiobook, filename)
+
+ def get_source_file_paths(self, audiobook):
+ if not audiobook.youtube_volume:
+ return [audiobook.source_file.path]
+ return [
+ a.source_file.path
+ for a in type(audiobook)
+ .objects.filter(
+ slug=audiobook.slug, youtube_volume=audiobook.youtube_volume
+ )
+ .order_by("index")
+ ]
+from unittest import mock
from django.test import TestCase
+from archive.models import Audiobook
+from . import tasks
-# Create your tests here.
+
+@mock.patch("youtube.models.youtube_call")
+@mock.patch("youtube.utils.subprocess")
+@mock.patch("youtube.models.concat_audio")
+@mock.patch("youtube.models.unlink")
+@mock.patch("youtube.models.YouTube.prepare_thumbnail")
+class YouTubeTests(TestCase):
+ fixtures = ["tests.yaml"]
+
+ def test_youtube_volumes(
+ self, prepare_thumbnail, unlink, concat_audio, subprocess, youtube_call
+ ):
+ youtube_call.return_value = mock.Mock(
+ json=mock.Mock(return_value={"id": "deadbeef"})
+ )
+ audiobooks = Audiobook.objects.all().order_by("index")
+
+ self.assertEqual(audiobooks[0].youtube_volume_count, 3)
+ self.assertEqual([a.youtube_volume_index for a in audiobooks], [1, 2, 2, 3])
+ self.assertEqual(
+ [a.is_youtube_publishable for a in audiobooks], [True, True, False, True]
+ )
+
+ tasks.YouTubeTask().run(None, 2, True)
+
+ # In creating a volume of two audiobooks, we should've called concat with a list of two files.
+ self.assertEqual(len(concat_audio.call_args[0][0]), 2)
@permission_required('archive.change_audiobook')
def publish(request, aid, publish=True):
audiobook = get_object_or_404(Audiobook, id=aid)
- audiobook.youtube_status = status.QUEUED
- audiobook.youtube_queued = now()
- audiobook.save(update_fields=['youtube_status', 'youtube_queued'])
+ if audiobook.is_youtube_publishable:
+ audiobook.youtube_status = status.QUEUED
+ audiobook.youtube_queued = now()
+ audiobook.save(update_fields=['youtube_status', 'youtube_queued'])
return redirect(reverse('file', args=[aid]))