1 # This file is part of Wolne Lektury, licensed under GNU Affero GPLv3 or later.
 
   2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
 
   4 from base64 import b64encode
 
  11 from unittest.mock import patch
 
  12 from urllib.parse import quote, urlencode, parse_qs
 
  14 from django.contrib.auth.models import User
 
  15 from django.core.files.uploadedfile import SimpleUploadedFile
 
  16 from django.test import TestCase
 
  17 from django.test.utils import override_settings
 
  19 from catalogue.models import Book, Tag
 
  20 from api.models import Consumer, Token
 
  26         'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}},
 
  28 class ApiTest(TestCase):
 
  31     def load_json(self, url):
 
  32         content = self.client.get(url).content
 
  34             data = json.loads(content)
 
  36             self.fail('No JSON could be decoded: %s' % content)
 
  39     def assert_response(self, url, name):
 
  40         content = self.client.get(url).content.decode('utf-8').rstrip()
 
  41         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
 
  42         with open(filename) as f:
 
  43             good_content = f.read().rstrip()
 
  44         self.assertEqual(content, good_content, content)
 
  46     def assert_json_response(self, url, name):
 
  47         data = self.load_json(url)
 
  48         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
 
  49         with open(filename) as f:
 
  50             good_data = json.load(f)
 
  51         self.assertEqual(data, good_data, json.dumps(data, indent=4))
 
  53     def assert_slugs(self, url, slugs):
 
  54         have_slugs = [x['slug'] for x in self.load_json(url)]
 
  55         self.assertEqual(have_slugs, slugs, have_slugs)
 
  58 class BookTests(ApiTest):
 
  61         self.tag = Tag.objects.create(category='author', slug='joe')
 
  62         self.book = Book.objects.create(title='A Book', slug='a-book')
 
  63         self.book_tagged = Book.objects.create(
 
  64             title='Tagged Book', slug='tagged-book')
 
  65         self.book_tagged.tags = [self.tag]
 
  66         self.book_tagged.save()
 
  68     def test_book_list(self):
 
  69         books = self.load_json('/api/books/')
 
  70         self.assertEqual(len(books), 2,
 
  73     def test_tagged_books(self):
 
  74         books = self.load_json('/api/authors/joe/books/')
 
  76         self.assertEqual([b['title'] for b in books], [self.book_tagged.title],
 
  77                          'Wrong tagged book list.')
 
  79     def test_detail(self):
 
  80         book = self.load_json('/api/books/a-book/')
 
  81         self.assertEqual(book['title'], self.book.title,
 
  82                          'Wrong book details.')
 
  85 class TagTests(ApiTest):
 
  88         self.tag = Tag.objects.create(
 
  89             category='author', slug='joe', name='Joe')
 
  90         self.book = Book.objects.create(title='A Book', slug='a-book')
 
  91         self.book.tags = [self.tag]
 
  94     def test_tag_list(self):
 
  95         tags = self.load_json('/api/authors/')
 
  96         self.assertEqual(len(tags), 1,
 
  99     def test_tag_detail(self):
 
 100         tag = self.load_json('/api/authors/joe/')
 
 101         self.assertEqual(tag['name'], self.tag.name,
 
 102                          'Wrong tag details.')
 
 105 class BooksTests(ApiTest):
 
 106     fixtures = ['test-books.yaml']
 
 108     def test_books(self):
 
 109         self.assert_json_response('/api/books/', 'books.json')
 
 110         self.assert_json_response('/api/books/?new_api=true', 'books.json')
 
 111         self.assert_response('/api/books/?format=xml', 'books.xml')
 
 113         self.assert_slugs('/api/audiobooks/', ['parent'])
 
 114         self.assert_slugs('/api/daisy/', ['parent'])
 
 115         self.assert_slugs('/api/newest/', ['parent'])
 
 116         self.assert_slugs('/api/parent_books/', ['parent'])
 
 117         self.assert_slugs('/api/recommended/', ['parent'])
 
 120         self.assert_slugs('/api/books/after/grandchild/count/1/', ['parent'])
 
 122             '/api/books/?new_api=true&after=$grandchild$3&count=1', ['parent'])
 
 125         self.assert_slugs('/api/authors/john-doe/books/', ['parent'])
 
 127             '/api/genres/sonet/books/?authors=john-doe',
 
 129         # It is probably a mistake that this doesn't filter:
 
 131             '/api/books/?authors=john-doe',
 
 132             ['child', 'grandchild', 'parent'])
 
 134         # Parent books by tag.
 
 135         # Notice this contains a grandchild, if a child doesn't have the tag.
 
 136         # This probably isn't really intended behavior and should be redefined.
 
 138             '/api/genres/sonet/parent_books/',
 
 139             ['grandchild', 'parent'])
 
 141     def test_ebooks(self):
 
 142         self.assert_json_response('/api/ebooks/', 'ebooks.json')
 
 144     def test_filter_books(self):
 
 145         self.assert_json_response('/api/filter-books/', 'filter-books.json')
 
 147             '/api/filter-books/?lektura=false',
 
 148             ['child', 'grandchild', 'parent'])
 
 150             '/api/filter-books/?lektura=true',
 
 153         Book.objects.filter(slug='grandchild').update(preview=True)
 
 154         # Skipping: we don't allow previewed books in filtered list.
 
 156         #    '/api/filter-books/?preview=true',
 
 159             '/api/filter-books/?preview=false',
 
 161         Book.objects.filter(slug='grandchild').update(preview=False)
 
 164             '/api/filter-books/?audiobook=true',
 
 167             '/api/filter-books/?audiobook=false',
 
 168             ['child', 'grandchild'])
 
 170         self.assert_slugs('/api/filter-books/?genres=wiersz', ['child'])
 
 172         self.assert_slugs('/api/filter-books/?search=parent', ['parent'])
 
 174     def test_collections(self):
 
 175         self.assert_json_response('/api/collections/', 'collections.json')
 
 176         self.assert_json_response(
 
 177             '/api/collections/a-collection/',
 
 181         self.assert_json_response('/api/books/parent/', 'books-parent.json')
 
 182         self.assert_json_response('/api/books/child/', 'books-child.json')
 
 183         self.assert_json_response(
 
 184             '/api/books/grandchild/',
 
 185             'books-grandchild.json')
 
 188         # List of tags by category.
 
 189         self.assert_json_response('/api/genres/', 'tags.json')
 
 191     def test_fragments(self):
 
 192         # This is not supported, though it probably should be.
 
 193         # self.assert_json_response(
 
 194         #     '/api/books/child/fragments/',
 
 197         self.assert_json_response(
 
 198             '/api/genres/wiersz/fragments/',
 
 200         self.assert_json_response(
 
 201             '/api/books/child/fragments/an-anchor/',
 
 205 class BlogTests(ApiTest):
 
 207         self.assertEqual(self.load_json('/api/blog'), [])
 
 210 class OAuth1Tests(ApiTest):
 
 213         cls.user = User.objects.create(username='test')
 
 214         cls.user.set_password('test')
 
 216         cls.consumer_secret = 'len(quote(consumer secret))>=32'
 
 217         Consumer.objects.create(
 
 219             secret=cls.consumer_secret
 
 223     def tearDownClass(cls):
 
 224         User.objects.all().delete()
 
 226     def test_create_token(self):
 
 227         # Fetch request token.
 
 228         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
 
 229                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
 
 230                       "oauth_version=1.0".format(int(time())))
 
 233             quote('http://testserver/api/oauth/request_token/', safe=''),
 
 234             quote(base_query, safe='')
 
 237             (quote(self.consumer_secret) + '&').encode('latin1'),
 
 238             raw.encode('latin1'),
 
 241         h = b64encode(h).rstrip(b'\n')
 
 243         query = "{}&oauth_signature={}".format(base_query, sign)
 
 244         response = self.client.get('/api/oauth/request_token/?' + query)
 
 245         request_token_data = parse_qs(response.content.decode('latin1'))
 
 246         request_token = request_token_data['oauth_token'][0]
 
 247         request_token_secret = request_token_data['oauth_token_secret'][0]
 
 249         # Request token authorization.
 
 250         self.client.login(username='test', password='test')
 
 251         response = self.client.get(
 
 252             '/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % (
 
 256         post_data = response.context['form'].initial
 
 258         response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
 
 260             response['Location'],
 
 261             'test://oauth.callback/?oauth_token=' + request_token
 
 264         # Fetch access token.
 
 265         base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
 
 266                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
 
 267                       "oauth_token={}&oauth_version=1.0".format(
 
 268                           int(time()), request_token))
 
 271             quote('http://testserver/api/oauth/access_token/', safe=''),
 
 272             quote(base_query, safe='')
 
 275             (quote(self.consumer_secret) + '&' +
 
 276              quote(request_token_secret, safe='')).encode('latin1'),
 
 277             raw.encode('latin1'),
 
 280         h = b64encode(h).rstrip(b'\n')
 
 282         query = "{}&oauth_signature={}".format(base_query, sign)
 
 283         response = self.client.get('/api/oauth/access_token/?' + query)
 
 284         access_token_data = parse_qs(response.content.decode('latin1'))
 
 285         access_token = access_token_data['oauth_token'][0]
 
 288             Token.objects.filter(
 
 290                 token_type=Token.ACCESS,
 
 295 class AuthorizedTests(ApiTest):
 
 296     fixtures = ['test-books.yaml']
 
 300         super(AuthorizedTests, cls).setUpClass()
 
 301         cls.user = User.objects.create(username='test')
 
 302         cls.consumer = Consumer.objects.create(
 
 303             key='client', secret='12345678901234567890123456789012')
 
 304         cls.token = Token.objects.create(
 
 305             key='123456789012345678',
 
 306             secret='12345678901234567890123456789012',
 
 308             consumer=cls.consumer,
 
 309             token_type=Token.ACCESS,
 
 311         cls.key = (cls.consumer.secret + '&' + cls.token.secret).encode('latin1')
 
 314     def tearDownClass(cls):
 
 316         cls.consumer.delete()
 
 317         super(AuthorizedTests, cls).tearDownClass()
 
 319     def signed(self, url, method='GET', params=None, data=None):
 
 321             "oauth_consumer_key": self.consumer.key,
 
 322             "oauth_nonce": ("%f" % time()).replace('.', ''),
 
 323             "oauth_signature_method": "HMAC-SHA1",
 
 324             "oauth_timestamp": int(time()),
 
 325             "oauth_token": self.token.key,
 
 326             "oauth_version": "1.0",
 
 331             sign_params.update(params)
 
 333             sign_params.update(data)
 
 334         sign_params.update(auth_params)
 
 337             quote('http://testserver' + url, safe=''),
 
 339                 quote(str(k), safe='') + "=" + quote(str(v), safe='')
 
 340                 for (k, v) in sorted(sign_params.items())))
 
 342         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
 
 344             raw.encode('latin1'),
 
 346         ).digest()).rstrip(b'\n'))
 
 347         auth = 'OAuth realm="API", ' + ', '.join(
 
 348             '{}="{}"'.format(k, v) for (k, v) in auth_params.items())
 
 351             url = url + '?' + urlencode(params)
 
 352         return getattr(self.client, method.lower())(
 
 354             data=urlencode(data) if data else None,
 
 355             content_type='application/x-www-form-urlencoded',
 
 356             HTTP_AUTHORIZATION=auth,
 
 359     def signed_json(self, url, method='GET', params=None, data=None):
 
 360         return json.loads(self.signed(url, method, params, data).content)
 
 362     def test_books(self):
 
 364             [b['liked'] for b in self.signed_json('/api/books/')],
 
 365             [False, False, False]
 
 367         data = self.signed_json('/api/books/child/')
 
 368         self.assertFalse(data['parent']['liked'])
 
 369         self.assertFalse(data['children'][0]['liked'])
 
 372             self.signed_json('/api/like/parent/'),
 
 375         self.signed('/api/like/parent/', 'POST')
 
 377             self.signed_json('/api/like/parent/'),
 
 380         # There are several endpoints where 'liked' appears.
 
 381         self.assertTrue(self.signed_json('/api/parent_books/')[0]['liked'])
 
 382         self.assertTrue(self.signed_json(
 
 383             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
 
 385         self.assertTrue(self.signed_json(
 
 386             '/api/books/child/')['parent']['liked'])
 
 387         # Liked books go on shelf.
 
 389             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
 
 392         self.signed('/api/like/parent/', 'POST', {"action": "unlike"})
 
 394             self.signed_json('/api/like/parent/'),
 
 397         self.assertFalse(self.signed_json('/api/parent_books/')[0]['liked'])
 
 399     def test_reading(self):
 
 401             self.signed_json('/api/reading/parent/'),
 
 402             {"state": "not_started"}
 
 404         self.signed('/api/reading/parent/reading/', 'post')
 
 406             self.signed_json('/api/reading/parent/'),
 
 410             [x['slug'] for x in self.signed_json('/api/shelf/reading/')],
 
 413     def test_subscription(self):
 
 414         Book.objects.filter(slug='grandchild').update(preview=True)
 
 416         self.assert_slugs('/api/preview/', ['grandchild'])
 
 418             self.signed_json('/api/username/'),
 
 419             {"username": "test", "premium": False})
 
 421             self.signed('/api/epub/grandchild/').status_code,
 
 424         with patch('club.models.Membership.is_active_for', return_value=True):
 
 426                 self.signed_json('/api/username/'),
 
 427                 {"username": "test", "premium": True})
 
 428             with patch('django.core.files.storage.Storage.open',
 
 429                        return_value=BytesIO(b"<epub>")):
 
 431                     self.signed('/api/epub/grandchild/').content,
 
 434         Book.objects.filter(slug='grandchild').update(preview=False)
 
 436     def test_publish(self):
 
 437         response = self.signed('/api/books/',
 
 439                                data={"data": json.dumps({})})
 
 440         self.assertEqual(response.status_code, 403)
 
 442         self.user.is_superuser = True
 
 445         with patch('catalogue.models.Book.from_xml_file') as mock:
 
 446             response = self.signed('/api/books/',
 
 448                                    data={"data": json.dumps({
 
 449                                        "book_xml": "<utwor/>"
 
 451             self.assertTrue(mock.called)
 
 452         self.assertEqual(response.status_code, 201)
 
 454         self.user.is_superuser = False