view 2.00/testcases.py @ 40:af9c45708987

Cemented a decision previously being unsure about The mere presense of the tasknames configuration variable now always makes problem names to be printed. This is not new, but the old behaviour (only printing names if we test more than one problem), previously commented out, has now been removed altogether.
author Oleg Oshmyan <chortos@inbox.lv>
date Sun, 05 Dec 2010 14:34:24 +0100
parents 2b459f9743b4
children 164395af969d
line wrap: on
line source

#! /usr/bin/env python
# Copyright (c) 2010 Chortos-2 <chortos@inbox.lv>

from __future__ import division, with_statement

try:
	from compat import *
	import files, problem, config
except ImportError:
	import __main__
	__main__.import_error(sys.exc_info()[1])
else:
	from __main__ import clock, options

import glob, re, sys, tempfile, time
from subprocess import Popen, PIPE, STDOUT

import os
devnull = open(os.path.devnull, 'w+')

try:
	from signal import SIGTERM, SIGKILL
except ImportError:
	SIGTERM = 15
	SIGKILL = 9

try:
	from _subprocess import TerminateProcess
except ImportError:
	# CPython 2.5 does define _subprocess.TerminateProcess even though it is
	# not used in the subprocess module, but maybe something else does not
	try:
		import ctypes
		TerminateProcess = ctypes.windll.kernel32.TerminateProcess
	except (ImportError, AttributeError):
		TerminateProcess = None


# Do the hacky-wacky dark magic needed to catch presses of the Escape button.
# If only Python supported forcible termination of threads...
if not sys.stdin.isatty():
	canceled = init_canceled = lambda: False
	pause = None
else:
	try:
		# Windows has select() too, but it is not the select() we want
		import msvcrt
	except ImportError:
		try:
			import select, termios, tty, atexit
		except ImportError:
			# It cannot be helped!
			# Silently disable support for killing the program being tested
			canceled = init_canceled = lambda: False
			pause = None
		else:
			def cleanup(old=termios.tcgetattr(sys.stdin.fileno())):
				termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, old)
			atexit.register(cleanup)
			del cleanup
			tty.setcbreak(sys.stdin.fileno())
			def canceled(select=select.select, stdin=sys.stdin, read=sys.stdin.read):
				while select((stdin,), (), (), 0)[0]:
					if read(1) == '\33':
						return True
				return False
			def init_canceled():
				while select.select((sys.stdin,), (), (), 0)[0]:
					sys.stdin.read(1)
			def pause():
				sys.stdin.read(1)
	else:
		def canceled(kbhit=msvcrt.kbhit, getch=msvcrt.getch):
			while kbhit():
				c = getch()
				if c == '\33':
					return True
				elif c == '\0':
					# Let's hope no-one is fiddling with this
					getch()
			return False
		def init_canceled():
			while msvcrt.kbhit():
				msvcrt.getch()
		def pause():
			msvcrt.getch()


__all__ = ('TestCase', 'load_problem', 'TestCaseNotPassed',
           'TimeLimitExceeded', 'CanceledByUser', 'WrongAnswer',
           'NonZeroExitCode', 'CannotStartTestee',
           'CannotStartValidator', 'CannotReadOutputFile',
           'CannotReadInputFile', 'CannotReadAnswerFile')



# Exceptions

class TestCaseNotPassed(Exception): __slots__ = ()
class TimeLimitExceeded(TestCaseNotPassed): __slots__ = ()
class CanceledByUser(TestCaseNotPassed): __slots__ = ()

class WrongAnswer(TestCaseNotPassed):
	__slots__ = 'comment'
	def __init__(self, comment=''):
		self.comment = comment

class NonZeroExitCode(TestCaseNotPassed):
	__slots__ = 'exitcode'
	def __init__(self, exitcode):
		self.exitcode = exitcode

class ExceptionWrapper(TestCaseNotPassed):
	__slots__ = 'upstream'
	def __init__(self, upstream):
		self.upstream = upstream

class CannotStartTestee(ExceptionWrapper): __slots__ = ()
class CannotStartValidator(ExceptionWrapper): __slots__ = ()
class CannotReadOutputFile(ExceptionWrapper): __slots__ = ()
class CannotReadInputFile(ExceptionWrapper): __slots__ = ()
class CannotReadAnswerFile(ExceptionWrapper): __slots__ = ()



