Mercurial > ~astiob > upreckon > hgweb
view upreckon/testcases.py @ 193:a76cdc26ba9d
Added conf. var. match and match='regexp' for non-archives
Specify match='regexp', and your tests and dummies will be treated
as regular expressions describing test case identifiers. Every file that
is in a suitable location and whose name matches {testcase,dummy}inname
and the given regexp will be treated as a file with test case input data.
You are free to use backreferences in the regexps, but group numbering
starts at two rather than one.
If you want test groups, you can get them magically created for you
by putting a part of the test ID in a group in the regexp sense
and specifying the tests variable as a pair consisting of the regexp
itself and the number of this regexp group (remember group numbers start
at two).
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Thu, 11 Aug 2011 23:20:52 +0300 |
parents | 760d38ee86d6 |
children | fa81289ee407 |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement from .compat import * from .exceptions import * from . import files, config from __main__ import options import re, sys, tempfile from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+b') class DummySignalIgnorer(object): def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): pass signal_ignorer = DummySignalIgnorer() try: from .win32 import * except Exception: from .unix import * __all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase', 'OutputOnlyTestCase') # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', 'maxmemory', 'has_called_back', 'files_to_delete', 'cpu_time_limit_string', 'wall_time_limit_string', 'time_limit_string') has_ansfile = has_iofiles = False needs_realinname = True if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxcputime = case.problem.config.maxcputime case.maxwalltime = case.problem.config.maxwalltime case.maxmemory = case.problem.config.maxmemory if case.maxcputime: case.cpu_time_limit_string = '/%.3f' % case.maxcputime else: case.cpu_time_limit_string = '' if case.maxwalltime: case.wall_time_limit_string = '/%.3f' % case.maxwalltime else: case.wall_time_limit_string = '' if not isdummy: if case.needs_realinname: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: if case.needs_realinname: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] case.time_limit_string = case.wall_time_limit_string try: return case.test(callback) finally: now = clock() if getattr(case, 'time_started', None) is None: case.time_started = case.time_stopped = now elif getattr(case, 'time_stopped', None) is None: case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): # Note that native extensions clean up on their own # and never let this condition be satisfied if getattr(case, 'process', None) and case.process.returncode is None: kill(case.process) for name in case.files_to_delete: try: os.remove(name) except OSError: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class SkippedTestCase(TestCase): __slots__ = () def test(case, callback): raise TestCaseSkipped class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output buffer = refbuffer = crlfhalf = refcrlfhalf = ''.encode() crlf = '\r\n'.encode('ascii') case.open_outfile() with case.outfile.open() as refoutput: while True: data = output.read(4096 - len(buffer)) refdata = refoutput.read(4096 - len(refbuffer)) if not case.problem.config.binary: data, refdata = crlfhalf + data, refcrlfhalf + refdata size, refsize = len(data), len(refdata) if data and data != crlfhalf and data[-1] == crlf[0]: size -= 1 crlfhalf = data[-1:] else: crlfhalf = ''.encode() if refdata and refdata != refcrlfhalf and refdata[-1] == crlf[0]: refsize -= 1 refcrlfhalf = refdata[-1:] else: refcrlfhalf = ''.encode() data = data[:size].replace(crlf, crlf[1:]) data = data.replace(crlf[:1], crlf[1:]) refdata = refdata[:refsize].replace(crlf, crlf[1:]) refdata = refdata.replace(crlf[:1], crlf[1:]) buffer += data refbuffer += refdata if not (buffer or refbuffer or crlfhalf or refcrlfhalf): break elif not buffer and not crlfhalf or not refbuffer and not refcrlfhalf: raise WrongAnswer size = min(len(buffer), len(refbuffer)) if buffer[:size] != refbuffer[:size]: raise WrongAnswer buffer, refbuffer = buffer[size:], refbuffer[size:] return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) with signal_ignorer: comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () @property def has_iofiles(case): return (not case.problem.config.stdio or case.validator and not callable(case.validator)) @property def has_ansfile(case): return case.validator and not callable(case.validator) def test(case, callback): case.open_infile() if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: with tempfile.TemporaryFile('w+b') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+b') as outfile: with open(inputdatafname) as infile: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=devnull) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() try: output = open(case.problem.config.outname, 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () needs_realinname = False def cleanup(case): pass def test(case, callback): case.time_stopped = case.time_started = 0 case.has_called_back = True callback() try: output = open(case.problem.config.outname.replace('$', case.id), 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used.