view upreckon/files.py @ 209:c03a8113685d

Rewrote files.regexp as files.File.regexp (a class method) I still dislike the iterative code though.
author Oleg Oshmyan <chortos@inbox.lv>
date Thu, 18 Aug 2011 02:41:46 +0300
parents ede78fbd509a
children 1cbe2c428942
line wrap: on
line source

# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>

"""File access routines and classes with support for archives."""

from __future__ import division, with_statement

from .compat import *
import contextlib, itertools, os, posixpath, re, shutil, sys

# You don't need to know about anything else.
__all__ = 'File', 'regexp'

# In these two variables, use full stops no matter what os.extsep is;
# all full stops will be converted to os.extsep on the fly
archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2'
formats = {}

class Archive(object):
	__slots__ = ()
	
	if ABCMeta:
		__metaclass__ = ABCMeta
	
	def __new__(cls, path):
		"""
		Create a new instance of the archive class corresponding
		to the file name in the given path.
		"""
		if cls is not Archive:
			return object.__new__(cls)
		else:
			# Do this by hand rather than through os.path.splitext
			# because we support multi-dotted file name extensions
			ext = path.partition(os.path.extsep)[2]
			while ext:
				if ext in formats:
					return formats[ext](path)
				ext = ext.partition(os.path.extsep)[2]
			raise LookupError("unsupported archive file name extension in file name '%s'" % filename)
	
	@abstractmethod
	def __init__(self, path): raise NotImplementedError
	
	@abstractmethod
	def extract(self, name, target): raise NotImplementedError
	
	@abstractmethod
	def open(self, name): raise NotImplementedError
	
	@abstractmethod
	def exists(self, name): raise NotImplementedError
	
	@abstractmethod
	def listdir(self, name): raise NotImplementedError

try:
	import tarfile
except ImportError:
	TarArchive = None
else:
	class TarArchive(Archive):
		__slots__ = '_tarfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._tarfile = tarfile.open(path)
			files, dirs = {}, set(('/',))
			for member in self._tarfile.getmembers():
				cutname = posixpath.normpath('/' + member.name)
				if cutname == '/':
					continue
				if member.isfile():
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				elif not member.isdir():
					continue
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			member.name = target
			self._tarfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			return self._tarfile.extractfile(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._tarfile, '__enter__'):
				self._tarfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._tarfile, '__exit__'):
				return self._tarfile.__exit__(exc_type, exc_value, traceback)
			elif exc_type is None:
				self._tarfile.close()
			else:
				# This code was shamelessly copied from tarfile.py of Python 2.7
				if not self._tarfile._extfileobj:
					self._tarfile.fileobj.close()
				self._tarfile.closed = True
	
	formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive

try:
	import zipfile
except ImportError:
	ZipArchive = None
else:
	class ZipArchive(Archive):
		__slots__ = '_zipfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._zipfile = zipfile.ZipFile(path)
			files, dirs = {}, set(('/',))
			for member in self._zipfile.infolist():
				cutname = posixpath.normpath('/' + member.filename)
				if not member.filename.endswith('/'):
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			# FIXME: 2.5 lacks ZipFile.extract
			if os.path.isabs(target):
				# To my knowledge, this is as portable as it gets
				path = os.path.join(os.path.splitdrive(target)[0], os.path.sep)
				member.filename = os.path.relpath(target, path)
				self._zipfile.extract(member, path)
			else:
				member.filename = os.path.relpath(target)
				self._zipfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			# FIXME: 2.5 lacks ZipFile.open
			return self._zipfile.open(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._zipfile, '__enter__'):
				self._zipfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._zipfile, '__exit__'):
				return self._zipfile.__exit__(exc_type, exc_value, traceback)
			else:
				return self._zipfile.close()
	
	formats['zip'] = ZipArchive

# Remove unsupported archive formats and replace full stops
# with the platform-dependent file name extension separator
def issupported(filename, formats=formats):
	ext = filename.partition('.')[2]
	while ext:
		if ext in formats: return True
		ext = ext.partition('.')[2]
	return False
archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)]
formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats))

open_archives = {}

def open_archive(path):
	if path in open_archives:
		return open_archives[path]
	else:
		open_archives[path] = archive = Archive(path)
		return archive

