view upreckon/files.py @ 205:166a23999bf7

Added confvar okexitcodemask; changed the validator protocol Callable validators now return three-tuples (number granted, bool correct, str comment) instead of two-tuples (number granted, str comment). They are still allowed to return single numbers. Callable validators must now explicitly raise upreckon.exceptions.WrongAnswer if they want the verdict to be Wrong Answer rather than Partly Correct. okexitcodemask specifies a bitmask ANDed with the exit code of the external validator to get a boolean flag showing whether the answer is to be marked as 'OK' rather than 'partly correct'. The bits covered by the bitmask are reset to zeroes before devising the number of points granted from the resulting number.
author Oleg Oshmyan <chortos@inbox.lv>
date Wed, 17 Aug 2011 20:44:54 +0300
parents 79f4f2fdeead
children ede78fbd509a
line wrap: on
line source

# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>

"""File access routines and classes with support for archives."""

from __future__ import division, with_statement

from .compat import *
import contextlib, itertools, os, posixpath, re, shutil, sys

# You don't need to know about anything else.
__all__ = 'File', 'regexp'

# In these two variables, use full stops no matter what os.extsep is;
# all full stops will be converted to os.extsep on the fly
archives = 'tests.tar', 'tests.zip', 'tests.tgz', 'tests.tar.gz', 'tests.tbz2', 'tests.tar.bz2'
formats = {}

class Archive(object):
	__slots__ = ()
	
	if ABCMeta:
		__metaclass__ = ABCMeta
	
	def __new__(cls, path):
		"""
		Create a new instance of the archive class corresponding
		to the file name in the given path.
		"""
		if cls is not Archive:
			return object.__new__(cls)
		else:
			# Do this by hand rather than through os.path.splitext
			# because we support multi-dotted file name extensions
			ext = path.partition(os.path.extsep)[2]
			while ext:
				if ext in formats:
					return formats[ext](path)
				ext = ext.partition(os.path.extsep)[2]
			raise LookupError("unsupported archive file name extension in file name '%s'" % filename)
	
	@abstractmethod
	def __init__(self, path): raise NotImplementedError
	
	@abstractmethod
	def extract(self, name, target): raise NotImplementedError
	
	@abstractmethod
	def open(self, name): raise NotImplementedError
	
	@abstractmethod
	def exists(self, name): raise NotImplementedError
	
	@abstractmethod
	def listdir(self, name): raise NotImplementedError

try:
	import tarfile
except ImportError:
	TarArchive = None
else:
	class TarArchive(Archive):
		__slots__ = '_tarfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._tarfile = tarfile.open(path)
			files, dirs = {}, set(('/',))
			for member in self._tarfile.getmembers():
				cutname = posixpath.normpath('/' + member.name)
				if cutname == '/':
					continue
				if member.isfile():
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				elif not member.isdir():
					continue
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			member.name = target
			self._tarfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			return self._tarfile.extractfile(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._tarfile, '__enter__'):
				self._tarfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._tarfile, '__exit__'):
				return self._tarfile.__exit__(exc_type, exc_value, traceback)
			elif exc_type is None:
				self._tarfile.close()
			else:
				# This code was shamelessly copied from tarfile.py of Python 2.7
				if not self._tarfile._extfileobj:
					self._tarfile.fileobj.close()
				self._tarfile.closed = True
	
	formats['tar'] = formats['tgz'] = formats['tar.gz'] = formats['tbz2'] = formats['tar.bz2'] = TarArchive

try:
	import zipfile
except ImportError:
	ZipArchive = None
else:
	class ZipArchive(Archive):
		__slots__ = '_zipfile', '_files', '_dirs', '_names'
		
		def __init__(self, path):
			self._zipfile = zipfile.ZipFile(path)
			files, dirs = {}, set(('/',))
			for member in self._zipfile.infolist():
				cutname = posixpath.normpath('/' + member.filename)
				if not member.filename.endswith('/'):
					files[cutname] = member
					cutname = posixpath.dirname(cutname)
				while cutname != '/':
					dirs.add(cutname)
					cutname = posixpath.dirname(cutname)
			self._files = files
			self._dirs = frozenset(dirs)
			self._names = self._dirs | frozenset(files)
		
		def extract(self, name, target):
			member = self._files[posixpath.normpath('/' + name)]
			# FIXME: 2.5 lacks ZipFile.extract
			if os.path.isabs(target):
				# To my knowledge, this is as portable as it gets
				path = os.path.join(os.path.splitdrive(target)[0], os.path.sep)
				member.filename = os.path.relpath(target, path)
				self._zipfile.extract(member, path)
			else:
				member.filename = os.path.relpath(target)
				self._zipfile.extract(member)
		
		def open(self, name):
			name = posixpath.normpath('/' + name)
			# FIXME: 2.5 lacks ZipFile.open
			return self._zipfile.open(self._files[name])
		
		def exists(self, name):
			return posixpath.normpath('/' + name) in self._names
		
		def listdir(self, name):
			normname = posixpath.normpath('/' + name)
			if normname not in self._dirs:
				raise KeyError('No such directory: %r' % name)
			if normname != '/':
				normname += '/'
			nnlen = len(normname)
			return [fname[nnlen:] for fname in self._names
			                      if fname.startswith(normname) and
			                         fname.find('/', nnlen) == -1]
		
		def __enter__(self):
			if hasattr(self._zipfile, '__enter__'):
				self._zipfile.__enter__()
			return self
		
		def __exit__(self, exc_type, exc_value, traceback):
			if hasattr(self._zipfile, '__exit__'):
				return self._zipfile.__exit__(exc_type, exc_value, traceback)
			else:
				return self._zipfile.close()
	
	formats['zip'] = ZipArchive

