diff files.py @ 50:4ea7133ac25c

Converted 2.00 into the default branch
author Oleg Oshmyan <chortos@inbox.lv>
date Sun, 19 Dec 2010 23:25:13 +0200
parents 2.00/files.py@8fec38b0dd6e
children 1914ae9cfdce
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/files.py	Sun Dec 19 23:25:13 2010 +0200
@@ -0,0 +1,250 @@
+#! /usr/bin/env python
+# Copyright (c) 2010 Chortos-2 <chortos@inbox.lv>
+
+"""File access routines and classes with support for archives."""
+
+from __future__ import division, with_statement
+
+try:
+	from compat import *
+except ImportError:
+	import __main__
+	__main__.import_error(sys.exc_info()[1])
+
+import contextlib, os, shutil, sys
+
+# You don't need to know about anything else.
+__all__ = 'File',
+
+# In these two variables, use full stops no matter what os.extsep is;
+# all full stops will be converted to os.extsep on the fly
+archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2'
+formats = {}
+
+class Archive(object):
+	__slots__ = 'file'
+	
+	if ABCMeta:
+		__metaclass__ = ABCMeta
+	
+	def __new__(cls, path):
+		"""
+		Create a new instance of the archive class corresponding
+		to the file name in the given path.
+		"""
+		if cls is not Archive:
+			return object.__new__(cls)
+		else:
+			# Do this by hand rather than through os.path.splitext
+			# because we support multi-dotted file name extensions
+			ext = path.partition(os.path.extsep)[2]
+			while ext:
+				if ext in formats:
+					return formats[ext](path)
+				ext = ext.partition(os.path.extsep)[2]
+			raise LookupError("unsupported archive file name extension in file name '%s'" % filename)
+	
+	@abstractmethod
+	def __init__(self, path): raise NotImplementedError
+	
+	@abstractmethod
+	def extract(self, name, target): raise NotImplementedError
+	
+	def __del__(self):
+		del self.file
+
+try:
+	import tarfile
+except ImportError:
+	TarArchive = None
+else:
+	class TarArchive(Archive):
+		__slots__ = '__namelist'
+		
+		def __init__(self, path):
+			self.file = tarfile.open(path)
+		
+		def extract(self, name, target):
+			member = self.file.getmember(name)
+			member.name = target
+			self.file.extract(member)
+		
+		# TODO: somehow automagically emulate universal line break support
+		def open(self, name):
+			return self.file.extractfile(name)
+		
+		def exists(self, queried_name):
+			if not hasattr(self, '__namelist'):
+				names = set()
+				for name in self.file.getnames():
+					cutname = name
+					while cutname:
+						names.add(cutname)
+						cutname = cutname.rpartition('/')[0]
+				self.__namelist = frozenset(names)
+			return queried_name in self.__namelist
+		
+		def __enter__(self):
+			if hasattr(self.file, '__enter__'):
+				self.file.__enter__()
+			return self
+		
+		def __exit__(self, exc_type, exc_value, traceback):
+			if hasattr(self.file, '__exit__'):
+				return self.file.__exit__(exc_type, exc_value, traceback)
+			elif exc_type is None:
+				self.file.close()
+			else:
+				# This code was shamelessly copied from tarfile.py of Python 2.7
+				if not self.file._extfileobj:
+					self.file.fileobj.close()
+				self.file.closed = True
+	
+	formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive
+
+try:
+	import zipfile
+except ImportError:
+	ZipArchive = None
+else:
+	class ZipArchive(Archive):
+		__slots__ = '__namelist'
+		
+		def __init__(self, path):
+			self.file = zipfile.ZipFile(path)
+		
+		def extract(self, name, target):
+			if os.path.isabs(target):
+				# To my knowledge, this is as portable as it gets
+				path = os.path.join(os.path.splitdrive(target)[0], os.path.sep)
+			else:
+				path = None
+			
+			member = self.file.getinfo(name)
+			member.filename = os.path.relpath(target, path)
+			# FIXME: 2.5 lacks ZipFile.extract
+			self.file.extract(member, path)
+		
+		def open(self, name):
+			return self.file.open(name, 'rU')
+		
+		def exists(self, queried_name):
+			if not hasattr(self, '__namelist'):
+				names = set()
+				for name in self.file.namelist():
+					cutname = name
+					while cutname:
+						names.add(cutname)
+						cutname = cutname.rpartition('/')[0]
+				self.__namelist = frozenset(names)
+			return queried_name in self.__namelist
+		
+		def __enter__(self):
+			if hasattr(self.file, '__enter__'):
+				self.file.__enter__()
+			return self
+		
+		def __exit__(self, exc_type, exc_value, traceback):
+			if hasattr(self.file, '__exit__'):
+				return self.file.__exit__(exc_type, exc_value, traceback)
+			else:
+				return self.file.close()
+	
+	formats['zip'] = ZipArchive
+
+# Remove unsupported archive formats and replace full stops
+# with the platform-dependent file name extension separator
+def issupported(filename, formats=formats):
+	ext = filename.partition('.')[2]
+	while ext:
+		if ext in formats: return True
+		ext = ext.partition('.')[2]
+	return False
+archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)]
+formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats))
+
+open_archives = {}
+
+def open_archive(path):
+	if path in open_archives:
+		return open_archives[path]
+	else:
+		open_archives[path] = archive = Archive(path)
+		return archive
+
+class File(object):
+	__slots__ = 'virtual_path', 'real_path', 'full_real_path', 'archive'
+	
+	def __init__(self, virtpath, allow_root=False, msg='test data'):
+		self.virtual_path = virtpath
+		self.archive = None
+		if not self.realize_path('', tuple(comp.replace('.', os.path.extsep) for comp in virtpath.split('/')), allow_root):
+			raise IOError("%s file '%s' could not be found" % (msg, virtpath))
+	
+	def realize_path(self, root, virtpath, allow_root=False, hastests=False):
+		if root and not os.path.exists(root):
+			return False
+		if len(virtpath) > 1:
+			if self.realize_path(os.path.join(root, virtpath[0]), virtpath[1:], allow_root, hastests):
+				return True
+			elif not hastests:
+				if self.realize_path(os.path.join(root, 'tests'), virtpath, allow_root, True):
+					return True
+				for archive in archives:
+					path = os.path.join(root, archive)
+					if os.path.exists(path):
+						if self.realize_path_archive(open_archive(path), '', virtpath, path):
+							return True
+			if self.realize_path(root, virtpath[1:], allow_root, hastests):
+				return True
+		else:
+			if not hastests:
+				path = os.path.join(root, 'tests', virtpath[0])
+				if os.path.exists(path):
+					self.full_real_path = self.real_path = path
+					return True
+				for archive in archives:
+					path = os.path.join(root, archive)
+					if os.path.exists(path):
+						if self.realize_path_archive(open_archive(path), '', virtpath, path):
+							return True
+			if hastests or allow_root:
+				path = os.path.join(root, virtpath[0])
+				if os.path.exists(path):
+					self.full_real_path = self.real_path = path
+					return True
+		return False
+	
+	def realize_path_archive(self, archive, root, virtpath, archpath):
+		if root and not archive.exists(root):
+			return False
+		if root: path = ''.join((root, '/', virtpath[0]))
+		else: path = virtpath[0]
+		if len(virtpath) > 1:
+			if self.realize_path_archive(archive, path, virtpath[1:], archpath):
+				return True
+			elif self.realize_path_archive(archive, root, virtpath[1:], archpath):
+				return True
+		else:
+			if archive.exists(path):
+				self.archive = archive
+				self.real_path = path
+				self.full_real_path = os.path.join(archpath, *path.split('/'))
+				return True
+		return False
+	
+	def open(self):
+		if self.archive:
+			file = self.archive.open(self.real_path)
+			if hasattr(file, '__exit__'):
+				return file
+			else:
+				return contextlib.closing(file)
+		else:
+			return open(self.real_path)
+	
+	def copy(self, target):
+		if self.archive:
+			self.archive.extract(self.real_path, target)
+		else:
+			shutil.copy(self.real_path, target)
\ No newline at end of file