Club renewal procedure fixes.
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 import hashlib
6 import hmac
7 from io import BytesIO
8 import json
9 from os import path
10 from time import time
11 from unittest.mock import patch
12 from urllib.parse import quote, urlencode, parse_qs
13
14 from django.contrib.auth.models import User
15 from django.core.files.uploadedfile import SimpleUploadedFile
16 from django.test import TestCase
17 from django.test.utils import override_settings
18
19 from catalogue.models import Book, Tag
20 from picture.forms import PictureImportForm
21 from picture.models import Picture
22 import picture.tests
23 from api.models import Consumer, Token
24
25
26 @override_settings(
27     NO_SEARCH_INDEX=True,
28     CACHES={'default': {
29         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
30 )
31 class ApiTest(TestCase):
32     maxDiff = None
33
34     def load_json(self, url):
35         content = self.client.get(url).content
36         try:
37             data = json.loads(content)
38         except ValueError:
39             self.fail('No JSON could be decoded: %s' % content)
40         return data
41
42     def assert_response(self, url, name):
43         content = self.client.get(url).content.decode('utf-8').rstrip()
44         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
45         with open(filename) as f:
46             good_content = f.read().rstrip()
47         self.assertEqual(content, good_content, content)
48
49     def assert_json_response(self, url, name):
50         data = self.load_json(url)
51         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
52         with open(filename) as f:
53             good_data = json.load(f)
54         self.assertEqual(data, good_data, json.dumps(data, indent=4))
55
56     def assert_slugs(self, url, slugs):
57         have_slugs = [x['slug'] for x in self.load_json(url)]
58         self.assertEqual(have_slugs, slugs, have_slugs)
59
60
61 class BookTests(ApiTest):
62
63     def setUp(self):
64         self.tag = Tag.objects.create(category='author', slug='joe')
65         self.book = Book.objects.create(title='A Book', slug='a-book')
66         self.book_tagged = Book.objects.create(
67             title='Tagged Book', slug='tagged-book')
68         self.book_tagged.tags = [self.tag]
69         self.book_tagged.save()
70
71     def test_book_list(self):
72         books = self.load_json('/api/books/')
73         self.assertEqual(len(books), 2,
74                          'Wrong book list.')
75
76     def test_tagged_books(self):
77         books = self.load_json('/api/authors/joe/books/')
78
79         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
80                          'Wrong tagged book list.')
81
82     def test_detail(self):
83         book = self.load_json('/api/books/a-book/')
84         self.assertEqual(book['title'], self.book.title,
85                          'Wrong book details.')
86
87
88 class TagTests(ApiTest):
89
90     def setUp(self):
91         self.tag = Tag.objects.create(
92             category='author', slug='joe', name='Joe')
93         self.book = Book.objects.create(title='A Book', slug='a-book')
94         self.book.tags = [self.tag]
95         self.book.save()
96
97     def test_tag_list(self):
98         tags = self.load_json('/api/authors/')
99         self.assertEqual(len(tags), 1,
100                          'Wrong tag list.')
101
102     def test_tag_detail(self):
103         tag = self.load_json('/api/authors/joe/')
104         self.assertEqual(tag['name'], self.tag.name,
105                          'Wrong tag details.')
106
107
108 class PictureTests(ApiTest):
109     def test_publish(self):
110         slug = "kandinsky-composition-viii"
111         xml = SimpleUploadedFile(
112             'composition8.xml',
113             open(path.join(
114                 picture.tests.__path__[0], "files", slug + ".xml"
115             ), 'rb').read())
116         img = SimpleUploadedFile(
117             'kompozycja-8.png',
118             open(path.join(
119                 picture.tests.__path__[0], "files", slug + ".png"
120             ), 'rb').read())
121
122         import_form = PictureImportForm({}, {
123             'picture_xml_file': xml,
124             'picture_image_file': img
125             })
126
127         assert import_form.is_valid()
128         if import_form.is_valid():
129             import_form.save()
130
131         Picture.objects.get(slug=slug)
132
133
134 class BooksTests(ApiTest):
135     fixtures = ['test-books.yaml']
136
137     def test_books(self):
138         self.assert_json_response('/api/books/', 'books.json')
139         self.assert_json_response('/api/books/?new_api=true', 'books.json')
140         self.assert_response('/api/books/?format=xml', 'books.xml')
141
142         self.assert_slugs('/api/audiobooks/', ['parent'])
143         self.assert_slugs('/api/daisy/', ['parent'])
144         self.assert_slugs('/api/newest/', ['parent'])
145         self.assert_slugs('/api/parent_books/', ['parent'])
146         self.assert_slugs('/api/recommended/', ['parent'])
147
148         # Book paging.
149         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
150         self.assert_slugs(
151             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
152
153         # By tag.
154         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
155         self.assert_slugs(
156             '/api/genres/sonet/books/?authors=john-doe',
157             ['parent'])
158         # It is probably a mistake that this doesn't filter:
159         self.assert_slugs(
160             '/api/books/?authors=john-doe',
161             ['child', 'grandchild', 'parent'])
162
163         # Parent books by tag.
164         # Notice this contains a grandchild, if a child doesn't have the tag.
165         # This probably isn't really intended behavior and should be redefined.
166         self.assert_slugs(
167             '/api/genres/sonet/parent_books/',
168             ['grandchild', 'parent'])
169
170     def test_ebooks(self):
171         self.assert_json_response('/api/ebooks/', 'ebooks.json')
172
173     def test_filter_books(self):
174         self.assert_json_response('/api/filter-books/', 'filter-books.json')
175         self.assert_slugs(
176             '/api/filter-books/?lektura=false',
177             ['child', 'grandchild', 'parent'])
178         self.assert_slugs(
179             '/api/filter-books/?lektura=true',
180             [])
181
182         Book.objects.filter(slug='grandchild').update(preview=True)
183         # Skipping: we don't allow previewed books in filtered list.
184         #self.assert_slugs(
185         #    '/api/filter-books/?preview=true',
186         #    ['grandchild'])
187         self.assert_slugs(
188             '/api/filter-books/?preview=false',
189             ['child', 'parent'])
190         Book.objects.filter(slug='grandchild').update(preview=False)
191
192         self.assert_slugs(
193             '/api/filter-books/?audiobook=true',
194             ['parent'])
195         self.assert_slugs(
196             '/api/filter-books/?audiobook=false',
197             ['child', 'grandchild'])
198
199         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
200
201         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
202
203     def test_collections(self):
204         self.assert_json_response('/api/collections/', 'collections.json')
205         self.assert_json_response(
206             '/api/collections/a-collection/',
207             'collection.json')
208
209     def test_book(self):
210         self.assert_json_response('/api/books/parent/', 'books-parent.json')
211         self.assert_json_response('/api/books/child/', 'books-child.json')
212         self.assert_json_response(
213             '/api/books/grandchild/',
214             'books-grandchild.json')
215
216     def test_tags(self):
217         # List of tags by category.
218         self.assert_json_response('/api/genres/', 'tags.json')
219
220     def test_fragments(self):
221         # This is not supported, though it probably should be.
222         # self.assert_json_response(
223         #     '/api/books/child/fragments/',
224         #     'fragments.json')
225
226         self.assert_json_response(
227             '/api/genres/wiersz/fragments/',
228             'fragments.json')
229         self.assert_json_response(
230             '/api/books/child/fragments/an-anchor/',
231             'fragment.json')
232
233
234 class BlogTests(ApiTest):
235     def test_get(self):
236         self.assertEqual(self.load_json('/api/blog'), [])
237
238
239 class OAuth1Tests(ApiTest):
240     @classmethod
241     def setUpClass(cls):
242         cls.user = User.objects.create(username='test')
243         cls.user.set_password('test')
244         cls.user.save()
245         cls.consumer_secret = 'len(quote(consumer secret))>=32'
246         Consumer.objects.create(
247             key='client',
248             secret=cls.consumer_secret
249         )
250
251     @classmethod
252     def tearDownClass(cls):
253         User.objects.all().delete()
254
255     def test_create_token(self):
256         # Fetch request token.
257         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
258                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
259                       "oauth_version=1.0".format(int(time())))
260         raw = '&'.join([
261             'GET',
262             quote('http://testserver/api/oauth/request_token/', safe=''),
263             quote(base_query, safe='')
264         ])
265         h = hmac.new(
266             (quote(self.consumer_secret) + '&').encode('latin1'),
267             raw.encode('latin1'),
268             hashlib.sha1
269         ).digest()
270         h = b64encode(h).rstrip(b'\n')
271         sign = quote(h)
272         query = "{}&oauth_signature={}".format(base_query, sign)
273         response = self.client.get('/api/oauth/request_token/?' + query)
274         request_token_data = parse_qs(response.content.decode('latin1'))
275         request_token = request_token_data['oauth_token'][0]
276         request_token_secret = request_token_data['oauth_token_secret'][0]
277
278         # Request token authorization.
279         self.client.login(username='test', password='test')
280         response = self.client.get(
281             '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % (
282                 request_token,
283             )
284         )
285         post_data = response.context['form'].initial
286
287         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
288         self.assertEqual(
289             response['Location'],
290             'test://oauth.callback/?oauth_token=' + request_token
291         )
292
293         # Fetch access token.
294         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
295                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
296                       "oauth_token={}&oauth_version=1.0".format(
297                           int(time()), request_token))
298         raw = '&'.join([
299             'GET',
300             quote('http://testserver/api/oauth/access_token/', safe=''),
301             quote(base_query, safe='')
302         ])
303         h = hmac.new(
304             (quote(self.consumer_secret) + '&' +
305              quote(request_token_secret, safe='')).encode('latin1'),
306             raw.encode('latin1'),
307             hashlib.sha1
308         ).digest()
309         h = b64encode(h).rstrip(b'\n')
310         sign = quote(h)
311         query = "{}&oauth_signature={}".format(base_query, sign)
312         response = self.client.get('/api/oauth/access_token/?' + query)
313         access_token_data = parse_qs(response.content.decode('latin1'))
314         access_token = access_token_data['oauth_token'][0]
315
316         self.assertTrue(
317             Token.objects.filter(
318                 key=access_token,
319                 token_type=Token.ACCESS,
320                 user=self.user
321             ).exists())
322
323
324 class AuthorizedTests(ApiTest):
325     fixtures = ['test-books.yaml']
326
327     @classmethod
328     def setUpClass(cls):
329         super(AuthorizedTests, cls).setUpClass()
330         cls.user = User.objects.create(username='test')
331         cls.consumer = Consumer.objects.create(
332             key='client', secret='12345678901234567890123456789012')
333         cls.token = Token.objects.create(
334             key='123456789012345678',
335             secret='12345678901234567890123456789012',
336             user=cls.user,
337             consumer=cls.consumer,
338             token_type=Token.ACCESS,
339             timestamp=time())
340         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
341
342     @classmethod
343     def tearDownClass(cls):
344         cls.user.delete()
345         cls.consumer.delete()
346         super(AuthorizedTests, cls).tearDownClass()
347
348     def signed(self, url, method='GET', params=None, data=None):
349         auth_params = {
350             "oauth_consumer_key": self.consumer.key,
351             "oauth_nonce": ("%f" % time()).replace('.', ''),
352             "oauth_signature_method": "HMAC-SHA1",
353             "oauth_timestamp": int(time()),
354             "oauth_token": self.token.key,
355             "oauth_version": "1.0",
356         }
357
358         sign_params = {}
359         if params:
360             sign_params.update(params)
361         if data:
362             sign_params.update(data)
363         sign_params.update(auth_params)
364         raw = "&".join([
365             method.upper(),
366             quote('http://testserver' + url, safe=''),
367             quote("&".join(
368                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
369                 for (k, v) in sorted(sign_params.items())))
370         ])
371         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
372             self.key,
373             raw.encode('latin1'),
374             hashlib.sha1
375         ).digest()).rstrip(b'\n'))
376         auth = 'OAuth realm="API", ' + ', '.join(
377             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
378
379         if params:
380             url = url + '?' + urlencode(params)
381         return getattr(self.client, method.lower())(
382             url,
383             data=urlencode(data) if data else None,
384             content_type='application/x-www-form-urlencoded',
385             HTTP_AUTHORIZATION=auth,
386         )
387
388     def signed_json(self, url, method='GET', params=None, data=None):
389         return json.loads(self.signed(url, method, params, data).content)
390
391     def test_books(self):
392         self.assertEqual(
393             [b['liked'] for b in self.signed_json('/api/books/')],
394             [False, False, False]
395         )
396         data = self.signed_json('/api/books/child/')
397         self.assertFalse(data['parent']['liked'])
398         self.assertFalse(data['children'][0]['liked'])
399
400         self.assertEqual(
401             self.signed_json('/api/like/parent/'),
402             {"likes": False}
403         )
404         self.signed('/api/like/parent/', 'POST')
405         self.assertEqual(
406             self.signed_json('/api/like/parent/'),
407             {"likes": True}
408         )
409         # There are several endpoints where 'liked' appears.
410         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
411         self.assertTrue(self.signed_json(
412             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
413
414         self.assertTrue(self.signed_json(
415             '/api/books/child/')['parent']['liked'])
416         # Liked books go on shelf.
417         self.assertEqual(
418             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
419             ['parent'])
420
421         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
422         self.assertEqual(
423             self.signed_json('/api/like/parent/'),
424             {"likes": False}
425         )
426         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
427
428     def test_reading(self):
429         self.assertEqual(
430             self.signed_json('/api/reading/parent/'),
431             {"state": "not_started"}
432         )
433         self.signed('/api/reading/parent/reading/', 'post')
434         self.assertEqual(
435             self.signed_json('/api/reading/parent/'),
436             {"state": "reading"}
437         )
438         self.assertEqual(
439             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
440             ['parent'])
441
442     def test_subscription(self):
443         Book.objects.filter(slug='grandchild').update(preview=True)
444
445         self.assert_slugs('/api/preview/', ['grandchild'])
446         self.assertEqual(
447             self.signed_json('/api/username/'),
448             {"username": "test", "premium": False})
449         self.assertEqual(
450             self.signed('/api/epub/grandchild/').status_code,
451             403)
452
453         with patch('club.models.Membership.is_active_for', return_value=True):
454             self.assertEqual(
455                 self.signed_json('/api/username/'),
456                 {"username": "test", "premium": True})
457             with patch('django.core.files.storage.Storage.open',
458                        return_value=BytesIO(b"<epub>")):
459                 self.assertEqual(
460                     self.signed('/api/epub/grandchild/').content,
461                     b"<epub>")
462
463         Book.objects.filter(slug='grandchild').update(preview=False)
464
465     def test_publish(self):
466         response = self.signed('/api/books/',
467                                method='POST',
468                                data={"data": json.dumps({})})
469         self.assertEqual(response.status_code, 403)
470
471         response = self.signed('/api/pictures/',
472                                method='POST',
473                                data={"data": json.dumps({})})
474         self.assertEqual(response.status_code, 403)
475
476         self.user.is_superuser = True
477         self.user.save()
478
479         with patch('catalogue.models.Book.from_xml_file') as mock:
480             response = self.signed('/api/books/',
481                                    method='POST',
482                                    data={"data": json.dumps({
483                                        "book_xml": "<utwor/>"
484                                    })})
485             self.assertTrue(mock.called)
486         self.assertEqual(response.status_code, 201)
487
488         with patch('picture.models.Picture.from_xml_file') as mock:
489             response = self.signed('/api/pictures/',
490                                    method='POST',
491                                    data={"data": json.dumps({
492                                        "picture_xml": "<utwor/>",
493                                        "picture_image_data": "Kg==",
494                                    })})
495             self.assertTrue(mock.called)
496         self.assertEqual(response.status_code, 201)
497
498         self.user.is_superuser = False
499         self.user.save()