# -*- encoding: utf-8 -*-
+import logging
+log = logging.getLogger('ral.mercurial')
+
__author__= "Łukasz Rekucki"
__date__ = "$2009-09-25 09:33:02$"
__doc__ = "Module documentation."
from mercurial import error
import wlrepo
+from wlrepo.mercurial_backend.revision import MercurialRevision
from wlrepo.mercurial_backend.document import MercurialDocument
-from wlrepo.mercurial_backend import MercurialRevision
class MergeStatus(object):
def __init__(self, mstatus):
def documents(self):
- return [ key[5:] for key in \
+ return [ key[5:].decode('utf-8') for key in \
self._hgrepo.branchmap() if key.startswith("$doc:") ]
@property
def ospath(self):
- return self._ospath
+ return self._ospath.decode('utf-8')
- def document_for_rev(self, revision):
+ def document_for_revision(self, revision):
if revision is None:
raise ValueError("Revision can't be None.")
- if not isinstance(revision, MercurialRevision):
+ if not isinstance(revision, MercurialRevision):
rev = self.get_revision(revision)
else:
rev = revision
# every revision is a document
return self._doccache[str(rev)]
- def document(self, docid, user=None):
- return self.document_for_rev(self.fulldocid(docid, user))
+ def document(self, docid, user=None, rev=u'latest'):
+ rev = self._sanitize_string(rev)
+
+ if rev != u'latest':
+ doc = self.document_for_revision(rev)
+
+ if doc.id != docid or (doc.owner != user):
+ raise wlrepo.RevisionMismatch(self.fulldocid(docid, user)+u'@'+unicode(rev))
+
+ return doc
+ else:
+ return self.document_for_revision(self.fulldocid(docid, user))
def get_revision(self, revid):
- ctx = self._changectx(revid)
+ revid = self._sanitize_string(revid)
+
+ log.info("Looking up rev %r (%s)" %(revid, type(revid)) )
+
+ try:
+ ctx = self._changectx( revid )
+ except mercurial.error.RepoError, e:
+ raise wlrepo.RevisionNotFound(revid)
if ctx is None:
- raise RevisionNotFound(revid)
+ raise wlrepo.RevisionNotFound(revid)
+
+ return self._revision(ctx)
+ def _revision(self, ctx):
if self._revcache.has_key(ctx):
return self._revcache[ctx]
-
+
return MercurialRevision(self, ctx)
- def fulldocid(self, docid, user=None):
- fulldocid = ''
+ def fulldocid(self, docid, user=None):
+ fulldocid = u''
if user is not None:
- fulldocid += '$user:' + user
- fulldocid += '$doc:' + docid
+ fulldocid += u'$user:' + user
+ fulldocid += u'$doc:' + docid
return fulldocid
-
def has_revision(self, revid):
+ revid = self._sanitize_string(revid)
+
try:
self._hgrepo[revid]
return True
- except error.RepoError:
+ except mercurial.error.RepoError:
return False
- def document_create(self, docid):
+ def document_create(self, docid):
+
# check if it already exists
fullid = self.fulldocid(docid)
if self.has_revision(fullid):
- raise LibraryException("Document already exists!");
+ raise wlrepo.DocumentAlreadyExists(u"Document %s already exists!" % docid);
# doesn't exist
- self._create_branch(fullid)
- return self.document_for_rev(fullid)
+ self._create_branch(self._sanitize_string(fullid))
+ return self.document_for_revision(fullid)
#
# Private methods
# Locking
#
- def _lock(self, write_mode=False):
+ def lock(self, write_mode=False):
return self._hgrepo.wlock() # no support for read/write mode yet
def _transaction(self, write_mode, action):
- lock = self._lock(write_mode)
+ lock = self.lock(write_mode)
try:
return action(self)
finally:
def _changectx(self, nodeid):
return self._hgrepo.changectx(nodeid)
+ def _rollback(self):
+ return self._hgrepo.rollback()
+
#
# BASIC BRANCH routines
#
name = self._sanitize_string(name)
return self._hgrepo.branchtags()[name]
- def _create_branch(self, name, parent=None, before_commit=None):
+ def _create_branch(self, name, parent=None, before_commit=None, message=None):
name = self._sanitize_string(name)
if self._has_branch(name): return # just exit
parentrev = parent.hgrev()
self._checkout(parentrev)
- self._hgrepo.dirstate.setbranch(name)
+ self._set_branchname(name)
if before_commit: before_commit(self)
- self._commit("$ADMN$ Initial commit for branch '%s'." % name, user='$library')
+ message = message or "$ADMN$ Initial commit for branch '%s'." % name
+ self._commit(message, user='$library')
+
+ def _set_branchname(self, name):
+ name = self._sanitize_string(name)
+ self._hgrepo.dirstate.setbranch(name)
def _switch_to_branch(self, branchname):
current = self._hgrepo[None].branch()
@staticmethod
def _sanitize_string(s):
- if isinstance(s, unicode):
- s = s.encode('utf-8')
+ if s is None:
+ return None
- if ' ' in s:
- raise ValueError('Whitespace is forbidden!')
+ if isinstance(s, unicode):
+ s = s.encode('utf-8')
return s
\ No newline at end of file