eccdd04a8724360e2eeb55da30d7f49033403087
[wolnelektury.git] / src / api / tests / tests.py
1 # -*- coding: utf-8 -*-
2 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
3 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 #
5 from base64 import b64encode
6 from os import path
7 import hashlib
8 import hmac
9 import json
10 from io import BytesIO
11 from time import time
12 from urllib.parse import quote, urlencode, parse_qs
13
14 from django.contrib.auth.models import User
15 from django.core.files.uploadedfile import SimpleUploadedFile
16 from django.test import TestCase
17 from django.test.utils import override_settings
18 from unittest.mock import patch
19 from api.models import Consumer, Token
20
21 from catalogue.models import Book, Tag
22 from picture.forms import PictureImportForm
23 from picture.models import Picture
24 import picture.tests
25
26
27 @override_settings(
28     NO_SEARCH_INDEX=True,
29     CACHES={'default': {
30         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
31 )
32 class ApiTest(TestCase):
33     def load_json(self, url):
34         content = self.client.get(url).content
35         try:
36             data = json.loads(content)
37         except ValueError:
38             self.fail('No JSON could be decoded: %s' % content)
39         return data
40
41     def assert_response(self, url, name):
42         content = self.client.get(url).content.decode('utf-8').rstrip()
43         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
44         with open(filename) as f:
45             good_content = f.read().rstrip()
46         self.assertEqual(content, good_content, content)
47
48     def assert_json_response(self, url, name):
49         data = self.load_json(url)
50         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
51         with open(filename) as f:
52             good_data = json.load(f)
53         self.assertEqual(data, good_data, json.dumps(data, indent=4))
54
55     def assert_slugs(self, url, slugs):
56         have_slugs = [x['slug'] for x in self.load_json(url)]
57         self.assertEqual(have_slugs, slugs, have_slugs)
58
59
60 class BookTests(ApiTest):
61
62     def setUp(self):
63         self.tag = Tag.objects.create(category='author', slug='joe')
64         self.book = Book.objects.create(title='A Book', slug='a-book')
65         self.book_tagged = Book.objects.create(
66             title='Tagged Book', slug='tagged-book')
67         self.book_tagged.tags = [self.tag]
68         self.book_tagged.save()
69
70     def test_book_list(self):
71         books = self.load_json('/api/books/')
72         self.assertEqual(len(books), 2,
73                          'Wrong book list.')
74
75     def test_tagged_books(self):
76         books = self.load_json('/api/authors/joe/books/')
77
78         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
79                          'Wrong tagged book list.')
80
81     def test_detail(self):
82         book = self.load_json('/api/books/a-book/')
83         self.assertEqual(book['title'], self.book.title,
84                          'Wrong book details.')
85
86
87 class TagTests(ApiTest):
88
89     def setUp(self):
90         self.tag = Tag.objects.create(
91             category='author', slug='joe', name='Joe')
92         self.book = Book.objects.create(title='A Book', slug='a-book')
93         self.book.tags = [self.tag]
94         self.book.save()
95
96     def test_tag_list(self):
97         tags = self.load_json('/api/authors/')
98         self.assertEqual(len(tags), 1,
99                          'Wrong tag list.')
100
101     def test_tag_detail(self):
102         tag = self.load_json('/api/authors/joe/')
103         self.assertEqual(tag['name'], self.tag.name,
104                          'Wrong tag details.')
105
106
107 class PictureTests(ApiTest):
108     def test_publish(self):
109         slug = "kandinsky-composition-viii"
110         xml = SimpleUploadedFile(
111             'composition8.xml',
112             open(path.join(
113                 picture.tests.__path__[0], "files", slug + ".xml"
114             ), 'rb').read())
115         img = SimpleUploadedFile(
116             'kompozycja-8.png',
117             open(path.join(
118                 picture.tests.__path__[0], "files", slug + ".png"
119             ), 'rb').read())
120
121         import_form = PictureImportForm({}, {
122             'picture_xml_file': xml,
123             'picture_image_file': img
124             })
125
126         assert import_form.is_valid()
127         if import_form.is_valid():
128             import_form.save()
129
130         Picture.objects.get(slug=slug)
131
132
133 class BooksTests(ApiTest):
134     fixtures = ['test-books.yaml']
135
136     def test_books(self):
137         self.assert_json_response('/api/books/', 'books.json')
138         self.assert_json_response('/api/books/?new_api=true', 'books.json')
139         self.assert_response('/api/books/?format=xml', 'books.xml')
140
141         self.assert_slugs('/api/audiobooks/', ['parent'])
142         self.assert_slugs('/api/daisy/', ['parent'])
143         self.assert_slugs('/api/newest/', ['parent'])
144         self.assert_slugs('/api/parent_books/', ['parent'])
145         self.assert_slugs('/api/recommended/', ['parent'])
146
147         # Book paging.
148         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
149         self.assert_slugs(
150             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
151
152         # By tag.
153         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
154         self.assert_slugs(
155             '/api/genres/sonet/books/?authors=john-doe',
156             ['parent'])
157         # It is probably a mistake that this doesn't filter:
158         self.assert_slugs(
159             '/api/books/?authors=john-doe',
160             ['child', 'grandchild', 'parent'])
161
162         # Parent books by tag.
163         # Notice this contains a grandchild, if a child doesn't have the tag.
164         # This probably isn't really intended behavior and should be redefined.
165         self.assert_slugs(
166             '/api/genres/sonet/parent_books/',
167             ['grandchild', 'parent'])
168
169     def test_ebooks(self):
170         self.assert_json_response('/api/ebooks/', 'ebooks.json')
171
172     def test_filter_books(self):
173         self.assert_json_response('/api/filter-books/', 'filter-books.json')
174         self.assert_slugs(
175             '/api/filter-books/?lektura=false',
176             ['child', 'grandchild', 'parent'])
177         self.assert_slugs(
178             '/api/filter-books/?lektura=true',
179             [])
180
181         self.assert_slugs(
182             '/api/filter-books/?preview=true',
183             ['grandchild'])
184         self.assert_slugs(
185             '/api/filter-books/?preview=false',
186             ['child', 'parent'])
187
188         self.assert_slugs(
189             '/api/filter-books/?audiobook=true',
190             ['parent'])
191         self.assert_slugs(
192             '/api/filter-books/?audiobook=false',
193             ['child', 'grandchild'])
194
195         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
196
197         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
198
199     def test_collections(self):
200         self.assert_json_response('/api/collections/', 'collections.json')
201         self.assert_json_response(
202             '/api/collections/a-collection/',
203             'collection.json')
204
205     def test_book(self):
206         self.assert_json_response('/api/books/parent/', 'books-parent.json')
207         self.assert_json_response('/api/books/child/', 'books-child.json')
208         self.assert_json_response(
209             '/api/books/grandchild/',
210             'books-grandchild.json')
211
212     def test_tags(self):
213         # List of tags by category.
214         self.assert_json_response('/api/genres/', 'tags.json')
215
216     def test_fragments(self):
217         # This is not supported, though it probably should be.
218         # self.assert_json_response(
219         #     '/api/books/child/fragments/',
220         #     'fragments.json')
221
222         self.assert_json_response(
223             '/api/genres/wiersz/fragments/',
224             'fragments.json')
225         self.assert_json_response(
226             '/api/books/child/fragments/an-anchor/',
227             'fragment.json')
228
229
230 class BlogTests(ApiTest):
231     def test_get(self):
232         self.assertEqual(self.load_json('/api/blog'), [])
233
234
235 class PreviewTests(ApiTest):
236     def unauth(self):
237         self.assert_json_response('/api/preview/', 'preview.json')
238
239
240 class OAuth1Tests(ApiTest):
241     @classmethod
242     def setUpClass(cls):
243         cls.user = User.objects.create(username='test')
244         cls.user.set_password('test')
245         cls.user.save()
246         cls.consumer_secret = 'len(quote(consumer secret))>=32'
247         Consumer.objects.create(
248             key='client',
249             secret=cls.consumer_secret
250         )
251
252     @classmethod
253     def tearDownClass(cls):
254         User.objects.all().delete()
255
256     def test_create_token(self):
257         # Fetch request token.
258         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
259                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
260                       "oauth_version=1.0".format(int(time())))
261         raw = '&'.join([
262             'GET',
263             quote('http://testserver/api/oauth/request_token/', safe=''),
264             quote(base_query, safe='')
265         ])
266         h = hmac.new(
267             (quote(self.consumer_secret) + '&').encode('latin1'),
268             raw.encode('latin1'),
269             hashlib.sha1
270         ).digest()
271         h = b64encode(h).rstrip(b'\n')
272         sign = quote(h)
273         query = "{}&oauth_signature={}".format(base_query, sign)
274         response = self.client.get('/api/oauth/request_token/?' + query)
275         request_token_data = parse_qs(response.content.decode('latin1'))
276         request_token = request_token_data['oauth_token'][0]
277         request_token_secret = request_token_data['oauth_token_secret'][0]
278
279         # Request token authorization.
280         self.client.login(username='test', password='test')
281         response = self.client.get('/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % request_token)
282         post_data = response.context['form'].initial
283
284         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
285         self.assertEqual(
286             response['Location'],
287             'test://oauth.callback/?oauth_token=' + request_token
288         )
289
290         # Fetch access token.
291         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
292                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
293                       "oauth_token={}&oauth_version=1.0".format(
294                           int(time()), request_token))
295         raw = '&'.join([
296             'GET',
297             quote('http://testserver/api/oauth/access_token/', safe=''),
298             quote(base_query, safe='')
299         ])
300         h = hmac.new(
301             (quote(self.consumer_secret) + '&' +
302              quote(request_token_secret, safe='')).encode('latin1'),
303             raw.encode('latin1'),
304             hashlib.sha1
305         ).digest()
306         h = b64encode(h).rstrip(b'\n')
307         sign = quote(h)
308         query = u"{}&oauth_signature={}".format(base_query, sign)
309         response = self.client.get(u'/api/oauth/access_token/?' + query)
310         access_token_data = parse_qs(response.content.decode('latin1'))
311         access_token = access_token_data['oauth_token'][0]
312
313         self.assertTrue(
314             Token.objects.filter(
315                 key=access_token,
316                 token_type=Token.ACCESS,
317                 user=self.user
318             ).exists())
319
320
321 class AuthorizedTests(ApiTest):
322     fixtures = ['test-books.yaml']
323
324     @classmethod
325     def setUpClass(cls):
326         super(AuthorizedTests, cls).setUpClass()
327         cls.user = User.objects.create(username='test')
328         cls.consumer = Consumer.objects.create(
329             key='client', secret='12345678901234567890123456789012')
330         cls.token = Token.objects.create(
331             key='123456789012345678',
332             secret='12345678901234567890123456789012',
333             user=cls.user,
334             consumer=cls.consumer,
335             token_type=Token.ACCESS,
336             timestamp=time())
337         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
338
339     @classmethod
340     def tearDownClass(cls):
341         cls.user.delete()
342         cls.consumer.delete()
343         super(AuthorizedTests, cls).tearDownClass()
344
345     def signed(self, url, method='GET', params=None, data=None):
346         auth_params = {
347             "oauth_consumer_key": self.consumer.key,
348             "oauth_nonce": ("%f" % time()).replace('.', ''),
349             "oauth_signature_method": "HMAC-SHA1",
350             "oauth_timestamp": int(time()),
351             "oauth_token": self.token.key,
352             "oauth_version": "1.0",
353         }
354
355         sign_params = {}
356         if params:
357             sign_params.update(params)
358         if data:
359             sign_params.update(data)
360         sign_params.update(auth_params)
361         raw = "&".join([
362             method.upper(),
363             quote('http://testserver' + url, safe=''),
364             quote("&".join(
365                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
366                 for (k, v) in sorted(sign_params.items())))
367         ])
368         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
369             self.key,
370             raw.encode('latin1'),
371             hashlib.sha1
372         ).digest()).rstrip(b'\n'))
373         auth = 'OAuth realm="API", ' + ', '.join(
374             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
375
376         if params:
377             url = url + '?' + urlencode(params)
378         return getattr(self.client, method.lower())(
379             url,
380             data=urlencode(data) if data else None,
381             content_type='application/x-www-form-urlencoded',
382             HTTP_AUTHORIZATION=auth,
383         )
384
385     def signed_json(self, url, method='GET', params=None, data=None):
386         return json.loads(self.signed(url, method, params, data).content)
387
388     def test_books(self):
389         self.assertEqual(
390             [b['liked'] for b in self.signed_json('/api/books/')],
391             [False, False, False]
392         )
393         data = self.signed_json('/api/books/child/')
394         self.assertFalse(data['parent']['liked'])
395         self.assertFalse(data['children'][0]['liked'])
396
397         self.assertEqual(
398             self.signed_json('/api/like/parent/'),
399             {"likes": False}
400         )
401         self.signed('/api/like/parent/', 'POST')
402         self.assertEqual(
403             self.signed_json('/api/like/parent/'),
404             {"likes": True}
405         )
406         # There are several endpoints where 'liked' appears.
407         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
408         self.assertTrue(self.signed_json(
409             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
410
411         self.assertTrue(self.signed_json(
412             '/api/books/child/')['parent']['liked'])
413         # Liked books go on shelf.
414         self.assertEqual(
415             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
416             ['parent'])
417
418         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
419         self.assertEqual(
420             self.signed_json('/api/like/parent/'),
421             {"likes": False}
422         )
423         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
424
425     def test_reading(self):
426         self.assertEqual(
427             self.signed_json('/api/reading/parent/'),
428             {"state": "not_started"}
429         )
430         self.signed('/api/reading/parent/reading/', 'post')
431         self.assertEqual(
432             self.signed_json('/api/reading/parent/'),
433             {"state": "reading"}
434         )
435         self.assertEqual(
436             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
437             ['parent'])
438
439     def test_subscription(self):
440         self.assert_slugs('/api/preview/', ['grandchild'])
441         self.assertEqual(
442             self.signed_json('/api/username/'),
443             {"username": "test", "premium": False})
444         self.assertEqual(
445             self.signed('/api/epub/grandchild/').status_code,
446             403)
447
448         with patch('api.fields.user_is_subscribed', return_value=True):
449             self.assertEqual(
450                 self.signed_json('/api/username/'),
451                 {"username": "test", "premium": True})
452         with patch('paypal.permissions.user_is_subscribed', return_value=True):
453             with patch('django.core.files.storage.Storage.open',
454                        return_value=BytesIO(b"<epub>")):
455                 self.assertEqual(
456                     self.signed('/api/epub/grandchild/').content,
457                     b"<epub>")
458
459     def test_publish(self):
460         response = self.signed('/api/books/',
461                                method='POST',
462                                data={"data": json.dumps({})})
463         self.assertEqual(response.status_code, 403)
464
465         response = self.signed('/api/pictures/',
466                                method='POST',
467                                data={"data": json.dumps({})})
468         self.assertEqual(response.status_code, 403)
469
470         self.user.is_superuser = True
471         self.user.save()
472
473         with patch('catalogue.models.Book.from_xml_file') as mock:
474             response = self.signed('/api/books/',
475                                    method='POST',
476                                    data={"data": json.dumps({
477                                        "book_xml": "<utwor/>"
478                                    })})
479             self.assertTrue(mock.called)
480         self.assertEqual(response.status_code, 201)
481
482         with patch('picture.models.Picture.from_xml_file') as mock:
483             response = self.signed('/api/pictures/',
484                                    method='POST',
485                                    data={"data": json.dumps({
486                                        "picture_xml": "<utwor/>",
487                                        "picture_image_data": "Kg==",
488                                    })})
489             self.assertTrue(mock.called)
490         self.assertEqual(response.status_code, 201)
491
492         self.user.is_superuser = False
493         self.user.save()