add a filter
[wolnelektury.git] / src / api / request_validator.py
1 # This file is part of Wolne Lektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
3 #
4 import time
5 from oauthlib.oauth1 import RequestValidator
6 from api.models import Consumer, Nonce, Token
7
8
9 class PistonRequestValidator(RequestValidator):
10     timestamp_threshold = 300
11
12     dummy_access_token = '!'
13     realms = ['API']
14
15     # Just for the tests.
16     # It'd be a little more kosher to use test client with secure=True.
17     enforce_ssl = False
18
19     # iOS app generates 8-char nonces.
20     nonce_length = 8, 250
21
22     # Because Token.key is char(18).
23     request_token_length = 18, 32
24     access_token_length = 18, 32
25     # TODO: oauthlib request-access switch.
26
27     def check_client_key(self, client_key):
28         """We control the keys anyway."""
29         return True
30
31     def get_request_token_secret(self, client_key, token, request):
32         return request.token.secret
33
34     def get_access_token_secret(self, client_key, token, request):
35         if request.token:
36             return request.token.secret
37         else:
38             try:
39                 token = Token.objects.get(
40                     token_type=Token.ACCESS,
41                     consumer__key=client_key,
42                     key=token
43                 )
44             except: return None
45             return token.secret
46
47     def get_default_realms(self, client_key, request):
48         return ['API']
49
50     def validate_request_token(self, client_key, token, request):
51         try:
52             token = Token.objects.get(
53                 token_type=Token.REQUEST,
54                 consumer__key=client_key,
55                 key=token,
56                 is_approved=True,
57             )
58         except Token.DoesNotExist:
59             return False
60         else:
61             request.token = token
62             return True
63
64     def validate_access_token(self, client_key, token, request):
65         try:
66             token = Token.objects.get(
67                 token_type=Token.ACCESS,
68                 consumer__key=client_key,
69                 key=token
70             )
71         except Token.DoesNotExist:
72             return False
73         else:
74             request.token = token
75             return True
76
77     def validate_timestamp_and_nonce(self, client_key, timestamp, nonce,
78                                      request, request_token=None, access_token=None):
79         if abs(time.time() - int(timestamp)) > self.timestamp_threshold:
80             return False
81         token = request_token or access_token
82         # Yes, this is what Piston did.
83         if token is None:
84             return True
85
86         nonce, created = Nonce.objects.get_or_create(consumer_key=client_key,
87                                                      token_key=token,
88                                                      key=nonce)
89         return created
90
91     def validate_client_key(self, client_key, request):
92         try:
93             request.oauth_consumer = Consumer.objects.get(key=client_key)
94         except Consumer.DoesNotExist:
95             return False
96         return True
97
98     def validate_realms(self, client_key, token, request, uri=None, realms=None):
99         return True
100
101     def validate_requested_realms(self, *args, **kwargs):
102         return True
103
104     def validate_redirect_uri(self, *args, **kwargs):
105         return True
106
107     def validate_verifier(self, client_key, token, verifier, request):
108         return True
109
110     def get_client_secret(self, client_key, request):
111         return request.oauth_consumer.secret
112
113     def save_request_token(self, token, request):
114         Token.objects.create(
115             token_type=Token.REQUEST,
116             timestamp=request.timestamp,
117             key=token['oauth_token'],
118             secret=token['oauth_token_secret'],
119             consumer=request.oauth_consumer,
120         )
121
122     def save_access_token(self, token, request):
123         Token.objects.create(
124             token_type=Token.ACCESS,
125             timestamp=request.timestamp,
126             key=token['oauth_token'],
127             secret=token['oauth_token_secret'],
128             consumer=request.oauth_consumer,
129             user=request.token.user,
130         )
131
132     def verify_request_token(self, token, request):
133         return Token.objects.filter(
134             token_type=Token.REQUEST, key=token, is_approved=False
135         ).exists()
136
137     def get_realms(self, *args, **kwargs):
138         return []
139
140     def save_verifier(self, token, verifier, request):
141         Token.objects.filter(
142             token_type=Token.REQUEST,
143             key=token,
144             is_approved=False
145         ).update(
146             is_approved=True,
147             user=verifier['user']
148         )
149
150     def get_redirect_uri(self, token, request):
151         return request.redirect_uri
152
153     def invalidate_request_token(self, client_key, request_token, request):
154         Token.objects.filter(
155             token_type=Token.REQUEST,
156             key=request_token,
157             consumer__key=client_key,
158         )