# Helper context managers

class CopyDeleting(object):
	__slots__ = 'case', 'file', 'name'
	
	def __init__(self, case, file, name):
		self.case = case
		self.file = file
		self.name = name
	
	def __enter__(self):
		if self.name:
			try:
				self.file.copy(self.name)
			except:
				try:
					self.__exit__(None, None, None)
				except:
					pass
				raise
	
	def __exit__(self, exc_type, exc_val, exc_tb):
		if self.name:
			self.case.files_to_delete.append(self.name)


class Copying(object):
	__slots__ = 'file', 'name'
	
	def __init__(self, file, name):
		self.file = file
		self.name = name
	
	def __enter__(self):
		if self.name:
			self.file.copy(self.name)
	
	def __exit__(self, exc_type, exc_val, exc_tb):
		pass



# Test case types

class TestCase(object):
	__slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points',
	             'process', 'time_started', 'time_stopped', 'time_limit_string',
	             'realinname', 'realoutname', 'maxtime', 'maxmemory',
	             'has_called_back', 'files_to_delete')
	
	if ABCMeta:
		__metaclass__ = ABCMeta
	
	def __init__(case, prob, id, isdummy, points):
		case.problem = prob
		case.id = id
		case.isdummy = isdummy
		case.points = points
		case.maxtime = case.problem.config.maxtime
		case.maxmemory = case.problem.config.maxmemory
		if case.maxtime:
			case.time_limit_string = '/%.3f' % case.maxtime
		else:
			case.time_limit_string = ''
		if not isdummy:
			case.realinname = case.problem.config.testcaseinname
			case.realoutname = case.problem.config.testcaseoutname
		else:
			case.realinname = case.problem.config.dummyinname
			case.realoutname = case.problem.config.dummyoutname
	
	@abstractmethod
	def test(case): raise NotImplementedError
	
	def __call__(case, callback):
		case.has_called_back = False
		case.files_to_delete = []
		try:
			return case.test(callback)
		finally:
			now = clock()
			if not getattr(case, 'time_started', None):
				case.time_started = case.time_stopped = now
			elif not getattr(case, 'time_stopped', None):
				case.time_stopped = now
			if not case.has_called_back:
				callback()
			case.cleanup()
	
	def cleanup(case):
		#if getattr(case, 'infile', None):
		#	case.infile.close()
		#if getattr(case, 'outfile', None):
		#	case.outfile.close()
		if getattr(case, 'process', None):
			# Try killing after three unsuccessful TERM attempts in a row
			# (except on Windows, where TERMing is killing)
			for i in range(3):
				try:
					try:
						case.process.terminate()
					except AttributeError:
						# Python 2.5
						if TerminateProcess and hasattr(proc, '_handle'):
							# Windows API
							TerminateProcess(proc._handle, 1)
						else:
							# POSIX
							os.kill(proc.pid, SIGTERM)
				except Exception:
					time.sleep(0)
					case.process.poll()
				else:
					case.process.wait()
					break
			else:
				# If killing the process is unsuccessful three times in a row,
				# just silently stop trying
				for i in range(3):
					try:
						try:
							case.process.kill()
						except AttributeError:
							# Python 2.5
							if TerminateProcess and hasattr(proc, '_handle'):
								# Windows API
								TerminateProcess(proc._handle, 1)
							else:
								# POSIX
								os.kill(proc.pid, SIGKILL)
					except Exception:
						time.sleep(0)
						case.process.poll()
					else:
						case.process.wait()
						break
		if case.files_to_delete:
			for name in case.files_to_delete:
				try:
					os.remove(name)
				except Exception:
					# It can't be helped
					pass
	
	def open_infile(case):
		try:
			case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id))))
		except IOError:
			e = sys.exc_info()[1]
			raise CannotReadInputFile(e)
	
	def open_outfile(case):
		try:
			case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id))))
		except IOError:
			e = sys.exc_info()[1]
			raise CannotReadAnswerFile(e)


