Mercurial > ~astiob > upreckon > hgweb
view upreckon/files.py @ 246:1bc89faac941 2.04
Fixed: match='re' could produce duplicate test identifiers
files.Files.regexp(pattern) now makes sure to return only one
metafile for each matching virtual path, namely, the one that would
be returned for that virtual path by files.Files.from_virtual_path.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Thu, 03 Oct 2013 01:19:09 +0300 |
parents | 7827e63cd148 |
children | f5847d29e838 |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> """File access routines and classes with support for archives.""" from __future__ import division, with_statement from .compat import * import contextlib, os, posixpath, re, shutil # You don't need to know about anything else. __all__ = 'File', 'regexp' # In these two variables, use full stops no matter what os.extsep is; # all full stops will be converted to os.extsep on the fly archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2' formats = {} class Archive(object): __slots__ = () if ABCMeta: __metaclass__ = ABCMeta def __new__(cls, path): """ Create a new instance of the archive class corresponding to the file name in the given path. """ if cls is not Archive: return object.__new__(cls) else: # Do this by hand rather than through os.path.splitext # because we support multi-dotted file name extensions ext = path.partition(os.path.extsep)[2] while ext: if ext in formats: return formats[ext](path) ext = ext.partition(os.path.extsep)[2] raise LookupError("unsupported archive file name extension in file name '%s'" % filename) @abstractmethod def __init__(self, path): raise NotImplementedError @abstractmethod def extract(self, name, target): raise NotImplementedError @abstractmethod def open(self, name): raise NotImplementedError @abstractmethod def exists(self, name): raise NotImplementedError @abstractmethod def listdir(self, name): raise NotImplementedError try: import tarfile except ImportError: TarArchive = None else: class TarArchive(Archive): __slots__ = '_tarfile', '_files', '_dirs', '_names' def __init__(self, path): self._tarfile = tarfile.open(path) files, dirs = {}, set(('/',)) for member in self._tarfile.getmembers(): cutname = posixpath.normpath('/' + member.name) if cutname == '/': continue if member.isfile(): files[cutname] = member cutname = posixpath.dirname(cutname) elif not member.isdir(): continue while cutname != '/': dirs.add(cutname) cutname = posixpath.dirname(cutname) self._files = files self._dirs = frozenset(dirs) self._names = self._dirs | frozenset(files) def extract(self, name, target): member = self._files[posixpath.normpath('/' + name)] member.name = target self._tarfile.extract(member) def open(self, name): name = posixpath.normpath('/' + name) return self._tarfile.extractfile(self._files[name]) def exists(self, name): return posixpath.normpath('/' + name) in self._names def listdir(self, name): normname = posixpath.normpath('/' + name) if normname not in self._dirs: raise KeyError('No such directory: %r' % name) if normname != '/': normname += '/' nnlen = len(normname) return [fname[nnlen:] for fname in self._names if fname.startswith(normname) and fname.find('/', nnlen) == -1] def __enter__(self): if hasattr(self._tarfile, '__enter__'): self._tarfile.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self._tarfile, '__exit__'): return self._tarfile.__exit__(exc_type, exc_value, traceback) elif exc_type is None: self._tarfile.close() else: # This code was shamelessly copied from tarfile.py of Python 2.7 if not self._tarfile._extfileobj: self._tarfile.fileobj.close() self._tarfile.closed = True formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive try: import zipfile except ImportError: ZipArchive = None else: class ZipArchive(Archive): __slots__ = '_zipfile', '_files', '_dirs', '_names' def __init__(self, path): self._zipfile = zipfile.ZipFile(path) files, dirs = {}, set(('/',)) for member in self._zipfile.infolist(): cutname = posixpath.normpath('/' + member.filename) if not member.filename.endswith('/'): files[cutname] = member cutname = posixpath.dirname(cutname) while cutname != '/': dirs.add(cutname) cutname = posixpath.dirname(cutname) self._files = files self._dirs = frozenset(dirs) self._names = self._dirs | frozenset(files) def extract(self, name, target): member = self._files[posixpath.normpath('/' + name)] # FIXME: 2.5 lacks ZipFile.extract if os.path.isabs(target): # To my knowledge, this is as portable as it gets path = os.path.join(os.path.splitdrive(target)[0], os.path.sep) member.filename = os.path.relpath(target, path) self._zipfile.extract(member, path) else: member.filename = os.path.relpath(target) self._zipfile.extract(member) def open(self, name): name = posixpath.normpath('/' + name) # FIXME: 2.5 lacks ZipFile.open return self._zipfile.open(self._files[name]) def exists(self, name): return posixpath.normpath('/' + name) in self._names def listdir(self, name): normname = posixpath.normpath('/' + name) if normname not in self._dirs: raise KeyError('No such directory: %r' % name) if normname != '/': normname += '/' nnlen = len(normname) return [fname[nnlen:] for fname in self._names if fname.startswith(normname) and fname.find('/', nnlen) == -1] def __enter__(self): if hasattr(self._zipfile, '__enter__'): self._zipfile.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self._zipfile, '__exit__'): return self._zipfile.__exit__(exc_type, exc_value, traceback) else: return self._zipfile.close() formats['zip'] = ZipArchive # Remove unsupported archive formats and replace full stops # with the platform-dependent file name extension separator def issupported(filename, formats=formats): ext = filename.partition('.')[2] while ext: if ext in formats: return True ext = ext.partition('.')[2] return False archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)] formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats)) open_archives = {} def open_archive(path): if path in open_archives: return open_archives[path] else: open_archives[path] = archive = Archive(path) return archive class File(object): __slots__ = ('virtual_path', 'archive', '_external_path', '_internal_path', '_has_tests') def __init__(self, _virtual_path='', _external_path='', _internal_path='', _archive=None, _has_tests=False): self.virtual_path = _virtual_path self._external_path = _external_path self._internal_path = _internal_path self.archive = _archive self._has_tests = _has_tests if not _archive: try: self.archive = open_archive(_external_path) except Exception: pass @property def full_real_path(self): intpath = self._internal_path.split('/') if self._internal_path else () return os.path.join(self._external_path, *(filename.replace('.', os.path.extsep) for filename in intpath)) def exists(self): if self.archive: return self.archive.exists(self._internal_path) else: return (not self._external_path or os.path.exists(self._external_path)) def open(self): if self.archive: file = self.archive.open(self._internal_path) if hasattr(file, '__exit__'): return file else: return contextlib.closing(file) else: return open(self._external_path, 'rb') def copy(self, target): if self.archive: self.archive.extract(self._internal_path, target) else: shutil.copy(self._external_path, target) def listdir(self): if self.archive: return self.archive.listdir(self._internal_path) else: return os.listdir(self._external_path) @classmethod def from_virtual_path(cls, virtual_path, allow_root, msg): metafile = cls()._realize_path(virtual_path.split('/'), allow_root) if not metafile: raise IOError("%s file with virtual path %r could not be found" % (msg, virtual_path)) assert metafile.virtual_path == virtual_path return metafile def _realize_path(self, virtpath, allow_root): if not self.exists(): return None elif not virtpath: if allow_root or self._has_tests or self.archive: return self return None cand = (self + virtpath[0])._realize_path(virtpath[1:], allow_root) if cand: return cand if not cand and not self._has_tests: for metafile in self._add_tests(): cand = metafile._realize_path(virtpath, allow_root) if cand: return cand if not cand and len(virtpath) > 1: metafile = self._add_virtual(virtpath[0]) cand = metafile._realize_path(virtpath[1:], allow_root) if cand: return cand def _add_tests(self): assert not self._has_tests metafile = self.__add__('tests', False) metafile._has_tests = True yield metafile if not self.archive: for filename in archives: yield self.__add__(filename, False) def _also_add_tests(self): yield self if not self._has_tests: for metafile in self._add_tests(): yield metafile def _add_virtual(self, filename): return File(posixpath.join(self.virtual_path, filename), self._external_path, self._internal_path, self.archive, self._has_tests) def __add__(self, filename, add_virtual=True): if not isinstance(filename, basestring): return NotImplemented if add_virtual: virtual_path = posixpath.join(self.virtual_path, filename) else: virtual_path = self.virtual_path if self.archive: return File(virtual_path, self._external_path, posixpath.join(self._internal_path, filename), self.archive, self._has_tests) else: filename = filename.replace('.', os.path.extsep) return File(virtual_path, os.path.join(self._external_path, filename), _has_tests=self._has_tests) @classmethod def regexp(cls, pattern, _unique=True): if not pattern: yield cls('', os.curdir) return dirname, basename = posixpath.split(pattern) dirs = cls.regexp(dirname, False) reobj = re.compile(pattern + '$', re.UNICODE) if _unique: yielded = set() while dirs: newdirs = [] for testless_directory in dirs: for directory in testless_directory._also_add_tests(): try: names = directory.listdir() except Exception: continue for name in names: dir_entry = directory + name if re.match(reobj, dir_entry.virtual_path): if not _unique: yield dir_entry elif dir_entry.virtual_path not in yielded: yield dir_entry yielded.add(dir_entry.virtual_path) dirs = newdirs