42ebc9118890a15b14b6545b91c7f28b29908c81
[audio.git] / src / youtube / utils.py
1 import os
2 import shutil
3 import subprocess
4 from tempfile import NamedTemporaryFile
5 from django.conf import settings
6
7
8 FILE_CACHE = getattr(settings, 'FILE_CACHE', 'file_cache/')
9
10
11 def link_or_copy(src, dst):
12     dstdir = os.path.dirname(dst)
13     if not os.path.exists(dstdir):
14         os.makedirs(dstdir)
15     if os.path.exists(dst):
16         os.unlink(dst)
17         # FIXME: tiny window here when the temp path is not taken.
18     try:
19         os.link(src, dst)
20     except OSError:
21         shutil.copyfile(src, dst)
22
23
24 def process_to_file(cmdline, prefix='', suffix='', cache_key=None, output_path=None):
25     if not output_path:
26         tmp = NamedTemporaryFile(prefix=prefix, suffix=suffix, delete=False)
27         tmp.close()
28         output_path = tmp.name
29
30     if cache_key:
31         cache_path = FILE_CACHE + cache_key.replace('/', '__')
32
33     if cache_key and os.path.exists(cache_path):
34         link_or_copy(cache_path, output_path)
35     else:
36         # Actually run the processing.
37         subprocess.run(cmdline + [output_path], check=True)
38         if cache_key:
39             link_or_copy(output_path, cache_path)
40
41     return output_path
42
43
44 def video_from_image(img_path, duration, fps=25, cache=True):
45     return process_to_file(
46         ['ffmpeg', '-y', '-loop', '1', '-t', str(duration), '-i', img_path, '-c:v', 'libx264', '-vf', f'fps={fps},format=yuv420p'],
47         'image-',
48         '.mkv',
49         f'video_from_image:{img_path}:{duration}:{fps}.mkv' if cache else None
50     )
51
52
53 def cut_video(video_path, duration):
54     return process_to_file(
55         ['ffmpeg', '-y', '-i', video_path, '-t', str(duration)],
56         'cut-',
57         '.mkv'
58     )
59
60
61 def ffmpeg_concat(paths, suffix, copy=False):
62     filelist = NamedTemporaryFile(prefix='concat-', suffix='.txt')
63     for path in paths:
64         filelist.write(f"file '{path}'\n".encode('utf-8'))
65     filelist.flush()
66
67     args = ['ffmpeg', '-y', '-safe', '0', '-f', 'concat', '-i', filelist.name]
68     if copy:
69         args += ['-c', 'copy']
70     outname = process_to_file(args, 'concat-', suffix)
71
72     filelist.close()
73     return outname
74
75
76 def concat_videos(paths):
77     return ffmpeg_concat(paths, '.mkv', copy=True)
78
79
80 def concat_audio(paths):
81     return ffmpeg_concat(paths, '.flac')
82
83
84 def standardize_audio(p, cache=True):
85     return process_to_file(
86         ['ffmpeg', '-y', '-i', p, '-sample_fmt', 's16', '-acodec', 'flac', '-ac', '2', '-ar', '44100'],
87         'standardize-', '.flac',
88         f'standardize_audio:{p}.flac' if cache else None
89     )
90
91
92 def standardize_video(p, cache=True):
93     return process_to_file(
94         ['ffmpeg', '-y', '-i', p],
95         'standardize-', '.mkv',
96         f'standardize_video:{p}.mkv' if cache else None
97     )
98
99
100 def mux(channels, output_path=None):
101     args = ['ffmpeg', '-y']
102     for c in channels:
103         args.extend(['-i', c])
104     return process_to_file(args, 'mux-', '.mkv', output_path=output_path)
105
106
107 def get_duration(path):
108     return float(
109         subprocess.run(
110             [
111                 "ffprobe",
112                 "-i",
113                 path,
114                 "-show_entries",
115                 "format=duration",
116                 "-v",
117                 "quiet",
118                 "-of",
119                 "csv=p=0",
120             ],
121             capture_output=True,
122             text=True,
123             check=True,
124         ).stdout
125     )
126
127
128 def get_framerate(path):
129     rates = subprocess.run(
130             [
131                 "ffprobe",
132                 "-i",
133                 path,
134                 "-show_entries",
135                 "stream=r_frame_rate",
136                 "-v",
137                 "quiet",
138                 "-of",
139                 "csv=p=0",
140             ],
141             capture_output=True,
142             text=True,
143             check=True,
144         ).stdout.strip().split('\n')
145     for rate in rates:
146         a, b = rate.split('/')
147         if b == '1':
148             return int(a)