Start replacing Piston in OAuth flow with OAuthLib.
[wolnelektury.git] / src / api / tests / tests.py
1 # -*- coding: utf-8 -*-
2 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
3 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 #
5 from base64 import b64encode
6 from os import path
7 import hashlib
8 import hmac
9 import json
10 from StringIO import StringIO
11 from time import time
12 from urllib import quote, urlencode
13 from urlparse import parse_qs
14
15 from django.contrib.auth.models import User
16 from django.core.files.uploadedfile import SimpleUploadedFile
17 from django.test import TestCase
18 from django.test.utils import override_settings
19 from mock import patch
20 from piston.models import Consumer, Token
21
22 from catalogue.models import Book, Tag
23 from picture.forms import PictureImportForm
24 from picture.models import Picture
25 import picture.tests
26
27
28 @override_settings(
29     NO_SEARCH_INDEX=True,
30     CACHES={'default': {
31         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
32 )
33 class ApiTest(TestCase):
34     def load_json(self, url):
35         content = self.client.get(url).content
36         try:
37             data = json.loads(content)
38         except ValueError:
39             self.fail('No JSON could be decoded: %s' % content)
40         return data
41
42     def assert_response(self, url, name):
43         content = self.client.get(url).content.rstrip()
44         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
45         with open(filename) as f:
46             good_content = f.read().rstrip()
47         self.assertEqual(content, good_content, content)
48     
49     def assert_json_response(self, url, name):
50         data = self.load_json(url)
51         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
52         with open(filename) as f:
53             good_data = json.load(f)
54         self.assertEqual(data, good_data, json.dumps(data, indent=4))
55
56     def assert_slugs(self, url, slugs):
57         have_slugs = [x['slug'] for x in self.load_json(url)]
58         self.assertEqual(have_slugs, slugs, have_slugs)
59
60
61 class BookTests(ApiTest):
62
63     def setUp(self):
64         self.tag = Tag.objects.create(category='author', slug='joe')
65         self.book = Book.objects.create(title='A Book', slug='a-book')
66         self.book_tagged = Book.objects.create(
67             title='Tagged Book', slug='tagged-book')
68         self.book_tagged.tags = [self.tag]
69         self.book_tagged.save()
70
71     def test_book_list(self):
72         books = self.load_json('/api/books/')
73         self.assertEqual(len(books), 2,
74                          'Wrong book list.')
75
76     def test_tagged_books(self):
77         books = self.load_json('/api/authors/joe/books/')
78
79         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
80                          'Wrong tagged book list.')
81
82     def test_detail(self):
83         book = self.load_json('/api/books/a-book/')
84         self.assertEqual(book['title'], self.book.title,
85                          'Wrong book details.')
86
87
88 class TagTests(ApiTest):
89
90     def setUp(self):
91         self.tag = Tag.objects.create(
92             category='author', slug='joe', name='Joe')
93         self.book = Book.objects.create(title='A Book', slug='a-book')
94         self.book.tags = [self.tag]
95         self.book.save()
96
97     def test_tag_list(self):
98         tags = self.load_json('/api/authors/')
99         self.assertEqual(len(tags), 1,
100                          'Wrong tag list.')
101
102     def test_tag_detail(self):
103         tag = self.load_json('/api/authors/joe/')
104         self.assertEqual(tag['name'], self.tag.name,
105                          'Wrong tag details.')
106
107
108 class PictureTests(ApiTest):
109     def test_publish(self):
110         slug = "kandinsky-composition-viii"
111         xml = SimpleUploadedFile(
112             'composition8.xml',
113             open(path.join(
114                 picture.tests.__path__[0], "files", slug + ".xml"
115             )).read())
116         img = SimpleUploadedFile(
117             'kompozycja-8.png',
118             open(path.join(
119                 picture.tests.__path__[0], "files", slug + ".png"
120             )).read())
121
122         import_form = PictureImportForm({}, {
123             'picture_xml_file': xml,
124             'picture_image_file': img
125             })
126
127         assert import_form.is_valid()
128         if import_form.is_valid():
129             import_form.save()
130
131         Picture.objects.get(slug=slug)
132
133
134 class BooksTests(ApiTest):
135     fixtures = ['test-books.yaml']
136
137     def test_books(self):
138         self.assert_json_response('/api/books/', 'books.json')
139         self.assert_json_response('/api/books/?new_api=true', 'books.json')
140         self.assert_response('/api/books/?format=xml', 'books.xml')
141
142         self.assert_slugs('/api/audiobooks/', ['parent'])
143         self.assert_slugs('/api/daisy/', ['parent'])
144         self.assert_slugs('/api/newest/', ['parent'])
145         self.assert_slugs('/api/parent_books/', ['parent'])
146         self.assert_slugs('/api/recommended/', ['parent'])
147
148         # Book paging.
149         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
150         self.assert_slugs(
151             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
152
153         # By tag.
154         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
155         self.assert_slugs(
156             '/api/genres/sonet/books/?authors=john-doe',
157             ['parent'])
158         # It is probably a mistake that this doesn't filter:
159         self.assert_slugs(
160             '/api/books/?authors=john-doe',
161             ['child', 'grandchild', 'parent'])
162
163         # Parent books by tag.
164         # Notice this contains a grandchild, if a child doesn't have the tag.
165         # This probably isn't really intended behavior and should be redefined.
166         self.assert_slugs(
167             '/api/genres/sonet/parent_books/',
168             ['grandchild', 'parent'])
169
170     def test_ebooks(self):
171         self.assert_json_response('/api/ebooks/', 'ebooks.json')
172
173     def test_filter_books(self):
174         self.assert_json_response('/api/filter-books/', 'filter-books.json')
175         self.assert_slugs(
176             '/api/filter-books/?lektura=false',
177             ['child', 'grandchild', 'parent'])
178         self.assert_slugs(
179             '/api/filter-books/?lektura=true',
180             [])
181
182         self.assert_slugs(
183             '/api/filter-books/?preview=true',
184             ['grandchild'])
185         self.assert_slugs(
186             '/api/filter-books/?preview=false',
187             ['child', 'parent'])
188
189         self.assert_slugs(
190             '/api/filter-books/?audiobook=true',
191             ['parent'])
192         self.assert_slugs(
193             '/api/filter-books/?audiobook=false',
194             ['child', 'grandchild'])
195
196         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
197
198         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
199
200     def test_collections(self):
201         self.assert_json_response('/api/collections/', 'collections.json')
202         self.assert_json_response(
203             '/api/collections/a-collection/',
204             'collection.json')
205
206     def test_book(self):
207         self.assert_json_response('/api/books/parent/', 'books-parent.json')
208         self.assert_json_response('/api/books/child/', 'books-child.json')
209         self.assert_json_response(
210             '/api/books/grandchild/',
211             'books-grandchild.json')
212
213     def test_tags(self):
214         # List of tags by category.
215         self.assert_json_response('/api/genres/', 'tags.json')
216
217     def test_fragments(self):
218         # This is not supported, though it probably should be.
219         # self.assert_json_response(
220         #     '/api/books/child/fragments/',
221         #     'fragments.json')
222
223         self.assert_json_response(
224             '/api/genres/wiersz/fragments/',
225             'fragments.json')
226         self.assert_json_response(
227             '/api/books/child/fragments/an-anchor/',
228             'fragment.json')
229
230
231 class BlogTests(ApiTest):
232     def test_get(self):
233         self.assertEqual(self.load_json('/api/blog/'), [])
234
235
236 class PreviewTests(ApiTest):
237     def unauth(self):
238         self.assert_json_response('/api/preview/', 'preview.json')
239
240
241 class OAuth1Tests(ApiTest):
242     @classmethod
243     def setUpClass(cls):
244         cls.user = User.objects.create(username='test')
245         cls.consumer_secret = 'len(quote(consumer secret))>=32'
246         Consumer.objects.create(
247             key='client',
248             secret=cls.consumer_secret
249         )
250
251     @classmethod
252     def tearDownClass(cls):
253         User.objects.all().delete()
254
255     def test_create_token(self):
256         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
257                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
258                       "oauth_version=1.0".format(int(time())))
259         raw = '&'.join([
260             'GET',
261             quote('http://testserver/api/oauth/request_token/', safe=''),
262             quote(base_query, safe='')
263         ])
264         h = hmac.new(
265             quote(self.consumer_secret) + '&', raw, hashlib.sha1
266         ).digest()
267         h = b64encode(h).rstrip('\n')
268         sign = quote(h)
269         query = "{}&oauth_signature={}".format(base_query, sign)
270         response = self.client.get('/api/oauth/request_token/?' + query)
271         request_token = parse_qs(response.content)
272
273         Token.objects.filter(
274             key=request_token['oauth_token'][0], token_type=Token.REQUEST
275         ).update(user=self.user, is_approved=True)
276
277         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
278                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
279                       "oauth_token={}&oauth_version=1.0".format(
280                           int(time()), request_token['oauth_token'][0]))
281         raw = '&'.join([
282             'GET',
283             quote('http://testserver/api/oauth/access_token/', safe=''),
284             quote(base_query, safe='')
285         ])
286         h = hmac.new(
287             quote(self.consumer_secret) + '&' +
288             quote(request_token['oauth_token_secret'][0], safe=''),
289             raw,
290             hashlib.sha1
291         ).digest()
292         h = b64encode(h).rstrip('\n')
293         sign = quote(h)
294         query = u"{}&oauth_signature={}".format(base_query, sign)
295         response = self.client.get(u'/api/oauth/access_token/?' + query)
296         access_token = parse_qs(response.content)
297
298         self.assertTrue(
299             Token.objects.filter(
300                 key=access_token['oauth_token'][0],
301                 token_type=Token.ACCESS,
302                 user=self.user
303             ).exists())
304
305
306 class AuthorizedTests(ApiTest):
307     fixtures = ['test-books.yaml']
308
309     @classmethod
310     def setUpClass(cls):
311         super(AuthorizedTests, cls).setUpClass()
312         cls.user = User.objects.create(username='test')
313         cls.consumer = Consumer.objects.create(
314             key='client', secret='12345678901234567890123456789012')
315         cls.token = Token.objects.create(
316             key='123456789012345678',
317             secret='12345678901234567890123456789012',
318             user=cls.user,
319             consumer=cls.consumer,
320             token_type=Token.ACCESS,
321             timestamp=time())
322         cls.key = cls.consumer.secret + '&' + cls.token.secret
323
324     @classmethod
325     def tearDownClass(cls):
326         cls.user.delete()
327         cls.consumer.delete()
328         super(AuthorizedTests, cls).tearDownClass()
329
330     def signed(self, url, method='GET', params=None, data=None):
331         auth_params = {
332             "oauth_consumer_key": self.consumer.key,
333             "oauth_nonce": ("%f" % time()).replace('.', ''),
334             "oauth_signature_method": "HMAC-SHA1",
335             "oauth_timestamp": int(time()),
336             "oauth_token": self.token.key,
337             "oauth_version": "1.0",
338         }
339
340         sign_params = {}
341         if params:
342             sign_params.update(params)
343         if data:
344             sign_params.update(data)
345         sign_params.update(auth_params)
346         raw = "&".join([
347             method.upper(),
348             quote('http://testserver' + url, safe=''),
349             quote("&".join(
350                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
351                 for (k, v) in sorted(sign_params.items())))
352         ])
353         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
354             self.key, raw, hashlib.sha1).digest()).rstrip('\n'))
355         auth = 'OAuth realm="API", ' + ', '.join(
356             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
357
358         if params:
359             url = url + '?' + urlencode(params)
360         return getattr(self.client, method.lower())(
361             url,
362             data=urlencode(data) if data else None,
363             content_type='application/x-www-form-urlencoded',
364             HTTP_AUTHORIZATION=auth,
365         )
366
367     def signed_json(self, url, method='GET', params=None, data=None):
368         return json.loads(self.signed(url, method, params, data).content)
369
370     def test_books(self):
371         self.assertEqual(
372             [b['liked'] for b in self.signed_json('/api/books/')],
373             [False, False, False]
374         )
375         data = self.signed_json('/api/books/child/')
376         self.assertFalse(data['parent']['liked'])
377         self.assertFalse(data['children'][0]['liked'])
378
379         self.assertEqual(
380             self.signed_json('/api/like/parent/'),
381             {"likes": False}
382         )
383         self.signed('/api/like/parent/', 'POST')
384         self.assertEqual(
385             self.signed_json('/api/like/parent/'),
386             {"likes": True}
387         )
388         # There are several endpoints where 'liked' appears.
389         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
390         self.assertTrue(self.signed_json(
391             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
392
393         self.assertTrue(self.signed_json(
394             '/api/books/child/')['parent']['liked'])
395         # Liked books go on shelf.
396         self.assertEqual(
397             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
398             ['parent'])
399
400         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
401         self.assertEqual(
402             self.signed_json('/api/like/parent/'),
403             {"likes": False}
404         )
405         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
406
407     def test_reading(self):
408         self.assertEqual(
409             self.signed_json('/api/reading/parent/'),
410             {"state": "not_started"}
411         )
412         self.signed('/api/reading/parent/reading/', 'post')
413         self.assertEqual(
414             self.signed_json('/api/reading/parent/'),
415             {"state": "reading"}
416         )
417         self.assertEqual(
418             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
419             ['parent'])
420
421     def test_subscription(self):
422         self.assert_slugs('/api/preview/', ['grandchild'])
423         self.assertEqual(
424             self.signed_json('/api/username/'),
425             {"username": "test", "premium": False})
426         self.assertEqual(
427             self.signed('/api/epub/grandchild/').status_code,
428             403)
429
430         with patch('api.fields.user_is_subscribed', return_value=True):
431             self.assertEqual(
432                 self.signed_json('/api/username/'),
433                 {"username": "test", "premium": True})
434         with patch('paypal.permissions.user_is_subscribed', return_value=True):
435             with patch('django.core.files.storage.Storage.open',
436                        return_value=StringIO("<epub>")):
437                 self.assertEqual(
438                     self.signed('/api/epub/grandchild/').content,
439                     "<epub>")
440
441     def test_publish(self):
442         response = self.signed('/api/books/',
443                                method='POST',
444                                data={"data": json.dumps({})})
445         self.assertEqual(response.status_code, 403)
446
447         response = self.signed('/api/pictures/',
448                                method='POST',
449                                data={"data": json.dumps({})})
450         self.assertEqual(response.status_code, 403)
451
452         self.user.is_superuser = True
453         self.user.save()
454
455         with patch('catalogue.models.Book.from_xml_file') as mock:
456             response = self.signed('/api/books/',
457                                    method='POST',
458                                    data={"data": json.dumps({
459                                        "book_xml": "<utwor/>"
460                                    })})
461             self.assertTrue(mock.called)
462         self.assertEqual(response.status_code, 201)
463
464         with patch('picture.models.Picture.from_xml_file') as mock:
465             response = self.signed('/api/pictures/',
466                                    method='POST',
467                                    data={"data": json.dumps({
468                                        "picture_xml": "<utwor/>",
469                                        "picture_image_data": "Kg==",
470                                    })})
471             self.assertTrue(mock.called)
472         self.assertEqual(response.status_code, 201)
473
474         self.user.is_superuser = False
475         self.user.save()