X-Git-Url: https://git.mdrn.pl/wolnelektury.git/blobdiff_plain/f3e9066b40546163f0df4678702ca4d7d31f9f84..7010b5cb6b4e4380d9e23add9c7c98431b6e3f95:/src/api/tests/tests.py diff --git a/src/api/tests/tests.py b/src/api/tests/tests.py index 32d394e70..c9afad7b0 100644 --- a/src/api/tests/tests.py +++ b/src/api/tests/tests.py @@ -1,10 +1,17 @@ -# -*- coding: utf-8 -*- # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later. # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information. # -from os import path +from base64 import b64encode +import hashlib +import hmac +from io import BytesIO import json +from os import path +from time import time +from unittest.mock import patch +from urllib.parse import quote, urlencode, parse_qs +from django.contrib.auth.models import User from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase from django.test.utils import override_settings @@ -13,6 +20,7 @@ from catalogue.models import Book, Tag from picture.forms import PictureImportForm from picture.models import Picture import picture.tests +from api.models import Consumer, Token @override_settings( @@ -21,6 +29,8 @@ import picture.tests 'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}}, ) class ApiTest(TestCase): + maxDiff = None + def load_json(self, url): content = self.client.get(url).content try: @@ -29,19 +39,32 @@ class ApiTest(TestCase): self.fail('No JSON could be decoded: %s' % content) return data + def assert_response(self, url, name): + content = self.client.get(url).content.decode('utf-8').rstrip() + filename = path.join(path.dirname(__file__), 'res', 'responses', name) + with open(filename) as f: + good_content = f.read().rstrip() + self.assertEqual(content, good_content, content) + def assert_json_response(self, url, name): data = self.load_json(url) - with open(path.join(path.dirname(__file__), 'res', 'responses', name)) as f: + filename = path.join(path.dirname(__file__), 'res', 'responses', name) + with open(filename) as f: good_data = json.load(f) self.assertEqual(data, good_data, json.dumps(data, indent=4)) + def assert_slugs(self, url, slugs): + have_slugs = [x['slug'] for x in self.load_json(url)] + self.assertEqual(have_slugs, slugs, have_slugs) + class BookTests(ApiTest): def setUp(self): self.tag = Tag.objects.create(category='author', slug='joe') self.book = Book.objects.create(title='A Book', slug='a-book') - self.book_tagged = Book.objects.create(title='Tagged Book', slug='tagged-book') + self.book_tagged = Book.objects.create( + title='Tagged Book', slug='tagged-book') self.book_tagged.tags = [self.tag] self.book_tagged.save() @@ -65,7 +88,8 @@ class BookTests(ApiTest): class TagTests(ApiTest): def setUp(self): - self.tag = Tag.objects.create(category='author', slug='joe', name='Joe') + self.tag = Tag.objects.create( + category='author', slug='joe', name='Joe') self.book = Book.objects.create(title='A Book', slug='a-book') self.book.tags = [self.tag] self.book.save() @@ -85,9 +109,15 @@ class PictureTests(ApiTest): def test_publish(self): slug = "kandinsky-composition-viii" xml = SimpleUploadedFile( - 'composition8.xml', open(path.join(picture.tests.__path__[0], "files", slug + ".xml")).read()) + 'composition8.xml', + open(path.join( + picture.tests.__path__[0], "files", slug + ".xml" + ), 'rb').read()) img = SimpleUploadedFile( - 'kompozycja-8.png', open(path.join(picture.tests.__path__[0], "files", slug + ".png")).read()) + 'kompozycja-8.png', + open(path.join( + picture.tests.__path__[0], "files", slug + ".png" + ), 'rb').read()) import_form = PictureImportForm({}, { 'picture_xml_file': xml, @@ -101,19 +131,369 @@ class PictureTests(ApiTest): Picture.objects.get(slug=slug) -class URLTests(ApiTest): +class BooksTests(ApiTest): fixtures = ['test-books.yaml'] - def test_get(self): - # book lists - self.assert_json_response('/api/audiobooks/', 'audiobooks.json') - self.assert_json_response('/api/daisy/', 'daisy.json') + def test_books(self): + self.assert_json_response('/api/books/', 'books.json') + self.assert_json_response('/api/books/?new_api=true', 'books.json') + self.assert_response('/api/books/?format=xml', 'books.xml') + + self.assert_slugs('/api/audiobooks/', ['parent']) + self.assert_slugs('/api/daisy/', ['parent']) + self.assert_slugs('/api/newest/', ['parent']) + self.assert_slugs('/api/parent_books/', ['parent']) + self.assert_slugs('/api/recommended/', ['parent']) + + # Book paging. + self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent']) + self.assert_slugs( + '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent']) + + # By tag. + self.assert_slugs('/api/authors/john-doe/books/', ['parent']) + self.assert_slugs( + '/api/genres/sonet/books/?authors=john-doe', + ['parent']) + # It is probably a mistake that this doesn't filter: + self.assert_slugs( + '/api/books/?authors=john-doe', + ['child', 'grandchild', 'parent']) + + # Parent books by tag. + # Notice this contains a grandchild, if a child doesn't have the tag. + # This probably isn't really intended behavior and should be redefined. + self.assert_slugs( + '/api/genres/sonet/parent_books/', + ['grandchild', 'parent']) + + def test_ebooks(self): self.assert_json_response('/api/ebooks/', 'ebooks.json') + + def test_filter_books(self): self.assert_json_response('/api/filter-books/', 'filter-books.json') - self.assert_json_response('/api/newest/', 'newest.json') + self.assert_slugs( + '/api/filter-books/?lektura=false', + ['child', 'grandchild', 'parent']) + self.assert_slugs( + '/api/filter-books/?lektura=true', + []) + + Book.objects.filter(slug='grandchild').update(preview=True) + # Skipping: we don't allow previewed books in filtered list. + #self.assert_slugs( + # '/api/filter-books/?preview=true', + # ['grandchild']) + self.assert_slugs( + '/api/filter-books/?preview=false', + ['child', 'parent']) + Book.objects.filter(slug='grandchild').update(preview=False) + + self.assert_slugs( + '/api/filter-books/?audiobook=true', + ['parent']) + self.assert_slugs( + '/api/filter-books/?audiobook=false', + ['child', 'grandchild']) - self.assert_json_response('/api/blog/', 'blog.json') - self.assert_json_response('/api/preview/', 'preview.json') - self.assert_json_response('/api/recommended/', 'recommended.json') + self.assert_slugs('/api/filter-books/?genres=wiersz', ['child']) + self.assert_slugs('/api/filter-books/?search=parent', ['parent']) + + def test_collections(self): self.assert_json_response('/api/collections/', 'collections.json') + self.assert_json_response( + '/api/collections/a-collection/', + 'collection.json') + + def test_book(self): + self.assert_json_response('/api/books/parent/', 'books-parent.json') + self.assert_json_response('/api/books/child/', 'books-child.json') + self.assert_json_response( + '/api/books/grandchild/', + 'books-grandchild.json') + + def test_tags(self): + # List of tags by category. + self.assert_json_response('/api/genres/', 'tags.json') + + def test_fragments(self): + # This is not supported, though it probably should be. + # self.assert_json_response( + # '/api/books/child/fragments/', + # 'fragments.json') + + self.assert_json_response( + '/api/genres/wiersz/fragments/', + 'fragments.json') + self.assert_json_response( + '/api/books/child/fragments/an-anchor/', + 'fragment.json') + + +class BlogTests(ApiTest): + def test_get(self): + self.assertEqual(self.load_json('/api/blog'), []) + + +class OAuth1Tests(ApiTest): + @classmethod + def setUpClass(cls): + cls.user = User.objects.create(username='test') + cls.user.set_password('test') + cls.user.save() + cls.consumer_secret = 'len(quote(consumer secret))>=32' + Consumer.objects.create( + key='client', + secret=cls.consumer_secret + ) + + @classmethod + def tearDownClass(cls): + User.objects.all().delete() + + def test_create_token(self): + # Fetch request token. + base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&" + "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&" + "oauth_version=1.0".format(int(time()))) + raw = '&'.join([ + 'GET', + quote('http://testserver/api/oauth/request_token/', safe=''), + quote(base_query, safe='') + ]) + h = hmac.new( + (quote(self.consumer_secret) + '&').encode('latin1'), + raw.encode('latin1'), + hashlib.sha1 + ).digest() + h = b64encode(h).rstrip(b'\n') + sign = quote(h) + query = "{}&oauth_signature={}".format(base_query, sign) + response = self.client.get('/api/oauth/request_token/?' + query) + request_token_data = parse_qs(response.content.decode('latin1')) + request_token = request_token_data['oauth_token'][0] + request_token_secret = request_token_data['oauth_token_secret'][0] + + # Request token authorization. + self.client.login(username='test', password='test') + response = self.client.get( + '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % ( + request_token, + ) + ) + post_data = response.context['form'].initial + + response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data)) + self.assertEqual( + response['Location'], + 'test://oauth.callback/?oauth_token=' + request_token + ) + + # Fetch access token. + base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&" + "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&" + "oauth_token={}&oauth_version=1.0".format( + int(time()), request_token)) + raw = '&'.join([ + 'GET', + quote('http://testserver/api/oauth/access_token/', safe=''), + quote(base_query, safe='') + ]) + h = hmac.new( + (quote(self.consumer_secret) + '&' + + quote(request_token_secret, safe='')).encode('latin1'), + raw.encode('latin1'), + hashlib.sha1 + ).digest() + h = b64encode(h).rstrip(b'\n') + sign = quote(h) + query = "{}&oauth_signature={}".format(base_query, sign) + response = self.client.get('/api/oauth/access_token/?' + query) + access_token_data = parse_qs(response.content.decode('latin1')) + access_token = access_token_data['oauth_token'][0] + + self.assertTrue( + Token.objects.filter( + key=access_token, + token_type=Token.ACCESS, + user=self.user + ).exists()) + + +class AuthorizedTests(ApiTest): + fixtures = ['test-books.yaml'] + + @classmethod + def setUpClass(cls): + super(AuthorizedTests, cls).setUpClass() + cls.user = User.objects.create(username='test') + cls.consumer = Consumer.objects.create( + key='client', secret='12345678901234567890123456789012') + cls.token = Token.objects.create( + key='123456789012345678', + secret='12345678901234567890123456789012', + user=cls.user, + consumer=cls.consumer, + token_type=Token.ACCESS, + timestamp=time()) + cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1') + + @classmethod + def tearDownClass(cls): + cls.user.delete() + cls.consumer.delete() + super(AuthorizedTests, cls).tearDownClass() + + def signed(self, url, method='GET', params=None, data=None): + auth_params = { + "oauth_consumer_key": self.consumer.key, + "oauth_nonce": ("%f" % time()).replace('.', ''), + "oauth_signature_method": "HMAC-SHA1", + "oauth_timestamp": int(time()), + "oauth_token": self.token.key, + "oauth_version": "1.0", + } + + sign_params = {} + if params: + sign_params.update(params) + if data: + sign_params.update(data) + sign_params.update(auth_params) + raw = "&".join([ + method.upper(), + quote('http://testserver' + url, safe=''), + quote("&".join( + quote(str(k), safe='') + "=" + quote(str(v), safe='') + for (k, v) in sorted(sign_params.items()))) + ]) + auth_params["oauth_signature"] = quote(b64encode(hmac.new( + self.key, + raw.encode('latin1'), + hashlib.sha1 + ).digest()).rstrip(b'\n')) + auth = 'OAuth realm="API", ' + ', '.join( + '{}="{}"'.format(k, v) for (k, v) in auth_params.items()) + + if params: + url = url + '?' + urlencode(params) + return getattr(self.client, method.lower())( + url, + data=urlencode(data) if data else None, + content_type='application/x-www-form-urlencoded', + HTTP_AUTHORIZATION=auth, + ) + + def signed_json(self, url, method='GET', params=None, data=None): + return json.loads(self.signed(url, method, params, data).content) + + def test_books(self): + self.assertEqual( + [b['liked'] for b in self.signed_json('/api/books/')], + [False, False, False] + ) + data = self.signed_json('/api/books/child/') + self.assertFalse(data['parent']['liked']) + self.assertFalse(data['children'][0]['liked']) + + self.assertEqual( + self.signed_json('/api/like/parent/'), + {"likes": False} + ) + self.signed('/api/like/parent/', 'POST') + self.assertEqual( + self.signed_json('/api/like/parent/'), + {"likes": True} + ) + # There are several endpoints where 'liked' appears. + self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked']) + self.assertTrue(self.signed_json( + '/api/filter-books/', params={"search": "parent"})[0]['liked']) + + self.assertTrue(self.signed_json( + '/api/books/child/')['parent']['liked']) + # Liked books go on shelf. + self.assertEqual( + [x['slug'] for x in self.signed_json('/api/shelf/likes/')], + ['parent']) + + self.signed('/api/like/parent/', 'POST', {"action": "unlike"}) + self.assertEqual( + self.signed_json('/api/like/parent/'), + {"likes": False} + ) + self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked']) + + def test_reading(self): + self.assertEqual( + self.signed_json('/api/reading/parent/'), + {"state": "not_started"} + ) + self.signed('/api/reading/parent/reading/', 'post') + self.assertEqual( + self.signed_json('/api/reading/parent/'), + {"state": "reading"} + ) + self.assertEqual( + [x['slug'] for x in self.signed_json('/api/shelf/reading/')], + ['parent']) + + def test_subscription(self): + Book.objects.filter(slug='grandchild').update(preview=True) + + self.assert_slugs('/api/preview/', ['grandchild']) + self.assertEqual( + self.signed_json('/api/username/'), + {"username": "test", "premium": False}) + self.assertEqual( + self.signed('/api/epub/grandchild/').status_code, + 403) + + with patch('club.models.Membership.is_active_for', return_value=True): + self.assertEqual( + self.signed_json('/api/username/'), + {"username": "test", "premium": True}) + with patch('django.core.files.storage.Storage.open', + return_value=BytesIO(b"")): + self.assertEqual( + self.signed('/api/epub/grandchild/').content, + b"") + + Book.objects.filter(slug='grandchild').update(preview=False) + + def test_publish(self): + response = self.signed('/api/books/', + method='POST', + data={"data": json.dumps({})}) + self.assertEqual(response.status_code, 403) + + response = self.signed('/api/pictures/', + method='POST', + data={"data": json.dumps({})}) + self.assertEqual(response.status_code, 403) + + self.user.is_superuser = True + self.user.save() + + with patch('catalogue.models.Book.from_xml_file') as mock: + response = self.signed('/api/books/', + method='POST', + data={"data": json.dumps({ + "book_xml": "" + })}) + self.assertTrue(mock.called) + self.assertEqual(response.status_code, 201) + + with patch('picture.models.Picture.from_xml_file') as mock: + response = self.signed('/api/pictures/', + method='POST', + data={"data": json.dumps({ + "picture_xml": "", + "picture_image_data": "Kg==", + })}) + self.assertTrue(mock.called) + self.assertEqual(response.status_code, 201) + + self.user.is_superuser = False + self.user.save()