view upreckon/testcases.py @ 205:166a23999bf7

Added confvar okexitcodemask; changed the validator protocol Callable validators now return three-tuples (number granted, bool correct, str comment) instead of two-tuples (number granted, str comment). They are still allowed to return single numbers. Callable validators must now explicitly raise upreckon.exceptions.WrongAnswer if they want the verdict to be Wrong Answer rather than Partly Correct. okexitcodemask specifies a bitmask ANDed with the exit code of the external validator to get a boolean flag showing whether the answer is to be marked as 'OK' rather than 'partly correct'. The bits covered by the bitmask are reset to zeroes before devising the number of points granted from the resulting number.
author Oleg Oshmyan <chortos@inbox.lv>
date Wed, 17 Aug 2011 20:44:54 +0300
parents 00c80bba7f13
children 4edb6ef5a676
line wrap: on
line source

# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>

# TODO: copy the ansfile if not options.erase even if no validator is used

from __future__ import division, with_statement

from .compat import *
from .exceptions import *
from . import files, config
from __main__ import options

import re, sys, tempfile
from subprocess import Popen, PIPE, STDOUT

import os
devnull = open(os.path.devnull, 'w+b')

class DummySignalIgnorer(object):
	def __enter__(self): pass
	def __exit__(self, exc_type, exc_value, traceback): pass
signal_ignorer = DummySignalIgnorer()

try:
	from .win32 import *
except Exception:
	from .unix import *

__all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase',
           'OutputOnlyTestCase')



# Helper context managers

class CopyDeleting(object):
	__slots__ = 'case', 'file', 'name'
	
	def __init__(self, case, file, name):
		self.case = case
		self.file = file
		self.name = name
	
	def __enter__(self):
		if self.name:
			try:
				self.file.copy(self.name)
			except:
				try:
					self.__exit__(None, None, None)
				except:
					pass
				raise
	
	def __exit__(self, exc_type, exc_val, exc_tb):
		if self.name:
			self.case.files_to_delete.append(self.name)


class Copying(object):
	__slots__ = 'file', 'name'
	
	def __init__(self, file, name):
		self.file = file
		self.name = name
	
	def __enter__(self):
		if self.name:
			self.file.copy(self.name)
	
	def __exit__(self, exc_type, exc_val, exc_tb):
		pass



# Test case types

class TestCase(object):
	__slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points',
	             'process', 'time_started', 'time_stopped',
	             'realinname', 'realoutname', 'maxcputime', 'maxwalltime',
	             'maxmemory', 'has_called_back', 'files_to_delete',
	             'cpu_time_limit_string', 'wall_time_limit_string',
	             'time_limit_string')
	has_ansfile = has_iofiles = False
	needs_realinname = True
	
	if ABCMeta:
		__metaclass__ = ABCMeta
	
	def __init__(case, prob, id, isdummy, points):
		case.problem = prob
		case.id = id
		case.isdummy = isdummy
		case.points = points
		case.maxcputime = case.problem.config.maxcputime
		case.maxwalltime = case.problem.config.maxwalltime
		case.maxmemory = case.problem.config.maxmemory
		if case.maxcputime:
			case.cpu_time_limit_string = '/%.3f' % case.maxcputime
		else:
			case.cpu_time_limit_string = ''
		if case.maxwalltime:
			case.wall_time_limit_string = '/%.3f' % case.maxwalltime
		else:
			case.wall_time_limit_string = ''
		if not isdummy:
			if case.needs_realinname:
				case.realinname = case.problem.config.testcaseinname
			case.realoutname = case.problem.config.testcaseoutname
		else:
			if case.needs_realinname:
				case.realinname = case.problem.config.dummyinname
			case.realoutname = case.problem.config.dummyoutname
	
	@abstractmethod
	def test(case):
		raise NotImplementedError
	
	def __call__(case, callback):
		case.has_called_back = False
		case.files_to_delete = []
		case.time_limit_string = case.wall_time_limit_string
		try:
			return case.test(callback)
		finally:
			now = clock()
			if getattr(case, 'time_started', None) is None:
				case.time_started = case.time_stopped = now
			elif getattr(case, 'time_stopped', None) is None:
				case.time_stopped = now
			if not case.has_called_back:
				callback()
			case.cleanup()
	
	def cleanup(case):
		# Note that native extensions clean up on their own
		# and never let this condition be satisfied
		if getattr(case, 'process', None) and case.process.returncode is None:
			kill(case.process)
		for name in case.files_to_delete:
			try:
				os.remove(name)
			except OSError:
				# It can't be helped
				pass
	
	def open_infile(case):
		try:
			case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id))))
		except IOError:
			e = sys.exc_info()[1]
			raise CannotReadInputFile(e)
	
	def open_outfile(case):
		try:
			case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id))))
		except IOError:
			e = sys.exc_info()[1]
			raise CannotReadAnswerFile(e)


class SkippedTestCase(TestCase):
	__slots__ = ()
	
	def test(case, callback):
		raise TestCaseSkipped


