Allow uploading audiobooks via API.
[wolnelektury.git] / src / api / tests / tests.py
index 91e5bbf..8dd29a5 100644 (file)
@@ -17,7 +17,7 @@ from django.core.files.uploadedfile import SimpleUploadedFile
 from django.test import TestCase
 from django.test.utils import override_settings
 from mock import patch
-from piston.models import Consumer, Token
+from api.piston.models import Consumer, Token
 
 from catalogue.models import Book, Tag
 from picture.forms import PictureImportForm
@@ -45,7 +45,7 @@ class ApiTest(TestCase):
         with open(filename) as f:
             good_content = f.read().rstrip()
         self.assertEqual(content, good_content, content)
-    
+
     def assert_json_response(self, url, name):
         data = self.load_json(url)
         filename = path.join(path.dirname(__file__), 'res', 'responses', name)
@@ -230,7 +230,7 @@ class BooksTests(ApiTest):
 
 class BlogTests(ApiTest):
     def test_get(self):
-        self.assertEqual(self.load_json('/api/blog/'), [])
+        self.assertEqual(self.load_json('/api/blog'), [])
 
 
 class PreviewTests(ApiTest):
@@ -242,6 +242,8 @@ class OAuth1Tests(ApiTest):
     @classmethod
     def setUpClass(cls):
         cls.user = User.objects.create(username='test')
