from django.core.urlresolvers import reverse
from wlrepo import MercurialLibrary, CabinetNotFound
+from librarian import dcparser
+
#
# Document List Handlers
#
result['parts'] = document.parts()
return result
-
+
+#
+# Document Meta Data
+#
class DocumentHandler(BaseHandler):
allowed_methods = ('GET', 'PUT')
anonymous = BasicDocumentHandler
document = lib.cabinet(docid, request.user.username, \
create=opts.cleaned_data['autocabinet'] ).retrieve()
+
+ if not document:
+ return rc.NOT_HERE
shared = lib.main_cabinet.retrieve(docid)
lib.document(docid, request.user.username).write(data)
return rc.ALL_OK
except (CabinetNotFound, KeyError):
- return rc.NOT_HERE
\ No newline at end of file
+ return rc.NOT_HERE
+
+
+#
+# Dublin Core handlers
+#
+# @requires librarian
+#
+class DocumentDublinCoreHandler(BaseHandler):
+ allowed_methods = ('GET', 'PUT')
+
+ def read(self, request, docid):
+ """Read document as raw text"""
+ lib = MercurialLibrary(path=settings.REPOSITORY_PATH)
+ try:
+ doc = lib.document(docid, request.user.username)
+
+ # TODO: RAL:document should support file-like ops
+ bookinfo = dcparser.BookInfo.from_string(doc.read())
+ return bookinfo.serialize()
+ except CabinetNotFound:
+ return rc.NOT_HERE
+
+ def update(self, request, docid):
+ lib = MercurialLibrary(path=settings.REPOSITORY_PATH)
+ try:
+ data = request.PUT['contents']
+ lib.document(docid, request.user.username).write(data)
+ return rc.ALL_OK
+ except (CabinetNotFound, KeyError):
+ return rc.NOT_HERE
+
REPO_TEMPLATES = join(dirname(__file__), 'data')
+
+
def temprepo(name):
- def decorator(func):
+ from functools import wraps
+
+ def decorator(func):
+
+ @wraps(func)
def decorated(self, *args, **kwargs):
clean = False
try:
shutil.rmtree(temp, True)
settings.REPOSITORY_PATH = ''
-
+
return decorated
return decorator
authdata = {'authentication': DjangoAuth()}
-FORMAT_EXT = r"\.(?P<emitter_format>xml|json|yaml|django)$"
+FORMAT_EXT = r"\.(?P<emitter_format>xml|json|yaml)$"
library_resource = Resource(LibraryHandler, **authdata)
document_resource = Resource(DocumentHandler, **authdata)
document_text_resource = Resource(DocumentTextHandler, **authdata)
+document_dc_resource = Resource(DocumentDublinCoreHandler, **authdata)
urlpatterns = patterns('',
# url(r'^hello$', hello_resource, {'emitter_format': 'json'}),
# url(r'^hello\.(?P<emitter_format>.+)$', hello_resource),
# Documents
- url(r'^documents$', library_resource, {'emitter_format': 'json'},
- name="document_list_view"),
+ url(r'^documents$', library_resource,
+ {'emitter_format': 'json'}, name="document_list_view"),
+ url(r'^documents'+FORMAT_EXT, library_resource,
+ name="document_list_view_withformat"),
+
url(r'^documents/(?P<docid>[^/]+)'+FORMAT_EXT,
document_resource, name="document_view_withformat"),
document_text_resource, {'emitter_format': 'rawxml'},
name="doctext_view"),
+ url(r'^documents/(?P<docid>[^/]+)/dc' + FORMAT_EXT,
+ document_dc_resource,
+ name="docdc_view_withformat"),
+
url(r'^documents/(?P<docid>[^/]+)/dc$',
- document_resource, {'emitter_format': 'json'},
+ document_dc_resource, {'emitter_format': 'json'},
name="docdc_view"),
url(r'^documents/(?P<docid>[^/]+)/parts$',
return unicode(self.construct())
Emitter.register('text', TextEmitter, 'text/plain; charset=utf-8')
-Emitter.register('rawxml', TextEmitter, 'application/xml; charset=utf-8')
-
+Emitter.register('rawxml', TextEmitter, 'application/xml; charset=UTF-8')
class DjangoAuth(object):
finally:
lock.release()
+ @property
+ def ospath(self):
+ return self._ospath
+
@property
def main_cabinet(self):
return self._maincab
return self.cabinet(docid, user, create=False).retrieve()
def cabinet(self, docid, user, create=False):
+ docid = self._sanitize_string(docid)
+ user = self._sanitize_string(user)
+
bname = self._bname(user, docid)
lock = self._lock(True)
garbage = [fid for (fid, did) in l._filelist() if not did.startswith(docid)]
l._filesrm(garbage)
+ print "removed: ", garbage
# create the branch
self._create_branch(bname, before_commit=cleanup_action)
self._checkout(self._branch_tip(branchname))
return branchname
- def shelf(self, nodeid):
+ def shelf(self, nodeid=None):
+ if nodeid is None:
+ nodeid = self._maincab._name
return MercurialShelf(self, self._changectx(nodeid))
def retrieve_action(l,c):
if l._fileexists(fileid):
return MercurialDocument(c, name=name, fileid=fileid)
+ print "File %s not found " % fileid
return None
return self._execute_in_branch(retrieve_action)
user = self._cabinet.username
- main = self.shared().shelf()
+ main = self.library.shelf()
local = self.shelf()
no_changes = True
# no commit's since last update
if main.ancestorof(local):
+ print "case 1"
main.merge_with(local, user=user, message=message)
no_changes = False
-
# Case 2:
#
# main * * local
# Default has no changes, to update from this branch
# since the last merge of local to default.
elif local.has_common_ancestor(main):
+ print "case 2"
if not local.parentof(main):
main.merge_with(local, user=user, message=message)
no_changes = False
# Use the fact, that user is prepared to see changes, to
# update his branch if there are any
elif local.ancestorof(main):
+ print "case 3"
if not local.parentof(main):
local.merge_with(main, user=user, message='Local branch update.')
no_changes = False
else:
+ print "case 4"
local.merge_with(main, user=user, message='Local branch update.')
-
- self._refresh()
local = self.shelf()
-
main.merge_with(local, user=user, message=message)
+
+ print "no_changes: ", no_changes
+ return no_changes
finally:
lock.release()
def has_common_ancestor(self, other):
a = self._changectx.ancestor(other._changectx)
- print a, self._changectx.branch(), a.branch()
+ # print a, self._changectx.branch(), a.branch()
return (a.branch() == self._changectx.branch())
try:
self._library._checkout(self._changectx.node())
self._library._merge(other._changectx.node())
+ self._library._commit(user=user, message=message)
finally:
lock.release()
import shutil
-REPO_TEMPLATES = os.path.join( os.path.dirname(__file__), 'data/repos')
+REPO_TEMPLATES = os.path.join( os.path.dirname(__file__), 'data')
def temprepo(name):
- def decorator(func):
- def decorated(self, *args, **kwargs):
+
+ from functools import wraps
+
+ def decorator(func):
+ def decorated(*args, **kwargs):
clean = False
try:
temp = tempfile.mkdtemp("", "testdir_" )
- path = join(temp, 'repo')
- shutil.copytree(join(REPO_TEMPLATES, name), path, False)
- repo = MercurialLibrary(path)
- func(self, *args, library=repo, **kwargs)
+ path = os.path.join(temp, 'repo')
+ shutil.copytree(os.path.join(REPO_TEMPLATES, name), path, False)
+ kwargs['library'] = MercurialLibrary(path)
+ func(*args, **kwargs)
clean = True
finally:
#if not clean and self.response:
# print "<<<"
shutil.rmtree(temp, True)
- return decorated
+ decorated = make_decorator(func)(decorated)
+ return decorated
+
return decorator
-
@temprepo('clean')
def test_opening(library):
pass
def test_create_document(library):
doc = library.main_cabinet.create("another_file", "Some text")
assert_equal( doc.read(), "Some text")
- assert_true( os.path.isfile( os.path.join(repopath, "pub_another_file.xml")) )
+ assert_true( os.path.isfile( os.path.join(library.ospath, "pub_another_file.xml")) )
@temprepo('branched')
def test_switch_branch(library):
assert_true( n3.has_common_ancestor(n3) )
-@temprepo('branched_two')
+@temprepo('branched2')
def test_once_branched(library):
n7 = library.shelf(7)
n6 = library.shelf(6)
@temprepo('branched2')
def test_merge_personal_to_default(library):
main = library.shelf(2)
+ print main
+
local = library.shelf(7)
+ print local
document = library.document("ala", "admin")
shared = document.shared()
- print document, shared
-
+ assert_true( shared is None )
document.share("Here is my copy!")
assert_equal( document.shelf(), local) # local didn't change
+ shared = document.shared()
+ assert_true( shared is not None )
+
+ print library.shelf()
new_main = shared.shelf()
assert_not_equal( new_main, main) # main has new revision
assert_true( local.parentof(new_main) )
@temprepo('clean')
-def testCreateBranch(library):
- library = MercurialLibrary(repopath)
-
+def test_create_branch(library):
tester_cab = library.cabinet("anotherone", "tester", create=True)
- assert_equal( list(tester_cab.documents()), ['anotherone'])
-
-
-
+ assert_equal( list(tester_cab.documents()), ['anotherone'])