diff upreckon/testcases.py @ 146:d5b6708c1955

Distutils support, reorganization and cleaning up * Removed command-line options -t and -u. * Reorganized code: o all modules are now in package upreckon; o TestCaseNotPassed and its descendants now live in a separate module exceptions; o load_problem now lives in module problem. * Commented out mentions of command-line option -c in --help. * Added a distutils-based setup.py.
author Oleg Oshmyan <chortos@inbox.lv>
date Sat, 28 May 2011 14:24:25 +0100
parents testcases.py@ed4035661b85
children 93bf6b333c99
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/upreckon/testcases.py	Sat May 28 14:24:25 2011 +0100
@@ -0,0 +1,295 @@
+# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>
+
+# TODO: copy the ansfile if not options.erase even if no validator is used
+
+from __future__ import division, with_statement
+
+from .compat import *
+from .exceptions import *
+from . import files, config
+from __main__ import options
+
+import re, sys, tempfile
+from subprocess import Popen, PIPE, STDOUT
+
+import os
+devnull = open(os.path.devnull, 'w+')
+
+class DummySignalIgnorer(object):
+	def __enter__(self): pass
+	def __exit__(self, exc_type, exc_value, traceback): pass
+signal_ignorer = DummySignalIgnorer()
+
+try:
+	from .win32 import *
+except Exception:
+	from .unix import *
+
+__all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase',
+           'OutputOnlyTestCase')
+
+
+
+# Helper context managers
+
+class CopyDeleting(object):
+	__slots__ = 'case', 'file', 'name'
+	
+	def __init__(self, case, file, name):
+		self.case = case
+		self.file = file
+		self.name = name
+	
+	def __enter__(self):
+		if self.name:
+			try:
+				self.file.copy(self.name)
+			except:
+				try:
+					self.__exit__(None, None, None)
+				except:
+					pass
+				raise
+	
+	def __exit__(self, exc_type, exc_val, exc_tb):
+		if self.name:
+			self.case.files_to_delete.append(self.name)
+
+
+class Copying(object):
+	__slots__ = 'file', 'name'
+	
+	def __init__(self, file, name):
+		self.file = file
+		self.name = name
+	
+	def __enter__(self):
+		if self.name:
+			self.file.copy(self.name)
+	
+	def __exit__(self, exc_type, exc_val, exc_tb):
+		pass
+
+
+
+# Test case types
+
+class TestCase(object):
+	__slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points',
+	             'process', 'time_started', 'time_stopped',
+	             'realinname', 'realoutname', 'maxcputime', 'maxwalltime',
+	             'maxmemory', 'has_called_back', 'files_to_delete',
+	             'cpu_time_limit_string', 'wall_time_limit_string',
+	             'time_limit_string')
+	has_ansfile = has_iofiles = False
+	needs_realinname = True
+	
+	if ABCMeta:
+		__metaclass__ = ABCMeta
+	
+	def __init__(case, prob, id, isdummy, points):
+		case.problem = prob
+		case.id = id
+		case.isdummy = isdummy
+		case.points = points
+		case.maxcputime = case.problem.config.maxcputime
+		case.maxwalltime = case.problem.config.maxwalltime
+		case.maxmemory = case.problem.config.maxmemory
+		if case.maxcputime:
+			case.cpu_time_limit_string = '/%.3f' % case.maxcputime
+		else:
+			case.cpu_time_limit_string = ''
+		if case.maxwalltime:
+			case.wall_time_limit_string = '/%.3f' % case.maxwalltime
+		else:
+			case.wall_time_limit_string = ''
+		if not isdummy:
+			if case.needs_realinname:
+				case.realinname = case.problem.config.testcaseinname
+			case.realoutname = case.problem.config.testcaseoutname
+		else:
+			if case.needs_realinname:
+				case.realinname = case.problem.config.dummyinname
+			case.realoutname = case.problem.config.dummyoutname
+	
+	@abstractmethod
+	def test(case):
+		raise NotImplementedError
+	
+	def __call__(case, callback):
+		case.has_called_back = False
+		case.files_to_delete = []
+		case.time_limit_string = case.wall_time_limit_string
+		try:
+			return case.test(callback)
+		finally:
+			now = clock()
+			if getattr(case, 'time_started', None) is None:
+				case.time_started = case.time_stopped = now
+			elif getattr(case, 'time_stopped', None) is None:
+				case.time_stopped = now
+			if not case.has_called_back:
+				callback()
+			case.cleanup()
+	
+	def cleanup(case):
+		# Note that native extensions clean up on their own
+		# and never let this condition be satisfied
+		if getattr(case, 'process', None) and case.process.returncode is None:
+			kill(case.process)
+		for name in case.files_to_delete:
+			try:
+				os.remove(name)
+			except OSError:
+				# It can't be helped
+				pass
+	
+	def open_infile(case):
+		try:
+			case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id))))
+		except IOError:
+			e = sys.exc_info()[1]
+			raise CannotReadInputFile(e)
+	
+	def open_outfile(case):
+		try:
+			case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id))))
+		except IOError:
+			e = sys.exc_info()[1]
+			raise CannotReadAnswerFile(e)
+
+
+class SkippedTestCase(TestCase):
+	__slots__ = ()
+	
+	def test(case, callback):
+		raise TestCaseSkipped
+
+
+class ValidatedTestCase(TestCase):
+	__slots__ = 'validator'
+	
+	def __init__(case, *args):
+		TestCase.__init__(case, *args)
+		if not case.problem.config.tester:
+			case.validator = None
+		else:
+			case.validator = case.problem.config.tester
+	
+	def validate(case, output):
+		if not case.validator:
+			# Compare the output with the reference output
+			case.open_outfile()
+			with case.outfile.open() as refoutput:
+				for line, refline in zip_longest(output, refoutput):
+					if refline is not None and not isinstance(refline, basestring):
+						line = bytes(line, sys.getdefaultencoding())
+					if line != refline:
+						raise WrongAnswer
+			return 1
+		elif callable(case.validator):
+			return case.validator(output)
+		else:                 
+			# Call the validator program
+			output.close()
+			if case.problem.config.ansname:
+				case.open_outfile()
+				case.outfile.copy(case.problem.config.ansname)
+			try:
+				case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1)
+			except OSError:
+				raise CannotStartValidator(sys.exc_info()[1])
+			with signal_ignorer:
+				comment = case.process.communicate()[0].strip()
+			match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment)
+			if match:
+				comment = comment[match.end():]
+			if not case.problem.config.maxexitcode:
+				if case.process.returncode:
+					raise WrongAnswer(comment)
+				else:
+					return 1, comment
+			else:
+				return case.process.returncode / case.problem.config.maxexitcode, comment
+
+
+class BatchTestCase(ValidatedTestCase):
+	__slots__ = ()
+	
+	@property
+	def has_iofiles(case):
+		return (not case.problem.config.stdio or
+		        case.validator and not callable(case.validator))
+	
+	@property
+	def has_ansfile(case):
+		return case.validator and not callable(case.validator)
+	
+	def test(case, callback):
+		case.open_infile()
+		if case.problem.config.stdio:
+			if options.erase and not case.validator or not case.problem.config.inname:
+				# TODO: re-use the same file name if possible
+				# FIXME: 2.5 lacks the delete parameter
+				with tempfile.NamedTemporaryFile(delete=False) as f:
+					inputdatafname = f.name
+				contextmgr = CopyDeleting(case, case.infile, inputdatafname)
+			else:
+				inputdatafname = case.problem.config.inname
+				contextmgr = Copying(case.infile, inputdatafname)
+			with contextmgr:
+				with open(inputdatafname) as infile:
+					with tempfile.TemporaryFile('w+') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+') as outfile:
+						call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull)
+						if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0:
+							raise NonZeroExitCode(case.process.returncode)
+						case.has_called_back = True
+						callback()
+						outfile.seek(0)
+						return case.validate(outfile)
+		else:
+			case.infile.copy(case.problem.config.inname)
+			call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=STDOUT)
+			if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0:
+				raise NonZeroExitCode(case.process.returncode)
+			case.has_called_back = True
+			callback()
+			try:
+				output = open(case.problem.config.outname, 'rU')
+			except IOError:
+				raise CannotReadOutputFile(sys.exc_info()[1])
+			with output as output:
+				return case.validate(output)
+
+
+# This is the only test case type not executing any programs to be tested
+class OutputOnlyTestCase(ValidatedTestCase):
+	__slots__ = ()
+	needs_realinname = False
+	
+	def cleanup(case):
+		pass
+	
+	def test(case, callback):
+		case.time_stopped = case.time_started = 0
+		case.has_called_back = True
+		callback()
+		try:
+			output = open(case.problem.config.outname.replace('$', case.id), 'rU')
+		except IOError:
+			raise CannotReadOutputFile(sys.exc_info()[1])
+		with output as output:
+			return case.validate(output)
+
+
+class BestOutputTestCase(ValidatedTestCase):
+	__slots__ = ()
+
+
+# This is the only test case type executing two programs simultaneously
+class ReactiveTestCase(TestCase):
+	__slots__ = ()
+	# The basic idea is to launch the program to be tested and the grader
+	# and to pipe their standard I/O from and to each other,
+	# and then to capture the grader's exit code and use it
+	# like the exit code of an output validator is used.