class ValidatedTestCase(TestCase):
	__slots__ = 'validator'
	
	def __init__(case, *args):
		TestCase.__init__(case, *args)
		if not case.problem.config.tester:
			case.validator = None
		else:
			case.validator = case.problem.config.tester
	
	def validate(case, output):
		if not case.validator:
			# Compare the output with the reference output
			case.open_outfile()
			with case.outfile.open() as refoutput:
				for line, refline in zip_longest(output, refoutput):
					if refline is not None and not isinstance(refline, basestring):
						line = bytes(line, sys.getdefaultencoding())
					if line != refline:
						raise WrongAnswer
			return 1
		elif callable(case.validator):
			return case.validator(output)
		else:                 
			# Call the validator program
			output.close()
			if case.problem.config.ansname:
				case.open_outfile()
				case.outfile.copy(case.problem.config.ansname)
			try:
				case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1)
			except OSError:
				raise CannotStartValidator(sys.exc_info()[1])
			comment = case.process.communicate()[0].strip()
			match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment)
			if match:
				comment = comment[match.end():]
			if not case.problem.config.maxexitcode:
				if case.process.returncode:
					raise WrongAnswer(comment)
				else:
					return 1, comment
			else:
				return case.process.returncode / case.problem.config.maxexitcode, comment


class BatchTestCase(ValidatedTestCase):
	__slots__ = ()
	
	def test(case, callback):
		init_canceled()
		if sys.platform == 'win32' or not case.maxmemory:
			preexec_fn = None
		else:
			def preexec_fn():
				try:
					import resource
					maxmemory = int(case.maxmemory * 1048576)
					resource.setrlimit(resource.RLIMIT_AS, (maxmemory, maxmemory))
					# I would also set a CPU time limit but I do not want the time
					# that passes between the calls to fork and exec to be counted in
				except MemoryError:
					# We do not have enough memory for ourselves;
					# let the parent know about this
					raise
				except Exception:
					# Well, at least we tried
					pass
		case.open_infile()
		case.time_started = None
		if case.problem.config.stdio:
			if options.erase and not case.validator:
				# TODO: re-use the same file name if possible
				# FIXME: 2.5 lacks the delete parameter
				with tempfile.NamedTemporaryFile(delete=False) as f:
					inputdatafname = f.name
				contextmgr = CopyDeleting(case, case.infile, inputdatafname)
			else:
				inputdatafname = case.problem.config.inname
				contextmgr = Copying(case.infile, inputdatafname)
			with contextmgr:
				# FIXME: this U doesn't do anything good for the child process, does it?
				with open(inputdatafname, 'rU') as infile:
					with tempfile.TemporaryFile('w+') if options.erase and not case.validator else open(case.problem.config.outname, 'w+') as outfile:
						# TODO: make sure outfile.file is passed to Popen if needed
						try:
							try:
								case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1, preexec_fn=preexec_fn)
							except MemoryError:
								# If there is not enough memory for the forked test.py,
								# opt for silent dropping of the limit
								# TODO: show a warning somewhere
								case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1)
						except OSError:
							raise CannotStartTestee(sys.exc_info()[1])
						case.time_started = clock()
						time_next_check = case.time_started + .15
						if not case.maxtime:
							while True:
								exitcode, now = case.process.poll(), clock()
								if exitcode is not None:
									case.time_stopped = now
									break
								# For some reason (probably Microsoft's fault),
								# msvcrt.kbhit() is slow as hell
								else:
									if now >= time_next_check:
										if canceled():
											raise CanceledByUser
										else:
											time_next_check = now + .15
									time.sleep(0)
						else:
							time_end = case.time_started + case.maxtime
							while True:
								exitcode, now = case.process.poll(), clock()
								if exitcode is not None:
									case.time_stopped = now
									break
								elif now >= time_end:
									raise TimeLimitExceeded
								else:
									if now >= time_next_check:
										if canceled():
											raise CanceledByUser
										else:
											time_next_check = now + .15
									time.sleep(0)
						if config.globalconf.force_zero_exitcode and case.process.returncode:
							raise NonZeroExitCode(case.process.returncode)
						callback()
						case.has_called_back = True
						outfile.seek(0)
						return case.validate(outfile)
		else:
			case.infile.copy(case.problem.config.inname)
			try:
				try:
					case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT, preexec_fn=preexec_fn)
				except MemoryError:
					# If there is not enough memory for the forked test.py,
					# opt for silent dropping of the limit
					# TODO: show a warning somewhere
					case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT)
			except OSError:
				raise CannotStartTestee(sys.exc_info()[1])
			case.time_started = clock()
			time_next_check = case.time_started + .15
			if not case.maxtime:
				while True:
					exitcode, now = case.process.poll(), clock()
					if exitcode is not None:
						case.time_stopped = now
						break
					else:
						if now >= time_next_check:
							if canceled():
								raise CanceledByUser
							else:
								time_next_check = now + .15
						time.sleep(0)
			else:
				time_end = case.time_started + case.maxtime
				while True:
					exitcode, now = case.process.poll(), clock()
					if exitcode is not None:
						case.time_stopped = now
						break
					elif now >= time_end:
						raise TimeLimitExceeded
					else:
						if now >= time_next_check:
							if canceled():
								raise CanceledByUser
							else:
								time_next_check = now + .15
						time.sleep(0)
			if config.globalconf.force_zero_exitcode and case.process.returncode:
				raise NonZeroExitCode(case.process.returncode)
			callback()
			case.has_called_back = True
			with open(case.problem.config.outname, 'rU') as output:
				return case.validate(output)