class File(object):
	__slots__ = ('virtual_path', 'archive',
	             '_external_path', '_internal_path', '_has_tests')
	
	def __init__(self, _virtual_path='', _external_path='', _internal_path='',
	             _archive=None, _has_tests=False):
		self.virtual_path = _virtual_path
		self._external_path = _external_path
		self._internal_path = _internal_path
		self.archive = _archive
		self._has_tests = _has_tests
		if not _archive:
			try:
				self.archive = open_archive(_external_path)
			except Exception:
				pass
	
	@property
	def full_real_path(self):
		intpath = self._internal_path.split('/') if self._internal_path else ()
		return os.path.join(self._external_path,
		                    *(filename.replace('.', os.path.extsep)
	                          for filename in intpath))
	
	def exists(self):
		if self.archive:
			return self.archive.exists(self._internal_path)
		else:
			return (not self._external_path or
			        os.path.exists(self._external_path))
	
	def open(self):
		if self.archive:
			file = self.archive.open(self._internal_path)
			if hasattr(file, '__exit__'):
				return file
			else:
				return contextlib.closing(file)
		else:
			return open(self._external_path, 'rb')
	
	def copy(self, target):
		if self.archive:
			self.archive.extract(self._internal_path, target)
		else:
			shutil.copy(self._external_path, target)
	
	def listdir(self):
		if self.archive:
			return self.archive.listdir(self._internal_path)
		else:
			return os.listdir(self._external_path)
	
	@classmethod
	def from_virtual_path(cls, virtual_path, allow_root, msg):
		metafile = cls()._realize_path(virtual_path.split('/'), allow_root)
		if not metafile:
			raise IOError("%s file with virtual path %r could not be found" %
			              (msg, virtual_path))
		assert metafile.virtual_path == virtual_path
		return metafile
	
	def _realize_path(self, virtpath, allow_root):
		if not self.exists():
			return None
		elif not virtpath:
			if allow_root or self._has_tests or self.archive:
				return self
			return None
		cand = (self + virtpath[0])._realize_path(virtpath[1:], allow_root)
		if cand: return cand
		if not cand and not self._has_tests:
			for metafile in self._add_tests():
				cand = metafile._realize_path(virtpath, allow_root)
				if cand: return cand
		if not cand and len(virtpath) > 1:
			metafile = self._add_virtual(virtpath[0])
			cand = metafile._realize_path(virtpath[1:], allow_root)
			if cand: return cand
	
	def _add_tests(self):
		assert not self._has_tests
		metafile = self.__add__('tests', False)
		metafile._has_tests = True
		yield metafile
		if not self.archive:
			for filename in archives:
				yield self.__add__(filename, False)
	
	def _add_virtual(self, filename):
		return File(posixpath.join(self.virtual_path, filename),
		            self._external_path,
		            self._internal_path,
		            self.archive,
		            self._has_tests)
	
	def __add__(self, filename, add_virtual=True):
		if not isinstance(filename, basestring):
			return NotImplemented
		if add_virtual:
			virtual_path = posixpath.join(self.virtual_path, filename)
		else:
			virtual_path = self.virtual_path
		if self.archive:
			return File(virtual_path,
			            self._external_path,
			            posixpath.join(self._internal_path, filename),
			            self.archive,
			            self._has_tests)
		else:
			filename = filename.replace('.', os.path.extsep)
			return File(virtual_path,
			            os.path.join(self._external_path, filename),
			            _has_tests=self._has_tests)
	
	@classmethod
	def regexp(cls, pattern):
		if not pattern:
			yield cls('', os.curdir)
			return
		dirname, basename = posixpath.split(pattern)
		dirs = cls.regexp(dirname)
		reobj = re.compile(pattern + '$', re.UNICODE)
		while dirs:
			newdirs = []
			for directory in dirs:
				try:
					names = directory.listdir()
				except Exception:
					continue
				for name in names:
					dir_entry = directory + name
					if re.match(reobj, dir_entry.virtual_path):
						yield dir_entry
					if not directory._has_tests:
						if name == 'tests':
							dir_entry = directory.__add__(name, False)
							dir_entry._has_tests = True
							newdirs.append(dir_entry)
						elif not directory.archive and name in archives:
							newdirs.append(directory.__add__(name, False))
			dirs = newdirs