# Remove unsupported archive formats and replace full stops
# with the platform-dependent file name extension separator
def issupported(filename, formats=formats):
	ext = filename.partition('.')[2]
	while ext:
		if ext in formats: return True
		ext = ext.partition('.')[2]
	return False
archives = [filename.replace('.', os.path.extsep) for filename in filter(issupported, archives)]
formats = dict((item[0].replace('.', os.path.extsep), item[1]) for item in items(formats))

open_archives = {}

def open_archive(path):
	if path in open_archives:
		return open_archives[path]
	else:
		open_archives[path] = archive = Archive(path)
		return archive

class File(object):
	__slots__ = 'virtual_path', 'real_path', 'full_real_path', 'archive'
	
	def __init__(self, virtpath, allow_root=False, msg='test data'):
		self.virtual_path = virtpath
		self.archive = None
		if not self.realize_path('', tuple(comp.replace('.', os.path.extsep) for comp in virtpath.split('/')), allow_root):
			raise IOError("%s file '%s' could not be found" % (msg, virtpath))
	
	def realize_path(self, root, virtpath, allow_root=False, hastests=False):
		if root and not os.path.exists(root):
			return False
		if len(virtpath) > 1:
			if self.realize_path(os.path.join(root, virtpath[0]), virtpath[1:], allow_root, hastests):
				return True
			elif not hastests:
				if self.realize_path(os.path.join(root, 'tests'), virtpath, allow_root, True):
					return True
				for archive in archives:
					path = os.path.join(root, archive)
					if os.path.exists(path):
						if self.realize_path_archive(open_archive(path), '', virtpath, path):
							return True
			if self.realize_path(root, virtpath[1:], allow_root, hastests):
				return True
		else:
			if not hastests:
				path = os.path.join(root, 'tests', virtpath[0])
				if os.path.exists(path):
					self.full_real_path = self.real_path = path
					return True
				for archive in archives:
					path = os.path.join(root, archive)
					if os.path.exists(path):
						if self.realize_path_archive(open_archive(path), '', virtpath, path):
							return True
			if hastests or allow_root:
				path = os.path.join(root, virtpath[0])
				if os.path.exists(path):
					self.full_real_path = self.real_path = path
					return True
		return False
	
	def realize_path_archive(self, archive, root, virtpath, archpath, hastests=False):
		if root and not archive.exists(root):
			return False
		path = posixpath.join(root, virtpath[0])
		if len(virtpath) > 1:
			if self.realize_path_archive(archive, path, virtpath[1:], archpath):
				return True
			elif self.realize_path_archive(archive, root, virtpath[1:], archpath):
				return True
		else:
			if archive.exists(path):
				self.archive = archive
				self.real_path = path
				self.full_real_path = os.path.join(archpath, *path.split('/'))
				return True
		if not hastests:
			if self.realize_path_archive(archive, posixpath.join(root, 'tests'), virtpath, archpath, True):
				return True
		return False
	
	def open(self):
		if self.archive:
			file = self.archive.open(self.real_path)
			if hasattr(file, '__exit__'):
				return file
			else:
				return contextlib.closing(file)
		else:
			return open(self.real_path, 'rb')
	
	def copy(self, target):
		if self.archive:
			self.archive.extract(self.real_path, target)
		else:
			shutil.copy(self.real_path, target)

class RegexpMatchFile(object):
	__slots__ = 'virtual_path', 'real_path', 'hastests', 'archive'
	
	def __init__(self, virtual_path, real_path, hastests=False, archive=None):
		self.virtual_path = virtual_path
		self.real_path = real_path
		self.hastests = hastests
		self.archive = archive

def regexp(pattern):
	if not pattern:
		yield RegexpMatchFile('', os.curdir)
		return
	dirname, basename = posixpath.split(pattern)
	dirs = regexp(dirname)
	reobj = re.compile(pattern + '$', re.UNICODE)
	while dirs:
		newdirs = []
		for directory in dirs:
			if directory.archive:
				try:
					names = directory.archive.listdir(directory.real_path)
				except KeyError:
					continue
				join = posixpath.join
			else:
				try:
					names = os.listdir(directory.real_path)
				except OSError:
					continue
				join = posixpath.join
			for name in names:
				path = join(directory.real_path, name)
				vpath = posixpath.join(directory.virtual_path, name)
				if re.match(reobj, vpath):
					yield RegexpMatchFile(vpath, path, directory.hastests, directory.archive)
				if not directory.hastests:
					if name == 'tests':
						newdirs.append(RegexpMatchFile(directory.virtual_path, path, True, directory.archive))
					if not directory.archive and name in archives:
						newdirs.append(RegexpMatchFile(directory.virtual_path, '', False, open_archive(path)))
		dirs = newdirs