Mercurial > ~astiob > upreckon > hgweb
view upreckon/files.py @ 193:a76cdc26ba9d
Added conf. var. match and match='regexp' for non-archives
Specify match='regexp', and your tests and dummies will be treated
as regular expressions describing test case identifiers. Every file that
is in a suitable location and whose name matches {testcase,dummy}inname
and the given regexp will be treated as a file with test case input data.
You are free to use backreferences in the regexps, but group numbering
starts at two rather than one.
If you want test groups, you can get them magically created for you
by putting a part of the test ID in a group in the regexp sense
and specifying the tests variable as a pair consisting of the regexp
itself and the number of this regexp group (remember group numbers start
at two).
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Thu, 11 Aug 2011 23:20:52 +0300 |
parents | 0d657576b1ac |
children | 8c30a2c8a09e |
line wrap: on
line source
# Copyright (c) 2010 Chortos-2 <chortos@inbox.lv> """File access routines and classes with support for archives.""" from __future__ import division, with_statement from .compat import * import contextlib, itertools, os, posixpath, re, shutil, sys # You don't need to know about anything else. __all__ = 'File', 'regexp' # In these two variables, use full stops no matter what os.extsep is; # all full stops will be converted to os.extsep on the fly archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2' formats = {} class Archive(object): __slots__ = 'file' if ABCMeta: __metaclass__ = ABCMeta def __new__(cls, path): """ Create a new instance of the archive class corresponding to the file name in the given path. """ if cls is not Archive: return object.__new__(cls) else: # Do this by hand rather than through os.path.splitext # because we support multi-dotted file name extensions ext = path.partition(os.path.extsep)[2] while ext: if ext in formats: return formats[ext](path) ext = ext.partition(os.path.extsep)[2] raise LookupError("unsupported archive file name extension in file name '%s'" % filename) @abstractmethod def __init__(self, path): raise NotImplementedError @abstractmethod def extract(self, name, target): raise NotImplementedError try: import tarfile except ImportError: TarArchive = None else: class TarArchive(Archive): __slots__ = '_namelist' def __init__(self, path): self.file = tarfile.open(path) def extract(self, name, target): member = self.file.getmember(name) member.name = target self.file.extract(member) def open(self, name): return self.file.extractfile(name) def exists(self, queried_name): if not hasattr(self, '_namelist'): names = set() for name in self.file.getnames(): cutname = name while cutname: names.add(cutname) cutname = cutname.rpartition('/')[0] self._namelist = frozenset(names) return queried_name in self._namelist def __enter__(self): if hasattr(self.file, '__enter__'): self.file.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self.file, '__exit__'): return self.file.__exit__(exc_type, exc_value, traceback) elif exc_type is None: self.file.close() else: # This code was shamelessly copied from tarfile.py of Python 2.7 if not self.file._extfileobj: self.file.fileobj.close() self.file.closed = True formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive try: import zipfile except ImportError: ZipArchive = None else: class ZipArchive(Archive): __slots__ = '_namelist' def __init__(self, path): self.file = zipfile.ZipFile(path) def extract(self, name, target): member = self.file.getinfo(name) # FIXME: 2.5 lacks ZipFile.extract if os.path.isabs(target): # To my knowledge, this is as portable as it gets path = os.path.join(os.path.splitdrive(target)[0], os.path.sep) member.filename = os.path.relpath(target, path) self.file.extract(member, path) else: member.filename = os.path.relpath(target) self.file.extract(member) def open(self, name): return self.file.open(name, 'r') def exists(self, queried_name): if not hasattr(self, '_namelist'): names = set() for name in self.file.namelist(): cutname = name while cutname: names.add(cutname) cutname = cutname.rpartition('/')[0] self._namelist = frozenset(names) return queried_name in self._namelist def __enter__(self): if hasattr(self.file, '__enter__'): self.file.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self.file, '__exit__'): return self.file.__exit__(exc_type, exc_value, traceback) else: return self.file.close() formats['zip'] = ZipArchive # Remove unsupported archive formats and replace full stops # with the platform-dependent file name extension separator def issupported(filename, formats=formats): ext = filename.partition('.')[2] while ext: if ext in formats: return True ext = ext.partition('.')[2] return False archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)] formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats)) open_archives = {} def open_archive(path): if path in open_archives: return open_archives[path] else: open_archives[path] = archive = Archive(path) return archive class File(object): __slots__ = 'virtual_path', 'real_path', 'full_real_path', 'archive' def __init__(self, virtpath, allow_root=False, msg='test data'): self.virtual_path = virtpath self.archive = None if not self.realize_path('', tuple(comp.replace('.', os.path.extsep) for comp in virtpath.split('/')), allow_root): raise IOError("%s file '%s' could not be found" % (msg, virtpath)) def realize_path(self, root, virtpath, allow_root=False, hastests=False): if root and not os.path.exists(root): return False if len(virtpath) > 1: if self.realize_path(os.path.join(root, virtpath[0]), virtpath[1:], allow_root, hastests): return True elif not hastests: if self.realize_path(os.path.join(root, 'tests'), virtpath, allow_root, True): return True for archive in archives: path = os.path.join(root, archive) if os.path.exists(path): if self.realize_path_archive(open_archive(path), '', virtpath, path): return True if self.realize_path(root, virtpath[1:], allow_root, hastests): return True else: if not hastests: path = os.path.join(root, 'tests', virtpath[0]) if os.path.exists(path): self.full_real_path = self.real_path = path return True for archive in archives: path = os.path.join(root, archive) if os.path.exists(path): if self.realize_path_archive(open_archive(path), '', virtpath, path): return True if hastests or allow_root: path = os.path.join(root, virtpath[0]) if os.path.exists(path): self.full_real_path = self.real_path = path return True return False def realize_path_archive(self, archive, root, virtpath, archpath, hastests=False): if root and not archive.exists(root): return False path = posixpath.join(root, virtpath[0]) if len(virtpath) > 1: if self.realize_path_archive(archive, path, virtpath[1:], archpath): return True elif self.realize_path_archive(archive, root, virtpath[1:], archpath): return True else: if archive.exists(path): self.archive = archive self.real_path = path self.full_real_path = os.path.join(archpath, *path.split('/')) return True if not hastests: if self.realize_path_archive(archive, posixpath.join(root, 'tests'), virtpath, archpath, True): return True return False def open(self): if self.archive: file = self.archive.open(self.real_path) if hasattr(file, '__exit__'): return file else: return contextlib.closing(file) else: return open(self.real_path, 'rb') def copy(self, target): if self.archive: self.archive.extract(self.real_path, target) else: shutil.copy(self.real_path, target) def regexp(pattern, hastests=False, droplast=False): # FIXME: add test archives if not pattern: yield os.curdir, '' return dirname, basename = posixpath.split(pattern) if hastests: dirnames = regexp(dirname, True) else: dirnames = itertools.chain(regexp(dirname), regexp(posixpath.join(dirname, 'tests'), True, True)) reobj = re.compile(pattern + '$', re.UNICODE) for dirname, vdirname in dirnames: try: names = os.listdir(dirname) except OSError: continue for name in names: path = os.path.join(dirname, name) vpath = posixpath.join(vdirname, name) if re.match(reobj, vpath): yield path, vpath if not droplast else vdirname