3 from django.db import models
4 from django.contrib.auth.models import User
5 from requests_oauthlib import OAuth2Session
6 from requests_toolbelt.streaming_iterator import StreamingIterator
7 from .settings import YOUTUBE_CLIENT_ID, YOUTUBE_CLIENT_SECRET, YOUTUBE_TOKEN_URL
10 class OAuthConnection(models.Model):
11 user = models.ForeignKey(User, models.CASCADE)
12 access = models.BooleanField(default=False)
13 token = models.CharField(max_length=64, null=True, blank=True)
14 token_secret = models.CharField(max_length=64, null=True, blank=True)
19 return cls.objects.get(user=user)
20 except cls.DoesNotExist:
26 class YouTubeToken(models.Model):
27 token = models.TextField()
29 def token_updater(self, token):
30 self.token = json.dumps(token)
33 def get_session(self):
35 client_id=YOUTUBE_CLIENT_ID,
36 auto_refresh_url=YOUTUBE_TOKEN_URL,
37 token=json.loads(self.token),
38 auto_refresh_kwargs={'client_id':YOUTUBE_CLIENT_ID,'client_secret':YOUTUBE_CLIENT_SECRET},
39 token_updater=self.token_updater
42 def call(self, method, url, params=None, json=None, data=None, resumable_file_path=None):
44 if resumable_file_path:
45 params['uploadType'] = 'resumable'
46 file_size = os.stat(resumable_file_path).st_size
48 session = self.get_session()
49 response = session.request(
56 'X-Upload-Content-Length': str(file_size),
57 'x-upload-content-type': 'application/octet-stream',
58 } if resumable_file_path else {}
60 response.raise_for_status()
61 if resumable_file_path:
62 location = response.headers['Location']
63 with open(resumable_file_path, 'rb') as f:
64 response = session.put(
66 data=StreamingIterator(file_size, f),
67 headers={"Content-Type": "application/octet-stream"},
69 response.raise_for_status()