Mercurial > ~astiob > upreckon > hgweb
diff upreckon/testcases.py @ 146:d5b6708c1955
Distutils support, reorganization and cleaning up
* Removed command-line options -t and -u.
* Reorganized code:
o all modules are now in package upreckon;
o TestCaseNotPassed and its descendants now live in a separate
module exceptions;
o load_problem now lives in module problem.
* Commented out mentions of command-line option -c in --help.
* Added a distutils-based setup.py.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Sat, 28 May 2011 14:24:25 +0100 |
parents | testcases.py@ed4035661b85 |
children | 93bf6b333c99 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/upreckon/testcases.py Sat May 28 14:24:25 2011 +0100 @@ -0,0 +1,295 @@ +# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> + +# TODO: copy the ansfile if not options.erase even if no validator is used + +from __future__ import division, with_statement + +from .compat import * +from .exceptions import * +from . import files, config +from __main__ import options + +import re, sys, tempfile +from subprocess import Popen, PIPE, STDOUT + +import os +devnull = open(os.path.devnull, 'w+') + +class DummySignalIgnorer(object): + def __enter__(self): pass + def __exit__(self, exc_type, exc_value, traceback): pass +signal_ignorer = DummySignalIgnorer() + +try: + from .win32 import * +except Exception: + from .unix import * + +__all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase', + 'OutputOnlyTestCase') + + + +# Helper context managers + +class CopyDeleting(object): + __slots__ = 'case', 'file', 'name' + + def __init__(self, case, file, name): + self.case = case + self.file = file + self.name = name + + def __enter__(self): + if self.name: + try: + self.file.copy(self.name) + except: + try: + self.__exit__(None, None, None) + except: + pass + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.name: + self.case.files_to_delete.append(self.name) + + +class Copying(object): + __slots__ = 'file', 'name' + + def __init__(self, file, name): + self.file = file + self.name = name + + def __enter__(self): + if self.name: + self.file.copy(self.name) + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + + +# Test case types + +class TestCase(object): + __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', + 'process', 'time_started', 'time_stopped', + 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', + 'maxmemory', 'has_called_back', 'files_to_delete', + 'cpu_time_limit_string', 'wall_time_limit_string', + 'time_limit_string') + has_ansfile = has_iofiles = False + needs_realinname = True + + if ABCMeta: + __metaclass__ = ABCMeta + + def __init__(case, prob, id, isdummy, points): + case.problem = prob + case.id = id + case.isdummy = isdummy + case.points = points + case.maxcputime = case.problem.config.maxcputime + case.maxwalltime = case.problem.config.maxwalltime + case.maxmemory = case.problem.config.maxmemory + if case.maxcputime: + case.cpu_time_limit_string = '/%.3f' % case.maxcputime + else: + case.cpu_time_limit_string = '' + if case.maxwalltime: + case.wall_time_limit_string = '/%.3f' % case.maxwalltime + else: + case.wall_time_limit_string = '' + if not isdummy: + if case.needs_realinname: + case.realinname = case.problem.config.testcaseinname + case.realoutname = case.problem.config.testcaseoutname + else: + if case.needs_realinname: + case.realinname = case.problem.config.dummyinname + case.realoutname = case.problem.config.dummyoutname + + @abstractmethod + def test(case): + raise NotImplementedError + + def __call__(case, callback): + case.has_called_back = False + case.files_to_delete = [] + case.time_limit_string = case.wall_time_limit_string + try: + return case.test(callback) + finally: + now = clock() + if getattr(case, 'time_started', None) is None: + case.time_started = case.time_stopped = now + elif getattr(case, 'time_stopped', None) is None: + case.time_stopped = now + if not case.has_called_back: + callback() + case.cleanup() + + def cleanup(case): + # Note that native extensions clean up on their own + # and never let this condition be satisfied + if getattr(case, 'process', None) and case.process.returncode is None: + kill(case.process) + for name in case.files_to_delete: + try: + os.remove(name) + except OSError: + # It can't be helped + pass + + def open_infile(case): + try: + case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) + except IOError: + e = sys.exc_info()[1] + raise CannotReadInputFile(e) + + def open_outfile(case): + try: + case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) + except IOError: + e = sys.exc_info()[1] + raise CannotReadAnswerFile(e) + + +class SkippedTestCase(TestCase): + __slots__ = () + + def test(case, callback): + raise TestCaseSkipped + + +class ValidatedTestCase(TestCase): + __slots__ = 'validator' + + def __init__(case, *args): + TestCase.__init__(case, *args) + if not case.problem.config.tester: + case.validator = None + else: + case.validator = case.problem.config.tester + + def validate(case, output): + if not case.validator: + # Compare the output with the reference output + case.open_outfile() + with case.outfile.open() as refoutput: + for line, refline in zip_longest(output, refoutput): + if refline is not None and not isinstance(refline, basestring): + line = bytes(line, sys.getdefaultencoding()) + if line != refline: + raise WrongAnswer + return 1 + elif callable(case.validator): + return case.validator(output) + else: + # Call the validator program + output.close() + if case.problem.config.ansname: + case.open_outfile() + case.outfile.copy(case.problem.config.ansname) + try: + case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) + except OSError: + raise CannotStartValidator(sys.exc_info()[1]) + with signal_ignorer: + comment = case.process.communicate()[0].strip() + match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) + if match: + comment = comment[match.end():] + if not case.problem.config.maxexitcode: + if case.process.returncode: + raise WrongAnswer(comment) + else: + return 1, comment + else: + return case.process.returncode / case.problem.config.maxexitcode, comment + + +class BatchTestCase(ValidatedTestCase): + __slots__ = () + + @property + def has_iofiles(case): + return (not case.problem.config.stdio or + case.validator and not callable(case.validator)) + + @property + def has_ansfile(case): + return case.validator and not callable(case.validator) + + def test(case, callback): + case.open_infile() + if case.problem.config.stdio: + if options.erase and not case.validator or not case.problem.config.inname: + # TODO: re-use the same file name if possible + # FIXME: 2.5 lacks the delete parameter + with tempfile.NamedTemporaryFile(delete=False) as f: + inputdatafname = f.name + contextmgr = CopyDeleting(case, case.infile, inputdatafname) + else: + inputdatafname = case.problem.config.inname + contextmgr = Copying(case.infile, inputdatafname) + with contextmgr: + with open(inputdatafname) as infile: + with tempfile.TemporaryFile('w+') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+') as outfile: + call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull) + if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: + raise NonZeroExitCode(case.process.returncode) + case.has_called_back = True + callback() + outfile.seek(0) + return case.validate(outfile) + else: + case.infile.copy(case.problem.config.inname) + call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=STDOUT) + if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: + raise NonZeroExitCode(case.process.returncode) + case.has_called_back = True + callback() + try: + output = open(case.problem.config.outname, 'rU') + except IOError: + raise CannotReadOutputFile(sys.exc_info()[1]) + with output as output: + return case.validate(output) + + +# This is the only test case type not executing any programs to be tested +class OutputOnlyTestCase(ValidatedTestCase): + __slots__ = () + needs_realinname = False + + def cleanup(case): + pass + + def test(case, callback): + case.time_stopped = case.time_started = 0 + case.has_called_back = True + callback() + try: + output = open(case.problem.config.outname.replace('$', case.id), 'rU') + except IOError: + raise CannotReadOutputFile(sys.exc_info()[1]) + with output as output: + return case.validate(output) + + +class BestOutputTestCase(ValidatedTestCase): + __slots__ = () + + +# This is the only test case type executing two programs simultaneously +class ReactiveTestCase(TestCase): + __slots__ = () + # The basic idea is to launch the program to be tested and the grader + # and to pipe their standard I/O from and to each other, + # and then to capture the grader's exit code and use it + # like the exit code of an output validator is used.