Mercurial > ~astiob > upreckon > hgweb
view upreckon/testcases.py @ 205:166a23999bf7
Added confvar okexitcodemask; changed the validator protocol
Callable validators now return three-tuples (number granted, bool correct,
str comment) instead of two-tuples (number granted, str comment). They are
still allowed to return single numbers.
Callable validators must now explicitly raise
upreckon.exceptions.WrongAnswer if they want the verdict to be Wrong
Answer rather than Partly Correct.
okexitcodemask specifies a bitmask ANDed with the exit code of the
external validator to get a boolean flag showing whether the answer is to
be marked as 'OK' rather than 'partly correct'. The bits covered by the
bitmask are reset to zeroes before devising the number of points granted
from the resulting number.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Wed, 17 Aug 2011 20:44:54 +0300 |
parents | 00c80bba7f13 |
children | 4edb6ef5a676 |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement from .compat import * from .exceptions import * from . import files, config from __main__ import options import re, sys, tempfile from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+b') class DummySignalIgnorer(object): def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): pass signal_ignorer = DummySignalIgnorer() try: from .win32 import * except Exception: from .unix import * __all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase', 'OutputOnlyTestCase') # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', 'maxmemory', 'has_called_back', 'files_to_delete', 'cpu_time_limit_string', 'wall_time_limit_string', 'time_limit_string') has_ansfile = has_iofiles = False needs_realinname = True if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxcputime = case.problem.config.maxcputime case.maxwalltime = case.problem.config.maxwalltime case.maxmemory = case.problem.config.maxmemory if case.maxcputime: case.cpu_time_limit_string = '/%.3f' % case.maxcputime else: case.cpu_time_limit_string = '' if case.maxwalltime: case.wall_time_limit_string = '/%.3f' % case.maxwalltime else: case.wall_time_limit_string = '' if not isdummy: if case.needs_realinname: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: if case.needs_realinname: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] case.time_limit_string = case.wall_time_limit_string try: return case.test(callback) finally: now = clock() if getattr(case, 'time_started', None) is None: case.time_started = case.time_stopped = now elif getattr(case, 'time_stopped', None) is None: case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): # Note that native extensions clean up on their own # and never let this condition be satisfied if getattr(case, 'process', None) and case.process.returncode is None: kill(case.process) for name in case.files_to_delete: try: os.remove(name) except OSError: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class SkippedTestCase(TestCase): __slots__ = () def test(case, callback): raise TestCaseSkipped class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output buffer = refbuffer = crlfhalf = refcrlfhalf = ''.encode() crlf = '\r\n'.encode('ascii') case.open_outfile() with case.outfile.open() as refoutput: while True: data = output.read(4096 - len(buffer)) refdata = refoutput.read(4096 - len(refbuffer)) if not case.problem.config.binary: data, refdata = crlfhalf + data, refcrlfhalf + refdata size, refsize = len(data), len(refdata) if data and data != crlfhalf and data[-1] == crlf[0]: size -= 1 crlfhalf = data[-1:] else: crlfhalf = ''.encode() if refdata and refdata != refcrlfhalf and refdata[-1] == crlf[0]: refsize -= 1 refcrlfhalf = refdata[-1:] else: refcrlfhalf = ''.encode() data = data[:size].replace(crlf, crlf[1:]) data = data.replace(crlf[:1], crlf[1:]) refdata = refdata[:refsize].replace(crlf, crlf[1:]) refdata = refdata.replace(crlf[:1], crlf[1:]) buffer += data refbuffer += refdata if not (buffer or refbuffer or crlfhalf or refcrlfhalf): break elif not buffer and not crlfhalf or not refbuffer and not refcrlfhalf: raise WrongAnswer size = min(len(buffer), len(refbuffer)) if buffer[:size] != refbuffer[:size]: raise WrongAnswer buffer, refbuffer = buffer[size:], refbuffer[size:] return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) with signal_ignorer: comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, True, comment else: if case.problem.config.okexitcodeflag: correct = bool(case.process.returncode & case.problem.config.okexitcodeflag) case.process.returncode &= ~case.problem.config.okexitcodeflag else: correct = case.process.returncode >= case.problem.config.maxexitcode if not correct and not case.process.returncode: raise WrongAnswer(comment) return case.process.returncode / case.problem.config.maxexitcode, correct, comment class BatchTestCase(ValidatedTestCase): __slots__ = () @property def has_iofiles(case): return (not case.problem.config.stdio or case.validator and not callable(case.validator)) @property def has_ansfile(case): return case.validator and not callable(case.validator) def test(case, callback): case.open_infile() if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: with tempfile.TemporaryFile('w+b') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+b') as outfile: with open(inputdatafname) as infile: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull) if case.problem.config.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=devnull) if case.problem.config.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() try: output = open(case.problem.config.outname, 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () needs_realinname = False def cleanup(case): pass def test(case, callback): case.time_stopped = case.time_started = 0 case.has_called_back = True callback() try: output = open(case.problem.config.outname.replace('$', case.id), 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used.