Update Celery to 4.3
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 from os import path
6 import hashlib
7 import hmac
8 import json
9 from io import BytesIO
10 from time import time
11 from urllib.parse import quote, urlencode, parse_qs
12
13 from django.contrib.auth.models import User
14 from django.core.files.uploadedfile import SimpleUploadedFile
15 from django.test import TestCase
16 from django.test.utils import override_settings
17 from unittest.mock import patch
18 from api.models import Consumer, Token
19
20 from catalogue.models import Book, Tag
21 from picture.forms import PictureImportForm
22 from picture.models import Picture
23 import picture.tests
24
25
26 @override_settings(
27     NO_SEARCH_INDEX=True,
28     CACHES={'default': {
29         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
30 )
31 class ApiTest(TestCase):
32     def load_json(self, url):
33         content = self.client.get(url).content
34         try:
35             data = json.loads(content)
36         except ValueError:
37             self.fail('No JSON could be decoded: %s' % content)
38         return data
39
40     def assert_response(self, url, name):
41         content = self.client.get(url).content.decode('utf-8').rstrip()
42         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
43         with open(filename) as f:
44             good_content = f.read().rstrip()
45         self.assertEqual(content, good_content, content)
46
47     def assert_json_response(self, url, name):
48         data = self.load_json(url)
49         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
50         with open(filename) as f:
51             good_data = json.load(f)
52         self.assertEqual(data, good_data, json.dumps(data, indent=4))
53
54     def assert_slugs(self, url, slugs):
55         have_slugs = [x['slug'] for x in self.load_json(url)]
56         self.assertEqual(have_slugs, slugs, have_slugs)
57
58
59 class BookTests(ApiTest):
60
61     def setUp(self):
62         self.tag = Tag.objects.create(category='author', slug='joe')
63         self.book = Book.objects.create(title='A Book', slug='a-book')
64         self.book_tagged = Book.objects.create(
65             title='Tagged Book', slug='tagged-book')
66         self.book_tagged.tags = [self.tag]
67         self.book_tagged.save()
68
69     def test_book_list(self):
70         books = self.load_json('/api/books/')
71         self.assertEqual(len(books), 2,
72                          'Wrong book list.')
73
74     def test_tagged_books(self):
75         books = self.load_json('/api/authors/joe/books/')
76
77         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
78                          'Wrong tagged book list.')
79
80     def test_detail(self):
81         book = self.load_json('/api/books/a-book/')
82         self.assertEqual(book['title'], self.book.title,
83                          'Wrong book details.')
84
85
86 class TagTests(ApiTest):
87
88     def setUp(self):
89         self.tag = Tag.objects.create(
90             category='author', slug='joe', name='Joe')
91         self.book = Book.objects.create(title='A Book', slug='a-book')
92         self.book.tags = [self.tag]
93         self.book.save()
94
95     def test_tag_list(self):
96         tags = self.load_json('/api/authors/')
97         self.assertEqual(len(tags), 1,
98                          'Wrong tag list.')
99
100     def test_tag_detail(self):
101         tag = self.load_json('/api/authors/joe/')
102         self.assertEqual(tag['name'], self.tag.name,
103                          'Wrong tag details.')
104
105
106 class PictureTests(ApiTest):
107     def test_publish(self):
108         slug = "kandinsky-composition-viii"
109         xml = SimpleUploadedFile(
110             'composition8.xml',
111             open(path.join(
112                 picture.tests.__path__[0], "files", slug + ".xml"
113             ), 'rb').read())
114         img = SimpleUploadedFile(
115             'kompozycja-8.png',
116             open(path.join(
117                 picture.tests.__path__[0], "files", slug + ".png"
118             ), 'rb').read())
119
120         import_form = PictureImportForm({}, {
121             'picture_xml_file': xml,
122             'picture_image_file': img
123             })
124
125         assert import_form.is_valid()
126         if import_form.is_valid():
127             import_form.save()
128
129         Picture.objects.get(slug=slug)
130
131
132 class BooksTests(ApiTest):
133     fixtures = ['test-books.yaml']
134
135     def test_books(self):
136         self.assert_json_response('/api/books/', 'books.json')
137         self.assert_json_response('/api/books/?new_api=true', 'books.json')
138         self.assert_response('/api/books/?format=xml', 'books.xml')
139
140         self.assert_slugs('/api/audiobooks/', ['parent'])
141         self.assert_slugs('/api/daisy/', ['parent'])
142         self.assert_slugs('/api/newest/', ['parent'])
143         self.assert_slugs('/api/parent_books/', ['parent'])
144         self.assert_slugs('/api/recommended/', ['parent'])
145
146         # Book paging.
147         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
148         self.assert_slugs(
149             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
150
151         # By tag.
152         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
153         self.assert_slugs(
154             '/api/genres/sonet/books/?authors=john-doe',
155             ['parent'])
156         # It is probably a mistake that this doesn't filter:
157         self.assert_slugs(
158             '/api/books/?authors=john-doe',
159             ['child', 'grandchild', 'parent'])
160
161         # Parent books by tag.
162         # Notice this contains a grandchild, if a child doesn't have the tag.
163         # This probably isn't really intended behavior and should be redefined.
164         self.assert_slugs(
165             '/api/genres/sonet/parent_books/',
166             ['grandchild', 'parent'])
167
168     def test_ebooks(self):
169         self.assert_json_response('/api/ebooks/', 'ebooks.json')
170
171     def test_filter_books(self):
172         self.assert_json_response('/api/filter-books/', 'filter-books.json')
173         self.assert_slugs(
174             '/api/filter-books/?lektura=false',
175             ['child', 'grandchild', 'parent'])
176         self.assert_slugs(
177             '/api/filter-books/?lektura=true',
178             [])
179
180         Book.objects.filter(slug='grandchild').update(preview=True)
181         # Skipping: we don't allow previewed books in filtered list.
182         #self.assert_slugs(
183         #    '/api/filter-books/?preview=true',
184         #    ['grandchild'])
185         self.assert_slugs(
186             '/api/filter-books/?preview=false',
187             ['child', 'parent'])
188         Book.objects.filter(slug='grandchild').update(preview=False)
189
190         self.assert_slugs(
191             '/api/filter-books/?audiobook=true',
192             ['parent'])
193         self.assert_slugs(
194             '/api/filter-books/?audiobook=false',
195             ['child', 'grandchild'])
196
197         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
198
199         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
200
201     def test_collections(self):
202         self.assert_json_response('/api/collections/', 'collections.json')
203         self.assert_json_response(
204             '/api/collections/a-collection/',
205             'collection.json')
206
207     def test_book(self):
208         self.assert_json_response('/api/books/parent/', 'books-parent.json')
209         self.assert_json_response('/api/books/child/', 'books-child.json')
210         self.assert_json_response(
211             '/api/books/grandchild/',
212             'books-grandchild.json')
213
214     def test_tags(self):
215         # List of tags by category.
216         self.assert_json_response('/api/genres/', 'tags.json')
217
218     def test_fragments(self):
219         # This is not supported, though it probably should be.
220         # self.assert_json_response(
221         #     '/api/books/child/fragments/',
222         #     'fragments.json')
223
224         self.assert_json_response(
225             '/api/genres/wiersz/fragments/',
226             'fragments.json')
227         self.assert_json_response(
228             '/api/books/child/fragments/an-anchor/',
229             'fragment.json')
230
231
232 class BlogTests(ApiTest):
233     def test_get(self):
234         self.assertEqual(self.load_json('/api/blog'), [])
235
236
237 class OAuth1Tests(ApiTest):
238     @classmethod
239     def setUpClass(cls):
240         cls.user = User.objects.create(username='test')
241         cls.user.set_password('test')
242         cls.user.save()
243         cls.consumer_secret = 'len(quote(consumer secret))>=32'
244         Consumer.objects.create(
245             key='client',
246             secret=cls.consumer_secret
247         )
248
249     @classmethod
250     def tearDownClass(cls):
251         User.objects.all().delete()
252
253     def test_create_token(self):
254         # Fetch request token.
255         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
256                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
257                       "oauth_version=1.0".format(int(time())))
258         raw = '&'.join([
259             'GET',
260             quote('http://testserver/api/oauth/request_token/', safe=''),
261             quote(base_query, safe='')
262         ])
263         h = hmac.new(
264             (quote(self.consumer_secret) + '&').encode('latin1'),
265             raw.encode('latin1'),
266             hashlib.sha1
267         ).digest()
268         h = b64encode(h).rstrip(b'\n')
269         sign = quote(h)
270         query = "{}&oauth_signature={}".format(base_query, sign)
271         response = self.client.get('/api/oauth/request_token/?' + query)
272         request_token_data = parse_qs(response.content.decode('latin1'))
273         request_token = request_token_data['oauth_token'][0]
274         request_token_secret = request_token_data['oauth_token_secret'][0]
275
276         # Request token authorization.
277         self.client.login(username='test', password='test')
278         response = self.client.get('/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % request_token)
279         post_data = response.context['form'].initial
280
281         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
282         self.assertEqual(
283             response['Location'],
284             'test://oauth.callback/?oauth_token=' + request_token
285         )
286
287         # Fetch access token.
288         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
289                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
290                       "oauth_token={}&oauth_version=1.0".format(
291                           int(time()), request_token))
292         raw = '&'.join([
293             'GET',
294             quote('http://testserver/api/oauth/access_token/', safe=''),
295             quote(base_query, safe='')
296         ])
297         h = hmac.new(
298             (quote(self.consumer_secret) + '&' +
299              quote(request_token_secret, safe='')).encode('latin1'),
300             raw.encode('latin1'),
301             hashlib.sha1
302         ).digest()
303         h = b64encode(h).rstrip(b'\n')
304         sign = quote(h)
305         query = u"{}&oauth_signature={}".format(base_query, sign)
306         response = self.client.get(u'/api/oauth/access_token/?' + query)
307         access_token_data = parse_qs(response.content.decode('latin1'))
308         access_token = access_token_data['oauth_token'][0]
309
310         self.assertTrue(
311             Token.objects.filter(
312                 key=access_token,
313                 token_type=Token.ACCESS,
314                 user=self.user
315             ).exists())
316
317
318 class AuthorizedTests(ApiTest):
319     fixtures = ['test-books.yaml']
320
321     @classmethod
322     def setUpClass(cls):
323         super(AuthorizedTests, cls).setUpClass()
324         cls.user = User.objects.create(username='test')
325         cls.consumer = Consumer.objects.create(
326             key='client', secret='12345678901234567890123456789012')
327         cls.token = Token.objects.create(
328             key='123456789012345678',
329             secret='12345678901234567890123456789012',
330             user=cls.user,
331             consumer=cls.consumer,
332             token_type=Token.ACCESS,
333             timestamp=time())
334         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
335
336     @classmethod
337     def tearDownClass(cls):
338         cls.user.delete()
339         cls.consumer.delete()
340         super(AuthorizedTests, cls).tearDownClass()
341
342     def signed(self, url, method='GET', params=None, data=None):
343         auth_params = {
344             "oauth_consumer_key": self.consumer.key,
345             "oauth_nonce": ("%f" % time()).replace('.', ''),
346             "oauth_signature_method": "HMAC-SHA1",
347             "oauth_timestamp": int(time()),
348             "oauth_token": self.token.key,
349             "oauth_version": "1.0",
350         }
351
352         sign_params = {}
353         if params:
354             sign_params.update(params)
355         if data:
356             sign_params.update(data)
357         sign_params.update(auth_params)
358         raw = "&".join([
359             method.upper(),
360             quote('http://testserver' + url, safe=''),
361             quote("&".join(
362                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
363                 for (k, v) in sorted(sign_params.items())))
364         ])
365         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
366             self.key,
367             raw.encode('latin1'),
368             hashlib.sha1
369         ).digest()).rstrip(b'\n'))
370         auth = 'OAuth realm="API", ' + ', '.join(
371             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
372
373         if params:
374             url = url + '?' + urlencode(params)
375         return getattr(self.client, method.lower())(
376             url,
377             data=urlencode(data) if data else None,
378             content_type='application/x-www-form-urlencoded',
379             HTTP_AUTHORIZATION=auth,
380         )
381
382     def signed_json(self, url, method='GET', params=None, data=None):
383         return json.loads(self.signed(url, method, params, data).content)
384
385     def test_books(self):
386         self.assertEqual(
387             [b['liked'] for b in self.signed_json('/api/books/')],
388             [False, False, False]
389         )
390         data = self.signed_json('/api/books/child/')
391         self.assertFalse(data['parent']['liked'])
392         self.assertFalse(data['children'][0]['liked'])
393
394         self.assertEqual(
395             self.signed_json('/api/like/parent/'),
396             {"likes": False}
397         )
398         self.signed('/api/like/parent/', 'POST')
399         self.assertEqual(
400             self.signed_json('/api/like/parent/'),
401             {"likes": True}
402         )
403         # There are several endpoints where 'liked' appears.
404         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
405         self.assertTrue(self.signed_json(
406             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
407
408         self.assertTrue(self.signed_json(
409             '/api/books/child/')['parent']['liked'])
410         # Liked books go on shelf.
411         self.assertEqual(
412             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
413             ['parent'])
414
415         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
416         self.assertEqual(
417             self.signed_json('/api/like/parent/'),
418             {"likes": False}
419         )
420         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
421
422     def test_reading(self):
423         self.assertEqual(
424             self.signed_json('/api/reading/parent/'),
425             {"state": "not_started"}
426         )
427         self.signed('/api/reading/parent/reading/', 'post')
428         self.assertEqual(
429             self.signed_json('/api/reading/parent/'),
430             {"state": "reading"}
431         )
432         self.assertEqual(
433             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
434             ['parent'])
435
436     def test_subscription(self):
437         Book.objects.filter(slug='grandchild').update(preview=True)
438
439         self.assert_slugs('/api/preview/', ['grandchild'])
440         self.assertEqual(
441             self.signed_json('/api/username/'),
442             {"username": "test", "premium": False})
443         self.assertEqual(
444             self.signed('/api/epub/grandchild/').status_code,
445             403)
446
447         with patch('club.models.Membership.is_active_for', return_value=True):
448             self.assertEqual(
449                 self.signed_json('/api/username/'),
450                 {"username": "test", "premium": True})
451             with patch('django.core.files.storage.Storage.open',
452                        return_value=BytesIO(b"<epub>")):
453                 self.assertEqual(
454                     self.signed('/api/epub/grandchild/').content,
455                     b"<epub>")
456
457         Book.objects.filter(slug='grandchild').update(preview=False)
458
459     def test_publish(self):
460         response = self.signed('/api/books/',
461                                method='POST',
462                                data={"data": json.dumps({})})
463         self.assertEqual(response.status_code, 403)
464
465         response = self.signed('/api/pictures/',
466                                method='POST',
467                                data={"data": json.dumps({})})
468         self.assertEqual(response.status_code, 403)
469
470         self.user.is_superuser = True
471         self.user.save()
472
473         with patch('catalogue.models.Book.from_xml_file') as mock:
474             response = self.signed('/api/books/',
475                                    method='POST',
476                                    data={"data": json.dumps({
477                                        "book_xml": "<utwor/>"
478                                    })})
479             self.assertTrue(mock.called)
480         self.assertEqual(response.status_code, 201)
481
482         with patch('picture.models.Picture.from_xml_file') as mock:
483             response = self.signed('/api/pictures/',
484                                    method='POST',
485                                    data={"data": json.dumps({
486                                        "picture_xml": "<utwor/>",
487                                        "picture_image_data": "Kg==",
488                                    })})
489             self.assertTrue(mock.called)
490         self.assertEqual(response.status_code, 201)
491
492         self.user.is_superuser = False
493         self.user.save()