+class OAuth1Tests(ApiTest):
+ @classmethod
+ def setUpClass(cls):
+ cls.user = User.objects.create(username='test')
+ cls.user.set_password('test')
+ cls.user.save()
+ cls.consumer_secret = 'len(quote(consumer secret))>=32'
+ Consumer.objects.create(
+ key='client',
+ secret=cls.consumer_secret
+ )
+
+ @classmethod
+ def tearDownClass(cls):
+ User.objects.all().delete()
+
+ def test_create_token(self):
+ # Fetch request token.
+ base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
+ "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
+ "oauth_version=1.0".format(int(time())))
+ raw = '&'.join([
+ 'GET',
+ quote('http://testserver/api/oauth/request_token/', safe=''),
+ quote(base_query, safe='')
+ ])
+ h = hmac.new(
+ (quote(self.consumer_secret) + '&').encode('latin1'),
+ raw.encode('latin1'),
+ hashlib.sha1
+ ).digest()
+ h = b64encode(h).rstrip(b'\n')
+ sign = quote(h)
+ query = "{}&oauth_signature={}".format(base_query, sign)
+ response = self.client.get('/api/oauth/request_token/?' + query)
+ request_token_data = parse_qs(response.content.decode('latin1'))
+ request_token = request_token_data['oauth_token'][0]
+ request_token_secret = request_token_data['oauth_token_secret'][0]
+
+ # Request token authorization.
+ self.client.login(username='test', password='test')
+ response = self.client.get('/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % request_token)
+ post_data = response.context['form'].initial
+
+ response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
+ self.assertEqual(
+ response['Location'],
+ 'test://oauth.callback/?oauth_token=' + request_token
+ )
+
+ # Fetch access token.
+ base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
+ "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
+ "oauth_token={}&oauth_version=1.0".format(
+ int(time()), request_token))
+ raw = '&'.join([
+ 'GET',
+ quote('http://testserver/api/oauth/access_token/', safe=''),
+ quote(base_query, safe='')
+ ])
+ h = hmac.new(
+ (quote(self.consumer_secret) + '&' +
+ quote(request_token_secret, safe='')).encode('latin1'),
+ raw.encode('latin1'),
+ hashlib.sha1
+ ).digest()
+ h = b64encode(h).rstrip(b'\n')
+ sign = quote(h)
+ query = u"{}&oauth_signature={}".format(base_query, sign)
+ response = self.client.get(u'/api/oauth/access_token/?' + query)
+ access_token_data = parse_qs(response.content.decode('latin1'))
+ access_token = access_token_data['oauth_token'][0]
+
+ self.assertTrue(
+ Token.objects.filter(
+ key=access_token,
+ token_type=Token.ACCESS,
+ user=self.user
+ ).exists())
+
+
+class AuthorizedTests(ApiTest):
+ fixtures = ['test-books.yaml']
+
+ @classmethod
+ def setUpClass(cls):
+ super(AuthorizedTests, cls).setUpClass()
+ cls.user = User.objects.create(username='test')
+ cls.consumer = Consumer.objects.create(
+ key='client', secret='12345678901234567890123456789012')
+ cls.token = Token.objects.create(
+ key='123456789012345678',
+ secret='12345678901234567890123456789012',
+ user=cls.user,
+ consumer=cls.consumer,
+ token_type=Token.ACCESS,
+ timestamp=time())
+ cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.user.delete()
+ cls.consumer.delete()
+ super(AuthorizedTests, cls).tearDownClass()
+
+ def signed(self, url, method='GET', params=None, data=None):
+ auth_params = {
+ "oauth_consumer_key": self.consumer.key,
+ "oauth_nonce": ("%f" % time()).replace('.', ''),
+ "oauth_signature_method": "HMAC-SHA1",
+ "oauth_timestamp": int(time()),
+ "oauth_token": self.token.key,
+ "oauth_version": "1.0",
+ }
+
+ sign_params = {}
+ if params:
+ sign_params.update(params)
+ if data:
+ sign_params.update(data)
+ sign_params.update(auth_params)
+ raw = "&".join([
+ method.upper(),
+ quote('http://testserver' + url, safe=''),
+ quote("&".join(
+ quote(str(k), safe='') + "=" + quote(str(v), safe='')
+ for (k, v) in sorted(sign_params.items())))
+ ])
+ auth_params["oauth_signature"] = quote(b64encode(hmac.new(
+ self.key,
+ raw.encode('latin1'),
+ hashlib.sha1
+ ).digest()).rstrip(b'\n'))
+ auth = 'OAuth realm="API", ' + ', '.join(
+ '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
+
+ if params:
+ url = url + '?' + urlencode(params)
+ return getattr(self.client, method.lower())(
+ url,
+ data=urlencode(data) if data else None,
+ content_type='application/x-www-form-urlencoded',
+ HTTP_AUTHORIZATION=auth,
+ )
+
+ def signed_json(self, url, method='GET', params=None, data=None):
+ return json.loads(self.signed(url, method, params, data).content)
+
+ def test_books(self):
+ self.assertEqual(
+ [b['liked'] for b in self.signed_json('/api/books/')],
+ [False, False, False]
+ )
+ data = self.signed_json('/api/books/child/')
+ self.assertFalse(data['parent']['liked'])
+ self.assertFalse(data['children'][0]['liked'])
+
+ self.assertEqual(
+ self.signed_json('/api/like/parent/'),
+ {"likes": False}
+ )
+ self.signed('/api/like/parent/', 'POST')
+ self.assertEqual(
+ self.signed_json('/api/like/parent/'),
+ {"likes": True}
+ )
+ # There are several endpoints where 'liked' appears.
+ self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
+ self.assertTrue(self.signed_json(
+ '/api/filter-books/', params={"search": "parent"})[0]['liked'])
+
+ self.assertTrue(self.signed_json(
+ '/api/books/child/')['parent']['liked'])
+ # Liked books go on shelf.
+ self.assertEqual(
+ [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
+ ['parent'])
+
+ self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
+ self.assertEqual(
+ self.signed_json('/api/like/parent/'),
+ {"likes": False}
+ )
+ self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
+
+ def test_reading(self):
+ self.assertEqual(
+ self.signed_json('/api/reading/parent/'),
+ {"state": "not_started"}
+ )
+ self.signed('/api/reading/parent/reading/', 'post')
+ self.assertEqual(
+ self.signed_json('/api/reading/parent/'),
+ {"state": "reading"}
+ )
+ self.assertEqual(
+ [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
+ ['parent'])
+
+ def test_subscription(self):
+ self.assert_slugs('/api/preview/', ['grandchild'])
+ self.assertEqual(
+ self.signed_json('/api/username/'),
+ {"username": "test", "premium": False})
+ self.assertEqual(
+ self.signed('/api/epub/grandchild/').status_code,
+ 403)
+
+ with patch('club.models.Membership.is_active_for', return_value=True):
+ self.assertEqual(
+ self.signed_json('/api/username/'),
+ {"username": "test", "premium": True})
+ with patch('django.core.files.storage.Storage.open',
+ return_value=BytesIO(b"<epub>")):
+ self.assertEqual(
+ self.signed('/api/epub/grandchild/').content,
+ b"<epub>")
+
+ def test_publish(self):
+ response = self.signed('/api/books/',
+ method='POST',
+ data={"data": json.dumps({})})
+ self.assertEqual(response.status_code, 403)
+
+ response = self.signed('/api/pictures/',
+ method='POST',
+ data={"data": json.dumps({})})
+ self.assertEqual(response.status_code, 403)
+
+ self.user.is_superuser = True
+ self.user.save()
+
+ with patch('catalogue.models.Book.from_xml_file') as mock:
+ response = self.signed('/api/books/',
+ method='POST',
+ data={"data": json.dumps({
+ "book_xml": "<utwor/>"
+ })})
+ self.assertTrue(mock.called)
+ self.assertEqual(response.status_code, 201)
+
+ with patch('picture.models.Picture.from_xml_file') as mock:
+ response = self.signed('/api/pictures/',
+ method='POST',
+ data={"data": json.dumps({
+ "picture_xml": "<utwor/>",
+ "picture_image_data": "Kg==",
+ })})
+ self.assertTrue(mock.called)
+ self.assertEqual(response.status_code, 201)
+
+ self.user.is_superuser = False
+ self.user.save()