view upreckon/files.py @ 246:1bc89faac941 2.04

Fixed: match='re' could produce duplicate test identifiers files.Files.regexp(pattern) now makes sure to return only one metafile for each matching virtual path, namely, the one that would be returned for that virtual path by files.Files.from_virtual_path.
author Oleg Oshmyan <chortos@inbox.lv>
date Thu, 03 Oct 2013 01:19:09 +0300
parents 7827e63cd148
children f5847d29e838
line wrap: on
line source

# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>

"""File access routines and classes with support for archives."""

from __future__ import division, with_statement

from .compat import *
import contextlib, os, posixpath, re, shutil

# You don't need to know about anything else.
__all__ = 'File', 'regexp'

# In these two variables, use full stops no matter what os.extsep is;
# all full stops will be converted to os.extsep on the fly
archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2'
formats = {}

class Archive(object):
	__slots__ = ()
	
	if ABCMeta:
		__metaclass__ = ABCMeta
	
	def __new__(cls, path):
		"""
		Create a new instance of the archive class corresponding
		to the file name in the given path.
		"""
		if cls is not Archive:
			return object.__new__(cls)
		else:
			# Do this by hand rather than through os.path.splitext
			# because we support multi-dotted file name extensions
			ext = path.partition(os.path.extsep)[2]
			while ext:
				if ext in formats:
					return formats[ext](path)
				ext = ext.partition(os.path.extsep)[2]
			raise LookupError("unsupported archive file name extension in file name '%s'" % filename)
	
	@abstractmethod
	def __init__(self, path): raise NotImplementedError
	
	@abstractmethod
	def extract(self, name, target): raise NotImplementedError
	
	@abstractmethod
	def open(self, name): raise NotImplementedError
	
	@abstractmethod
	def exists(self, name): raise NotImplementedError
	
	@abstractmethod
	def listdir(self, name): raise NotImplementedError

try:
	import tarfile
except ImportError:
	TarArchive = None
else:
	class TarArchive(Archive):
		__slots__ = '_tarfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._tarfile = tarfile.open(path)
			files, dirs = {}, set(('/',))
			for member in self._tarfile.getmembers():
				cutname = posixpath.normpath('/' + member.name)
				if cutname == '/':
					continue
				if member.isfile():
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				elif not member.isdir():
					continue
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			member.name = target
			self._tarfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			return self._tarfile.extractfile(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._tarfile, '__enter__'):
				self._tarfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._tarfile, '__exit__'):
				return self._tarfile.__exit__(exc_type, exc_value, traceback)
			elif exc_type is None:
				self._tarfile.close()
			else:
				# This code was shamelessly copied from tarfile.py of Python 2.7
				if not self._tarfile._extfileobj:
					self._tarfile.fileobj.close()
				self._tarfile.closed = True
	
	formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive

try:
	import zipfile
except ImportError:
	ZipArchive = None
else:
	class ZipArchive(Archive):
		__slots__ = '_zipfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._zipfile = zipfile.ZipFile(path)
			files, dirs = {}, set(('/',))
			for member in self._zipfile.infolist():
				cutname = posixpath.normpath('/' + member.filename)
				if not member.filename.endswith('/'):
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			# FIXME: 2.5 lacks ZipFile.extract
			if os.path.isabs(target):
				# To my knowledge, this is as portable as it gets
				path = os.path.join(os.path.splitdrive(target)[0], os.path.sep)
				member.filename = os.path.relpath(target, path)
				self._zipfile.extract(member, path)
			else:
				member.filename = os.path.relpath(target)
				self._zipfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			# FIXME: 2.5 lacks ZipFile.open
			return self._zipfile.open(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._zipfile, '__enter__'):
				self._zipfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._zipfile, '__exit__'):
				return self._zipfile.__exit__(exc_type, exc_value, traceback)
			else:
				return self._zipfile.close()
	
	formats['zip'] = ZipArchive

# Remove unsupported archive formats and replace full stops
# with the platform-dependent file name extension separator
def issupported(filename, formats=formats):
	ext = filename.partition('.')[2]
	while ext:
		if ext in formats: return True
		ext = ext.partition('.')[2]
	return False
archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)]
formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats))

open_archives = {}

def open_archive(path):
	if path in open_archives:
		return open_archives[path]
	else:
		open_archives[path] = archive = Archive(path)
		return archive

