Add messaging.
[wolnelektury.git] / src / api / request_validator.py
1 # This file is part of Wolnelektury, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 import time
5 from oauthlib.oauth1 import RequestValidator
6 from api.models import Consumer, Nonce, Token
7
8
9 class PistonRequestValidator(RequestValidator):
10     timestamp_threshold = 300
11
12     dummy_access_token = '!'
13     realms = ['API']
14
15     # Just for the tests.
16     # It'd be a little more kosher to use test client with secure=True.
17     enforce_ssl = False
18
19     # iOS app generates 8-char nonces.
20     nonce_length = 8, 250
21
22     # Because Token.key is char(18).
23     request_token_length = 18, 32
24     access_token_length = 18, 32
25     # TODO: oauthlib request-access switch.
26
27     def check_client_key(self, client_key):
28         """We control the keys anyway."""
29         return True
30
31     def get_request_token_secret(self, client_key, token, request):
32         return request.token.secret
33
34     def get_access_token_secret(self, client_key, token, request):
35         return request.token.secret
36
37     def get_default_realms(self, client_key, request):
38         return ['API']
39
40     def validate_request_token(self, client_key, token, request):
41         try:
42             token = Token.objects.get(
43                 token_type=Token.REQUEST,
44                 consumer__key=client_key,
45                 key=token,
46                 is_approved=True,
47             )
48         except Token.DoesNotExist:
49             return False
50         else:
51             request.token = token
52             return True
53
54     def validate_access_token(self, client_key, token, request):
55         try:
56             token = Token.objects.get(
57                 token_type=Token.ACCESS,
58                 consumer__key=client_key,
59                 key=token
60             )
61         except Token.DoesNotExist:
62             return False
63         else:
64             request.token = token
65             return True
66
67     def validate_timestamp_and_nonce(self, client_key, timestamp, nonce,
68                                      request, request_token=None, access_token=None):
69         if abs(time.time() - int(timestamp)) > self.timestamp_threshold:
70             return False
71         token = request_token or access_token
72         # Yes, this is what Piston did.
73         if token is None:
74             return True
75
76         nonce, created = Nonce.objects.get_or_create(consumer_key=client_key,
77                                                      token_key=token,
78                                                      key=nonce)
79         return created
80
81     def validate_client_key(self, client_key, request):
82         try:
83             request.oauth_consumer = Consumer.objects.get(key=client_key)
84         except Consumer.DoesNotExist:
85             return False
86         return True
87
88     def validate_realms(self, client_key, token, request, uri=None, realms=None):
89         return True
90
91     def validate_requested_realms(self, *args, **kwargs):
92         return True
93
94     def validate_redirect_uri(self, *args, **kwargs):
95         return True
96
97     def validate_verifier(self, client_key, token, verifier, request):
98         return True
99
100     def get_client_secret(self, client_key, request):
101         return request.oauth_consumer.secret
102
103     def save_request_token(self, token, request):
104         Token.objects.create(
105             token_type=Token.REQUEST,
106             timestamp=request.timestamp,
107             key=token['oauth_token'],
108             secret=token['oauth_token_secret'],
109             consumer=request.oauth_consumer,
110         )
111
112     def save_access_token(self, token, request):
113         Token.objects.create(
114             token_type=Token.ACCESS,
115             timestamp=request.timestamp,
116             key=token['oauth_token'],
117             secret=token['oauth_token_secret'],
118             consumer=request.oauth_consumer,
119             user=request.token.user,
120         )
121
122     def verify_request_token(self, token, request):
123         return Token.objects.filter(
124             token_type=Token.REQUEST, key=token, is_approved=False
125         ).exists()
126
127     def get_realms(self, *args, **kwargs):
128         return []
129
130     def save_verifier(self, token, verifier, request):
131         Token.objects.filter(
132             token_type=Token.REQUEST,
133             key=token,
134             is_approved=False
135         ).update(
136             is_approved=True,
137             user=verifier['user']
138         )
139
140     def get_redirect_uri(self, token, request):
141         return request.redirect_uri
142
143     def invalidate_request_token(self, client_key, request_token, request):
144         Token.objects.filter(
145             token_type=Token.REQUEST,
146             key=request_token,
147             consumer__key=client_key,
148         )