+# -*- coding: utf-8 -*-
from datetime import datetime
import os.path
from django.contrib.auth.models import User
from django.core.files.base import ContentFile
-from django.core.files.storage import FileSystemStorage
from django.db import models, transaction
from django.db.models.base import ModelBase
from django.utils.translation import ugettext_lazy as _
-from mercurial import mdiff, simplemerge
+from mercurial import simplemerge
from django.conf import settings
from dvcs.signals import post_commit, post_publishable
class Tag(models.Model):
"""A tag (e.g. document stage) which can be applied to a Change."""
name = models.CharField(_('name'), max_length=64)
- slug = models.SlugField(_('slug'), unique=True, max_length=64,
- null=True, blank=True)
+ slug = models.SlugField(_('slug'), unique=True, max_length=64, null=True, blank=True)
ordering = models.IntegerField(_('ordering'))
_object_cache = {}
def listener_changed(sender, instance, **kwargs):
sender._object_cache = {}
- def next(self):
+ def get_next(self):
"""
Returns the next tag - stage to work on.
Returns None for the last stage.
def data_upload_to(instance, filename):
return "%d/%d" % (instance.tree.pk, instance.pk)
+
class Change(models.Model):
"""
Single document change related to previous change. The "parent"
- argument points to the version against which this change has been
+ argument points to the version against which this change has been
recorded. Initial text will have a null parent.
-
+
Data file contains a gzipped text of the document.
"""
author = models.ForeignKey(User, null=True, blank=True, verbose_name=_('author'))
- author_name = models.CharField(_('author name'), max_length=128,
- null=True, blank=True,
- help_text=_("Used if author is not set.")
- )
- author_email = models.CharField(_('author email'), max_length=128,
- null=True, blank=True,
- help_text=_("Used if author is not set.")
- )
+ author_name = models.CharField(
+ _('author name'), max_length=128,
+ null=True, blank=True,
+ help_text=_("Used if author is not set."))
+ author_email = models.CharField(
+ _('author email'), max_length=128,
+ null=True, blank=True,
+ help_text=_("Used if author is not set."))
revision = models.IntegerField(_('revision'), db_index=True)
- parent = models.ForeignKey('self',
- null=True, blank=True, default=None,
- verbose_name=_('parent'),
- related_name="children")
+ parent = models.ForeignKey(
+ 'self',
+ null=True, blank=True, default=None,
+ verbose_name=_('parent'), related_name="children")
- merge_parent = models.ForeignKey('self',
- null=True, blank=True, default=None,
- verbose_name=_('merge parent'),
- related_name="merge_children")
+ merge_parent = models.ForeignKey(
+ 'self',
+ null=True, blank=True, default=None,
+ verbose_name=_('merge parent'),
+ related_name="merge_children")
description = models.TextField(_('description'), blank=True, default='')
- created_at = models.DateTimeField(editable=False, db_index=True,
- default=datetime.now)
+ created_at = models.DateTimeField(editable=False, db_index=True, default=datetime.now)
publishable = models.BooleanField(_('publishable'), default=False)
class Meta:
if self.author:
return "%s %s <%s>" % (
self.author.first_name,
- self.author.last_name,
+ self.author.last_name,
self.author.email)
else:
return "%s <%s>" % (
self.author_email
)
-
def save(self, *args, **kwargs):
"""
take the next available revision number if none yet
f.close()
return unicode(text, 'utf-8')
- def merge_with(self, other, author=None,
- author_name=None, author_email=None,
- description=u"Automatic merge."):
+ def merge_with(self, other, author=None, author_name=None, author_email=None, description=u"Automatic merge."):
"""Performs an automatic merge after straying commits."""
assert self.tree_id == other.tree_id # same tree
if other.parent_id == self.pk:
class Meta(Tag.Meta):
app_label = model._meta.app_label
+ if hasattr(model, 'TagMeta'):
+ for attr, value in model.TagMeta.__dict__.items():
+ setattr(Meta, attr, value)
+
attrs = {
'__module__': model.__module__,
'Meta': Meta,
class DocumentMeta(ModelBase):
- "Metaclass for Document models."
- def __new__(cls, name, bases, attrs):
+ """Metaclass for Document models."""
+ def __new__(mcs, name, bases, attrs):
- model = super(DocumentMeta, cls).__new__(cls, name, bases, attrs)
+ model = super(DocumentMeta, mcs).__new__(mcs, name, bases, attrs)
if not model._meta.abstract:
# create a real Tag object and `stage' fk
model.tag_model = create_tag_model(model)
- models.ForeignKey(model.tag_model, verbose_name=_('stage'),
+ models.ForeignKey(
+ model.tag_model, verbose_name=_('stage'),
null=True, blank=True).contribute_to_class(model, 'stage')
# create real Change model and `head' fk
model.change_model = create_change_model(model)
- models.ForeignKey(model.change_model,
- null=True, blank=True, default=None,
- verbose_name=_('head'),
- help_text=_("This document's current head."),
- editable=False).contribute_to_class(model, 'head')
+ models.ForeignKey(
+ model.change_model,
+ null=True, blank=True, default=None,
+ verbose_name=_('head'),
+ help_text=_("This document's current head."),
+ editable=False).contribute_to_class(model, 'head')
- models.ForeignKey(User, null=True, blank=True, editable=False,
+ models.ForeignKey(
+ User, null=True, blank=True, editable=False,
verbose_name=_('creator'), related_name="created_%s" % name.lower()
).contribute_to_class(model, 'creator')
# default repository path
REPO_PATH = os.path.join(settings.MEDIA_ROOT, 'dvcs')
- user = models.ForeignKey(User, null=True, blank=True,
+ user = models.ForeignKey(
+ User, null=True, blank=True,
verbose_name=_('user'), help_text=_('Work assignment.'))
class Meta:
change = self.change_set.get(pk=change)
return change.materialize()
- def commit(self, text, author=None, author_name=None, author_email=None,
- publishable=False, **kwargs):
+ def commit(self, text, author=None, author_name=None, author_email=None, publishable=False, **kwargs):
"""Commits a new revision.
This will automatically merge the commit into the main branch,
tags = kwargs.get('tags', [])
if tags:
# set stage to next tag after the commited one
- self.stage = max(tags, key=lambda t: t.ordering).next()
+ self.stage = max(tags, key=lambda t: t.ordering).get_next()
- change = self.change_set.create(author=author,
- author_name=author_name,
- author_email=author_email,
- description=kwargs.get('description', ''),
- publishable=publishable,
- parent=parent)
+ change = self.change_set.create(
+ author=author,
+ author_name=author_name,
+ author_email=author_email,
+ description=kwargs.get('description', ''),
+ publishable=publishable,
+ parent=parent)
change.tags = tags
change.data.save('', ContentFile(text.encode('utf-8')))
if self.head:
# merge new change as new head
- self.head = self.head.merge_with(change, author=author,
- author_name=author_name,
- author_email=author_email)
+ self.head = self.head.merge_with(
+ change, author=author,
+ author_name=author_name,
+ author_email=author_email)
else:
self.head = change
self.save()
return self.head
def history(self):
- return self.change_set.filter(revision__gt=-1)
+ return self.change_set.all().order_by('revision')
def revision(self):
rev = self.change_set.aggregate(
return self.change_set.get(revision=rev)
def publishable(self):
- changes = self.change_set.filter(publishable=True)
+ changes = self.history().filter(publishable=True)
if changes.exists():
- return changes.order_by('-created_at')[0]
+ return changes.order_by('-revision')[0]
else:
return None
assert self != other
other_revs = other.change_set.all().count()
- self.change_set.all().update(revision=models.F('revision') + other_revs)
+ # workaround for a non-atomic UPDATE in SQLITE
+ self.change_set.all().update(revision=0-models.F('revision'))
+ self.change_set.all().update(revision=other_revs - models.F('revision'))
other.change_set.all().update(tree=self)
assert not other.change_set.exists()
other.delete()