*.egg-info
# Mac OS X garbage
-.DS_Store
\ No newline at end of file
+.DS_Store
+
+# PyDev garbage
+.tmp*
Chris Williams <chris@nitron.org>
+Alex Kamedov <alex@kamedov.ru>
+Łukasz Rekucki <lrekucki@gmail.com>
Marek Stepniowski <marek@stepniowski.com>
+Fred Wenzel <fwenzel@mozilla.com>
recursive-include cas_provider/templates *.html
+include AUTHORS.txt
include README.rst
include LICENSE
\ No newline at end of file
To install, run the following command from this directory::
- python setup.py install
+ python setup.py install
Or, put `cas_provider` somewhere on your Python path.
+
+If you want use CAS v.2 protocol or above, you must install `lxml` package to correct work.
+
USAGE
======
#. In *settings.py*, set ``LOGIN_URL`` to ``'/cas/login/'`` and ``LOGOUT_URL`` to ``'/cas/logout/'``
#. In *urls.py*, put the following line: ``(r'^cas/', include('cas_provider.urls')),``
#. Create login/logout templates (or modify the samples)
+#. Use 'cleanuptickets' management command to clean up expired tickets
+
+SETTINGS
+=========
+
+CAS_TICKET_EXPIRATION - minutes to tickets expiration. Default is 5 minutes.
+
+CAS_CUSTOM_ATTRIBUTES_CALLBACK - name of callback to provide dictionary with
+extended user attributes (may be used in CAS v.2 or above). Default is None.
+
+CAS_CUSTOM_ATTRIBUTES_FORMAT - name of custom attribute formatter callback will be
+used to format custom user attributes. This package provide module `attribute_formatters`
+with formatters for common used formats. Available formats styles are `RubyCAS`, `Jasig`
+and `Name-Value. Default is Jasig style. See module source code for more details.
+
+
+PROTOCOL DOCUMENTATION
+=====================
+
+* `CAS Protocol <http://www.jasig.org/cas/protocol>`
+* `CAS 1 Architecture <http://www.jasig.org/cas/cas1-architecture>`
+* `CAS 2 Architecture <http://www.jasig.org/cas/cas2-architecture>`
+* `Proxy Authentication <http://www.jasig.org/cas/proxy-authentication>`
+* `CAS – Central Authentication Service <http://www.jusfortechies.com/cas/overview.html>`
+* `Proxy CAS Walkthrough <https://wiki.jasig.org/display/CAS/Proxy+CAS+Walkthrough>`
+
+PROVIDED VIEWS
+=============
+
+login
+---------
+
+It has not required arguments.
+
+Optional arguments:
+
+* template_name - login form template name (default is 'cas/login.html')
+* success_redirect - redirect after successful login if service GET argument is not provided
+ (default is settings.LOGIN_REDIRECT_URL)
+* warn_template_name - warning page template name to allow login user to service if he
+ already authenticated in SSO (default is 'cas/warn.html')
+
+If request.GET has 'warn' argument and user has already authenticated in SSO it shows
+warning message instead of generate Service Ticket and redirect.
+
+logout
+-----------
+
+This destroys a client's single sign-on CAS session. The ticket-granting cookie is destroyed,
+and subsequent requests to login view will not obtain service tickets until the user again
+presents primary credentials (and thereby establishes a new single sign-on session).
+
+It has not required arguments.
+
+Optional arguments:
+
+* template_name - template name for page with successful logout message (default is 'cas/logout.html')
+
+validate
+-------------
+
+It checks the validity of a service ticket. It is part of the CAS 1.0 protocol and thus does
+not handle proxy authentication.
+
+It has not arguments.
+
+service_validate
+-------------------------
+
+It checks the validity of a service ticket and returns an XML-fragment response via CAS 2.0 protocol.
+Work with proxy is not supported yet.
+
+It has not arguments.
+
+
+CUSTOM USER ATTRIBUTES FORMAT
+===========================
+
+Custom attribute format style may be changed in project settings with
+CAS_CUSTOM_ATTRIBUTES_FORMAT constant. You can provide your own formatter callback
+or specify existing in this package in `attribute_formatters` module.
+
+Attribute formatter callback takes two arguments:
+
+* `auth_success` - `cas:authenticationSuccess` node. It is `lxml.etree.SubElement`instance;
+* `attrs` - dictionary with user attributes received from callback specified in
+ CAS_CUSTOM_ATTRIBUTES_CALLBACK in project settings.
+
+Example of generated XML below::
+
+ <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
+ <cas:authenticationSuccess>
+ <cas:user>jsmith</cas:user>
+
+ <!-- extended user attributes wiil be here -->
+
+ <cas:proxyGrantingTicket>PGTIOU-84678-8a9d2sfa23casd</cas:proxyGrantingTicket>
+ </cas:authenticationSuccess>
+ </cas:serviceResponse>
+
+
+* Name-Value style (provided in `cas_provider.attribute_formatters.name_value`)::
+
+ <cas:attribute name='attraStyle' value='Name-Value' />
+ <cas:attribute name='surname' value='Smith' />
+ <cas:attribute name='givenName' value='John' />
+ <cas:attribute name='memberOf' value='CN=Staff,OU=Groups,DC=example,DC=edu' />
+ <cas:attribute name='memberOf' value='CN=Spanish Department,OU=Departments,OU=Groups,DC=example,DC=edu' />
+
+
+* Jasig Style attributes (provided in `cas_provider.attribute_formatters.jasig`)::
+
+ <cas:attributes>
+ <cas:attraStyle>Jasig</cas:attraStyle>
+ <cas:surname>Smith</cas:surname>
+ <cas:givenName>John</cas:givenName>
+ <cas:memberOf>CN=Staff,OU=Groups,DC=example,DC=edu</cas:memberOf>
+ <cas:memberOf>CN=Spanish Department,OU=Departments,OU=Groups,DC=example,DC=edu</cas:memberOf>
+ </cas:attributes>
+
+
+* RubyCAS style (provided in `cas_provider.attribute_formatters.ruby_cas`)::
+
+ <cas:attraStyle>RubyCAS</cas:attraStyle>
+ <cas:surname>Smith</cas:surname>
+ <cas:givenName>John</cas:givenName>
+ <cas:memberOf>CN=Staff,OU=Groups,DC=example,DC=edu</cas:memberOf>
+ <cas:memberOf>CN=Spanish Department,OU=Departments,OU=Groups,DC=example,DC=edu</cas:memberOf>
+
_DEFAULTS = {
'CAS_TICKET_EXPIRATION': 5, # In minutes
'CAS_CUSTOM_ATTRIBUTES_CALLBACK': None,
+ 'CAS_CUSTOM_ATTRIBUTES_FORMATER': 'cas_provider.attribute_formatters.jasig',
}
for key, value in _DEFAULTS.iteritems():
except AttributeError:
setattr(settings, key, value)
except ImportError:
- pass
\ No newline at end of file
+ pass
from django.contrib import admin
+from models import *
-from cas_provider.models import ServiceTicket, LoginTicket
class ServiceTicketAdmin(admin.ModelAdmin):
- pass
-admin.site.register(ServiceTicket, ServiceTicketAdmin)
+ list_display = ('user', 'service', 'created')
+ list_filter = ('created',)
class LoginTicketAdmin(admin.ModelAdmin):
- pass
-admin.site.register(LoginTicket, LoginTicketAdmin)
\ No newline at end of file
+ list_display = ('ticket', 'created')
+ list_filter = ('created',)
+
+admin.site.register(ServiceTicket, ServiceTicketAdmin)
+admin.site.register(LoginTicket, LoginTicketAdmin)
--- /dev/null
+from lxml import etree
+
+CAS_URI = 'http://www.yale.edu/tp/cas'
+NSMAP = {'cas': CAS_URI}
+CAS = '{%s}' % CAS_URI
+
+
+def jasig(auth_success, attrs):
+ attributes = etree.SubElement(auth_success, CAS + 'attributes')
+ style = etree.SubElement(attributes, CAS + 'attraStyle')
+ style.text = u'Jasig'
+ for name, value in attrs.items():
+ if isinstance(value, list):
+ for e in value:
+ element = etree.SubElement(attributes, CAS + name)
+ element.text = e
+ else:
+ element = etree.SubElement(attributes, CAS + name)
+ element.text = value
+
+
+def ruby_cas(auth_success, attrs):
+ style = etree.SubElement(auth_success, CAS + 'attraStyle')
+ style.text = u'RubyCAS'
+ for name, value in attrs.items():
+ if isinstance(value, list):
+ for e in value:
+ element = etree.SubElement(auth_success, CAS + name)
+ element.text = e
+ else:
+ element = etree.SubElement(auth_success, CAS + name)
+ element.text = value
+
+def name_value(auth_success, attrs):
+ etree.SubElement(auth_success, CAS + 'attribute', name=u'attraStyle', value=u'Name-Value')
+ for name, value in attrs.items():
+ if isinstance(value, list):
+ for e in value:
+ etree.SubElement(auth_success, CAS + 'attribute', name=name, value=e)
+ else:
+ etree.SubElement(auth_success, CAS + 'attribute', name=name, value=value)
+++ /dev/null
-# Import etree from anywhere
-try:
- # lxml http://codespeak.net/lxml/
- from lxml import etree
-
- # Define register_namespace function and ElementRoot for proper serialization
- NSMAP = {}
- def register_namespace(prefix, uri):
- NSMAP[prefix] = uri
-
- def ElementRoot(*args, **kwargs):
- kwargs['nsmap'] = NSMAP
- return etree.Element(*args, **kwargs)
-
-except ImportError:
- try:
- # normal cElementTree install
- import cElementTree as etree
- except ImportError:
- # normal ElementTree install
- import elementtree.ElementTree as etree
-
- try:
- register_namespace = etree.register_namespace
- except AttributeError:
- def register_namespace(prefix, uri):
- etree._namespace_map[uri] = prefix
-
- def ElementRoot(*args, **kwargs):
- return etree.Element(*args, **kwargs)
-
-__all__ = ('etree', 'register_namespace', 'ElementRoot')
--- /dev/null
+[
+ {
+ "pk": 1,
+ "model": "auth.group",
+ "fields": {
+ "name": "editor",
+ "permissions": []
+ }
+ },
+ {
+ "pk": 2,
+ "model": "auth.group",
+ "fields": {
+ "name": "author",
+ "permissions": []
+ }
+ },
+ {
+ "pk": 1,
+ "model": "auth.user",
+ "fields": {
+ "username": "root",
+ "first_name": "",
+ "last_name": "",
+ "is_active": true,
+ "is_superuser": true,
+ "is_staff": true,
+ "last_login": "2011-04-24 11:29:11",
+ "groups": [],
+ "user_permissions": [],
+ "password": "sha1$602c5$ba8608296f6bfcb352e978084b337a90d586ecc3",
+ "email": "root@example.com",
+ "date_joined": "2010-07-04 13:33:14"
+ }
+ },
+ {
+ "pk": 26,
+ "model": "auth.user",
+ "fields": {
+ "username": "active",
+ "first_name": "",
+ "last_name": "",
+ "is_active": true,
+ "is_superuser": false,
+ "is_staff": false,
+ "last_login": "2011-04-01 12:42:53",
+ "groups": [],
+ "user_permissions": [],
+ "password": "sha1$7dfb4$d19f8340a01b597089dfde6dc17bc5288c1f863e",
+ "email": "active@example.com",
+ "date_joined": "2011-04-01 11:12:45"
+ }
+ },
+ {
+ "pk": 30,
+ "model": "auth.user",
+ "fields": {
+ "username": "author",
+ "first_name": "",
+ "last_name": "",
+ "is_active": true,
+ "is_superuser": false,
+ "is_staff": true,
+ "last_login": "2011-04-24 11:32:16",
+ "groups": [
+ 2
+ ],
+ "user_permissions": [],
+ "password": "sha1$6c580$01509bea19e3ade9f1bcf303205a7cb10ce6762d",
+ "email": "",
+ "date_joined": "2011-04-24 11:32:16"
+ }
+ },
+ {
+ "pk": 29,
+ "model": "auth.user",
+ "fields": {
+ "username": "editor",
+ "first_name": "",
+ "last_name": "",
+ "is_active": true,
+ "is_superuser": false,
+ "is_staff": true,
+ "last_login": "2011-04-24 11:31:50",
+ "groups": [
+ 1
+ ],
+ "user_permissions": [],
+ "password": "sha1$3be01$b6aa05c61fc52edae3055c55e160d4cfd4756d91",
+ "email": "editor@exapmle.com",
+ "date_joined": "2011-04-24 11:31:50"
+ }
+ },
+ {
+ "pk": 27,
+ "model": "auth.user",
+ "fields": {
+ "username": "nonactive",
+ "first_name": "",
+ "last_name": "",
+ "is_active": false,
+ "is_superuser": false,
+ "is_staff": false,
+ "last_login": "2011-04-24 11:31:00",
+ "groups": [],
+ "user_permissions": [],
+ "password": "sha1$0bf10$d60f146d15e4fe3cb0de5a607a17902d0f63a95c",
+ "email": "",
+ "date_joined": "2011-04-24 11:31:00"
+ }
+ },
+ {
+ "pk": 28,
+ "model": "auth.user",
+ "fields": {
+ "username": "staff",
+ "first_name": "",
+ "last_name": "",
+ "is_active": true,
+ "is_superuser": false,
+ "is_staff": true,
+ "last_login": "2011-04-24 11:31:26",
+ "groups": [],
+ "user_permissions": [],
+ "password": "sha1$5df85$bb4c1894a866fb86465d28831000af20316233d5",
+ "email": "staff@example.com",
+ "date_joined": "2011-04-24 11:31:26"
+ }
+ }
+]
\ No newline at end of file
from django import forms
-from django.contrib.auth.forms import AuthenticationForm
+from django.conf import settings
from django.contrib.auth import authenticate
+from django.core.exceptions import ValidationError
+from django.utils.translation import ugettext_lazy as _
+from models import LoginTicket
+import datetime
+
+
+__all__ = ['LoginForm', ]
-from cas_provider.utils import create_login_ticket
class LoginForm(forms.Form):
- username = forms.CharField(max_length=30)
- password = forms.CharField(widget=forms.PasswordInput)
- #warn = forms.BooleanField(required=False) # TODO: Implement
- lt = forms.CharField(widget=forms.HiddenInput, initial=create_login_ticket)
- def __init__(self, service=None, renew=None, gateway=None, request=None, *args, **kwargs):
+ username = forms.CharField(max_length=30, label=_('username'))
+ password = forms.CharField(widget=forms.PasswordInput, label=_('password'))
+ lt = forms.CharField(widget=forms.HiddenInput)
+ service = forms.CharField(widget=forms.HiddenInput, required=False)
+
+ def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
- self.request = request
- if service is not None:
- self.fields['service'] = forms.CharField(widget=forms.HiddenInput, initial=service)
\ No newline at end of file
+ self._user = None
+
+ def clean_lt(self):
+ ticket = self.cleaned_data['lt']
+ timeframe = datetime.datetime.now() - \
+ datetime.timedelta(minutes=settings.CAS_TICKET_EXPIRATION)
+ try:
+ return LoginTicket.objects.get(ticket=ticket, created__gte=timeframe)
+ except LoginTicket.DoesNotExist:
+ raise ValidationError(_('Login ticket expired. Please try again.'))
+ return ticket
+
+ def clean(self):
+ username = self.cleaned_data.get('username')
+ password = self.cleaned_data.get('password')
+ user = authenticate(username=username, password=password)
+ if user is None:
+ raise ValidationError(_('Incorrect username and/or password.'))
+ if not user.is_active:
+ raise ValidationError(_('This account is disabled.'))
+ self._user = user
+ self.cleaned_data.get('lt').delete()
+ return self.cleaned_data
+
+ def get_user(self):
+ return self._user
+
+ def get_errors(self):
+ errors = []
+ for k, error in self.errors.items():
+ errors += [e for e in error]
+ return errors
--- /dev/null
+# SOME DESCRIPTIVE TITLE.
+# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
+# This file is distributed under the same license as the PACKAGE package.
+# Alex Kamedov <alex@kamedov.ru>, 2011.
+msgid ""
+msgstr ""
+"Project-Id-Version: PACKAGE VERSION\n"
+"Report-Msgid-Bugs-To: \n"
+"POT-Creation-Date: 2011-04-24 18:51+0600\n"
+"PO-Revision-Date: 2011-04-07 12:01+0600\n"
+"Last-Translator: Volf <alex@kamedov.ru>\n"
+"Language-Team: delux\n"
+"Language: ru\n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+"Plural-Forms: nplurals=3; plural=(n%10==1 && n%100!=11 ? 0 : n%10>=2 && n"
+"%10<=4 && (n%100<10 || n%100>=20) ? 1 : 2);\n"
+"X-Generator: Virtaal 0.6.1\n"
+
+#: forms.py:14
+msgid "username"
+msgstr "имя пользователя"
+
+#: forms.py:15
+msgid "password"
+msgstr "пароль"
+
+#: forms.py:30
+msgid "Login ticket expired. Please try again."
+msgstr "Истек срок действия билета входа. Пожалуйста, попробуйте еще раз."
+
+#: forms.py:38
+msgid "Incorrect username and/or password."
+msgstr "Неверное имя пользователя и/или пароль."
+
+#: forms.py:40
+msgid "This account is disabled."
+msgstr "Эта учетная запись отключена."
+
+#: models.py:14
+msgid "ticket"
+msgstr "билет"
+
+#: models.py:15
+msgid "created"
+msgstr "создан"
+
+#: models.py:34
+msgid "user"
+msgstr "пользователь"
+
+#: models.py:35
+msgid "service"
+msgstr "сервис"
+
+#: models.py:40
+msgid "Service Ticket"
+msgstr "Билет для сервиса"
+
+#: models.py:41
+msgid "Service Tickets"
+msgstr "Билеты для сервисов"
+
+#: models.py:59
+msgid "Login Ticket"
+msgstr "Билет для входа"
+
+#: models.py:60
+msgid "Login Tickets"
+msgstr "Билеты для входа"
"""
from django.core.management.base import NoArgsCommand
-from django.core.management.base import CommandError
from django.conf import settings
import datetime
-from django.db import models
from django.contrib.auth.models import User
-from django.conf import settings
-from django.core.urlresolvers import get_callable
+from django.db import models
+from django.utils.translation import ugettext_lazy as _
+from random import Random
+import string
+import urllib
+import urlparse
-from cas_provider.etree import etree, register_namespace, ElementRoot
-class ServiceTicket(models.Model):
- user = models.ForeignKey(User)
- service = models.URLField(verify_exists=False)
- ticket = models.CharField(max_length=256)
- created = models.DateTimeField(auto_now=True)
-
- def __unicode__(self):
- return "%s (%s) - %s" % (self.user.username, self.service, self.created)
-
-class LoginTicket(models.Model):
- ticket = models.CharField(max_length=32)
- created = models.DateTimeField(auto_now=True)
-
+__all__ = ['ServiceTicket', 'LoginTicket']
+
+
+class BaseTicket(models.Model):
+ ticket = models.CharField(_('ticket'), max_length=32)
+ created = models.DateTimeField(_('created'), auto_now=True)
+
+ class Meta:
+ abstract = True
+
+ def __init__(self, *args, **kwargs):
+ if 'ticket' not in kwargs:
+ kwargs['ticket'] = self._generate_ticket()
+ super(BaseTicket, self).__init__(*args, **kwargs)
+
def __unicode__(self):
- return "%s - %s" % (self.ticket, self.created)
-
-CAS_URI = 'http://www.yale.edu/tp/cas'
-register_namespace('cas', CAS_URI)
-CAS = '{%s}' % CAS_URI
-
-def auth_success_response(user):
- attrs = {}
- if settings.CAS_CUSTOM_ATTRIBUTES_CALLBACK:
- callback = get_callable(settings.CAS_CUSTOM_ATTRIBUTES_CALLBACK)
- attrs = callback(user)
-
- response = ElementRoot(CAS + 'serviceResponse')
- auth_success = etree.SubElement(response, CAS + 'authenticationSuccess')
- username = etree.SubElement(auth_success, CAS + 'user')
- username.text = user.username
- for name, value in attrs.items():
- element = etree.SubElement(auth_success, name)
- element.text = value
- return unicode(etree.tostring(response, encoding='utf-8'), 'utf-8')
+ return self.ticket
+
+ def _generate_ticket(self, length=29, chars=string.ascii_letters + string.digits):
+ """ Generates a random string of the requested length. Used for creation of tickets. """
+ return u"%s-%s" % (self.prefix, ''.join(Random().sample(chars, length)))
+
+
+class ServiceTicket(BaseTicket):
+ user = models.ForeignKey(User, verbose_name=_('user'))
+ service = models.URLField(_('service'), verify_exists=False)
+
+ prefix = 'ST'
+
+ class Meta:
+ verbose_name = _('Service Ticket')
+ verbose_name_plural = _('Service Tickets')
+
+ def get_redirect_url(self):
+ parsed = urlparse.urlparse(self.service)
+ query = urlparse.parse_qs(parsed.query)
+ query['ticket'] = [self.ticket]
+ query = [ ((k, v) if len(v) > 1 else (k, v[0])) for k, v in query.iteritems()]
+ parsed = urlparse.ParseResult(parsed.scheme, parsed.netloc,
+ parsed.path, parsed.params,
+ urllib.urlencode(query), parsed.fragment)
+ return parsed.geturl()
+
+
+class LoginTicket(BaseTicket):
+
+ prefix = 'LT'
+
+ class Meta:
+ verbose_name = _('Login Ticket')
+ verbose_name_plural = _('Login Tickets')
{% endblock %}
{% block content %}
- <form action='.' method='post'>
+ <form action='{% url cas_login %}' method='post'>
<fieldset>
<legend>Log in to your account</legend>
+ {% csrf_token %}
{% if errors %}
<ul>
{% for error in errors %}
--- /dev/null
+{% extends "base.html" %}
+
+{% block title %}
+Warning
+{% endblock %}
+
+{% block content %}
+ <form action='{% url cas_login %}' method='get'>
+ <fieldset>
+ <legend>Confirm to log in to {{ service }}</legend>
+ <input type="hidden" name="service" value="{{ service }}">
+ <p><input type="submit" value="Login"/></p>
+ </fieldset>
+ </form>
+{% endblock %}
--- /dev/null
+from cas_provider.models import ServiceTicket
+from cas_provider.views import _cas2_sucess_response, _cas2_error_response, \
+ INVALID_TICKET
+from django.contrib.auth.models import User
+from django.core.urlresolvers import reverse
+from django.test import TestCase
+from urlparse import urlparse
+from django.conf import settings
+
+
+class ViewsTest(TestCase):
+
+ fixtures = ['cas_users.json', ]
+
+ def setUp(self):
+ self.service = 'http://example.com/'
+
+
+ def test_succeessful_login(self):
+ response = self._login_user('root', '123')
+ self._validate_cas1(response, True)
+
+ response = self.client.get(reverse('cas_login'), {'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 302)
+ self.assertTrue(response['location'].startswith('%s?ticket=' % self.service))
+
+ response = self.client.get(reverse('cas_login'), follow=False)
+ self.assertEqual(response.status_code, 302)
+ self.assertTrue(response['location'].startswith('http://testserver/'))
+
+ response = self.client.get(response['location'], follow=False)
+ self.assertIn(response.status_code, [302, 200])
+
+ response = self.client.get(reverse('cas_login'), {'service': self.service, 'warn': True}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertTemplateUsed(response, 'cas/warn.html')
+
+
+ def _cas_logout(self):
+ response = self.client.get(reverse('cas_logout'), follow=False)
+ self.assertEqual(response.status_code, 200)
+
+
+ def test_logout(self):
+ response = self._login_user('root', '123')
+ self._validate_cas1(response, True)
+
+ self._cas_logout()
+
+ response = self.client.get(reverse('cas_login'), follow=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.context['user'].is_anonymous(), True)
+
+
+ def test_broken_pwd(self):
+ self._fail_login('root', '321')
+
+ def test_broken_username(self):
+ self._fail_login('notroot', '123')
+
+ def test_nonactive_user_login(self):
+ self._fail_login('nonactive', '123')
+
+ def test_cas2_success_validate(self):
+ response = self._login_user('root', '123')
+ response = self._validate_cas2(response, True)
+ user = User.objects.get(username=self.username)
+ self.assertEqual(response.content, _cas2_sucess_response(user).content)
+
+ def test_cas2_custom_attrs(self):
+ settings.CAS_CUSTOM_ATTRIBUTES_CALLBACK = cas_mapping
+ response = self._login_user('editor', '123')
+
+ response = self._validate_cas2(response, True)
+ self.assertEqual(response.content, '''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">'''
+ '''<cas:authenticationSuccess>'''
+ '''<cas:user>editor</cas:user>'''
+ '''<cas:attributes>'''
+ '''<cas:attraStyle>Jasig</cas:attraStyle>'''
+ '''<cas:group>editor</cas:group>'''
+ '''<cas:is_staff>True</cas:is_staff>'''
+ '''<cas:is_active>True</cas:is_active>'''
+ '''<cas:email>editor@exapmle.com</cas:email>'''
+ '''</cas:attributes>'''
+ '''</cas:authenticationSuccess>'''
+ '''</cas:serviceResponse>''')
+
+ self._cas_logout()
+ response = self._login_user('editor', '123')
+ settings.CAS_CUSTOM_ATTRIBUTES_FORMATER = 'cas_provider.attribute_formatters.ruby_cas'
+ response = self._validate_cas2(response, True)
+ self.assertEqual(response.content, '''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">'''
+ '''<cas:authenticationSuccess>'''
+ '''<cas:user>editor</cas:user>'''
+ '''<cas:attraStyle>RubyCAS</cas:attraStyle>'''
+ '''<cas:group>editor</cas:group>'''
+ '''<cas:is_staff>True</cas:is_staff>'''
+ '''<cas:is_active>True</cas:is_active>'''
+ '''<cas:email>editor@exapmle.com</cas:email>'''
+ '''</cas:authenticationSuccess>'''
+ '''</cas:serviceResponse>''')
+
+ self._cas_logout()
+ response = self._login_user('editor', '123')
+ settings.CAS_CUSTOM_ATTRIBUTES_FORMATER = 'cas_provider.attribute_formatters.name_value'
+ response = self._validate_cas2(response, True)
+ self.assertEqual(response.content, '''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">'''
+ '''<cas:authenticationSuccess>'''
+ '''<cas:user>editor</cas:user>'''
+ '''<cas:attribute name="attraStyle" value="Name-Value"/>'''
+ '''<cas:attribute name="group" value="editor"/>'''
+ '''<cas:attribute name="is_staff" value="True"/>'''
+ '''<cas:attribute name="is_active" value="True"/>'''
+ '''<cas:attribute name="email" value="editor@exapmle.com"/>'''
+ '''</cas:authenticationSuccess>'''
+ '''</cas:serviceResponse>''')
+
+
+ def test_cas2_fail_validate(self):
+ for user, pwd in (('root', '321'), ('notroot', '123'), ('nonactive', '123')):
+ response = self._login_user(user, pwd)
+ self._validate_cas2(response, False)
+
+
+ def _fail_login(self, username, password):
+ response = self._login_user(username, password)
+ self._validate_cas1(response, False)
+
+ response = self.client.get(reverse('cas_login'), {'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ response = self.client.get(reverse('cas_login'), follow=False)
+ self.assertEqual(response.status_code, 200)
+
+
+
+ def _login_user(self, username, password):
+ self.username = username
+ response = self.client.get(reverse('cas_login'), {'service': self.service})
+ self.assertEqual(response.status_code, 200)
+ self.assertTemplateUsed(response, 'cas/login.html')
+ form = response.context['form']
+ service = form['service'].value()
+ return self.client.post(reverse('cas_login'), {
+ 'username': username,
+ 'password': password,
+ 'lt': form['lt'].value(),
+ 'service': service
+ }, follow=False)
+
+
+ def _validate_cas1(self, response, is_correct=True):
+ if is_correct:
+ self.assertEqual(response.status_code, 302)
+ self.assertTrue(response.has_header('location'))
+ location = urlparse(response['location'])
+ ticket = location.query.split('=')[1]
+
+ response = self.client.get(reverse('cas_validate'), {'ticket': ticket, 'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(unicode(response.content), u'yes\n%s\n' % self.username)
+ else:
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(len(response.context['form'].errors), 1)
+
+ response = self.client.get(reverse('cas_validate'), {'ticket': 'ST-12312312312312312312312', 'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.content, u'no\n\n')
+
+
+ def _validate_cas2(self, response, is_correct=True):
+ if is_correct:
+ self.assertEqual(response.status_code, 302)
+ self.assertTrue(response.has_header('location'))
+ location = urlparse(response['location'])
+ ticket = location.query.split('=')[1]
+
+ response = self.client.get(reverse('cas_service_validate'), {'ticket': ticket, 'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ else:
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(len(response.context['form'].errors), 1)
+
+ response = self.client.get(reverse('cas_service_validate'), {'ticket': 'ST-12312312312312312312312', 'service': self.service}, follow=False)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.content, _cas2_error_response(INVALID_TICKET).content)
+ return response
+
+
+class ModelsTestCase(TestCase):
+
+ fixtures = ['cas_users.json', ]
+
+ def setUp(self):
+ self.user = User.objects.get(username='root')
+
+ def test_redirects(self):
+ ticket = ServiceTicket.objects.create(service='http://example.com', user=self.user)
+ self.assertEqual(ticket.get_redirect_url(), '%(service)s?ticket=%(ticket)s' % ticket.__dict__)
+
+
+def cas_mapping(user):
+ return {
+ 'is_staff': unicode(user.is_staff),
+ 'is_active': unicode(user.is_active),
+ 'email': user.email,
+ 'group': [g.name for g in user.groups.all()]
+ }
-from django.conf.urls.defaults import *
+from django.conf.urls.defaults import patterns, url
-from cas_provider.views import *
-urlpatterns = patterns('',
- url(r'^login/$', login),
- url(r'^validate/$', validate),
- url(r'^serviceValidate/$', service_validate),
- url(r'^logout/$', logout),
-)
\ No newline at end of file
+urlpatterns = patterns('cas_provider.views',
+ url(r'^login/?$', 'login', name='cas_login'),
+ url(r'^validate/?$', 'validate', name='cas_validate'),
+ url(r'^serviceValidate/?$', 'service_validate', name='cas_service_validate'),
+ url(r'^logout/?$', 'logout', name='cas_logout'),
+)
+++ /dev/null
-from random import Random
-import string
-
-from cas_provider.models import ServiceTicket, LoginTicket
-
-def _generate_string(length=8, chars=string.ascii_letters + string.digits):
- """ Generates a random string of the requested length. Used for creation of tickets. """
- return ''.join(Random().sample(chars, length))
-
-def create_service_ticket(user, service):
- """ Creates a new service ticket for the specified user and service.
- Uses _generate_string.
- """
- ticket_string = 'ST-' + _generate_string(29) # Total ticket length = 29 + 3 = 32
- ticket = ServiceTicket(service=service, user=user, ticket=ticket_string)
- ticket.save()
- return ticket
-
-def create_login_ticket():
- """ Creates a new login ticket for the login form. Uses _generate_string. """
- ticket_string = 'LT-' + _generate_string(29)
- ticket = LoginTicket(ticket=ticket_string)
- ticket.save()
- return ticket_string
\ No newline at end of file
-from django.http import HttpResponse, HttpResponseForbidden, HttpResponseRedirect
+from django.conf import settings
+from django.contrib.auth import login as auth_login, logout as auth_logout
+from django.core.urlresolvers import get_callable
+from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
-from django.contrib.auth.models import User
-from django.contrib.auth import authenticate
-from django.contrib.auth import login as auth_login, logout as auth_logout
+from forms import LoginForm
+from models import ServiceTicket, LoginTicket
+
+
+__all__ = ['login', 'validate', 'logout', 'service_validate']
+
-from cas_provider.forms import LoginForm
-from cas_provider.models import ServiceTicket, LoginTicket, auth_success_response
-from cas_provider.utils import create_service_ticket
+INVALID_TICKET = 'INVALID_TICKET'
+INVALID_SERVICE = 'INVALID_SERVICE'
+INVALID_REQUEST = 'INVALID_REQUEST'
+INTERNAL_ERROR = 'INTERNAL_ERROR'
-__all__ = ['login', 'validate', 'service_validate', 'logout']
+ERROR_MESSAGES = (
+ (INVALID_TICKET, u'The provided ticket is invalid.'),
+ (INVALID_SERVICE, u'Service is invalid'),
+ (INVALID_REQUEST, u'Not all required parameters were sent.'),
+ (INTERNAL_ERROR, u'An internal error occurred during ticket validation'),
+)
-def login(request, template_name='cas/login.html', success_redirect='/accounts/'):
+
+def login(request, template_name='cas/login.html', \
+ success_redirect=settings.LOGIN_REDIRECT_URL,
+ warn_template_name='cas/warn.html'):
service = request.GET.get('service', None)
if request.user.is_authenticated():
if service is not None:
- ticket = create_service_ticket(request.user, service)
- if service.find('?') == -1:
- return HttpResponseRedirect(service + '?ticket=' + ticket.ticket)
- else:
- return HttpResponseRedirect(service + '&ticket=' + ticket.ticket)
+ if request.GET.get('warn', False):
+ return render_to_response(warn_template_name, {
+ 'service': service,
+ 'warn': False
+ }, context_instance=RequestContext(request))
+ ticket = ServiceTicket.objects.create(service=service, user=request.user)
+ return HttpResponseRedirect(ticket.get_redirect_url())
else:
return HttpResponseRedirect(success_redirect)
- errors = []
if request.method == 'POST':
- username = request.POST.get('username', None)
- password = request.POST.get('password', None)
- service = request.POST.get('service', None)
- lt = request.POST.get('lt', None)
-
- try:
- login_ticket = LoginTicket.objects.get(ticket=lt)
- except:
- errors.append('Login ticket expired. Please try again.')
- else:
- login_ticket.delete()
- user = authenticate(username=username, password=password)
- if user is not None:
- if user.is_active:
- auth_login(request, user)
- if service is not None:
- ticket = create_service_ticket(user, service)
- return HttpResponseRedirect(service + '?ticket=' + ticket.ticket)
- else:
- return HttpResponseRedirect(success_redirect)
- else:
- errors.append('This account is disabled.')
- else:
- errors.append('Incorrect username and/or password.')
- form = LoginForm(service)
- return render_to_response(template_name, {'form': form, 'errors': errors}, context_instance=RequestContext(request))
-
+ form = LoginForm(request.POST)
+ if form.is_valid():
+ user = form.get_user()
+ auth_login(request, user)
+ service = form.cleaned_data.get('service')
+ if service is not None:
+ ticket = ServiceTicket.objects.create(service=service, user=user)
+ success_redirect = ticket.get_redirect_url()
+ return HttpResponseRedirect(success_redirect)
+ else:
+ form = LoginForm(initial={
+ 'service': service,
+ 'lt': LoginTicket.objects.create()
+ })
+ return render_to_response(template_name, {
+ 'form': form,
+ 'errors': form.get_errors()
+ }, context_instance=RequestContext(request))
+
+
def validate(request):
+ """Validate ticket via CAS v.1 protocol"""
service = request.GET.get('service', None)
ticket_string = request.GET.get('ticket', None)
if service is not None and ticket_string is not None:
+ #renew = request.GET.get('renew', True)
+ #if not renew:
+ # TODO: check user SSO session
try:
ticket = ServiceTicket.objects.get(ticket=ticket_string)
+ assert ticket.service == service
username = ticket.user.username
ticket.delete()
return HttpResponse("yes\n%s\n" % username)
pass
return HttpResponse("no\n\n")
+
+def logout(request, template_name='cas/logout.html', auto_redirect=False):
+ url = request.GET.get('url', None)
+ if request.user.is_authenticated():
+ auth_logout(request)
+ if url and auto_redirect:
+ return HttpResponseRedirect(url)
+ return render_to_response(template_name, {'url': url}, \
+ context_instance=RequestContext(request))
+
+
def service_validate(request):
+ """Validate ticket via CAS v.2 protocol"""
service = request.GET.get('service', None)
ticket_string = request.GET.get('ticket', None)
if service is None or ticket_string is None:
- return HttpResponse('''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
- <cas:authenticationFailure code="INVALID_REQUEST">
- Not all required parameters were sent.
- </cas:authenticationFailure>
- </cas:serviceResponse>''', mimetype='text/xml')
-
+ return _cas2_error_response(INVALID_REQUEST)
+
try:
ticket = ServiceTicket.objects.get(ticket=ticket_string)
- ticket.delete()
- return HttpResponse(auth_success_response(ticket.user), mimetype='text/xml')
except ServiceTicket.DoesNotExist:
- return HttpResponse('''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
- <cas:authenticationFailure code="INVALID_TICKET">
- The provided ticket is invalid.
+ return _cas2_error_response(INVALID_TICKET)
+
+ if ticket.service != service:
+ ticket.delete()
+ return _cas2_error_response(INVALID_SERVICE)
+
+ user = ticket.user
+ ticket.delete()
+ return _cas2_sucess_response(user)
+
+
+def _cas2_error_response(code):
+ return HttpResponse(u''''<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
+ <cas:authenticationFailure code="%(code)s">
+ %(message)s
</cas:authenticationFailure>
- </cas:serviceResponse>''', mimetype='text/xml')
+ </cas:serviceResponse>''' % {
+ 'code': code,
+ 'message': dict(ERROR_MESSAGES).get(code)
+ }, mimetype='text/xml')
-def logout(request, template_name='cas/logout.html', auto_redirect=False):
- url = request.GET.get('url', None)
- if request.user.is_authenticated():
- auth_logout(request)
- if url and auto_redirect:
- return HttpResponseRedirect(url)
- return render_to_response(template_name, {'url': url}, context_instance=RequestContext(request))
+
+def _cas2_sucess_response(user):
+ return HttpResponse(auth_success_response(user), mimetype='text/xml')
+
+
+def auth_success_response(user):
+ from attribute_formatters import CAS, NSMAP, etree
+
+ response = etree.Element(CAS + 'serviceResponse', nsmap=NSMAP)
+ auth_success = etree.SubElement(response, CAS + 'authenticationSuccess')
+ username = etree.SubElement(auth_success, CAS + 'user')
+ username.text = user.username
+
+ if settings.CAS_CUSTOM_ATTRIBUTES_CALLBACK:
+ callback = get_callable(settings.CAS_CUSTOM_ATTRIBUTES_CALLBACK)
+ attrs = callback(user)
+ if len(attrs) > 0:
+ formater = get_callable(settings.CAS_CUSTOM_ATTRIBUTES_FORMATER)
+ formater(auth_success, attrs)
+ return unicode(etree.tostring(response, encoding='utf-8'), 'utf-8')
+++ /dev/null
-[egg_info]
-tag_build = .dev
-tag_date = 1
\ No newline at end of file
from setuptools import setup, find_packages
-
+
+
setup(
name='django-cas-provider',
- version='0.2.1',
+ version='0.2.2',
description='A "provider" for the Central Authentication Service (http://jasig.org/cas)',
author='Chris Williams',
author_email='chris@nitron.org',