X-Git-Url: https://git.mdrn.pl/wolnelektury.git/blobdiff_plain/d0ab1d8908cadac9f51a17e2fe9e1193f14e28cc..0e87ae0739ed3e72301b7b718098f97a7f06a5d8:/apps/piston/models.py diff --git a/apps/piston/models.py b/apps/piston/models.py new file mode 100644 index 000000000..9c93af85d --- /dev/null +++ b/apps/piston/models.py @@ -0,0 +1,150 @@ +import urllib, time, urlparse + +# Django imports +from django.db.models.signals import post_save, post_delete +from django.db import models +from django.contrib.auth.models import User +from django.contrib import admin +from django.core.mail import send_mail, mail_admins + +# Piston imports +from managers import TokenManager, ConsumerManager, ResourceManager +from signals import consumer_post_save, consumer_post_delete + +KEY_SIZE = 18 +SECRET_SIZE = 32 +VERIFIER_SIZE = 10 + +CONSUMER_STATES = ( + ('pending', 'Pending'), + ('accepted', 'Accepted'), + ('canceled', 'Canceled'), + ('rejected', 'Rejected') +) + +def generate_random(length=SECRET_SIZE): + return User.objects.make_random_password(length=length) + +class Nonce(models.Model): + token_key = models.CharField(max_length=KEY_SIZE) + consumer_key = models.CharField(max_length=KEY_SIZE) + key = models.CharField(max_length=255) + + def __unicode__(self): + return u"Nonce %s for %s" % (self.key, self.consumer_key) + +admin.site.register(Nonce) + +class Consumer(models.Model): + name = models.CharField(max_length=255) + description = models.TextField() + + key = models.CharField(max_length=KEY_SIZE) + secret = models.CharField(max_length=SECRET_SIZE) + + status = models.CharField(max_length=16, choices=CONSUMER_STATES, default='pending') + user = models.ForeignKey(User, null=True, blank=True, related_name='consumers') + + objects = ConsumerManager() + + def __unicode__(self): + return u"Consumer %s with key %s" % (self.name, self.key) + + def generate_random_codes(self): + """ + Used to generate random key/secret pairings. Use this after you've + added the other data in place of save(). + + c = Consumer() + c.name = "My consumer" + c.description = "An app that makes ponies from the API." + c.user = some_user_object + c.generate_random_codes() + """ + key = User.objects.make_random_password(length=KEY_SIZE) + secret = generate_random(SECRET_SIZE) + + while Consumer.objects.filter(key__exact=key, secret__exact=secret).count(): + secret = generate_random(SECRET_SIZE) + + self.key = key + self.secret = secret + self.save() + +admin.site.register(Consumer) + +class Token(models.Model): + REQUEST = 1 + ACCESS = 2 + TOKEN_TYPES = ((REQUEST, u'Request'), (ACCESS, u'Access')) + + key = models.CharField(max_length=KEY_SIZE) + secret = models.CharField(max_length=SECRET_SIZE) + verifier = models.CharField(max_length=VERIFIER_SIZE) + token_type = models.IntegerField(choices=TOKEN_TYPES) + timestamp = models.IntegerField(default=long(time.time())) + is_approved = models.BooleanField(default=False) + + user = models.ForeignKey(User, null=True, blank=True, related_name='tokens') + consumer = models.ForeignKey(Consumer) + + callback = models.CharField(max_length=255, null=True, blank=True) + callback_confirmed = models.BooleanField(default=False) + + objects = TokenManager() + + def __unicode__(self): + return u"%s Token %s for %s" % (self.get_token_type_display(), self.key, self.consumer) + + def to_string(self, only_key=False): + token_dict = { + 'oauth_token': self.key, + 'oauth_token_secret': self.secret, + 'oauth_callback_confirmed': 'true', + } + + if self.verifier: + token_dict.update({ 'oauth_verifier': self.verifier }) + + if only_key: + del token_dict['oauth_token_secret'] + + return urllib.urlencode(token_dict) + + def generate_random_codes(self): + key = User.objects.make_random_password(length=KEY_SIZE) + secret = generate_random(SECRET_SIZE) + + while Token.objects.filter(key__exact=key, secret__exact=secret).count(): + secret = generate_random(SECRET_SIZE) + + self.key = key + self.secret = secret + self.save() + + # -- OAuth 1.0a stuff + + def get_callback_url(self): + if self.callback and self.verifier: + # Append the oauth_verifier. + parts = urlparse.urlparse(self.callback) + scheme, netloc, path, params, query, fragment = parts[:6] + if query: + query = '%s&oauth_verifier=%s' % (query, self.verifier) + else: + query = 'oauth_verifier=%s' % self.verifier + return urlparse.urlunparse((scheme, netloc, path, params, + query, fragment)) + return self.callback + + def set_callback(self, callback): + if callback != "oob": # out of band, says "we can't do this!" + self.callback = callback + self.callback_confirmed = True + self.save() + +admin.site.register(Token) + +# Attach our signals +post_save.connect(consumer_post_save, sender=Consumer) +post_delete.connect(consumer_post_delete, sender=Consumer)