From: zuber Date: Fri, 27 Nov 2009 13:05:17 +0000 (+0100) Subject: Uproszczenie struktury pakietu vstorage (teraz jest zwykłym modułem) i jego testów... X-Git-Url: https://git.mdrn.pl/redakcja.git/commitdiff_plain/3eec01e4c2a842c7383857e49d68664f04759c1f Uproszczenie struktury pakietu vstorage (teraz jest zwykłym modułem) i jego testów. Poprawienie skryptu run_tests.sh, żeby uruchamiał testy dla vstorage. --- diff --git a/lib/run_tests.sh b/lib/run_tests.sh index 0b086ef2..03da16f6 100755 --- a/lib/run_tests.sh +++ b/lib/run_tests.sh @@ -1 +1,2 @@ -PYTYONPATH=./lib:$PYTHONPATH nosetests --detailed-errors --with-doctest --with-coverage --cover-package=wlrepo +#!/bin/sh +nosetests -v --with-doctest --with-coverage --cover-package=vstorage --detailed-errors diff --git a/lib/test_vstorage.py b/lib/test_vstorage.py new file mode 100644 index 00000000..0b416bbb --- /dev/null +++ b/lib/test_vstorage.py @@ -0,0 +1,99 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import os +import tempfile +from nose.tools import * + +import vstorage + + +def clear_directory(top): + for root, dirs, files in os.walk(top, topdown=False): + for name in files: + os.remove(os.path.join(root, name)) + for name in dirs: + os.rmdir(os.path.join(root, name)) + try: + os.removedirs(top) + except OSError: + pass + + +class TestVersionedStorage(object): + def setUp(self): + self.repo_path = tempfile.mkdtemp() + self.repo = vstorage.VersionedStorage(self.repo_path) + + def tearDown(self): + clear_directory(self.repo_path) + + def test_save_text(self): + text = u"test text" + title = u"test title" + author = u"test author" + comment = u"test comment" + self.repo.save_text(title, text, author, comment, parent=-1) + saved = self.repo.open_page(title).read() + assert saved == text + + def test_save_text_noparent(self): + text = u"test text" + title = u"test title" + author = u"test author" + comment = u"test comment" + self.repo.save_text(title, text, author, comment, parent=None) + saved = self.repo.open_page(title).read() + assert saved == text + + def test_save_merge_no_conflict(self): + text = u"test\ntext" + title = u"test title" + author = u"test author" + comment = u"test comment" + self.repo.save_text(title, text, author, comment, parent=-1) + self.repo.save_text(title, text, author, comment, parent=-1) + saved = self.repo.open_page(title).read() + assert saved == text + + def test_save_merge_line_conflict(self): + text = u"test\ntest\n" + text1 = u"test\ntext\n" + text2 = u"text\ntest\n" + title = u"test title" + author = u"test author" + comment = u"test comment" + self.repo.save_text(title, text, author, comment, parent=-1) + self.repo.save_text(title, text1, author, comment, parent=0) + self.repo.save_text(title, text2, author, comment, parent=0) + saved = self.repo.open_page(title).read() + # Other conflict markers placement can also be correct + assert_equal(saved, u'''\ +text +test +<<<<<<< local +======= +text +>>>>>>> other +''') + + def test_delete(self): + text = u"text test" + title = u"test title" + author = u"test author" + comment = u"test comment" + self.repo.save_text(title, text, author, comment, parent=-1) + assert title in self.repo + self.repo.delete_page(title, author, comment) + assert title not in self.repo + + @raises(vstorage.DocumentNotFound) + def test_document_not_found(self): + self.repo.open_page(u'unknown entity') + + def test_open_existing_repository(self): + self.repo.save_text(u'Python!', u'ham and spam') + current_repo_revision = self.repo.repo_revision() + same_repo = vstorage.VersionedStorage(self.repo_path) + assert same_repo.repo_revision() == current_repo_revision + diff --git a/lib/tests/__init__.py b/lib/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/tests/test_vstorage.py b/lib/tests/test_vstorage.py deleted file mode 100644 index 0b416bbb..00000000 --- a/lib/tests/test_vstorage.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import os -import tempfile -from nose.tools import * - -import vstorage - - -def clear_directory(top): - for root, dirs, files in os.walk(top, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) - try: - os.removedirs(top) - except OSError: - pass - - -class TestVersionedStorage(object): - def setUp(self): - self.repo_path = tempfile.mkdtemp() - self.repo = vstorage.VersionedStorage(self.repo_path) - - def tearDown(self): - clear_directory(self.repo_path) - - def test_save_text(self): - text = u"test text" - title = u"test title" - author = u"test author" - comment = u"test comment" - self.repo.save_text(title, text, author, comment, parent=-1) - saved = self.repo.open_page(title).read() - assert saved == text - - def test_save_text_noparent(self): - text = u"test text" - title = u"test title" - author = u"test author" - comment = u"test comment" - self.repo.save_text(title, text, author, comment, parent=None) - saved = self.repo.open_page(title).read() - assert saved == text - - def test_save_merge_no_conflict(self): - text = u"test\ntext" - title = u"test title" - author = u"test author" - comment = u"test comment" - self.repo.save_text(title, text, author, comment, parent=-1) - self.repo.save_text(title, text, author, comment, parent=-1) - saved = self.repo.open_page(title).read() - assert saved == text - - def test_save_merge_line_conflict(self): - text = u"test\ntest\n" - text1 = u"test\ntext\n" - text2 = u"text\ntest\n" - title = u"test title" - author = u"test author" - comment = u"test comment" - self.repo.save_text(title, text, author, comment, parent=-1) - self.repo.save_text(title, text1, author, comment, parent=0) - self.repo.save_text(title, text2, author, comment, parent=0) - saved = self.repo.open_page(title).read() - # Other conflict markers placement can also be correct - assert_equal(saved, u'''\ -text -test -<<<<<<< local -======= -text ->>>>>>> other -''') - - def test_delete(self): - text = u"text test" - title = u"test title" - author = u"test author" - comment = u"test comment" - self.repo.save_text(title, text, author, comment, parent=-1) - assert title in self.repo - self.repo.delete_page(title, author, comment) - assert title not in self.repo - - @raises(vstorage.DocumentNotFound) - def test_document_not_found(self): - self.repo.open_page(u'unknown entity') - - def test_open_existing_repository(self): - self.repo.save_text(u'Python!', u'ham and spam') - current_repo_revision = self.repo.repo_revision() - same_repo = vstorage.VersionedStorage(self.repo_path) - assert same_repo.repo_revision() == current_repo_revision - diff --git a/lib/vstorage.py b/lib/vstorage.py new file mode 100644 index 00000000..1060bc25 --- /dev/null +++ b/lib/vstorage.py @@ -0,0 +1,404 @@ +# -*- coding: utf-8 -*- +import os +import tempfile +import datetime +import mimetypes +import urllib + +# Note: we have to set these before importing Mercurial +os.environ['HGENCODING'] = 'utf-8' +os.environ['HGMERGE'] = "internal:merge" + +import mercurial.hg +import mercurial.ui +import mercurial.revlog +import mercurial.util + + +def urlquote(url, safe='/'): + """Quotes URL + + >>> urlquote(u'Za\u017c\xf3\u0142\u0107 g\u0119\u015bl\u0105 ja\u017a\u0144') + 'Za%C5%BC%C3%B3%C5%82%C4%87_g%C4%99%C5%9Bl%C4%85_ja%C5%BA%C5%84' + """ + return urllib.quote(url.replace(' ', '_').encode('utf-8', 'ignore'), safe) + + +def urlunquote(url): + """Unqotes URL + + # >>> urlunquote('Za%C5%BC%C3%B3%C5%82%C4%87_g%C4%99%C5%9Bl%C4%85_ja%C5%BA%C5%84') + # u'Za\u017c\xf3\u0142\u0107 g\u0119\u015bl\u0105 ja\u017a\u0144' + """ + return unicode(urllib.unquote(url), 'utf-8', 'ignore').replace('_', ' ') + + +def find_repo_path(path): + """Go up the directory tree looking for a Mercurial repository (a directory containing a .hg subdirectory).""" + while not os.path.isdir(os.path.join(path, ".hg")): + old_path, path = path, os.path.dirname(path) + if path == old_path: + return None + return path + + +def locked_repo(func): + """A decorator for locking the repository when calling a method.""" + + def new_func(self, *args, **kwargs): + """Wrap the original function in locks.""" + + wlock = self.repo.wlock() + lock = self.repo.lock() + try: + func(self, *args, **kwargs) + finally: + lock.release() + wlock.release() + + return new_func + + +def guess_mime(file_name): + """ + Guess file's mime type based on extension. + Default ot text/x-wiki for files without an extension. + + >>> guess_mime('something.txt') + 'text/plain' + >>> guess_mime('SomePage') + 'text/x-wiki' + >>> guess_mime(u'ąęśUnicodePage') + 'text/x-wiki' + >>> guess_mime('image.png') + 'image/png' + >>> guess_mime('style.css') + 'text/css' + >>> guess_mime('archive.tar.gz') + 'archive/gzip' + """ + + mime, encoding = mimetypes.guess_type(file_name, strict=False) + if encoding: + mime = 'archive/%s' % encoding + if mime is None: + mime = 'text/x-wiki' + return mime + + +class DocumentNotFound(Exception): + pass + + +class VersionedStorage(object): + """ + Provides means of storing text pages and keeping track of their + change history, using Mercurial repository as the storage method. + """ + + def __init__(self, path, charset=None): + """ + Takes the path to the directory where the pages are to be kept. + If the directory doen't exist, it will be created. If it's inside + a Mercurial repository, that repository will be used, otherwise + a new repository will be created in it. + """ + + self.charset = charset or 'utf-8' + self.path = path + if not os.path.exists(self.path): + os.makedirs(self.path) + self.repo_path = find_repo_path(self.path) + try: + self.ui = mercurial.ui.ui(report_untrusted=False, + interactive=False, quiet=True) + except TypeError: + # Mercurial 1.3 changed the way we setup the ui object. + self.ui = mercurial.ui.ui() + self.ui.quiet = True + self.ui._report_untrusted = False + self.ui.setconfig('ui', 'interactive', False) + if self.repo_path is None: + self.repo_path = self.path + create = True + else: + create = False + self.repo_prefix = self.path[len(self.repo_path):].strip('/') + self.repo = mercurial.hg.repository(self.ui, self.repo_path, + create=create) + + def reopen(self): + """Close and reopen the repo, to make sure we are up to date.""" + + self.repo = mercurial.hg.repository(self.ui, self.repo_path) + + def _file_path(self, title): + return os.path.join(self.path, urlquote(title, safe='')) + + def _title_to_file(self, title): + return os.path.join(self.repo_prefix, urlquote(title, safe='')) + + def _file_to_title(self, filename): + assert filename.startswith(self.repo_prefix) + name = filename[len(self.repo_prefix):].strip('/') + return urlunquote(name) + + def __contains__(self, title): + return os.path.exists(self._file_path(title)) + + def __iter__(self): + return self.all_pages() + + def merge_changes(self, changectx, repo_file, text, user, parent): + """Commits and merges conflicting changes in the repository.""" + tip_node = changectx.node() + filectx = changectx[repo_file].filectx(parent) + parent_node = filectx.changectx().node() + + self.repo.dirstate.setparents(parent_node) + node = self._commit([repo_file], text, user) + + partial = lambda filename: repo_file == filename + + # If p1 is equal to p2, there is no work to do. Even the dirstate is correct. + p1, p2 = self.repo[None].parents()[0], self.repo[tip_node] + if p1 == p2: + return text + + # TODO: Check if merge was successful + mercurial.merge.update(self.repo, tip_node, True, False, partial) + + self.repo.dirstate.setparents(tip_node, node) + # Mercurial 1.1 and later need updating the merge state + try: + mercurial.merge.mergestate(self.repo).mark(repo_file, "r") + except (AttributeError, KeyError): + pass + return u'merge of edit conflict' + + @locked_repo + def save_file(self, title, file_name, author=u'', comment=u'', parent=None): + """Save an existing file as specified page.""" + + user = author.encode('utf-8') or u'anon'.encode('utf-8') + text = comment.encode('utf-8') or u'comment'.encode('utf-8') + repo_file = self._title_to_file(title) + file_path = self._file_path(title) + mercurial.util.rename(file_name, file_path) + changectx = self._changectx() + try: + filectx_tip = changectx[repo_file] + current_page_rev = filectx_tip.filerev() + except mercurial.revlog.LookupError: + self.repo.add([repo_file]) + current_page_rev = -1 + if parent is not None and current_page_rev != parent: + msg = self.merge_changes(changectx, repo_file, text, user, parent) + user = '' + text = msg.encode('utf-8') + self._commit([repo_file], text, user) + + + def _commit(self, files, text, user): + try: + return self.repo.commit(files=files, text=text, user=user, + force=True, empty_ok=True) + except TypeError: + # Mercurial 1.3 doesn't accept empty_ok or files parameter + match = mercurial.match.exact(self.repo_path, '', list(files)) + return self.repo.commit(match=match, text=text, user=user, + force=True) + + + def save_data(self, title, data, author=u'', comment=u'', parent=None): + """Save data as specified page.""" + + try: + temp_path = tempfile.mkdtemp(dir=self.path) + file_path = os.path.join(temp_path, 'saved') + f = open(file_path, "wb") + f.write(data) + f.close() + self.save_file(title, file_path, author, comment, parent) + finally: + try: + os.unlink(file_path) + except OSError: + pass + try: + os.rmdir(temp_path) + except OSError: + pass + + def save_text(self, title, text, author=u'', comment=u'', parent=None): + """Save text as specified page, encoded to charset.""" + + data = text.encode(self.charset) + self.save_data(title, data, author, comment, parent) + + def page_text(self, title): + """Read unicode text of a page.""" + + data = self.open_page(title).read() + text = unicode(data, self.charset, 'replace') + return text + + def page_lines(self, page): + for data in page: + yield unicode(data, self.charset, 'replace') + + @locked_repo + def delete_page(self, title, author=u'', comment=u''): + user = author.encode('utf-8') or 'anon' + text = comment.encode('utf-8') or 'deleted' + repo_file = self._title_to_file(title) + file_path = self._file_path(title) + try: + os.unlink(file_path) + except OSError: + pass + self.repo.remove([repo_file]) + self._commit([repo_file], text, user) + + def open_page(self, title): + try: + return open(self._file_path(title), "rb") + except IOError: + raise DocumentNotFound() + + def page_file_meta(self, title): + """Get page's inode number, size and last modification time.""" + + try: + (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, + st_atime, st_mtime, st_ctime) = os.stat(self._file_path(title)) + except OSError: + return 0, 0, 0 + return st_ino, st_size, st_mtime + + def page_meta(self, title): + """Get page's revision, date, last editor and his edit comment.""" + + filectx_tip = self._find_filectx(title) + if filectx_tip is None: + raise DocumentNotFound() + #return -1, None, u'', u'' + rev = filectx_tip.filerev() + filectx = filectx_tip.filectx(rev) + date = datetime.datetime.fromtimestamp(filectx.date()[0]) + author = unicode(filectx.user(), "utf-8", + 'replace').split('<')[0].strip() + comment = unicode(filectx.description(), "utf-8", 'replace') + return rev, date, author, comment + + def repo_revision(self): + return self._changectx().rev() + + def page_mime(self, title): + """ + Guess page's mime type based on corresponding file name. + Default ot text/x-wiki for files without an extension. + """ + return guess_type(self._file_path(title)) + + def _changectx(self): + """Get the changectx of the tip.""" + try: + # This is for Mercurial 1.0 + return self.repo.changectx() + except TypeError: + # Mercurial 1.3 (and possibly earlier) needs an argument + return self.repo.changectx('tip') + + def _find_filectx(self, title): + """Find the last revision in which the file existed.""" + + repo_file = self._title_to_file(title) + changectx = self._changectx() + stack = [changectx] + while repo_file not in changectx: + if not stack: + return None + changectx = stack.pop() + for parent in changectx.parents(): + if parent != changectx: + stack.append(parent) + return changectx[repo_file] + + def page_history(self, title): + """Iterate over the page's history.""" + + filectx_tip = self._find_filectx(title) + if filectx_tip is None: + return + maxrev = filectx_tip.filerev() + minrev = 0 + for rev in range(maxrev, minrev-1, -1): + filectx = filectx_tip.filectx(rev) + date = datetime.datetime.fromtimestamp(filectx.date()[0]) + author = unicode(filectx.user(), "utf-8", + 'replace').split('<')[0].strip() + comment = unicode(filectx.description(), "utf-8", 'replace') + yield rev, date, author, comment + + def page_revision(self, title, rev): + """Get unicode contents of specified revision of the page.""" + + filectx_tip = self._find_filectx(title) + if filectx_tip is None: + raise DocumentNotFound() + try: + data = filectx_tip.filectx(rev).data() + except IndexError: + raise DocumentNotFound() + return data + + def revision_text(self, title, rev): + data = self.page_revision(title, rev) + text = unicode(data, self.charset, 'replace') + return text + + def history(self): + """Iterate over the history of entire wiki.""" + + changectx = self._changectx() + maxrev = changectx.rev() + minrev = 0 + for wiki_rev in range(maxrev, minrev-1, -1): + change = self.repo.changectx(wiki_rev) + date = datetime.datetime.fromtimestamp(change.date()[0]) + author = unicode(change.user(), "utf-8", + 'replace').split('<')[0].strip() + comment = unicode(change.description(), "utf-8", 'replace') + for repo_file in change.files(): + if repo_file.startswith(self.repo_prefix): + title = self._file_to_title(repo_file) + try: + rev = change[repo_file].filerev() + except mercurial.revlog.LookupError: + rev = -1 + yield title, rev, date, author, comment + + def all_pages(self): + """Iterate over the titles of all pages in the wiki.""" + + for filename in os.listdir(self.path): + if (os.path.isfile(os.path.join(self.path, filename)) + and not filename.startswith('.')): + yield urlunquote(filename) + + def changed_since(self, rev): + """Return all pages that changed since specified repository revision.""" + + try: + last = self.repo.lookup(int(rev)) + except IndexError: + for page in self.all_pages(): + yield page + return + current = self.repo.lookup('tip') + status = self.repo.status(current, last) + modified, added, removed, deleted, unknown, ignored, clean = status + for filename in modified+added+removed+deleted: + if filename.startswith(self.repo_prefix): + yield self._file_to_title(filename) diff --git a/lib/vstorage/__init__.py b/lib/vstorage/__init__.py deleted file mode 100644 index 1060bc25..00000000 --- a/lib/vstorage/__init__.py +++ /dev/null @@ -1,404 +0,0 @@ -# -*- coding: utf-8 -*- -import os -import tempfile -import datetime -import mimetypes -import urllib - -# Note: we have to set these before importing Mercurial -os.environ['HGENCODING'] = 'utf-8' -os.environ['HGMERGE'] = "internal:merge" - -import mercurial.hg -import mercurial.ui -import mercurial.revlog -import mercurial.util - - -def urlquote(url, safe='/'): - """Quotes URL - - >>> urlquote(u'Za\u017c\xf3\u0142\u0107 g\u0119\u015bl\u0105 ja\u017a\u0144') - 'Za%C5%BC%C3%B3%C5%82%C4%87_g%C4%99%C5%9Bl%C4%85_ja%C5%BA%C5%84' - """ - return urllib.quote(url.replace(' ', '_').encode('utf-8', 'ignore'), safe) - - -def urlunquote(url): - """Unqotes URL - - # >>> urlunquote('Za%C5%BC%C3%B3%C5%82%C4%87_g%C4%99%C5%9Bl%C4%85_ja%C5%BA%C5%84') - # u'Za\u017c\xf3\u0142\u0107 g\u0119\u015bl\u0105 ja\u017a\u0144' - """ - return unicode(urllib.unquote(url), 'utf-8', 'ignore').replace('_', ' ') - - -def find_repo_path(path): - """Go up the directory tree looking for a Mercurial repository (a directory containing a .hg subdirectory).""" - while not os.path.isdir(os.path.join(path, ".hg")): - old_path, path = path, os.path.dirname(path) - if path == old_path: - return None - return path - - -def locked_repo(func): - """A decorator for locking the repository when calling a method.""" - - def new_func(self, *args, **kwargs): - """Wrap the original function in locks.""" - - wlock = self.repo.wlock() - lock = self.repo.lock() - try: - func(self, *args, **kwargs) - finally: - lock.release() - wlock.release() - - return new_func - - -def guess_mime(file_name): - """ - Guess file's mime type based on extension. - Default ot text/x-wiki for files without an extension. - - >>> guess_mime('something.txt') - 'text/plain' - >>> guess_mime('SomePage') - 'text/x-wiki' - >>> guess_mime(u'ąęśUnicodePage') - 'text/x-wiki' - >>> guess_mime('image.png') - 'image/png' - >>> guess_mime('style.css') - 'text/css' - >>> guess_mime('archive.tar.gz') - 'archive/gzip' - """ - - mime, encoding = mimetypes.guess_type(file_name, strict=False) - if encoding: - mime = 'archive/%s' % encoding - if mime is None: - mime = 'text/x-wiki' - return mime - - -class DocumentNotFound(Exception): - pass - - -class VersionedStorage(object): - """ - Provides means of storing text pages and keeping track of their - change history, using Mercurial repository as the storage method. - """ - - def __init__(self, path, charset=None): - """ - Takes the path to the directory where the pages are to be kept. - If the directory doen't exist, it will be created. If it's inside - a Mercurial repository, that repository will be used, otherwise - a new repository will be created in it. - """ - - self.charset = charset or 'utf-8' - self.path = path - if not os.path.exists(self.path): - os.makedirs(self.path) - self.repo_path = find_repo_path(self.path) - try: - self.ui = mercurial.ui.ui(report_untrusted=False, - interactive=False, quiet=True) - except TypeError: - # Mercurial 1.3 changed the way we setup the ui object. - self.ui = mercurial.ui.ui() - self.ui.quiet = True - self.ui._report_untrusted = False - self.ui.setconfig('ui', 'interactive', False) - if self.repo_path is None: - self.repo_path = self.path - create = True - else: - create = False - self.repo_prefix = self.path[len(self.repo_path):].strip('/') - self.repo = mercurial.hg.repository(self.ui, self.repo_path, - create=create) - - def reopen(self): - """Close and reopen the repo, to make sure we are up to date.""" - - self.repo = mercurial.hg.repository(self.ui, self.repo_path) - - def _file_path(self, title): - return os.path.join(self.path, urlquote(title, safe='')) - - def _title_to_file(self, title): - return os.path.join(self.repo_prefix, urlquote(title, safe='')) - - def _file_to_title(self, filename): - assert filename.startswith(self.repo_prefix) - name = filename[len(self.repo_prefix):].strip('/') - return urlunquote(name) - - def __contains__(self, title): - return os.path.exists(self._file_path(title)) - - def __iter__(self): - return self.all_pages() - - def merge_changes(self, changectx, repo_file, text, user, parent): - """Commits and merges conflicting changes in the repository.""" - tip_node = changectx.node() - filectx = changectx[repo_file].filectx(parent) - parent_node = filectx.changectx().node() - - self.repo.dirstate.setparents(parent_node) - node = self._commit([repo_file], text, user) - - partial = lambda filename: repo_file == filename - - # If p1 is equal to p2, there is no work to do. Even the dirstate is correct. - p1, p2 = self.repo[None].parents()[0], self.repo[tip_node] - if p1 == p2: - return text - - # TODO: Check if merge was successful - mercurial.merge.update(self.repo, tip_node, True, False, partial) - - self.repo.dirstate.setparents(tip_node, node) - # Mercurial 1.1 and later need updating the merge state - try: - mercurial.merge.mergestate(self.repo).mark(repo_file, "r") - except (AttributeError, KeyError): - pass - return u'merge of edit conflict' - - @locked_repo - def save_file(self, title, file_name, author=u'', comment=u'', parent=None): - """Save an existing file as specified page.""" - - user = author.encode('utf-8') or u'anon'.encode('utf-8') - text = comment.encode('utf-8') or u'comment'.encode('utf-8') - repo_file = self._title_to_file(title) - file_path = self._file_path(title) - mercurial.util.rename(file_name, file_path) - changectx = self._changectx() - try: - filectx_tip = changectx[repo_file] - current_page_rev = filectx_tip.filerev() - except mercurial.revlog.LookupError: - self.repo.add([repo_file]) - current_page_rev = -1 - if parent is not None and current_page_rev != parent: - msg = self.merge_changes(changectx, repo_file, text, user, parent) - user = '' - text = msg.encode('utf-8') - self._commit([repo_file], text, user) - - - def _commit(self, files, text, user): - try: - return self.repo.commit(files=files, text=text, user=user, - force=True, empty_ok=True) - except TypeError: - # Mercurial 1.3 doesn't accept empty_ok or files parameter - match = mercurial.match.exact(self.repo_path, '', list(files)) - return self.repo.commit(match=match, text=text, user=user, - force=True) - - - def save_data(self, title, data, author=u'', comment=u'', parent=None): - """Save data as specified page.""" - - try: - temp_path = tempfile.mkdtemp(dir=self.path) - file_path = os.path.join(temp_path, 'saved') - f = open(file_path, "wb") - f.write(data) - f.close() - self.save_file(title, file_path, author, comment, parent) - finally: - try: - os.unlink(file_path) - except OSError: - pass - try: - os.rmdir(temp_path) - except OSError: - pass - - def save_text(self, title, text, author=u'', comment=u'', parent=None): - """Save text as specified page, encoded to charset.""" - - data = text.encode(self.charset) - self.save_data(title, data, author, comment, parent) - - def page_text(self, title): - """Read unicode text of a page.""" - - data = self.open_page(title).read() - text = unicode(data, self.charset, 'replace') - return text - - def page_lines(self, page): - for data in page: - yield unicode(data, self.charset, 'replace') - - @locked_repo - def delete_page(self, title, author=u'', comment=u''): - user = author.encode('utf-8') or 'anon' - text = comment.encode('utf-8') or 'deleted' - repo_file = self._title_to_file(title) - file_path = self._file_path(title) - try: - os.unlink(file_path) - except OSError: - pass - self.repo.remove([repo_file]) - self._commit([repo_file], text, user) - - def open_page(self, title): - try: - return open(self._file_path(title), "rb") - except IOError: - raise DocumentNotFound() - - def page_file_meta(self, title): - """Get page's inode number, size and last modification time.""" - - try: - (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, - st_atime, st_mtime, st_ctime) = os.stat(self._file_path(title)) - except OSError: - return 0, 0, 0 - return st_ino, st_size, st_mtime - - def page_meta(self, title): - """Get page's revision, date, last editor and his edit comment.""" - - filectx_tip = self._find_filectx(title) - if filectx_tip is None: - raise DocumentNotFound() - #return -1, None, u'', u'' - rev = filectx_tip.filerev() - filectx = filectx_tip.filectx(rev) - date = datetime.datetime.fromtimestamp(filectx.date()[0]) - author = unicode(filectx.user(), "utf-8", - 'replace').split('<')[0].strip() - comment = unicode(filectx.description(), "utf-8", 'replace') - return rev, date, author, comment - - def repo_revision(self): - return self._changectx().rev() - - def page_mime(self, title): - """ - Guess page's mime type based on corresponding file name. - Default ot text/x-wiki for files without an extension. - """ - return guess_type(self._file_path(title)) - - def _changectx(self): - """Get the changectx of the tip.""" - try: - # This is for Mercurial 1.0 - return self.repo.changectx() - except TypeError: - # Mercurial 1.3 (and possibly earlier) needs an argument - return self.repo.changectx('tip') - - def _find_filectx(self, title): - """Find the last revision in which the file existed.""" - - repo_file = self._title_to_file(title) - changectx = self._changectx() - stack = [changectx] - while repo_file not in changectx: - if not stack: - return None - changectx = stack.pop() - for parent in changectx.parents(): - if parent != changectx: - stack.append(parent) - return changectx[repo_file] - - def page_history(self, title): - """Iterate over the page's history.""" - - filectx_tip = self._find_filectx(title) - if filectx_tip is None: - return - maxrev = filectx_tip.filerev() - minrev = 0 - for rev in range(maxrev, minrev-1, -1): - filectx = filectx_tip.filectx(rev) - date = datetime.datetime.fromtimestamp(filectx.date()[0]) - author = unicode(filectx.user(), "utf-8", - 'replace').split('<')[0].strip() - comment = unicode(filectx.description(), "utf-8", 'replace') - yield rev, date, author, comment - - def page_revision(self, title, rev): - """Get unicode contents of specified revision of the page.""" - - filectx_tip = self._find_filectx(title) - if filectx_tip is None: - raise DocumentNotFound() - try: - data = filectx_tip.filectx(rev).data() - except IndexError: - raise DocumentNotFound() - return data - - def revision_text(self, title, rev): - data = self.page_revision(title, rev) - text = unicode(data, self.charset, 'replace') - return text - - def history(self): - """Iterate over the history of entire wiki.""" - - changectx = self._changectx() - maxrev = changectx.rev() - minrev = 0 - for wiki_rev in range(maxrev, minrev-1, -1): - change = self.repo.changectx(wiki_rev) - date = datetime.datetime.fromtimestamp(change.date()[0]) - author = unicode(change.user(), "utf-8", - 'replace').split('<')[0].strip() - comment = unicode(change.description(), "utf-8", 'replace') - for repo_file in change.files(): - if repo_file.startswith(self.repo_prefix): - title = self._file_to_title(repo_file) - try: - rev = change[repo_file].filerev() - except mercurial.revlog.LookupError: - rev = -1 - yield title, rev, date, author, comment - - def all_pages(self): - """Iterate over the titles of all pages in the wiki.""" - - for filename in os.listdir(self.path): - if (os.path.isfile(os.path.join(self.path, filename)) - and not filename.startswith('.')): - yield urlunquote(filename) - - def changed_since(self, rev): - """Return all pages that changed since specified repository revision.""" - - try: - last = self.repo.lookup(int(rev)) - except IndexError: - for page in self.all_pages(): - yield page - return - current = self.repo.lookup('tip') - status = self.repo.status(current, last) - modified, added, removed, deleted, unknown, ignored, clean = status - for filename in modified+added+removed+deleted: - if filename.startswith(self.repo_prefix): - yield self._file_to_title(filename) diff --git a/platforma/settings.py b/platforma/settings.py index cbfd70fa..58861bf4 100755 --- a/platforma/settings.py +++ b/platforma/settings.py @@ -117,6 +117,7 @@ INSTALLED_APPS = ( 'django.contrib.admin', 'django.contrib.admindocs', + 'wiki', 'piston', 'sorl.thumbnail', 'filebrowser',