class ValidatedTestCase(TestCase):
	__slots__ = 'validator'
	
	def __init__(case, *args):
		TestCase.__init__(case, *args)
		if not case.problem.config.tester:
			case.validator = None
		else:
			case.validator = case.problem.config.tester
	
	def validate(case, output):
		if not case.validator:
			# Compare the output with the reference output
			buffer = refbuffer = crlfhalf = refcrlfhalf = ''.encode()
			crlf = '\r\n'.encode('ascii')
			case.open_outfile()
			with case.outfile.open() as refoutput:
				while True:
					data = output.read(4096 - len(buffer))
					refdata = refoutput.read(4096 - len(refbuffer))
					if not case.problem.config.binary:
						data, refdata = crlfhalf + data, refcrlfhalf + refdata
						size, refsize = len(data), len(refdata)
						if data and data != crlfhalf and data[-1] == crlf[0]:
							size -= 1
							crlfhalf = data[-1:]
						else:
							crlfhalf = ''.encode()
						if refdata and refdata != refcrlfhalf and refdata[-1] == crlf[0]:
							refsize -= 1
							refcrlfhalf = refdata[-1:]
						else:
							refcrlfhalf = ''.encode()
						data = data[:size].replace(crlf, crlf[1:])
						data = data.replace(crlf[:1], crlf[1:])
						refdata = refdata[:refsize].replace(crlf, crlf[1:])
						refdata = refdata.replace(crlf[:1], crlf[1:])
					buffer += data
					refbuffer += refdata
					if not (buffer or refbuffer or crlfhalf or refcrlfhalf):
						break
					elif not buffer and not crlfhalf or not refbuffer and not refcrlfhalf:
						raise WrongAnswer
					size = min(len(buffer), len(refbuffer))
					if buffer[:size] != refbuffer[:size]:
						raise WrongAnswer
					buffer, refbuffer = buffer[size:], refbuffer[size:]
			return 1
		elif callable(case.validator):
			return case.validator(output)
		else:
			# Call the validator program
			output.close()
			if case.problem.config.ansname:
				case.open_outfile()
				case.outfile.copy(case.problem.config.ansname)
			try:
				case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1)
			except OSError:
				raise CannotStartValidator(sys.exc_info()[1])
			with signal_ignorer:
				comment = case.process.communicate()[0].strip()
			match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment)
			if match:
				comment = comment[match.end():]
			if not case.problem.config.maxexitcode:
				if case.process.returncode:
					raise WrongAnswer(comment)
				else:
					return 1, True, comment
			else:
				if case.problem.config.okexitcodeflag:
					correct = bool(case.process.returncode & case.problem.config.okexitcodeflag)
					case.process.returncode &= ~case.problem.config.okexitcodeflag
				else:
					correct = case.process.returncode >= case.problem.config.maxexitcode
				if not correct and not case.process.returncode:
					raise WrongAnswer(comment)
				return case.process.returncode / case.problem.config.maxexitcode, correct, comment


class BatchTestCase(ValidatedTestCase):
	__slots__ = ()
	
	@property
	def has_iofiles(case):
		return (not case.problem.config.stdio or
		        case.validator and not callable(case.validator))
	
	@property
	def has_ansfile(case):
		return case.validator and not callable(case.validator)
	
	def test(case, callback):
		case.open_infile()
		if case.problem.config.stdio:
			if options.erase and not case.validator or not case.problem.config.inname:
				# TODO: re-use the same file name if possible
				# FIXME: 2.5 lacks the delete parameter
				with tempfile.NamedTemporaryFile(delete=False) as f:
					inputdatafname = f.name
				contextmgr = CopyDeleting(case, case.infile, inputdatafname)
			else:
				inputdatafname = case.problem.config.inname
				contextmgr = Copying(case.infile, inputdatafname)
			with contextmgr:
				with tempfile.TemporaryFile('w+b') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+b') as outfile:
					with open(inputdatafname) as infile:
						call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull)
					if case.problem.config.force_zero_exitcode and case.process.returncode or case.process.returncode < 0:
						raise NonZeroExitCode(case.process.returncode)
					case.has_called_back = True
					callback()
					outfile.seek(0)
					return case.validate(outfile)
		else:
			case.infile.copy(case.problem.config.inname)
			call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=devnull)
			if case.problem.config.force_zero_exitcode and case.process.returncode or case.process.returncode < 0:
				raise NonZeroExitCode(case.process.returncode)
			case.has_called_back = True
			callback()
			try:
				output = open(case.problem.config.outname, 'rb')
			except IOError:
				raise CannotReadOutputFile(sys.exc_info()[1])
			with output as output:
				return case.validate(output)


# This is the only test case type not executing any programs to be tested
class OutputOnlyTestCase(ValidatedTestCase):
	__slots__ = ()
	needs_realinname = False
	
	def cleanup(case):
		pass
	
	def test(case, callback):
		case.time_stopped = case.time_started = 0
		case.has_called_back = True
		callback()
		try:
			output = open(case.problem.config.outname.replace('$', case.id), 'rb')
		except IOError:
			raise CannotReadOutputFile(sys.exc_info()[1])
		with output as output:
			return case.validate(output)


class BestOutputTestCase(ValidatedTestCase):
	__slots__ = ()


# This is the only test case type executing two programs simultaneously
class ReactiveTestCase(TestCase):
	__slots__ = ()
	# The basic idea is to launch the program to be tested and the grader
	# and to pipe their standard I/O from and to each other,
	# and then to capture the grader's exit code and use it
	# like the exit code of an output validator is used.