Mercurial > ~astiob > upreckon > hgweb
view testcases.py @ 82:06356af50bf9
Finished testcases reorganization and CPU time limit implementation
We now have:
* Win32-specific code in the win32 module (including bug fixes),
* UNIX-specific and generic code in the unix module,
* a much cleaner testcases module,
* wait4-based resource limits working on Python 3 (this is a bug fix),
* no warning/error reported on non-Win32 when -x is not passed
but standard input does not come from a terminal,
* the maxtime configuration variable replaced with two new variables
named maxcputime and maxwalltime,
* CPU time reported if it can be determined unless an error occurs sooner
than it is determined (e. g. if the wall-clock time limit is exceeded),
* memory limits enforced even if Upreckon's forking already breaks them,
* CPU time limits and private virtual memory limits honoured on Win32,
* CPU time limits honoured on UNIX(-like) platforms supporting wait4
or getrusage,
* address space limits honoured on UNIX(-like) platforms supporting
setrlimit with RLIMIT_AS/RLIMIT_VMEM,
* resident set size limits honoured on UNIX(-like) platforms supporting
wait4.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Wed, 23 Feb 2011 23:35:27 +0000 |
parents | 24752db487c5 |
children | 37c4ad87583c |
line wrap: on
line source
#! /usr/bin/env python # Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement try: from compat import * import files, problem, config except ImportError: import __main__ __main__.import_error(sys.exc_info()[1]) else: from __main__ import clock, options import glob, re, sys, tempfile, time from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+') try: from win32 import * except Exception: from unix import * __all__ = ('TestCase', 'load_problem', 'TestCaseNotPassed', 'TimeLimitExceeded', 'CanceledByUser', 'WrongAnswer', 'NonZeroExitCode', 'CannotStartTestee', 'CannotStartValidator', 'CannotReadOutputFile', 'CannotReadInputFile', 'CannotReadAnswerFile', 'MemoryLimitExceeded', 'CPUTimeLimitExceeded', 'WallTimeLimitExceeded') # Exceptions class TestCaseNotPassed(Exception): __slots__ = () class TimeLimitExceeded(TestCaseNotPassed): __slots__ = () class CPUTimeLimitExceeded(TimeLimitExceeded): __slots__ = () class WallTimeLimitExceeded(TimeLimitExceeded): __slots__ = () class MemoryLimitExceeded(TestCaseNotPassed): __slots__ = () class CanceledByUser(TestCaseNotPassed): __slots__ = () class WrongAnswer(TestCaseNotPassed): __slots__ = 'comment' def __init__(self, comment=''): self.comment = comment class NonZeroExitCode(TestCaseNotPassed): __slots__ = 'exitcode' def __init__(self, exitcode): self.exitcode = exitcode class ExceptionWrapper(TestCaseNotPassed): __slots__ = 'upstream' def __init__(self, upstream): self.upstream = upstream class CannotStartTestee(ExceptionWrapper): __slots__ = () class CannotStartValidator(ExceptionWrapper): __slots__ = () class CannotReadOutputFile(ExceptionWrapper): __slots__ = () class CannotReadInputFile(ExceptionWrapper): __slots__ = () class CannotReadAnswerFile(ExceptionWrapper): __slots__ = () # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', 'maxmemory', 'has_called_back', 'files_to_delete', 'cpu_time_limit_string', 'wall_time_limit_string', 'time_limit_string') if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxcputime = case.problem.config.maxcputime case.maxwalltime = case.problem.config.maxwalltime case.maxmemory = case.problem.config.maxmemory if case.maxcputime: case.cpu_time_limit_string = '/%.3f' % case.maxcputime else: case.cpu_time_limit_string = '' if case.maxwalltime: case.wall_time_limit_string = '/%.3f' % case.maxwalltime else: case.wall_time_limit_string = '' if not isdummy: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] case.time_limit_string = case.wall_time_limit_string try: return case.test(callback) finally: now = clock() if getattr(case, 'time_started', None) is None: case.time_started = case.time_stopped = now elif getattr(case, 'time_stopped', None) is None: case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): #if getattr(case, 'infile', None): # case.infile.close() #if getattr(case, 'outfile', None): # case.outfile.close() if getattr(case, 'process', None) and case.process.returncode is None: # Try KILLing after three unsuccessful TERM attempts in a row for i in range(3): try: terminate(case.process) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break else: # If killing the process is unsuccessful three times in a row, # just silently stop trying for i in range(3): try: kill(case.process) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break if case.files_to_delete: for name in case.files_to_delete: try: os.remove(name) except Exception: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output case.open_outfile() with case.outfile.open() as refoutput: for line, refline in zip_longest(output, refoutput): if refline is not None and not isinstance(refline, basestring): line = bytes(line, sys.getdefaultencoding()) if line != refline: raise WrongAnswer return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () def test(case, callback): case.open_infile() case.time_started = None if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: with open(inputdatafname) as infile: with tempfile.TemporaryFile('w+') if options.erase and not case.validator else open(case.problem.config.outname, 'w+') as outfile: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=STDOUT) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True with open(case.problem.config.outname, 'rU') as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () def cleanup(case): pass class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used. class DummyTestContext(problem.TestGroup): __slots__ = () def end(self): say('Sample total: %d/%d tests' % (self.ncorrect, self.ntotal)) return 0, 0, self.log def load_problem(prob, _types={'batch' : BatchTestCase, 'outonly' : OutputOnlyTestCase, 'bestout' : BestOutputTestCase, 'reactive': ReactiveTestCase}): # We will need to iterate over these configuration variables twice try: len(prob.config.dummies) except Exception: prob.config.dummies = tuple(prob.config.dummies) try: len(prob.config.tests) except Exception: prob.config.tests = tuple(prob.config.tests) if options.legacy: prob.config.usegroups = False newtests = [] for i, name in enumerate(prob.config.tests): # Same here; we'll need to iterate over them twice try: l = len(name) except Exception: try: name = tuple(name) except TypeError: name = (name,) l = len(name) if l > 1: prob.config.usegroups = True newtests.append(name) if prob.config.usegroups: prob.config.tests = newtests del newtests # Even if they have duplicate test identifiers, we must honour sequence pointmaps if isinstance(prob.config.pointmap, dict): def getpoints(i, j, k=None): try: return prob.config.pointmap[i] except KeyError: try: return prob.config.pointmap[None] except KeyError: return prob.config.maxexitcode or 1 elif prob.config.usegroups: def getpoints(i, j, k): try: return prob.config.pointmap[k][j] except LookupError: return prob.config.maxexitcode or 1 else: def getpoints(i, j): try: return prob.config.pointmap[j] except LookupError: return prob.config.maxexitcode or 1 # First get prob.cache.padoutput right, # then yield the actual test cases for i in prob.config.dummies: s = 'sample ' + str(i).zfill(prob.config.paddummies) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) if prob.config.usegroups: if not isinstance(prob.config.groupweight, dict): prob.config.groupweight = dict(enumerate(prob.config.groupweight)) for group in prob.config.tests: for i in group: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) yield DummyTestContext() for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) yield problem.test_context_end for k, group in enumerate(prob.config.tests): if not group: continue yield problem.TestGroup(prob.config.groupweight.get(k, prob.config.groupweight.get(None))) for j, i in enumerate(group): s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, getpoints(i, j, k)) yield problem.test_context_end else: for i in prob.config.tests: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) for j, i in enumerate(prob.config.tests): s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, getpoints(i, j))