add a filter
[wolnelektury.git] / src / api / tests / tests.py
1 # This file is part of Wolne Lektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
3 #
4 from base64 import b64encode
5 import hashlib
6 import hmac
7 from io import BytesIO
8 import json
9 from os import path
10 from time import time
11 from unittest.mock import patch
12 from urllib.parse import quote, urlencode, parse_qs
13
14 from django.contrib.auth.models import User
15 from django.core.files.uploadedfile import SimpleUploadedFile
16 from django.test import TestCase
17 from django.test.utils import override_settings
18
19 from catalogue.models import Book, Tag
20 from api.models import Consumer, Token
21
22
23 @override_settings(
24     NO_SEARCH_INDEX=True,
25     CACHES={'default': {
26         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
27 )
28 class ApiTest(TestCase):
29     maxDiff = None
30
31     def load_json(self, url):
32         content = self.client.get(url).content
33         try:
34             data = json.loads(content)
35         except ValueError:
36             self.fail('No JSON could be decoded: %s' % content)
37         return data
38
39     def assert_response(self, url, name):
40         content = self.client.get(url).content.decode('utf-8').rstrip()
41         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
42         with open(filename) as f:
43             good_content = f.read().rstrip()
44         self.assertEqual(content, good_content, content)
45
46     def assert_json_response(self, url, name):
47         data = self.load_json(url)
48         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
49         with open(filename) as f:
50             good_data = json.load(f)
51         self.assertEqual(data, good_data, json.dumps(data, indent=4))
52
53     def assert_slugs(self, url, slugs):
54         have_slugs = [x['slug'] for x in self.load_json(url)]
55         self.assertEqual(have_slugs, slugs, have_slugs)
56
57
58 class BookTests(ApiTest):
59
60     def setUp(self):
61         self.tag = Tag.objects.create(category='author', slug='joe')
62         self.book = Book.objects.create(title='A Book', slug='a-book')
63         self.book_tagged = Book.objects.create(
64             title='Tagged Book', slug='tagged-book')
65         self.book_tagged.tags = [self.tag]
66         self.book_tagged.save()
67
68     def test_book_list(self):
69         books = self.load_json('/api/books/')
70         self.assertEqual(len(books), 2,
71                          'Wrong book list.')
72
73     def test_tagged_books(self):
74         books = self.load_json('/api/authors/joe/books/')
75
76         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
77                          'Wrong tagged book list.')
78
79     def test_detail(self):
80         book = self.load_json('/api/books/a-book/')
81         self.assertEqual(book['title'], self.book.title,
82                          'Wrong book details.')
83
84
85 class TagTests(ApiTest):
86
87     def setUp(self):
88         self.tag = Tag.objects.create(
89             category='author', slug='joe', name='Joe')
90         self.book = Book.objects.create(title='A Book', slug='a-book')
91         self.book.tags = [self.tag]
92         self.book.save()
93
94     def test_tag_list(self):
95         tags = self.load_json('/api/authors/')
96         self.assertEqual(len(tags), 1,
97                          'Wrong tag list.')
98
99     def test_tag_detail(self):
100         tag = self.load_json('/api/authors/joe/')
101         self.assertEqual(tag['name'], self.tag.name,
102                          'Wrong tag details.')
103
104
105 class BooksTests(ApiTest):
106     fixtures = ['test-books.yaml']
107
108     def test_books(self):
109         self.assert_json_response('/api/books/', 'books.json')
110         self.assert_json_response('/api/books/?new_api=true', 'books.json')
111         self.assert_response('/api/books/?format=xml', 'books.xml')
112
113         self.assert_slugs('/api/audiobooks/', ['parent'])
114         self.assert_slugs('/api/daisy/', ['parent'])
115         self.assert_slugs('/api/newest/', ['parent'])
116         self.assert_slugs('/api/parent_books/', ['parent'])
117         self.assert_slugs('/api/recommended/', ['parent'])
118
119         # Book paging.
120         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
121         self.assert_slugs(
122             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
123
124         # By tag.
125         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
126         self.assert_slugs(
127             '/api/genres/sonet/books/?authors=john-doe',
128             ['parent'])
129         # It is probably a mistake that this doesn't filter:
130         self.assert_slugs(
131             '/api/books/?authors=john-doe',
132             ['child', 'grandchild', 'parent'])
133
134         # Parent books by tag.
135         # Notice this contains a grandchild, if a child doesn't have the tag.
136         # This probably isn't really intended behavior and should be redefined.
137         self.assert_slugs(
138             '/api/genres/sonet/parent_books/',
139             ['grandchild', 'parent'])
140
141     def test_ebooks(self):
142         self.assert_json_response('/api/ebooks/', 'ebooks.json')
143
144     def test_filter_books(self):
145         self.assert_json_response('/api/filter-books/', 'filter-books.json')
146         self.assert_slugs(
147             '/api/filter-books/?lektura=false',
148             ['child', 'grandchild', 'parent'])
149         self.assert_slugs(
150             '/api/filter-books/?lektura=true',
151             [])
152
153         Book.objects.filter(slug='grandchild').update(preview=True)
154         # Skipping: we don't allow previewed books in filtered list.
155         #self.assert_slugs(
156         #    '/api/filter-books/?preview=true',
157         #    ['grandchild'])
158         self.assert_slugs(
159             '/api/filter-books/?preview=false',
160             ['child', 'parent'])
161         Book.objects.filter(slug='grandchild').update(preview=False)
162
163         self.assert_slugs(
164             '/api/filter-books/?audiobook=true',
165             ['parent'])
166         self.assert_slugs(
167             '/api/filter-books/?audiobook=false',
168             ['child', 'grandchild'])
169
170         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
171
172         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
173
174     def test_collections(self):
175         self.assert_json_response('/api/collections/', 'collections.json')
176         self.assert_json_response(
177             '/api/collections/a-collection/',
178             'collection.json')
179
180     def test_book(self):
181         self.assert_json_response('/api/books/parent/', 'books-parent.json')
182         self.assert_json_response('/api/books/child/', 'books-child.json')
183         self.assert_json_response(
184             '/api/books/grandchild/',
185             'books-grandchild.json')
186
187     def test_tags(self):
188         # List of tags by category.
189         self.assert_json_response('/api/genres/', 'tags.json')
190
191     def test_fragments(self):
192         # This is not supported, though it probably should be.
193         # self.assert_json_response(
194         #     '/api/books/child/fragments/',
195         #     'fragments.json')
196
197         self.assert_json_response(
198             '/api/genres/wiersz/fragments/',
199             'fragments.json')
200         self.assert_json_response(
201             '/api/books/child/fragments/an-anchor/',
202             'fragment.json')
203
204
205 class BlogTests(ApiTest):
206     def test_get(self):
207         self.assertEqual(self.load_json('/api/blog'), [])
208
209
210 class OAuth1Tests(ApiTest):
211     @classmethod
212     def setUpClass(cls):
213         cls.user = User.objects.create(username='test')
214         cls.user.set_password('test')
215         cls.user.save()
216         cls.consumer_secret = 'len(quote(consumer secret))>=32'
217         Consumer.objects.create(
218             key='client',
219             secret=cls.consumer_secret
220         )
221
222     @classmethod
223     def tearDownClass(cls):
224         User.objects.all().delete()
225
226     def test_create_token(self):
227         # Fetch request token.
228         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
229                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
230                       "oauth_version=1.0".format(int(time())))
231         raw = '&'.join([
232             'GET',
233             quote('http://testserver/api/oauth/request_token/', safe=''),
234             quote(base_query, safe='')
235         ])
236         h = hmac.new(
237             (quote(self.consumer_secret) + '&').encode('latin1'),
238             raw.encode('latin1'),
239             hashlib.sha1
240         ).digest()
241         h = b64encode(h).rstrip(b'\n')
242         sign = quote(h)
243         query = "{}&oauth_signature={}".format(base_query, sign)
244         response = self.client.get('/api/oauth/request_token/?' + query)
245         request_token_data = parse_qs(response.content.decode('latin1'))
246         request_token = request_token_data['oauth_token'][0]
247         request_token_secret = request_token_data['oauth_token_secret'][0]
248
249         # Request token authorization.
250         self.client.login(username='test', password='test')
251         response = self.client.get(
252             '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % (
253                 request_token,
254             )
255         )
256         post_data = response.context['form'].initial
257
258         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
259         self.assertEqual(
260             response['Location'],
261             'test://oauth.callback/?oauth_token=' + request_token
262         )
263
264         # Fetch access token.
265         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
266                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
267                       "oauth_token={}&oauth_version=1.0".format(
268                           int(time()), request_token))
269         raw = '&'.join([
270             'GET',
271             quote('http://testserver/api/oauth/access_token/', safe=''),
272             quote(base_query, safe='')
273         ])
274         h = hmac.new(
275             (quote(self.consumer_secret) + '&' +
276              quote(request_token_secret, safe='')).encode('latin1'),
277             raw.encode('latin1'),
278             hashlib.sha1
279         ).digest()
280         h = b64encode(h).rstrip(b'\n')
281         sign = quote(h)
282         query = "{}&oauth_signature={}".format(base_query, sign)
283         response = self.client.get('/api/oauth/access_token/?' + query)
284         access_token_data = parse_qs(response.content.decode('latin1'))
285         access_token = access_token_data['oauth_token'][0]
286
287         self.assertTrue(
288             Token.objects.filter(
289                 key=access_token,
290                 token_type=Token.ACCESS,
291                 user=self.user
292             ).exists())
293
294
295 class AuthorizedTests(ApiTest):
296     fixtures = ['test-books.yaml']
297
298     @classmethod
299     def setUpClass(cls):
300         super(AuthorizedTests, cls).setUpClass()
301         cls.user = User.objects.create(username='test')
302         cls.consumer = Consumer.objects.create(
303             key='client', secret='12345678901234567890123456789012')
304         cls.token = Token.objects.create(
305             key='123456789012345678',
306             secret='12345678901234567890123456789012',
307             user=cls.user,
308             consumer=cls.consumer,
309             token_type=Token.ACCESS,
310             timestamp=time())
311         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
312
313     @classmethod
314     def tearDownClass(cls):
315         cls.user.delete()
316         cls.consumer.delete()
317         super(AuthorizedTests, cls).tearDownClass()
318
319     def signed(self, url, method='GET', params=None, data=None):
320         auth_params = {
321             "oauth_consumer_key": self.consumer.key,
322             "oauth_nonce": ("%f" % time()).replace('.', ''),
323             "oauth_signature_method": "HMAC-SHA1",
324             "oauth_timestamp": int(time()),
325             "oauth_token": self.token.key,
326             "oauth_version": "1.0",
327         }
328
329         sign_params = {}
330         if params:
331             sign_params.update(params)
332         if data:
333             sign_params.update(data)
334         sign_params.update(auth_params)
335         raw = "&".join([
336             method.upper(),
337             quote('http://testserver' + url, safe=''),
338             quote("&".join(
339                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
340                 for (k, v) in sorted(sign_params.items())))
341         ])
342         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
343             self.key,
344             raw.encode('latin1'),
345             hashlib.sha1
346         ).digest()).rstrip(b'\n'))
347         auth = 'OAuth realm="API", ' + ', '.join(
348             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
349
350         if params:
351             url = url + '?' + urlencode(params)
352         return getattr(self.client, method.lower())(
353             url,
354             data=urlencode(data) if data else None,
355             content_type='application/x-www-form-urlencoded',
356             HTTP_AUTHORIZATION=auth,
357         )
358
359     def signed_json(self, url, method='GET', params=None, data=None):
360         return json.loads(self.signed(url, method, params, data).content)
361
362     def test_books(self):
363         self.assertEqual(
364             [b['liked'] for b in self.signed_json('/api/books/')],
365             [False, False, False]
366         )
367         data = self.signed_json('/api/books/child/')
368         self.assertFalse(data['parent']['liked'])
369         self.assertFalse(data['children'][0]['liked'])
370
371         self.assertEqual(
372             self.signed_json('/api/like/parent/'),
373             {"likes": False}
374         )
375         self.signed('/api/like/parent/', 'POST')
376         self.assertEqual(
377             self.signed_json('/api/like/parent/'),
378             {"likes": True}
379         )
380         # There are several endpoints where 'liked' appears.
381         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
382         self.assertTrue(self.signed_json(
383             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
384
385         self.assertTrue(self.signed_json(
386             '/api/books/child/')['parent']['liked'])
387         # Liked books go on shelf.
388         self.assertEqual(
389             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
390             ['parent'])
391
392         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
393         self.assertEqual(
394             self.signed_json('/api/like/parent/'),
395             {"likes": False}
396         )
397         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
398
399     def test_reading(self):
400         self.assertEqual(
401             self.signed_json('/api/reading/parent/'),
402             {"state": "not_started"}
403         )
404         self.signed('/api/reading/parent/reading/', 'post')
405         self.assertEqual(
406             self.signed_json('/api/reading/parent/'),
407             {"state": "reading"}
408         )
409         self.assertEqual(
410             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
411             ['parent'])
412
413     def test_subscription(self):
414         Book.objects.filter(slug='grandchild').update(preview=True)
415
416         self.assert_slugs('/api/preview/', ['grandchild'])
417         self.assertEqual(
418             self.signed_json('/api/username/'),
419             {"username": "test", "premium": False})
420         self.assertEqual(
421             self.signed('/api/epub/grandchild/').status_code,
422             403)
423
424         with patch('club.models.Membership.is_active_for', return_value=True):
425             self.assertEqual(
426                 self.signed_json('/api/username/'),
427                 {"username": "test", "premium": True})
428             with patch('django.core.files.storage.Storage.open',
429                        return_value=BytesIO(b"<epub>")):
430                 self.assertEqual(
431                     self.signed('/api/epub/grandchild/').content,
432                     b"<epub>")
433
434         Book.objects.filter(slug='grandchild').update(preview=False)
435
436     def test_publish(self):
437         response = self.signed('/api/books/',
438                                method='POST',
439                                data={"data": json.dumps({})})
440         self.assertEqual(response.status_code, 403)
441
442         self.user.is_superuser = True
443         self.user.save()
444
445         with patch('catalogue.models.Book.from_xml_file') as mock:
446             response = self.signed('/api/books/',
447                                    method='POST',
448                                    data={"data": json.dumps({
449                                        "book_xml": "<utwor/>"
450                                    })})
451             self.assertTrue(mock.called)
452         self.assertEqual(response.status_code, 201)
453
454         self.user.is_superuser = False
455         self.user.save()