1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
5 from oauthlib.oauth1 import RequestValidator
6 from api.models import Consumer, Nonce, Token
9 class PistonRequestValidator(RequestValidator):
10 timestamp_threshold = 300
12 dummy_access_token = '!'
16 # It'd be a little more kosher to use test client with secure=True.
19 # iOS app generates 8-char nonces.
22 # Because Token.key is char(18).
23 request_token_length = 18, 32
24 access_token_length = 18, 32
25 # TODO: oauthlib request-access switch.
27 def check_client_key(self, client_key):
28 """We control the keys anyway."""
31 def get_request_token_secret(self, client_key, token, request):
32 return request.token.secret
34 def get_access_token_secret(self, client_key, token, request):
35 return request.token.secret
37 def get_default_realms(self, client_key, request):
40 def validate_request_token(self, client_key, token, request):
42 token = Token.objects.get(
43 token_type=Token.REQUEST,
44 consumer__key=client_key,
48 except Token.DoesNotExist:
54 def validate_access_token(self, client_key, token, request):
56 token = Token.objects.get(
57 token_type=Token.ACCESS,
58 consumer__key=client_key,
61 except Token.DoesNotExist:
67 def validate_timestamp_and_nonce(self, client_key, timestamp, nonce,
68 request, request_token=None, access_token=None):
69 if abs(time.time() - int(timestamp)) > self.timestamp_threshold:
71 token = request_token or access_token
72 # Yes, this is what Piston did.
76 nonce, created = Nonce.objects.get_or_create(consumer_key=client_key,
81 def validate_client_key(self, client_key, request):
83 request.oauth_consumer = Consumer.objects.get(key=client_key)
84 except Consumer.DoesNotExist:
88 def validate_realms(self, client_key, token, request, uri=None, realms=None):
91 def validate_requested_realms(self, *args, **kwargs):
94 def validate_redirect_uri(self, *args, **kwargs):
97 def validate_verifier(self, client_key, token, verifier, request):
100 def get_client_secret(self, client_key, request):
101 return request.oauth_consumer.secret
103 def save_request_token(self, token, request):
104 Token.objects.create(
105 token_type=Token.REQUEST,
106 timestamp=request.timestamp,
107 key=token['oauth_token'],
108 secret=token['oauth_token_secret'],
109 consumer=request.oauth_consumer,
112 def save_access_token(self, token, request):
113 Token.objects.create(
114 token_type=Token.ACCESS,
115 timestamp=request.timestamp,
116 key=token['oauth_token'],
117 secret=token['oauth_token_secret'],
118 consumer=request.oauth_consumer,
119 user=request.token.user,
122 def verify_request_token(self, token, request):
123 return Token.objects.filter(
124 token_type=Token.REQUEST, key=token, is_approved=False
127 def get_realms(self, *args, **kwargs):
130 def save_verifier(self, token, verifier, request):
131 Token.objects.filter(
132 token_type=Token.REQUEST,
137 user=verifier['user']
140 def get_redirect_uri(self, token, request):
141 return request.redirect_uri
143 def invalidate_request_token(self, client_key, request_token, request):
144 Token.objects.filter(
145 token_type=Token.REQUEST,
147 consumer__key=client_key,