class File(object):
	__slots__ = ('virtual_path', 'archive',
	             '_external_path', '_internal_path', '_has_tests')
	
	def __init__(self, _virtual_path='', _external_path='', _internal_path='',
	             _archive=None, _has_tests=False):
		self.virtual_path = _virtual_path
		self._external_path = _external_path
		self._internal_path = _internal_path
		self.archive = _archive
		self._has_tests = _has_tests
		if not _archive:
			try:
				self.archive = open_archive(_external_path)
			except Exception:
				pass
	
	@property
	def full_real_path(self):
		intpath = self._internal_path.split('/') if self._internal_path else ()
		return os.path.join(self._external_path,
		                    *(filename.replace('.', os.path.extsep)
		                      for filename in intpath))
	
	def exists(self):
		if self.archive:
			return self.archive.exists(self._internal_path)
		else:
			return (not self._external_path or
			        os.path.exists(self._external_path))
	
	def open(self):
		if self.archive:
			file = self.archive.open(self._internal_path)
			if hasattr(file, '__exit__'):
				return file
			else:
				return contextlib.closing(file)
		else:
			return open(self._external_path, 'rb')
	
	def copy(self, target):
		if self.archive:
			self.archive.extract(self._internal_path, target)
		else:
			shutil.copy(self._external_path, target)
	
	def listdir(self):
		if self.archive:
			return self.archive.listdir(self._internal_path)
		else:
			return os.listdir(self._external_path)
	
	@classmethod
	def from_virtual_path(cls, virtual_path, allow_root, msg):
		metafile = cls()._realize_path(virtual_path.split('/'), allow_root)
		if not metafile:
			raise IOError("%s file with virtual path %r could not be found" %
			              (msg, virtual_path))
		assert metafile.virtual_path == virtual_path
		return metafile
	
	def _realize_path(self, virtpath, allow_root):
		if not self.exists():
			return None
		elif not virtpath:
			if allow_root or self._has_tests or self.archive:
				return self
			return None
		cand = (self + virtpath[0])._realize_path(virtpath[1:], allow_root)
		if cand: return cand
		if not cand and not self._has_tests:
			for metafile in self._add_tests():
				cand = metafile._realize_path(virtpath, allow_root)
				if cand: return cand
		if not cand and len(virtpath) > 1:
			metafile = self._add_virtual(virtpath[0])
			cand = metafile._realize_path(virtpath[1:], allow_root)
			if cand: return cand
	
	def _add_tests(self):
		assert not self._has_tests
		metafile = self.__add__('tests', False)
		metafile._has_tests = True
		yield metafile
		if not self.archive:
			for filename in archives:
				yield self.__add__(filename, False)
	
	def _also_add_tests(self):
		yield self
		if not self._has_tests:
			for metafile in self._add_tests():
				yield metafile
	
	def _add_virtual(self, filename):
		return File(posixpath.join(self.virtual_path, filename),
		            self._external_path,
		            self._internal_path,
		            self.archive,
		            self._has_tests)
	
	def __add__(self, filename, add_virtual=True):
		if not isinstance(filename, basestring):
			return NotImplemented
		if add_virtual:
			virtual_path = posixpath.join(self.virtual_path, filename)
		else:
			virtual_path = self.virtual_path
		if self.archive:
			return File(virtual_path,
			            self._external_path,
			            posixpath.join(self._internal_path, filename),
			            self.archive,
			            self._has_tests)
		else:
			filename = filename.replace('.', os.path.extsep)
			return File(virtual_path,
			            os.path.join(self._external_path, filename),
			            _has_tests=self._has_tests)
	
	@classmethod
	def regexp(cls, pattern, _unique=True):
		if not pattern:
			yield cls('', os.curdir)
			return
		dirname, basename = posixpath.split(pattern)
		dirs = cls.regexp(dirname, False)
		reobj = re.compile(pattern + '$', re.UNICODE)
		if _unique:
			yielded = set()
		while dirs:
			newdirs = []
			for testless_directory in dirs:
				for directory in testless_directory._also_add_tests():
					try:
						names = directory.listdir()
					except Exception:
						continue
					for name in names:
						dir_entry = directory + name
						if re.match(reobj, dir_entry.virtual_path):
							if not _unique:
								yield dir_entry
							elif dir_entry.virtual_path not in yielded:
								yield dir_entry
								yielded.add(dir_entry.virtual_path)
			dirs = newdirs