Messaging.
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 import hashlib
6 import hmac
7 from io import BytesIO
8 import json
9 from os import path
10 from time import time
11 from unittest.mock import patch
12 from urllib.parse import quote, urlencode, parse_qs
13
14 from django.contrib.auth.models import User
15 from django.core.files.uploadedfile import SimpleUploadedFile
16 from django.test import TestCase
17 from django.test.utils import override_settings
18
19 from catalogue.models import Book, Tag
20 from picture.forms import PictureImportForm
21 from picture.models import Picture
22 import picture.tests
23 from api.models import Consumer, Token
24
25
26 @override_settings(
27     NO_SEARCH_INDEX=True,
28     CACHES={'default': {
29         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
30 )
31 class ApiTest(TestCase):
32     def load_json(self, url):
33         content = self.client.get(url).content
34         try:
35             data = json.loads(content)
36         except ValueError:
37             self.fail('No JSON could be decoded: %s' % content)
38         return data
39
40     def assert_response(self, url, name):
41         content = self.client.get(url).content.decode('utf-8').rstrip()
42         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
43         with open(filename) as f:
44             good_content = f.read().rstrip()
45         self.assertEqual(content, good_content, content)
46
47     def assert_json_response(self, url, name):
48         data = self.load_json(url)
49         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
50         with open(filename) as f:
51             good_data = json.load(f)
52         self.assertEqual(data, good_data, json.dumps(data, indent=4))
53
54     def assert_slugs(self, url, slugs):
55         have_slugs = [x['slug'] for x in self.load_json(url)]
56         self.assertEqual(have_slugs, slugs, have_slugs)
57
58
59 class BookTests(ApiTest):
60
61     def setUp(self):
62         self.tag = Tag.objects.create(category='author', slug='joe')
63         self.book = Book.objects.create(title='A Book', slug='a-book')
64         self.book_tagged = Book.objects.create(
65             title='Tagged Book', slug='tagged-book')
66         self.book_tagged.tags = [self.tag]
67         self.book_tagged.save()
68
69     def test_book_list(self):
70         books = self.load_json('/api/books/')
71         self.assertEqual(len(books), 2,
72                          'Wrong book list.')
73
74     def test_tagged_books(self):
75         books = self.load_json('/api/authors/joe/books/')
76
77         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
78                          'Wrong tagged book list.')
79
80     def test_detail(self):
81         book = self.load_json('/api/books/a-book/')
82         self.assertEqual(book['title'], self.book.title,
83                          'Wrong book details.')
84
85
86 class TagTests(ApiTest):
87
88     def setUp(self):
89         self.tag = Tag.objects.create(
90             category='author', slug='joe', name='Joe')
91         self.book = Book.objects.create(title='A Book', slug='a-book')
92         self.book.tags = [self.tag]
93         self.book.save()
94
95     def test_tag_list(self):
96         tags = self.load_json('/api/authors/')
97         self.assertEqual(len(tags), 1,
98                          'Wrong tag list.')
99
100     def test_tag_detail(self):
101         tag = self.load_json('/api/authors/joe/')
102         self.assertEqual(tag['name'], self.tag.name,
103                          'Wrong tag details.')
104
105
106 class PictureTests(ApiTest):
107     def test_publish(self):
108         slug = "kandinsky-composition-viii"
109         xml = SimpleUploadedFile(
110             'composition8.xml',
111             open(path.join(
112                 picture.tests.__path__[0], "files", slug + ".xml"
113             ), 'rb').read())
114         img = SimpleUploadedFile(
115             'kompozycja-8.png',
116             open(path.join(
117                 picture.tests.__path__[0], "files", slug + ".png"
118             ), 'rb').read())
119
120         import_form = PictureImportForm({}, {
121             'picture_xml_file': xml,
122             'picture_image_file': img
123             })
124
125         assert import_form.is_valid()
126         if import_form.is_valid():
127             import_form.save()
128
129         Picture.objects.get(slug=slug)
130
131
132 class BooksTests(ApiTest):
133     fixtures = ['test-books.yaml']
134
135     def test_books(self):
136         self.assert_json_response('/api/books/', 'books.json')
137         self.assert_json_response('/api/books/?new_api=true', 'books.json')
138         self.assert_response('/api/books/?format=xml', 'books.xml')
139
140         self.assert_slugs('/api/audiobooks/', ['parent'])
141         self.assert_slugs('/api/daisy/', ['parent'])
142         self.assert_slugs('/api/newest/', ['parent'])
143         self.assert_slugs('/api/parent_books/', ['parent'])
144         self.assert_slugs('/api/recommended/', ['parent'])
145
146         # Book paging.
147         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
148         self.assert_slugs(
149             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
150
151         # By tag.
152         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
153         self.assert_slugs(
154             '/api/genres/sonet/books/?authors=john-doe',
155             ['parent'])
156         # It is probably a mistake that this doesn't filter:
157         self.assert_slugs(
158             '/api/books/?authors=john-doe',
159             ['child', 'grandchild', 'parent'])
160
161         # Parent books by tag.
162         # Notice this contains a grandchild, if a child doesn't have the tag.
163         # This probably isn't really intended behavior and should be redefined.
164         self.assert_slugs(
165             '/api/genres/sonet/parent_books/',
166             ['grandchild', 'parent'])
167
168     def test_ebooks(self):
169         self.assert_json_response('/api/ebooks/', 'ebooks.json')
170
171     def test_filter_books(self):
172         self.assert_json_response('/api/filter-books/', 'filter-books.json')
173         self.assert_slugs(
174             '/api/filter-books/?lektura=false',
175             ['child', 'grandchild', 'parent'])
176         self.assert_slugs(
177             '/api/filter-books/?lektura=true',
178             [])
179
180         Book.objects.filter(slug='grandchild').update(preview=True)
181         # Skipping: we don't allow previewed books in filtered list.
182         #self.assert_slugs(
183         #    '/api/filter-books/?preview=true',
184         #    ['grandchild'])
185         self.assert_slugs(
186             '/api/filter-books/?preview=false',
187             ['child', 'parent'])
188         Book.objects.filter(slug='grandchild').update(preview=False)
189
190         self.assert_slugs(
191             '/api/filter-books/?audiobook=true',
192             ['parent'])
193         self.assert_slugs(
194             '/api/filter-books/?audiobook=false',
195             ['child', 'grandchild'])
196
197         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
198
199         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
200
201     def test_collections(self):
202         self.assert_json_response('/api/collections/', 'collections.json')
203         self.assert_json_response(
204             '/api/collections/a-collection/',
205             'collection.json')
206
207     def test_book(self):
208         self.assert_json_response('/api/books/parent/', 'books-parent.json')
209         self.assert_json_response('/api/books/child/', 'books-child.json')
210         self.assert_json_response(
211             '/api/books/grandchild/',
212             'books-grandchild.json')
213
214     def test_tags(self):
215         # List of tags by category.
216         self.assert_json_response('/api/genres/', 'tags.json')
217
218     def test_fragments(self):
219         # This is not supported, though it probably should be.
220         # self.assert_json_response(
221         #     '/api/books/child/fragments/',
222         #     'fragments.json')
223
224         self.assert_json_response(
225             '/api/genres/wiersz/fragments/',
226             'fragments.json')
227         self.assert_json_response(
228             '/api/books/child/fragments/an-anchor/',
229             'fragment.json')
230
231
232 class BlogTests(ApiTest):
233     def test_get(self):
234         self.assertEqual(self.load_json('/api/blog'), [])
235
236
237 class OAuth1Tests(ApiTest):
238     @classmethod
239     def setUpClass(cls):
240         cls.user = User.objects.create(username='test')
241         cls.user.set_password('test')
242         cls.user.save()
243         cls.consumer_secret = 'len(quote(consumer secret))>=32'
244         Consumer.objects.create(
245             key='client',
246             secret=cls.consumer_secret
247         )
248
249     @classmethod
250     def tearDownClass(cls):
251         User.objects.all().delete()
252
253     def test_create_token(self):
254         # Fetch request token.
255         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
256                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
257                       "oauth_version=1.0".format(int(time())))
258         raw = '&'.join([
259             'GET',
260             quote('http://testserver/api/oauth/request_token/', safe=''),
261             quote(base_query, safe='')
262         ])
263         h = hmac.new(
264             (quote(self.consumer_secret) + '&').encode('latin1'),
265             raw.encode('latin1'),
266             hashlib.sha1
267         ).digest()
268         h = b64encode(h).rstrip(b'\n')
269         sign = quote(h)
270         query = "{}&oauth_signature={}".format(base_query, sign)
271         response = self.client.get('/api/oauth/request_token/?' + query)
272         request_token_data = parse_qs(response.content.decode('latin1'))
273         request_token = request_token_data['oauth_token'][0]
274         request_token_secret = request_token_data['oauth_token_secret'][0]
275
276         # Request token authorization.
277         self.client.login(username='test', password='test')
278         response = self.client.get(
279             '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % (
280                 request_token,
281             )
282         )
283         post_data = response.context['form'].initial
284
285         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
286         self.assertEqual(
287             response['Location'],
288             'test://oauth.callback/?oauth_token=' + request_token
289         )
290
291         # Fetch access token.
292         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
293                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
294                       "oauth_token={}&oauth_version=1.0".format(
295                           int(time()), request_token))
296         raw = '&'.join([
297             'GET',
298             quote('http://testserver/api/oauth/access_token/', safe=''),
299             quote(base_query, safe='')
300         ])
301         h = hmac.new(
302             (quote(self.consumer_secret) + '&' +
303              quote(request_token_secret, safe='')).encode('latin1'),
304             raw.encode('latin1'),
305             hashlib.sha1
306         ).digest()
307         h = b64encode(h).rstrip(b'\n')
308         sign = quote(h)
309         query = "{}&oauth_signature={}".format(base_query, sign)
310         response = self.client.get('/api/oauth/access_token/?' + query)
311         access_token_data = parse_qs(response.content.decode('latin1'))
312         access_token = access_token_data['oauth_token'][0]
313
314         self.assertTrue(
315             Token.objects.filter(
316                 key=access_token,
317                 token_type=Token.ACCESS,
318                 user=self.user
319             ).exists())
320
321
322 class AuthorizedTests(ApiTest):
323     fixtures = ['test-books.yaml']
324
325     @classmethod
326     def setUpClass(cls):
327         super(AuthorizedTests, cls).setUpClass()
328         cls.user = User.objects.create(username='test')
329         cls.consumer = Consumer.objects.create(
330             key='client', secret='12345678901234567890123456789012')
331         cls.token = Token.objects.create(
332             key='123456789012345678',
333             secret='12345678901234567890123456789012',
334             user=cls.user,
335             consumer=cls.consumer,
336             token_type=Token.ACCESS,
337             timestamp=time())
338         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
339
340     @classmethod
341     def tearDownClass(cls):
342         cls.user.delete()
343         cls.consumer.delete()
344         super(AuthorizedTests, cls).tearDownClass()
345
346     def signed(self, url, method='GET', params=None, data=None):
347         auth_params = {
348             "oauth_consumer_key": self.consumer.key,
349             "oauth_nonce": ("%f" % time()).replace('.', ''),
350             "oauth_signature_method": "HMAC-SHA1",
351             "oauth_timestamp": int(time()),
352             "oauth_token": self.token.key,
353             "oauth_version": "1.0",
354         }
355
356         sign_params = {}
357         if params:
358             sign_params.update(params)
359         if data:
360             sign_params.update(data)
361         sign_params.update(auth_params)
362         raw = "&".join([
363             method.upper(),
364             quote('http://testserver' + url, safe=''),
365             quote("&".join(
366                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
367                 for (k, v) in sorted(sign_params.items())))
368         ])
369         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
370             self.key,
371             raw.encode('latin1'),
372             hashlib.sha1
373         ).digest()).rstrip(b'\n'))
374         auth = 'OAuth realm="API", ' + ', '.join(
375             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
376
377         if params:
378             url = url + '?' + urlencode(params)
379         return getattr(self.client, method.lower())(
380             url,
381             data=urlencode(data) if data else None,
382             content_type='application/x-www-form-urlencoded',
383             HTTP_AUTHORIZATION=auth,
384         )
385
386     def signed_json(self, url, method='GET', params=None, data=None):
387         return json.loads(self.signed(url, method, params, data).content)
388
389     def test_books(self):
390         self.assertEqual(
391             [b['liked'] for b in self.signed_json('/api/books/')],
392             [False, False, False]
393         )
394         data = self.signed_json('/api/books/child/')
395         self.assertFalse(data['parent']['liked'])
396         self.assertFalse(data['children'][0]['liked'])
397
398         self.assertEqual(
399             self.signed_json('/api/like/parent/'),
400             {"likes": False}
401         )
402         self.signed('/api/like/parent/', 'POST')
403         self.assertEqual(
404             self.signed_json('/api/like/parent/'),
405             {"likes": True}
406         )
407         # There are several endpoints where 'liked' appears.
408         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
409         self.assertTrue(self.signed_json(
410             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
411
412         self.assertTrue(self.signed_json(
413             '/api/books/child/')['parent']['liked'])
414         # Liked books go on shelf.
415         self.assertEqual(
416             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
417             ['parent'])
418
419         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
420         self.assertEqual(
421             self.signed_json('/api/like/parent/'),
422             {"likes": False}
423         )
424         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
425
426     def test_reading(self):
427         self.assertEqual(
428             self.signed_json('/api/reading/parent/'),
429             {"state": "not_started"}
430         )
431         self.signed('/api/reading/parent/reading/', 'post')
432         self.assertEqual(
433             self.signed_json('/api/reading/parent/'),
434             {"state": "reading"}
435         )
436         self.assertEqual(
437             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
438             ['parent'])
439
440     def test_subscription(self):
441         Book.objects.filter(slug='grandchild').update(preview=True)
442
443         self.assert_slugs('/api/preview/', ['grandchild'])
444         self.assertEqual(
445             self.signed_json('/api/username/'),
446             {"username": "test", "premium": False})
447         self.assertEqual(
448             self.signed('/api/epub/grandchild/').status_code,
449             403)
450
451         with patch('club.models.Membership.is_active_for', return_value=True):
452             self.assertEqual(
453                 self.signed_json('/api/username/'),
454                 {"username": "test", "premium": True})
455             with patch('django.core.files.storage.Storage.open',
456                        return_value=BytesIO(b"<epub>")):
457                 self.assertEqual(
458                     self.signed('/api/epub/grandchild/').content,
459                     b"<epub>")
460
461         Book.objects.filter(slug='grandchild').update(preview=False)
462
463     def test_publish(self):
464         response = self.signed('/api/books/',
465                                method='POST',
466                                data={"data": json.dumps({})})
467         self.assertEqual(response.status_code, 403)
468
469         response = self.signed('/api/pictures/',
470                                method='POST',
471                                data={"data": json.dumps({})})
472         self.assertEqual(response.status_code, 403)
473
474         self.user.is_superuser = True
475         self.user.save()
476
477         with patch('catalogue.models.Book.from_xml_file') as mock:
478             response = self.signed('/api/books/',
479                                    method='POST',
480                                    data={"data": json.dumps({
481                                        "book_xml": "<utwor/>"
482                                    })})
483             self.assertTrue(mock.called)
484         self.assertEqual(response.status_code, 201)
485
486         with patch('picture.models.Picture.from_xml_file') as mock:
487             response = self.signed('/api/pictures/',
488                                    method='POST',
489                                    data={"data": json.dumps({
490                                        "picture_xml": "<utwor/>",
491                                        "picture_image_data": "Kg==",
492                                    })})
493             self.assertTrue(mock.called)
494         self.assertEqual(response.status_code, 201)
495
496         self.user.is_superuser = False
497         self.user.save()