# This is the only test case type not executing any programs to be tested
class OutputOnlyTestCase(ValidatedTestCase):
	__slots__ = ()
	def cleanup(case): pass

class BestOutputTestCase(ValidatedTestCase):
	__slots__ = ()

# This is the only test case type executing two programs simultaneously
class ReactiveTestCase(TestCase):
	__slots__ = ()
	# The basic idea is to launch the program to be tested and the grader
	# and to pipe their standard I/O from and to each other,
	# and then to capture the grader's exit code and use it
	# like the exit code of an output validator is used.


def load_problem(prob, _types={'batch'   : BatchTestCase,
                               'outonly' : OutputOnlyTestCase,
                               'bestout' : BestOutputTestCase,
                               'reactive': ReactiveTestCase}):
	# We will need to iterate over these configuration variables twice
	try:
		len(prob.config.dummies)
	except Exception:
		prob.config.dummies = tuple(prob.config.dummies)
	try:
		len(prob.config.tests)
	except Exception:
		prob.config.tests = tuple(prob.config.tests)
	
	if options.legacy:
		prob.config.usegroups = False
		prob.config.tests = list(prob.config.tests)
		for i, name in enumerate(prob.config.tests):
			# Same here; we'll need to iterate over them twice
			try:
				l = len(name)
			except Exception:
				try:
					name = tuple(name)
				except TypeError:
					name = (name,)
				l = len(name)
			if len(name) > 1:
				prob.config.usegroups = True
				break
			elif not len(name):
				prob.config.tests[i] = (name,)
	
	# First get prob.cache.padoutput right,
	# then yield the actual test cases
	for i in prob.config.dummies:
		s = 'sample ' + str(i).zfill(prob.config.paddummies)
		prob.cache.padoutput = max(prob.cache.padoutput, len(s))
	if prob.config.usegroups:
		for group in prob.config.tests:
			for i in group:
				s = str(i).zfill(prob.config.padtests)
				prob.cache.padoutput = max(prob.cache.padoutput, len(s))
		for i in prob.config.dummies:
			s = str(i).zfill(prob.config.paddummies)
			yield _types[prob.config.kind](prob, s, True, 0)
		for group in prob.config.tests:
			yield problem.TestGroup()
			for i in group:
				s = str(i).zfill(prob.config.padtests)
				yield _types[prob.config.kind](prob, s, False, prob.config.pointmap.get(i, prob.config.pointmap.get(None, prob.config.maxexitcode if prob.config.maxexitcode else 1)))
			yield problem.test_context_end
	else:
		for i in prob.config.tests:
			s = str(i).zfill(prob.config.padtests)
			prob.cache.padoutput = max(prob.cache.padoutput, len(s))
		for i in prob.config.dummies:
			s = str(i).zfill(prob.config.paddummies)
			yield _types[prob.config.kind](prob, s, True, 0)
		for i in prob.config.tests:
			s = str(i).zfill(prob.config.padtests)
			yield _types[prob.config.kind](prob, s, False, prob.config.pointmap.get(i, prob.config.pointmap.get(None, prob.config.maxexitcode if prob.config.maxexitcode else 1)))