Mercurial > ~astiob > upreckon > hgweb
view upreckon/files.py @ 209:c03a8113685d
Rewrote files.regexp as files.File.regexp (a class method)
I still dislike the iterative code though.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Thu, 18 Aug 2011 02:41:46 +0300 |
parents | ede78fbd509a |
children | 1cbe2c428942 |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> """File access routines and classes with support for archives.""" from __future__ import division, with_statement from .compat import * import contextlib, itertools, os, posixpath, re, shutil, sys # You don't need to know about anything else. __all__ = 'File', 'regexp' # In these two variables, use full stops no matter what os.extsep is; # all full stops will be converted to os.extsep on the fly archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2' formats = {} class Archive(object): __slots__ = () if ABCMeta: __metaclass__ = ABCMeta def __new__(cls, path): """ Create a new instance of the archive class corresponding to the file name in the given path. """ if cls is not Archive: return object.__new__(cls) else: # Do this by hand rather than through os.path.splitext # because we support multi-dotted file name extensions ext = path.partition(os.path.extsep)[2] while ext: if ext in formats: return formats[ext](path) ext = ext.partition(os.path.extsep)[2] raise LookupError("unsupported archive file name extension in file name '%s'" % filename) @abstractmethod def __init__(self, path): raise NotImplementedError @abstractmethod def extract(self, name, target): raise NotImplementedError @abstractmethod def open(self, name): raise NotImplementedError @abstractmethod def exists(self, name): raise NotImplementedError @abstractmethod def listdir(self, name): raise NotImplementedError try: import tarfile except ImportError: TarArchive = None else: class TarArchive(Archive): __slots__ = '_tarfile', '_files', '_dirs', '_names' def __init__(self, path): self._tarfile = tarfile.open(path) files, dirs = {}, set(('/',)) for member in self._tarfile.getmembers(): cutname = posixpath.normpath('/' + member.name) if cutname == '/': continue if member.isfile(): files[cutname] = member cutname = posixpath.dirname(cutname) elif not member.isdir(): continue while cutname != '/': dirs.add(cutname) cutname = posixpath.dirname(cutname) self._files = files self._dirs = frozenset(dirs) self._names = self._dirs | frozenset(files) def extract(self, name, target): member = self._files[posixpath.normpath('/' + name)] member.name = target self._tarfile.extract(member) def open(self, name): name = posixpath.normpath('/' + name) return self._tarfile.extractfile(self._files[name]) def exists(self, name): return posixpath.normpath('/' + name) in self._names def listdir(self, name): normname = posixpath.normpath('/' + name) if normname not in self._dirs: raise KeyError('No such directory: %r' % name) if normname != '/': normname += '/' nnlen = len(normname) return [fname[nnlen:] for fname in self._names if fname.startswith(normname) and fname.find('/', nnlen) == -1] def __enter__(self): if hasattr(self._tarfile, '__enter__'): self._tarfile.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self._tarfile, '__exit__'): return self._tarfile.__exit__(exc_type, exc_value, traceback) elif exc_type is None: self._tarfile.close() else: # This code was shamelessly copied from tarfile.py of Python 2.7 if not self._tarfile._extfileobj: self._tarfile.fileobj.close() self._tarfile.closed = True formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive try: import zipfile except ImportError: ZipArchive = None else: class ZipArchive(Archive): __slots__ = '_zipfile', '_files', '_dirs', '_names' def __init__(self, path): self._zipfile = zipfile.ZipFile(path) files, dirs = {}, set(('/',)) for member in self._zipfile.infolist(): cutname = posixpath.normpath('/' + member.filename) if not member.filename.endswith('/'): files[cutname] = member cutname = posixpath.dirname(cutname) while cutname != '/': dirs.add(cutname) cutname = posixpath.dirname(cutname) self._files = files self._dirs = frozenset(dirs) self._names = self._dirs | frozenset(files) def extract(self, name, target): member = self._files[posixpath.normpath('/' + name)] # FIXME: 2.5 lacks ZipFile.extract if os.path.isabs(target): # To my knowledge, this is as portable as it gets path = os.path.join(os.path.splitdrive(target)[0], os.path.sep) member.filename = os.path.relpath(target, path) self._zipfile.extract(member, path) else: member.filename = os.path.relpath(target) self._zipfile.extract(member) def open(self, name): name = posixpath.normpath('/' + name) # FIXME: 2.5 lacks ZipFile.open return self._zipfile.open(self._files[name]) def exists(self, name): return posixpath.normpath('/' + name) in self._names def listdir(self, name): normname = posixpath.normpath('/' + name) if normname not in self._dirs: raise KeyError('No such directory: %r' % name) if normname != '/': normname += '/' nnlen = len(normname) return [fname[nnlen:] for fname in self._names if fname.startswith(normname) and fname.find('/', nnlen) == -1] def __enter__(self): if hasattr(self._zipfile, '__enter__'): self._zipfile.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self._zipfile, '__exit__'): return self._zipfile.__exit__(exc_type, exc_value, traceback) else: return self._zipfile.close() formats['zip'] = ZipArchive # Remove unsupported archive formats and replace full stops # with the platform-dependent file name extension separator def issupported(filename, formats=formats): ext = filename.partition('.')[2] while ext: if ext in formats: return True ext = ext.partition('.')[2] return False archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)] formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats)) open_archives = {} def open_archive(path): if path in open_archives: return open_archives[path] else: open_archives[path] = archive = Archive(path) return archive class File(object): __slots__ = ('virtual_path', 'archive', '_external_path', '_internal_path', '_has_tests') def __init__(self, _virtual_path='', _external_path='', _internal_path='', _archive=None, _has_tests=False): self.virtual_path = _virtual_path self._external_path = _external_path self._internal_path = _internal_path self.archive = _archive self._has_tests = _has_tests if not _archive: try: self.archive = open_archive(_external_path) except Exception: pass @property def full_real_path(self): intpath = self._internal_path.split('/') if self._internal_path else () return os.path.join(self._external_path, *(filename.replace('.', os.path.extsep) for filename in intpath)) def exists(self): if self.archive: return self.archive.exists(self._internal_path) else: return (not self._external_path or os.path.exists(self._external_path)) def open(self): if self.archive: file = self.archive.open(self._internal_path) if hasattr(file, '__exit__'): return file else: return contextlib.closing(file) else: return open(self._external_path, 'rb') def copy(self, target): if self.archive: self.archive.extract(self._internal_path, target) else: shutil.copy(self._external_path, target) def listdir(self): if self.archive: return self.archive.listdir(self._internal_path) else: return os.listdir(self._external_path) @classmethod def from_virtual_path(cls, virtual_path, allow_root, msg): metafile = cls()._realize_path(virtual_path.split('/'), allow_root) if not metafile: raise IOError("%s file with virtual path %r could not be found" % (msg, virtual_path)) assert metafile.virtual_path == virtual_path return metafile def _realize_path(self, virtpath, allow_root): if not self.exists(): return None elif not virtpath: if allow_root or self._has_tests or self.archive: return self return None cand = (self + virtpath[0])._realize_path(virtpath[1:], allow_root) if cand: return cand if not cand and not self._has_tests: for metafile in self._add_tests(): cand = metafile._realize_path(virtpath, allow_root) if cand: return cand if not cand and len(virtpath) > 1: metafile = self._add_virtual(virtpath[0]) cand = metafile._realize_path(virtpath[1:], allow_root) if cand: return cand def _add_tests(self): assert not self._has_tests metafile = self.__add__('tests', False) metafile._has_tests = True yield metafile if not self.archive: for filename in archives: yield self.__add__(filename, False) def _add_virtual(self, filename): return File(posixpath.join(self.virtual_path, filename), self._external_path, self._internal_path, self.archive, self._has_tests) def __add__(self, filename, add_virtual=True): if not isinstance(filename, basestring): return NotImplemented if add_virtual: virtual_path = posixpath.join(self.virtual_path, filename) else: virtual_path = self.virtual_path if self.archive: return File(virtual_path, self._external_path, posixpath.join(self._internal_path, filename), self.archive, self._has_tests) else: filename = filename.replace('.', os.path.extsep) return File(virtual_path, os.path.join(self._external_path, filename), _has_tests=self._has_tests) @classmethod def regexp(cls, pattern): if not pattern: yield cls('', os.curdir) return dirname, basename = posixpath.split(pattern) dirs = cls.regexp(dirname) reobj = re.compile(pattern + '$', re.UNICODE) while dirs: newdirs = [] for directory in dirs: try: names = directory.listdir() except Exception: continue for name in names: dir_entry = directory + name if re.match(reobj, dir_entry.virtual_path): yield dir_entry if not directory._has_tests: if name == 'tests': dir_entry = directory.__add__(name, False) dir_entry._has_tests = True newdirs.append(dir_entry) elif not directory.archive and name in archives: newdirs.append(directory.__add__(name, False)) dirs = newdirs