Mercurial > ~astiob > upreckon > hgweb
view problem.py @ 105:9f922b11c98a
Absent output files no longer crash Upreckon
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Fri, 08 Apr 2011 17:45:53 +0300 |
parents | b7fb64ce03d9 |
children | 523ba6907f3a |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> from __future__ import division, with_statement from compat import * import config, testcases from __main__ import options import os, re, sys try: from collections import deque except ImportError: deque = list try: import signal except ImportError: signalnames = () else: # Construct a cache of all signal names available on the current # platform. Prefer names from the UNIX standards over other versions. unixnames = frozenset(('HUP', 'INT', 'QUIT', 'ILL', 'ABRT', 'FPE', 'KILL', 'SEGV', 'PIPE', 'ALRM', 'TERM', 'USR1', 'USR2', 'CHLD', 'CONT', 'STOP', 'TSTP', 'TTIN', 'TTOU', 'BUS', 'POLL', 'PROF', 'SYS', 'TRAP', 'URG', 'VTALRM', 'XCPU', 'XFSZ')) signalnames = {} for name in dir(signal): if re.match('SIG[A-Z]+$', name): value = signal.__dict__[name] if isinstance(value, int) and (value not in signalnames or name[3:] in unixnames): signalnames[value] = name del unixnames __all__ = 'Problem', 'TestContext', 'test_context_end', 'TestGroup' def strerror(e): s = getattr(e, 'strerror', e) if not s: s = str(e) return ' (%s%s)' % (s[0].lower(), s[1:]) if s else '' class Cache(object): def __init__(self, mydict): self.__dict__ = mydict class TestContext(object): __slots__ = () test_context_end = object() class TestGroup(TestContext): __slots__ = 'points', 'case', 'log', 'correct', 'allcorrect', 'real', 'max', 'ntotal', 'nvalued', 'ncorrect', 'ncorrectvalued' def __init__(self, points=None): self.points = points self.real = self.max = self.ntotal = self.nvalued = self.ncorrect = self.ncorrectvalued = 0 self.allcorrect = True self.log = [] def case_start(self, case): self.case = case self.correct = False self.ntotal += 1 if case.points: self.nvalued += 1 def case_correct(self): self.correct = True self.ncorrect += 1 if self.case.points: self.ncorrectvalued += 1 def case_end(self): self.log.append((self.case, self.correct)) del self.case if not self.correct: self.allcorrect = False def score(self, real, max): self.real += real self.max += max def end(self): if not self.allcorrect: self.real = 0 if self.points is not None and self.points != self.max: max, weighted = self.points, self.real * self.points / self.max if self.max else 0 before_weighting = ' (%g/%g before weighting)' % (self.real, self.max) else: max, weighted = self.max, self.real before_weighting = '' say('Group total: %d/%d tests, %g/%g points%s' % (self.ncorrect, self.ntotal, weighted, max, before_weighting)) # No real need to flush stdout, as it will anyway be flushed in a moment, # when either the problem total or the next test case's ID is printed return weighted, max, self.log class Problem(object): __slots__ = 'name', 'config', 'cache', 'testcases' def __init__(prob, name): if not isinstance(name, basestring): # This shouldn't happen, of course raise TypeError('Problem() argument 1 must be string, not ' + type(name).__name__) prob.name = name prob.config = config.load_problem(name) prob.cache = Cache({'padoutput': 0}) prob.testcases = testcases.load_problem(prob) # TODO def build(prob): raise NotImplementedError def test(prob): case = None try: contexts = deque((TestGroup(),)) for case in prob.testcases: if case is test_context_end: real, max, log = contexts.pop().end() for case, correct in log: contexts[-1].case_start(case) if correct: contexts[-1].case_correct() contexts[-1].case_end() contexts[-1].score(real, max) continue elif isinstance(case, TestContext): contexts.append(case) continue contexts[-1].case_start(case) granted = 0 id = str(case.id) if case.isdummy: id = 'sample ' + id say('%*s: ' % (prob.cache.padoutput, id), end='') sys.stdout.flush() try: granted = case(lambda: (say('%7.3f%s s, ' % (case.time_stopped - case.time_started, case.time_limit_string), end=''), sys.stdout.flush())) except testcases.TestCaseSkipped: verdict = 'skipped due to skimming mode' except testcases.CanceledByUser: verdict = 'canceled by the user' except testcases.WallTimeLimitExceeded: verdict = 'wall-clock time limit exceeded' except testcases.CPUTimeLimitExceeded: verdict = 'CPU time limit exceeded' except testcases.MemoryLimitExceeded: verdict = 'memory limit exceeded' except testcases.WrongAnswer: e = sys.exc_info()[1] if e.comment: verdict = 'wrong answer (%s)' % e.comment else: verdict = 'wrong answer' except testcases.NonZeroExitCode: e = sys.exc_info()[1] if e.exitcode < 0: if sys.platform == 'win32': verdict = 'terminated with error 0x%X' % (e.exitcode + 0x100000000) elif -e.exitcode in signalnames: verdict = 'terminated by signal %d (%s)' % (-e.exitcode, signalnames[-e.exitcode]) else: verdict = 'terminated by signal %d' % -e.exitcode else: verdict = 'non-zero return code %d' % e.exitcode except testcases.CannotStartTestee: verdict = 'cannot launch the program to test%s' % strerror(sys.exc_info()[1].upstream) except testcases.CannotStartValidator: verdict = 'cannot launch the validator%s' % strerror(sys.exc_info()[1].upstream) except testcases.CannotReadOutputFile: verdict = 'cannot read the output file%s' % strerror(sys.exc_info()[1].upstream) except testcases.CannotReadInputFile: verdict = 'cannot read the input file%s' % strerror(sys.exc_info()[1].upstream) except testcases.CannotReadAnswerFile: verdict = 'cannot read the reference output file%s' % strerror(sys.exc_info()[1].upstream) except testcases.ExceptionWrapper: verdict = 'unspecified reason [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1].upstream) except testcases.TestCaseNotPassed: verdict = 'unspecified reason [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1]) #except Exception: # verdict = 'unknown error [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1]) else: try: granted, comment = granted except TypeError: comment = '' else: if comment: comment = ' (%s)' % comment if granted >= 1: contexts[-1].case_correct() prob.testcases.send(True) verdict = 'OK' + comment elif not granted: verdict = 'wrong answer' + comment else: verdict = 'partly correct' + comment granted *= case.points say('%g/%g, %s' % (granted, case.points, verdict)) contexts[-1].case_end() contexts[-1].score(granted, case.points) weighted = contexts[0].real * prob.config.taskweight / contexts[0].max if contexts[0].max else 0 before_weighting = valued = '' if prob.config.taskweight != contexts[0].max: before_weighting = ' (%g/%g before weighting)' % (contexts[0].real, contexts[0].max) if contexts[0].nvalued != contexts[0].ntotal: valued = ' (%d/%d valued)' % (contexts[0].ncorrectvalued, contexts[0].nvalued) say('Problem total: %d/%d tests%s, %g/%g points%s' % (contexts[0].ncorrect, contexts[0].ntotal, valued, weighted, prob.config.taskweight, before_weighting)) sys.stdout.flush() return weighted, prob.config.taskweight finally: if options.erase and case and case.has_iofiles: for var in 'in', 'out': name = getattr(prob.config, var + 'name') if name: try: os.remove(name) except Exception: pass if case.has_ansfile: if prob.config.ansname: try: os.remove(prob.config.ansname) except Exception: pass