Fixes #3934: Error on OAI-PMH list request.
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 from os import path
6 import hashlib
7 import hmac
8 import json
9 from io import BytesIO
10 from time import time
11 from urllib.parse import quote, urlencode, parse_qs
12
13 from django.contrib.auth.models import User
14 from django.core.files.uploadedfile import SimpleUploadedFile
15 from django.test import TestCase
16 from django.test.utils import override_settings
17 from unittest.mock import patch
18 from api.models import Consumer, Token
19
20 from catalogue.models import Book, Tag
21 from picture.forms import PictureImportForm
22 from picture.models import Picture
23 import picture.tests
24
25
26 @override_settings(
27     NO_SEARCH_INDEX=True,
28     CACHES={'default': {
29         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
30 )
31 class ApiTest(TestCase):
32     def load_json(self, url):
33         content = self.client.get(url).content
34         try:
35             data = json.loads(content)
36         except ValueError:
37             self.fail('No JSON could be decoded: %s' % content)
38         return data
39
40     def assert_response(self, url, name):
41         content = self.client.get(url).content.decode('utf-8').rstrip()
42         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
43         with open(filename) as f:
44             good_content = f.read().rstrip()
45         self.assertEqual(content, good_content, content)
46
47     def assert_json_response(self, url, name):
48         data = self.load_json(url)
49         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
50         with open(filename) as f:
51             good_data = json.load(f)
52         self.assertEqual(data, good_data, json.dumps(data, indent=4))
53
54     def assert_slugs(self, url, slugs):
55         have_slugs = [x['slug'] for x in self.load_json(url)]
56         self.assertEqual(have_slugs, slugs, have_slugs)
57
58
59 class BookTests(ApiTest):
60
61     def setUp(self):
62         self.tag = Tag.objects.create(category='author', slug='joe')
63         self.book = Book.objects.create(title='A Book', slug='a-book')
64         self.book_tagged = Book.objects.create(
65             title='Tagged Book', slug='tagged-book')
66         self.book_tagged.tags = [self.tag]
67         self.book_tagged.save()
68
69     def test_book_list(self):
70         books = self.load_json('/api/books/')
71         self.assertEqual(len(books), 2,
72                          'Wrong book list.')
73
74     def test_tagged_books(self):
75         books = self.load_json('/api/authors/joe/books/')
76
77         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
78                          'Wrong tagged book list.')
79
80     def test_detail(self):
81         book = self.load_json('/api/books/a-book/')
82         self.assertEqual(book['title'], self.book.title,
83                          'Wrong book details.')
84
85
86 class TagTests(ApiTest):
87
88     def setUp(self):
89         self.tag = Tag.objects.create(
90             category='author', slug='joe', name='Joe')
91         self.book = Book.objects.create(title='A Book', slug='a-book')
92         self.book.tags = [self.tag]
93         self.book.save()
94
95     def test_tag_list(self):
96         tags = self.load_json('/api/authors/')
97         self.assertEqual(len(tags), 1,
98                          'Wrong tag list.')
99
100     def test_tag_detail(self):
101         tag = self.load_json('/api/authors/joe/')
102         self.assertEqual(tag['name'], self.tag.name,
103                          'Wrong tag details.')
104
105
106 class PictureTests(ApiTest):
107     def test_publish(self):
108         slug = "kandinsky-composition-viii"
109         xml = SimpleUploadedFile(
110             'composition8.xml',
111             open(path.join(
112                 picture.tests.__path__[0], "files", slug + ".xml"
113             ), 'rb').read())
114         img = SimpleUploadedFile(
115             'kompozycja-8.png',
116             open(path.join(
117                 picture.tests.__path__[0], "files", slug + ".png"
118             ), 'rb').read())
119
120         import_form = PictureImportForm({}, {
121             'picture_xml_file': xml,
122             'picture_image_file': img
123             })
124
125         assert import_form.is_valid()
126         if import_form.is_valid():
127             import_form.save()
128
129         Picture.objects.get(slug=slug)
130
131
132 class BooksTests(ApiTest):
133     fixtures = ['test-books.yaml']
134
135     def test_books(self):
136         self.assert_json_response('/api/books/', 'books.json')
137         self.assert_json_response('/api/books/?new_api=true', 'books.json')
138         self.assert_response('/api/books/?format=xml', 'books.xml')
139
140         self.assert_slugs('/api/audiobooks/', ['parent'])
141         self.assert_slugs('/api/daisy/', ['parent'])
142         self.assert_slugs('/api/newest/', ['parent'])
143         self.assert_slugs('/api/parent_books/', ['parent'])
144         self.assert_slugs('/api/recommended/', ['parent'])
145
146         # Book paging.
147         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
148         self.assert_slugs(
149             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
150
151         # By tag.
152         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
153         self.assert_slugs(
154             '/api/genres/sonet/books/?authors=john-doe',
155             ['parent'])
156         # It is probably a mistake that this doesn't filter:
157         self.assert_slugs(
158             '/api/books/?authors=john-doe',
159             ['child', 'grandchild', 'parent'])
160
161         # Parent books by tag.
162         # Notice this contains a grandchild, if a child doesn't have the tag.
163         # This probably isn't really intended behavior and should be redefined.
164         self.assert_slugs(
165             '/api/genres/sonet/parent_books/',
166             ['grandchild', 'parent'])
167
168     def test_ebooks(self):
169         self.assert_json_response('/api/ebooks/', 'ebooks.json')
170
171     def test_filter_books(self):
172         self.assert_json_response('/api/filter-books/', 'filter-books.json')
173         self.assert_slugs(
174             '/api/filter-books/?lektura=false',
175             ['child', 'grandchild', 'parent'])
176         self.assert_slugs(
177             '/api/filter-books/?lektura=true',
178             [])
179
180         self.assert_slugs(
181             '/api/filter-books/?preview=true',
182             ['grandchild'])
183         self.assert_slugs(
184             '/api/filter-books/?preview=false',
185             ['child', 'parent'])
186
187         self.assert_slugs(
188             '/api/filter-books/?audiobook=true',
189             ['parent'])
190         self.assert_slugs(
191             '/api/filter-books/?audiobook=false',
192             ['child', 'grandchild'])
193
194         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
195
196         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
197
198     def test_collections(self):
199         self.assert_json_response('/api/collections/', 'collections.json')
200         self.assert_json_response(
201             '/api/collections/a-collection/',
202             'collection.json')
203
204     def test_book(self):
205         self.assert_json_response('/api/books/parent/', 'books-parent.json')
206         self.assert_json_response('/api/books/child/', 'books-child.json')
207         self.assert_json_response(
208             '/api/books/grandchild/',
209             'books-grandchild.json')
210
211     def test_tags(self):
212         # List of tags by category.
213         self.assert_json_response('/api/genres/', 'tags.json')
214
215     def test_fragments(self):
216         # This is not supported, though it probably should be.
217         # self.assert_json_response(
218         #     '/api/books/child/fragments/',
219         #     'fragments.json')
220
221         self.assert_json_response(
222             '/api/genres/wiersz/fragments/',
223             'fragments.json')
224         self.assert_json_response(
225             '/api/books/child/fragments/an-anchor/',
226             'fragment.json')
227
228
229 class BlogTests(ApiTest):
230     def test_get(self):
231         self.assertEqual(self.load_json('/api/blog'), [])
232
233
234 class PreviewTests(ApiTest):
235     def unauth(self):
236         self.assert_json_response('/api/preview/', 'preview.json')
237
238
239 class OAuth1Tests(ApiTest):
240     @classmethod
241     def setUpClass(cls):
242         cls.user = User.objects.create(username='test')
243         cls.user.set_password('test')
244         cls.user.save()
245         cls.consumer_secret = 'len(quote(consumer secret))>=32'
246         Consumer.objects.create(
247             key='client',
248             secret=cls.consumer_secret
249         )
250
251     @classmethod
252     def tearDownClass(cls):
253         User.objects.all().delete()
254
255     def test_create_token(self):
256         # Fetch request token.
257         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
258                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
259                       "oauth_version=1.0".format(int(time())))
260         raw = '&'.join([
261             'GET',
262             quote('http://testserver/api/oauth/request_token/', safe=''),
263             quote(base_query, safe='')
264         ])
265         h = hmac.new(
266             (quote(self.consumer_secret) + '&').encode('latin1'),
267             raw.encode('latin1'),
268             hashlib.sha1
269         ).digest()
270         h = b64encode(h).rstrip(b'\n')
271         sign = quote(h)
272         query = "{}&oauth_signature={}".format(base_query, sign)
273         response = self.client.get('/api/oauth/request_token/?' + query)
274         request_token_data = parse_qs(response.content.decode('latin1'))
275         request_token = request_token_data['oauth_token'][0]
276         request_token_secret = request_token_data['oauth_token_secret'][0]
277
278         # Request token authorization.
279         self.client.login(username='test', password='test')
280         response = self.client.get('/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % request_token)
281         post_data = response.context['form'].initial
282
283         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
284         self.assertEqual(
285             response['Location'],
286             'test://oauth.callback/?oauth_token=' + request_token
287         )
288
289         # Fetch access token.
290         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
291                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
292                       "oauth_token={}&oauth_version=1.0".format(
293                           int(time()), request_token))
294         raw = '&'.join([
295             'GET',
296             quote('http://testserver/api/oauth/access_token/', safe=''),
297             quote(base_query, safe='')
298         ])
299         h = hmac.new(
300             (quote(self.consumer_secret) + '&' +
301              quote(request_token_secret, safe='')).encode('latin1'),
302             raw.encode('latin1'),
303             hashlib.sha1
304         ).digest()
305         h = b64encode(h).rstrip(b'\n')
306         sign = quote(h)
307         query = u"{}&oauth_signature={}".format(base_query, sign)
308         response = self.client.get(u'/api/oauth/access_token/?' + query)
309         access_token_data = parse_qs(response.content.decode('latin1'))
310         access_token = access_token_data['oauth_token'][0]
311
312         self.assertTrue(
313             Token.objects.filter(
314                 key=access_token,
315                 token_type=Token.ACCESS,
316                 user=self.user
317             ).exists())
318
319
320 class AuthorizedTests(ApiTest):
321     fixtures = ['test-books.yaml']
322
323     @classmethod
324     def setUpClass(cls):
325         super(AuthorizedTests, cls).setUpClass()
326         cls.user = User.objects.create(username='test')
327         cls.consumer = Consumer.objects.create(
328             key='client', secret='12345678901234567890123456789012')
329         cls.token = Token.objects.create(
330             key='123456789012345678',
331             secret='12345678901234567890123456789012',
332             user=cls.user,
333             consumer=cls.consumer,
334             token_type=Token.ACCESS,
335             timestamp=time())
336         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
337
338     @classmethod
339     def tearDownClass(cls):
340         cls.user.delete()
341         cls.consumer.delete()
342         super(AuthorizedTests, cls).tearDownClass()
343
344     def signed(self, url, method='GET', params=None, data=None):
345         auth_params = {
346             "oauth_consumer_key": self.consumer.key,
347             "oauth_nonce": ("%f" % time()).replace('.', ''),
348             "oauth_signature_method": "HMAC-SHA1",
349             "oauth_timestamp": int(time()),
350             "oauth_token": self.token.key,
351             "oauth_version": "1.0",
352         }
353
354         sign_params = {}
355         if params:
356             sign_params.update(params)
357         if data:
358             sign_params.update(data)
359         sign_params.update(auth_params)
360         raw = "&".join([
361             method.upper(),
362             quote('http://testserver' + url, safe=''),
363             quote("&".join(
364                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
365                 for (k, v) in sorted(sign_params.items())))
366         ])
367         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
368             self.key,
369             raw.encode('latin1'),
370             hashlib.sha1
371         ).digest()).rstrip(b'\n'))
372         auth = 'OAuth realm="API", ' + ', '.join(
373             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
374
375         if params:
376             url = url + '?' + urlencode(params)
377         return getattr(self.client, method.lower())(
378             url,
379             data=urlencode(data) if data else None,
380             content_type='application/x-www-form-urlencoded',
381             HTTP_AUTHORIZATION=auth,
382         )
383
384     def signed_json(self, url, method='GET', params=None, data=None):
385         return json.loads(self.signed(url, method, params, data).content)
386
387     def test_books(self):
388         self.assertEqual(
389             [b['liked'] for b in self.signed_json('/api/books/')],
390             [False, False, False]
391         )
392         data = self.signed_json('/api/books/child/')
393         self.assertFalse(data['parent']['liked'])
394         self.assertFalse(data['children'][0]['liked'])
395
396         self.assertEqual(
397             self.signed_json('/api/like/parent/'),
398             {"likes": False}
399         )
400         self.signed('/api/like/parent/', 'POST')
401         self.assertEqual(
402             self.signed_json('/api/like/parent/'),
403             {"likes": True}
404         )
405         # There are several endpoints where 'liked' appears.
406         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
407         self.assertTrue(self.signed_json(
408             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
409
410         self.assertTrue(self.signed_json(
411             '/api/books/child/')['parent']['liked'])
412         # Liked books go on shelf.
413         self.assertEqual(
414             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
415             ['parent'])
416
417         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
418         self.assertEqual(
419             self.signed_json('/api/like/parent/'),
420             {"likes": False}
421         )
422         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
423
424     def test_reading(self):
425         self.assertEqual(
426             self.signed_json('/api/reading/parent/'),
427             {"state": "not_started"}
428         )
429         self.signed('/api/reading/parent/reading/', 'post')
430         self.assertEqual(
431             self.signed_json('/api/reading/parent/'),
432             {"state": "reading"}
433         )
434         self.assertEqual(
435             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
436             ['parent'])
437
438     def test_subscription(self):
439         self.assert_slugs('/api/preview/', ['grandchild'])
440         self.assertEqual(
441             self.signed_json('/api/username/'),
442             {"username": "test", "premium": False})
443         self.assertEqual(
444             self.signed('/api/epub/grandchild/').status_code,
445             403)
446
447         with patch('club.models.Membership.is_active_for', return_value=True):
448             self.assertEqual(
449                 self.signed_json('/api/username/'),
450                 {"username": "test", "premium": True})
451             with patch('django.core.files.storage.Storage.open',
452                        return_value=BytesIO(b"<epub>")):
453                 self.assertEqual(
454                     self.signed('/api/epub/grandchild/').content,
455                     b"<epub>")
456
457     def test_publish(self):
458         response = self.signed('/api/books/',
459                                method='POST',
460                                data={"data": json.dumps({})})
461         self.assertEqual(response.status_code, 403)
462
463         response = self.signed('/api/pictures/',
464                                method='POST',
465                                data={"data": json.dumps({})})
466         self.assertEqual(response.status_code, 403)
467
468         self.user.is_superuser = True
469         self.user.save()
470
471         with patch('catalogue.models.Book.from_xml_file') as mock:
472             response = self.signed('/api/books/',
473                                    method='POST',
474                                    data={"data": json.dumps({
475                                        "book_xml": "<utwor/>"
476                                    })})
477             self.assertTrue(mock.called)
478         self.assertEqual(response.status_code, 201)
479
480         with patch('picture.models.Picture.from_xml_file') as mock:
481             response = self.signed('/api/pictures/',
482                                    method='POST',
483                                    data={"data": json.dumps({
484                                        "picture_xml": "<utwor/>",
485                                        "picture_image_data": "Kg==",
486                                    })})
487             self.assertTrue(mock.called)
488         self.assertEqual(response.status_code, 201)
489
490         self.user.is_superuser = False
491         self.user.save()