Mercurial > ~astiob > upreckon > hgweb
view testcases.py @ 142:98bccf81db4d
win32.call() now trusts its own case.time_started
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Wed, 25 May 2011 00:27:12 +0100 |
parents | ed4035661b85 |
children |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement from compat import * import files, problem, config from __main__ import options import glob, re, sys, tempfile, time from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+') if options.autotime: # This is really a dirty hack that assumes that sleep() does not spend # the CPU time of the current process and that if clock() measures # wall-clock time, then it is more precise than time() is. Both these # assumptions are true on all platforms I have tested this on so far, # but I am not aware of any guarantee that they will both be true # on every other platform. c = time.clock() time.sleep(1) c = time.clock() - c if int(c + .5) == 1: clock = time.clock else: clock = time.time class DummySignalIgnorer(object): def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): pass signal_ignorer = DummySignalIgnorer() # win32 and unix are imported a bit later __all__ = ('TestCase', 'load_problem', 'TestCaseNotPassed', 'TimeLimitExceeded', 'CanceledByUser', 'WrongAnswer', 'NonZeroExitCode', 'CannotStartTestee', 'CannotStartValidator', 'CannotReadOutputFile', 'CannotReadInputFile', 'CannotReadAnswerFile', 'MemoryLimitExceeded', 'CPUTimeLimitExceeded', 'WallTimeLimitExceeded') # Exceptions class TestCaseNotPassed(Exception): __slots__ = () class TestCaseSkipped(TestCaseNotPassed): __slots__ = () class TimeLimitExceeded(TestCaseNotPassed): __slots__ = () class CPUTimeLimitExceeded(TimeLimitExceeded): __slots__ = () class WallTimeLimitExceeded(TimeLimitExceeded): __slots__ = () class MemoryLimitExceeded(TestCaseNotPassed): __slots__ = () class CanceledByUser(TestCaseNotPassed): __slots__ = () class WrongAnswer(TestCaseNotPassed): __slots__ = 'comment' def __init__(self, comment=''): self.comment = comment class NonZeroExitCode(TestCaseNotPassed): __slots__ = 'exitcode' def __init__(self, exitcode): self.exitcode = exitcode class ExceptionWrapper(TestCaseNotPassed): __slots__ = 'upstream' def __init__(self, upstream): self.upstream = upstream class CannotStartTestee(ExceptionWrapper): __slots__ = () class CannotStartValidator(ExceptionWrapper): __slots__ = () class CannotReadOutputFile(ExceptionWrapper): __slots__ = () class CannotReadInputFile(ExceptionWrapper): __slots__ = () class CannotReadAnswerFile(ExceptionWrapper): __slots__ = () # Import platform-specific code now that exception classes are defined try: from win32 import * except Exception: from unix import * # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', 'maxmemory', 'has_called_back', 'files_to_delete', 'cpu_time_limit_string', 'wall_time_limit_string', 'time_limit_string') has_ansfile = has_iofiles = False needs_realinname = True if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxcputime = case.problem.config.maxcputime case.maxwalltime = case.problem.config.maxwalltime case.maxmemory = case.problem.config.maxmemory if case.maxcputime: case.cpu_time_limit_string = '/%.3f' % case.maxcputime else: case.cpu_time_limit_string = '' if case.maxwalltime: case.wall_time_limit_string = '/%.3f' % case.maxwalltime else: case.wall_time_limit_string = '' if not isdummy: if case.needs_realinname: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: if case.needs_realinname: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] case.time_limit_string = case.wall_time_limit_string try: return case.test(callback) finally: now = clock() if getattr(case, 'time_started', None) is None: case.time_started = case.time_stopped = now elif getattr(case, 'time_stopped', None) is None: case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): # Note that native extensions clean up on their own # and never let this condition be satisfied if getattr(case, 'process', None) and case.process.returncode is None: kill(case.process) for name in case.files_to_delete: try: os.remove(name) except OSError: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class SkippedTestCase(TestCase): __slots__ = () def test(case, callback): raise TestCaseSkipped class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output case.open_outfile() with case.outfile.open() as refoutput: for line, refline in zip_longest(output, refoutput): if refline is not None and not isinstance(refline, basestring): line = bytes(line, sys.getdefaultencoding()) if line != refline: raise WrongAnswer return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) with signal_ignorer: comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () @property def has_iofiles(case): return (not case.problem.config.stdio or case.validator and not callable(case.validator)) @property def has_ansfile(case): return case.validator and not callable(case.validator) def test(case, callback): case.open_infile() if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: with open(inputdatafname) as infile: with tempfile.TemporaryFile('w+') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+') as outfile: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=STDOUT) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() try: output = open(case.problem.config.outname, 'rU') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () needs_realinname = False def cleanup(case): pass def test(case, callback): case.time_stopped = case.time_started = 0 case.has_called_back = True callback() try: output = open(case.problem.config.outname.replace('$', case.id), 'rU') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used. class DummyTestContext(problem.TestGroup): __slots__ = () def end(self): say('Sample total: %d/%d tests' % (self.ncorrect, self.ntotal)) return 0, 0, self.log def load_problem(prob, _types={'batch' : BatchTestCase, 'outonly' : OutputOnlyTestCase, 'bestout' : BestOutputTestCase, 'reactive': ReactiveTestCase}): # We will need to iterate over these configuration variables twice try: len(prob.config.dummies) except Exception: prob.config.dummies = tuple(prob.config.dummies) try: len(prob.config.tests) except Exception: prob.config.tests = tuple(prob.config.tests) if options.legacy: prob.config.usegroups = False newtests = [] for i, name in enumerate(prob.config.tests): # Same here; we'll need to iterate over them twice try: l = len(name) except Exception: try: name = tuple(name) except TypeError: name = (name,) l = len(name) if l > 1: prob.config.usegroups = True newtests.append(name) if prob.config.usegroups: prob.config.tests = newtests del newtests # Even if they have duplicate test identifiers, we must honour sequence pointmaps if isinstance(prob.config.pointmap, dict): def getpoints(i, j, k=None): try: return prob.config.pointmap[i] except KeyError: try: return prob.config.pointmap[None] except KeyError: return prob.config.maxexitcode or 1 elif prob.config.usegroups: def getpoints(i, j, k): try: return prob.config.pointmap[k][j] except LookupError: return prob.config.maxexitcode or 1 else: def getpoints(i, j): try: return prob.config.pointmap[j] except LookupError: return prob.config.maxexitcode or 1 # First get prob.cache.padoutput right, # then yield the actual test cases for i in prob.config.dummies: s = 'sample ' + str(i).zfill(prob.config.paddummies) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) if prob.config.usegroups: if not isinstance(prob.config.groupweight, dict): prob.config.groupweight = dict(enumerate(prob.config.groupweight)) for group in prob.config.tests: for i in group: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) if prob.config.dummies: yield DummyTestContext() for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) if (yield _types[prob.config.kind](prob, s, True, 0)): yield yield problem.test_context_end for k, group in enumerate(prob.config.tests): if not group: continue yield problem.TestGroup(prob.config.groupweight.get(k, prob.config.groupweight.get(None))) case_type = _types[prob.config.kind] for j, i in enumerate(group): s = str(i).zfill(prob.config.padtests) if not (yield case_type(prob, s, False, getpoints(i, j, k))): if options.skim: case_type = SkippedTestCase else: yield yield problem.test_context_end else: for i in prob.config.tests: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) if (yield _types[prob.config.kind](prob, s, True, 0)): yield for j, i in enumerate(prob.config.tests): s = str(i).zfill(prob.config.padtests) if (yield _types[prob.config.kind](prob, s, False, getpoints(i, j))): yield