fix race in filters
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 import hashlib
6 import hmac
7 from io import BytesIO
8 import json
9 from os import path
10 from time import time
11 from unittest.mock import patch
12 from urllib.parse import quote, urlencode, parse_qs
13
14 from django.contrib.auth.models import User
15 from django.core.files.uploadedfile import SimpleUploadedFile
16 from django.test import TestCase
17 from django.test.utils import override_settings
18
19 from catalogue.models import Book, Tag
20 from picture.forms import PictureImportForm
21 from picture.models import Picture
22 import picture.tests
23 from api.models import Consumer, Token
24
25
26 @override_settings(
27     NO_SEARCH_INDEX=True,
28     CACHES={'default': {
29         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
30 )
31 class ApiTest(TestCase):
32     maxDiff = None
33
34     def load_json(self, url):
35         content = self.client.get(url).content
36         try:
37             data = json.loads(content)
38         except ValueError:
39             self.fail('No JSON could be decoded: %s' % content)
40         return data
41
42     def assert_response(self, url, name):
43         content = self.client.get(url).content.decode('utf-8').rstrip()
44         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
45         with open(filename) as f:
46             good_content = f.read().rstrip()
47         self.assertEqual(content, good_content, content)
48
49     def assert_json_response(self, url, name):
50         data = self.load_json(url)
51         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
52         with open(filename) as f:
53             good_data = json.load(f)
54         self.assertEqual(data, good_data, json.dumps(data, indent=4))
55
56     def assert_slugs(self, url, slugs):
57         have_slugs = [x['slug'] for x in self.load_json(url)]
58         self.assertEqual(have_slugs, slugs, have_slugs)
59
60
61 class BookTests(ApiTest):
62
63     def setUp(self):
64         self.tag = Tag.objects.create(category='author', slug='joe')
65         self.book = Book.objects.create(title='A Book', slug='a-book')
66         self.book_tagged = Book.objects.create(
67             title='Tagged Book', slug='tagged-book')
68         self.book_tagged.tags = [self.tag]
69         self.book_tagged.save()
70
71     def test_book_list(self):
72         books = self.load_json('/api/books/')
73         self.assertEqual(len(books), 2,
74                          'Wrong book list.')
75
76     def test_tagged_books(self):
77         books = self.load_json('/api/authors/joe/books/')
78
79         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
80                          'Wrong tagged book list.')
81
82     def test_detail(self):
83         book = self.load_json('/api/books/a-book/')
84         self.assertEqual(book['title'], self.book.title,
85                          'Wrong book details.')
86
87
88 class TagTests(ApiTest):
89
90     def setUp(self):
91         self.tag = Tag.objects.create(
92             category='author', slug='joe', name='Joe')
93         self.book = Book.objects.create(title='A Book', slug='a-book')
94         self.book.tags = [self.tag]
95         self.book.save()
96
97     def test_tag_list(self):
98         tags = self.load_json('/api/authors/')
99         self.assertEqual(len(tags), 1,
100                          'Wrong tag list.')
101
102     def test_tag_detail(self):
103         tag = self.load_json('/api/authors/joe/')
104         self.assertEqual(tag['name'], self.tag.name,
105                          'Wrong tag details.')
106
107
108 class PictureTests(ApiTest):
109     def test_publish(self):
110         slug = "kandinsky-composition-viii"
111         with open(path.join(
112                 picture.tests.__path__[0], "files", slug + ".xml"
113             ), 'rb') as f:
114             xml = SimpleUploadedFile(
115                 'composition8.xml',
116                 f.read())
117         with open(path.join(
118                 picture.tests.__path__[0], "files", slug + ".png"
119             ), 'rb') as f:
120             img = SimpleUploadedFile(
121                 'kompozycja-8.png',
122                 f.read())
123
124         import_form = PictureImportForm({}, {
125             'picture_xml_file': xml,
126             'picture_image_file': img
127             })
128
129         assert import_form.is_valid()
130         if import_form.is_valid():
131             import_form.save()
132
133         Picture.objects.get(slug=slug)
134
135
136 class BooksTests(ApiTest):
137     fixtures = ['test-books.yaml']
138
139     def test_books(self):
140         self.assert_json_response('/api/books/', 'books.json')
141         self.assert_json_response('/api/books/?new_api=true', 'books.json')
142         self.assert_response('/api/books/?format=xml', 'books.xml')
143
144         self.assert_slugs('/api/audiobooks/', ['parent'])
145         self.assert_slugs('/api/daisy/', ['parent'])
146         self.assert_slugs('/api/newest/', ['parent'])
147         self.assert_slugs('/api/parent_books/', ['parent'])
148         self.assert_slugs('/api/recommended/', ['parent'])
149
150         # Book paging.
151         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
152         self.assert_slugs(
153             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
154
155         # By tag.
156         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
157         self.assert_slugs(
158             '/api/genres/sonet/books/?authors=john-doe',
159             ['parent'])
160         # It is probably a mistake that this doesn't filter:
161         self.assert_slugs(
162             '/api/books/?authors=john-doe',
163             ['child', 'grandchild', 'parent'])
164
165         # Parent books by tag.
166         # Notice this contains a grandchild, if a child doesn't have the tag.
167         # This probably isn't really intended behavior and should be redefined.
168         self.assert_slugs(
169             '/api/genres/sonet/parent_books/',
170             ['grandchild', 'parent'])
171
172     def test_ebooks(self):
173         self.assert_json_response('/api/ebooks/', 'ebooks.json')
174
175     def test_filter_books(self):
176         self.assert_json_response('/api/filter-books/', 'filter-books.json')
177         self.assert_slugs(
178             '/api/filter-books/?lektura=false',
179             ['child', 'grandchild', 'parent'])
180         self.assert_slugs(
181             '/api/filter-books/?lektura=true',
182             [])
183
184         Book.objects.filter(slug='grandchild').update(preview=True)
185         # Skipping: we don't allow previewed books in filtered list.
186         #self.assert_slugs(
187         #    '/api/filter-books/?preview=true',
188         #    ['grandchild'])
189         self.assert_slugs(
190             '/api/filter-books/?preview=false',
191             ['child', 'parent'])
192         Book.objects.filter(slug='grandchild').update(preview=False)
193
194         self.assert_slugs(
195             '/api/filter-books/?audiobook=true',
196             ['parent'])
197         self.assert_slugs(
198             '/api/filter-books/?audiobook=false',
199             ['child', 'grandchild'])
200
201         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
202
203         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
204
205     def test_collections(self):
206         self.assert_json_response('/api/collections/', 'collections.json')
207         self.assert_json_response(
208             '/api/collections/a-collection/',
209             'collection.json')
210
211     def test_book(self):
212         self.assert_json_response('/api/books/parent/', 'books-parent.json')
213         self.assert_json_response('/api/books/child/', 'books-child.json')
214         self.assert_json_response(
215             '/api/books/grandchild/',
216             'books-grandchild.json')
217
218     def test_tags(self):
219         # List of tags by category.
220         self.assert_json_response('/api/genres/', 'tags.json')
221
222     def test_fragments(self):
223         # This is not supported, though it probably should be.
224         # self.assert_json_response(
225         #     '/api/books/child/fragments/',
226         #     'fragments.json')
227
228         self.assert_json_response(
229             '/api/genres/wiersz/fragments/',
230             'fragments.json')
231         self.assert_json_response(
232             '/api/books/child/fragments/an-anchor/',
233             'fragment.json')
234
235
236 class BlogTests(ApiTest):
237     def test_get(self):
238         self.assertEqual(self.load_json('/api/blog'), [])
239
240
241 class OAuth1Tests(ApiTest):
242     @classmethod
243     def setUpClass(cls):
244         cls.user = User.objects.create(username='test')
245         cls.user.set_password('test')
246         cls.user.save()
247         cls.consumer_secret = 'len(quote(consumer secret))>=32'
248         Consumer.objects.create(
249             key='client',
250             secret=cls.consumer_secret
251         )
252
253     @classmethod
254     def tearDownClass(cls):
255         User.objects.all().delete()
256
257     def test_create_token(self):
258         # Fetch request token.
259         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
260                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
261                       "oauth_version=1.0".format(int(time())))
262         raw = '&'.join([
263             'GET',
264             quote('http://testserver/api/oauth/request_token/', safe=''),
265             quote(base_query, safe='')
266         ])
267         h = hmac.new(
268             (quote(self.consumer_secret) + '&').encode('latin1'),
269             raw.encode('latin1'),
270             hashlib.sha1
271         ).digest()
272         h = b64encode(h).rstrip(b'\n')
273         sign = quote(h)
274         query = "{}&oauth_signature={}".format(base_query, sign)
275         response = self.client.get('/api/oauth/request_token/?' + query)
276         request_token_data = parse_qs(response.content.decode('latin1'))
277         request_token = request_token_data['oauth_token'][0]
278         request_token_secret = request_token_data['oauth_token_secret'][0]
279
280         # Request token authorization.
281         self.client.login(username='test', password='test')
282         response = self.client.get(
283             '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % (
284                 request_token,
285             )
286         )
287         post_data = response.context['form'].initial
288
289         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
290         self.assertEqual(
291             response['Location'],
292             'test://oauth.callback/?oauth_token=' + request_token
293         )
294
295         # Fetch access token.
296         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
297                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
298                       "oauth_token={}&oauth_version=1.0".format(
299                           int(time()), request_token))
300         raw = '&'.join([
301             'GET',
302             quote('http://testserver/api/oauth/access_token/', safe=''),
303             quote(base_query, safe='')
304         ])
305         h = hmac.new(
306             (quote(self.consumer_secret) + '&' +
307              quote(request_token_secret, safe='')).encode('latin1'),
308             raw.encode('latin1'),
309             hashlib.sha1
310         ).digest()
311         h = b64encode(h).rstrip(b'\n')
312         sign = quote(h)
313         query = "{}&oauth_signature={}".format(base_query, sign)
314         response = self.client.get('/api/oauth/access_token/?' + query)
315         access_token_data = parse_qs(response.content.decode('latin1'))
316         access_token = access_token_data['oauth_token'][0]
317
318         self.assertTrue(
319             Token.objects.filter(
320                 key=access_token,
321                 token_type=Token.ACCESS,
322                 user=self.user
323             ).exists())
324
325
326 class AuthorizedTests(ApiTest):
327     fixtures = ['test-books.yaml']
328
329     @classmethod
330     def setUpClass(cls):
331         super(AuthorizedTests, cls).setUpClass()
332         cls.user = User.objects.create(username='test')
333         cls.consumer = Consumer.objects.create(
334             key='client', secret='12345678901234567890123456789012')
335         cls.token = Token.objects.create(
336             key='123456789012345678',
337             secret='12345678901234567890123456789012',
338             user=cls.user,
339             consumer=cls.consumer,
340             token_type=Token.ACCESS,
341             timestamp=time())
342         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
343
344     @classmethod
345     def tearDownClass(cls):
346         cls.user.delete()
347         cls.consumer.delete()
348         super(AuthorizedTests, cls).tearDownClass()
349
350     def signed(self, url, method='GET', params=None, data=None):
351         auth_params = {
352             "oauth_consumer_key": self.consumer.key,
353             "oauth_nonce": ("%f" % time()).replace('.', ''),
354             "oauth_signature_method": "HMAC-SHA1",
355             "oauth_timestamp": int(time()),
356             "oauth_token": self.token.key,
357             "oauth_version": "1.0",
358         }
359
360         sign_params = {}
361         if params:
362             sign_params.update(params)
363         if data:
364             sign_params.update(data)
365         sign_params.update(auth_params)
366         raw = "&".join([
367             method.upper(),
368             quote('http://testserver' + url, safe=''),
369             quote("&".join(
370                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
371                 for (k, v) in sorted(sign_params.items())))
372         ])
373         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
374             self.key,
375             raw.encode('latin1'),
376             hashlib.sha1
377         ).digest()).rstrip(b'\n'))
378         auth = 'OAuth realm="API", ' + ', '.join(
379             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
380
381         if params:
382             url = url + '?' + urlencode(params)
383         return getattr(self.client, method.lower())(
384             url,
385             data=urlencode(data) if data else None,
386             content_type='application/x-www-form-urlencoded',
387             HTTP_AUTHORIZATION=auth,
388         )
389
390     def signed_json(self, url, method='GET', params=None, data=None):
391         return json.loads(self.signed(url, method, params, data).content)
392
393     def test_books(self):
394         self.assertEqual(
395             [b['liked'] for b in self.signed_json('/api/books/')],
396             [False, False, False]
397         )
398         data = self.signed_json('/api/books/child/')
399         self.assertFalse(data['parent']['liked'])
400         self.assertFalse(data['children'][0]['liked'])
401
402         self.assertEqual(
403             self.signed_json('/api/like/parent/'),
404             {"likes": False}
405         )
406         self.signed('/api/like/parent/', 'POST')
407         self.assertEqual(
408             self.signed_json('/api/like/parent/'),
409             {"likes": True}
410         )
411         # There are several endpoints where 'liked' appears.
412         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
413         self.assertTrue(self.signed_json(
414             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
415
416         self.assertTrue(self.signed_json(
417             '/api/books/child/')['parent']['liked'])
418         # Liked books go on shelf.
419         self.assertEqual(
420             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
421             ['parent'])
422
423         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
424         self.assertEqual(
425             self.signed_json('/api/like/parent/'),
426             {"likes": False}
427         )
428         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
429
430     def test_reading(self):
431         self.assertEqual(
432             self.signed_json('/api/reading/parent/'),
433             {"state": "not_started"}
434         )
435         self.signed('/api/reading/parent/reading/', 'post')
436         self.assertEqual(
437             self.signed_json('/api/reading/parent/'),
438             {"state": "reading"}
439         )
440         self.assertEqual(
441             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
442             ['parent'])
443
444     def test_subscription(self):
445         Book.objects.filter(slug='grandchild').update(preview=True)
446
447         self.assert_slugs('/api/preview/', ['grandchild'])
448         self.assertEqual(
449             self.signed_json('/api/username/'),
450             {"username": "test", "premium": False})
451         self.assertEqual(
452             self.signed('/api/epub/grandchild/').status_code,
453             403)
454
455         with patch('club.models.Membership.is_active_for', return_value=True):
456             self.assertEqual(
457                 self.signed_json('/api/username/'),
458                 {"username": "test", "premium": True})
459             with patch('django.core.files.storage.Storage.open',
460                        return_value=BytesIO(b"<epub>")):
461                 self.assertEqual(
462                     self.signed('/api/epub/grandchild/').content,
463                     b"<epub>")
464
465         Book.objects.filter(slug='grandchild').update(preview=False)
466
467     def test_publish(self):
468         response = self.signed('/api/books/',
469                                method='POST',
470                                data={"data": json.dumps({})})
471         self.assertEqual(response.status_code, 403)
472
473         response = self.signed('/api/pictures/',
474                                method='POST',
475                                data={"data": json.dumps({})})
476         self.assertEqual(response.status_code, 403)
477
478         self.user.is_superuser = True
479         self.user.save()
480
481         with patch('catalogue.models.Book.from_xml_file') as mock:
482             response = self.signed('/api/books/',
483                                    method='POST',
484                                    data={"data": json.dumps({
485                                        "book_xml": "<utwor/>"
486                                    })})
487             self.assertTrue(mock.called)
488         self.assertEqual(response.status_code, 201)
489
490         with patch('picture.models.Picture.from_xml_file') as mock:
491             response = self.signed('/api/pictures/',
492                                    method='POST',
493                                    data={"data": json.dumps({
494                                        "picture_xml": "<utwor/>",
495                                        "picture_image_data": "Kg==",
496                                    })})
497             self.assertTrue(mock.called)
498         self.assertEqual(response.status_code, 201)
499
500         self.user.is_superuser = False
501         self.user.save()