+        cls.user.set_password('test')
+        cls.user.save()
         cls.consumer_secret = 'len(quote(consumer secret))>=32'
         Consumer.objects.create(
             key='client',
@@ -253,7 +255,8 @@ class OAuth1Tests(ApiTest):
         User.objects.all().delete()
 
     def test_create_token(self):
-        base_query = ("oauth_consumer_key=client&oauth_nonce=123&"
+        # Fetch request token.
+        base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
                       "oauth_version=1.0".format(int(time())))
         raw = '&'.join([
@@ -268,16 +271,26 @@ class OAuth1Tests(ApiTest):
         sign = quote(h)
         query = "{}&oauth_signature={}".format(base_query, sign)
         response = self.client.get('/api/oauth/request_token/?' + query)
-        request_token = parse_qs(response.content)
+        request_token_data = parse_qs(response.content)
+        request_token = request_token_data['oauth_token'][0]
+        request_token_secret = request_token_data['oauth_token_secret'][0]
+
+        # Request token authorization.
+        self.client.login(username='test', password='test')
+        response = self.client.get('/api/oauth/authorize/?oauth_token=%s&oauth_callback=test://oauth.callback/' % request_token)
+        post_data = response.context['form'].initial
 
-        Token.objects.filter(
-            key=request_token['oauth_token'][0], token_type=Token.REQUEST
-        ).update(user=self.user, is_approved=True)
+        response = self.client.post('/api/oauth/authorize/?' + urlencode(post_data))
+        self.assertEqual(
+            response['Location'],
+            'test://oauth.callback/?oauth_token=' + request_token
+        )
 
-        base_query = ("oauth_consumer_key=client&oauth_nonce=123&"
+        # Fetch access token.
+        base_query = ("oauth_consumer_key=client&oauth_nonce=12345678&"
                       "oauth_signature_method=HMAC-SHA1&oauth_timestamp={}&"
                       "oauth_token={}&oauth_version=1.0".format(
-                          int(time()), request_token['oauth_token'][0]))
+                          int(time()), request_token))
         raw = '&'.join([
             'GET',
             quote('http://testserver/api/oauth/access_token/', safe=''),
@@ -285,7 +298,7 @@ class OAuth1Tests(ApiTest):
         ])
         h = hmac.new(
             quote(self.consumer_secret) + '&' +
-            quote(request_token['oauth_token_secret'][0], safe=''),
+            quote(request_token_secret, safe=''),
             raw,
             hashlib.sha1
         ).digest()
@@ -293,11 +306,12 @@ class OAuth1Tests(ApiTest):
         sign = quote(h)
         query = u"{}&oauth_signature={}".format(base_query, sign)
         response = self.client.get(u'/api/oauth/access_token/?' + query)
-        access_token = parse_qs(response.content)
+        access_token_data = parse_qs(response.content)
+        access_token = access_token_data['oauth_token'][0]
 
         self.assertTrue(
             Token.objects.filter(
-                key=access_token['oauth_token'][0],
+                key=access_token,
                 token_type=Token.ACCESS,
                 user=self.user
             ).exists())
@@ -327,10 +341,10 @@ class AuthorizedTests(ApiTest):
         cls.consumer.delete()
         super(AuthorizedTests, cls).tearDownClass()
 
-    def signed(self, url, method='GET', params=None):
+    def signed(self, url, method='GET', params=None, data=None):
         auth_params = {
             "oauth_consumer_key": self.consumer.key,
-            "oauth_nonce": "%f" % time(),
+            "oauth_nonce": ("%f" % time()).replace('.', ''),
             "oauth_signature_method": "HMAC-SHA1",
             "oauth_timestamp": int(time()),
             "oauth_token": self.token.key,
@@ -340,12 +354,14 @@ class AuthorizedTests(ApiTest):
         sign_params = {}
         if params:
             sign_params.update(params)
+        if data:
+            sign_params.update(data)
         sign_params.update(auth_params)
         raw = "&".join([
             method.upper(),
             quote('http://testserver' + url, safe=''),
             quote("&".join(
-                quote(str(k)) + "=" + quote(str(v))
+                quote(str(k), safe='') + "=" + quote(str(v), safe='')
                 for (k, v) in sorted(sign_params.items())))
         ])
         auth_params["oauth_signature"] = quote(b64encode(hmac.new(
@@ -356,22 +372,23 @@ class AuthorizedTests(ApiTest):
         if params:
             url = url + '?' + urlencode(params)
         return getattr(self.client, method.lower())(
-                url,
-                HTTP_AUTHORIZATION=auth
-            )
+            url,
+            data=urlencode(data) if data else None,
+            content_type='application/x-www-form-urlencoded',
+            HTTP_AUTHORIZATION=auth,
+        )
 
-    def signed_json(self, url, method='GET', params=None):
-        return json.loads(self.signed(url, method, params).content)
+    def signed_json(self, url, method='GET', params=None, data=None):
+        return json.loads(self.signed(url, method, params, data).content)
 
     def test_books(self):
         self.assertEqual(
             [b['liked'] for b in self.signed_json('/api/books/')],
             [False, False, False]
         )
-        # This one fails in the legacy implementation
-        # data = self.signed_json('/api/books/child/')
-        # self.assertFalse(data['parent']['liked'])
-        # self.assertFalse(data['children'][0]['liked'])
+        data = self.signed_json('/api/books/child/')
+        self.assertFalse(data['parent']['liked'])
+        self.assertFalse(data['children'][0]['liked'])
 
         self.assertEqual(
             self.signed_json('/api/like/parent/'),
@@ -387,9 +404,8 @@ class AuthorizedTests(ApiTest):
         self.assertTrue(self.signed_json(
             '/api/filter-books/', params={"search": "parent"})[0]['liked'])
 
-        # This one fails in the legacy implementation.
-        #self.assertTrue(self.signed_json(
-        #    '/api/books/child/')['parent']['liked'])
+        self.assertTrue(self.signed_json(
+            '/api/books/child/')['parent']['liked'])
         # Liked books go on shelf.
         self.assertEqual(
             [x['slug'] for x in self.signed_json('/api/shelf/likes/')],
@@ -423,14 +439,51 @@ class AuthorizedTests(ApiTest):
             {"username": "test", "premium": False})
         self.assertEqual(
             self.signed('/api/epub/grandchild/').status_code,
-            401)  # Not 403 because Piston.
+            403)
 
-        with patch('api.handlers.user_is_subscribed', return_value=True):
+        with patch('api.fields.user_is_subscribed', return_value=True):
             self.assertEqual(
                 self.signed_json('/api/username/'),
                 {"username": "test", "premium": True})
+        with patch('paypal.permissions.user_is_subscribed', return_value=True):
             with patch('django.core.files.storage.Storage.open',
                        return_value=StringIO("<epub>")):
                 self.assertEqual(
                     self.signed('/api/epub/grandchild/').content,
                     "<epub>")
+
+    def test_publish(self):
+        response = self.signed('/api/books/',
+                               method='POST',
+                               data={"data": json.dumps({})})
+        self.assertEqual(response.status_code, 403)
+
+        response = self.signed('/api/pictures/',
+                               method='POST',
+                               data={"data": json.dumps({})})
+        self.assertEqual(response.status_code, 403)
+
+        self.user.is_superuser = True
+        self.user.save()
+
+        with patch('catalogue.models.Book.from_xml_file') as mock:
+            response = self.signed('/api/books/',
+                                   method='POST',
+                                   data={"data": json.dumps({
+                                       "book_xml": "<utwor/>"
+                                   })})
+            self.assertTrue(mock.called)
+        self.assertEqual(response.status_code, 201)
+
+        with patch('picture.models.Picture.from_xml_file') as mock:
+            response = self.signed('/api/pictures/',
+                                   method='POST',
+                                   data={"data": json.dumps({
+                                       "picture_xml": "<utwor/>",
+                                       "picture_image_data": "Kg==",
+                                   })})
+            self.assertTrue(mock.called)
+        self.assertEqual(response.status_code, 201)
+
+        self.user.is_superuser = False
+        self.user.save()