view problem.py @ 123:90c002c960cb

Fixed CPU time display on UNIX Previously, the total CPU time spent by the testee on all test cases up to and including the current one was displayed.
author Oleg Oshmyan <chortos@inbox.lv>
date Sun, 24 Apr 2011 19:28:40 +0100
parents b7fb64ce03d9
children 523ba6907f3a
line wrap: on
line source

# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv>

from __future__ import division, with_statement

from compat import *
import config, testcases
from __main__ import options

import os, re, sys

try:
	from collections import deque
except ImportError:
	deque = list

try:
	import signal
except ImportError:
	signalnames = ()
else:
	# Construct a cache of all signal names available on the current
	# platform. Prefer names from the UNIX standards over other versions.
	unixnames = frozenset(('HUP', 'INT', 'QUIT', 'ILL', 'ABRT', 'FPE', 'KILL', 'SEGV', 'PIPE', 'ALRM', 'TERM', 'USR1', 'USR2', 'CHLD', 'CONT', 'STOP', 'TSTP', 'TTIN', 'TTOU', 'BUS', 'POLL', 'PROF', 'SYS', 'TRAP', 'URG', 'VTALRM', 'XCPU', 'XFSZ'))
	signalnames = {}
	for name in dir(signal):
		if re.match('SIG[A-Z]+$', name):
			value = signal.__dict__[name]
			if isinstance(value, int) and (value not in signalnames or name[3:] in unixnames):
				signalnames[value] = name
	del unixnames

__all__ = 'Problem', 'TestContext', 'test_context_end', 'TestGroup'

def strerror(e):
	s = getattr(e, 'strerror', e)
	if not s: s = str(e)
	return ' (%s%s)' % (s[0].lower(), s[1:]) if s else ''

class Cache(object):
	def __init__(self, mydict):
		self.__dict__ = mydict

class TestContext(object):
	__slots__ = ()

test_context_end = object()

class TestGroup(TestContext):
	__slots__ = 'points', 'case', 'log', 'correct', 'allcorrect', 'real', 'max', 'ntotal', 'nvalued', 'ncorrect', 'ncorrectvalued'
	
	def __init__(self, points=None):
		self.points = points
		self.real = self.max = self.ntotal = self.nvalued = self.ncorrect = self.ncorrectvalued = 0
		self.allcorrect = True
		self.log = []
	
	def case_start(self, case):
		self.case = case
		self.correct = False
		self.ntotal += 1
		if case.points:
			self.nvalued += 1
	
	def case_correct(self):
		self.correct = True
		self.ncorrect += 1
		if self.case.points:
			self.ncorrectvalued += 1
	
	def case_end(self):
		self.log.append((self.case, self.correct))
		del self.case
		if not self.correct:
			self.allcorrect = False
	
	def score(self, real, max):
		self.real += real
		self.max += max
	
	def end(self):
		if not self.allcorrect:
			self.real = 0
		if self.points is not None and self.points != self.max:
			max, weighted = self.points, self.real * self.points / self.max if self.max else 0
			before_weighting = ' (%g/%g before weighting)' % (self.real, self.max)
		else:
			max, weighted = self.max, self.real
			before_weighting = ''
		say('Group total: %d/%d tests, %g/%g points%s' % (self.ncorrect, self.ntotal, weighted, max, before_weighting))
		# No real need to flush stdout, as it will anyway be flushed in a moment,
		# when either the problem total or the next test case's ID is printed
		return weighted, max, self.log

class Problem(object):
	__slots__ = 'name', 'config', 'cache', 'testcases'
	
	def __init__(prob, name):
		if not isinstance(name, basestring):
			# This shouldn't happen, of course
			raise TypeError('Problem() argument 1 must be string, not ' + type(name).__name__)
		prob.name = name
		prob.config = config.load_problem(name)
		prob.cache = Cache({'padoutput': 0})
		prob.testcases = testcases.load_problem(prob)
	
	# TODO
	def build(prob):
		raise NotImplementedError
	
	def test(prob):
		case = None
		try:
			contexts = deque((TestGroup(),))
			for case in prob.testcases:
				if case is test_context_end:
					real, max, log = contexts.pop().end()
					for case, correct in log:
						contexts[-1].case_start(case)
						if correct:
							contexts[-1].case_correct()
						contexts[-1].case_end()
					contexts[-1].score(real, max)
					continue
				elif isinstance(case, TestContext):
					contexts.append(case)
					continue
				contexts[-1].case_start(case)
				granted = 0
				id = str(case.id)
				if case.isdummy:
					id = 'sample ' + id
				say('%*s: ' % (prob.cache.padoutput, id), end='')
				sys.stdout.flush()
				try:
					granted = case(lambda: (say('%7.3f%s s, ' % (case.time_stopped - case.time_started, case.time_limit_string), end=''), sys.stdout.flush()))
				except testcases.TestCaseSkipped:
					verdict = 'skipped due to skimming mode'
				except testcases.CanceledByUser:
					verdict = 'canceled by the user'
				except testcases.WallTimeLimitExceeded:
					verdict = 'wall-clock time limit exceeded'
				except testcases.CPUTimeLimitExceeded:
					verdict = 'CPU time limit exceeded'
				except testcases.MemoryLimitExceeded:
					verdict = 'memory limit exceeded'
				except testcases.WrongAnswer:
					e = sys.exc_info()[1]
					if e.comment:
						verdict = 'wrong answer (%s)' % e.comment
					else:
						verdict = 'wrong answer'
				except testcases.NonZeroExitCode:
					e = sys.exc_info()[1]
					if e.exitcode < 0:
						if sys.platform == 'win32':
							verdict = 'terminated with error 0x%X' % (e.exitcode + 0x100000000)
						elif -e.exitcode in signalnames:
							verdict = 'terminated by signal %d (%s)' % (-e.exitcode, signalnames[-e.exitcode])
						else:
							verdict = 'terminated by signal %d' % -e.exitcode
					else:
						verdict = 'non-zero return code %d' % e.exitcode
				except testcases.CannotStartTestee:
					verdict = 'cannot launch the program to test%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.CannotStartValidator:
					verdict = 'cannot launch the validator%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.CannotReadOutputFile:
					verdict = 'cannot read the output file%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.CannotReadInputFile:
					verdict = 'cannot read the input file%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.CannotReadAnswerFile:
					verdict = 'cannot read the reference output file%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.ExceptionWrapper:
					verdict = 'unspecified reason [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1].upstream)
				except testcases.TestCaseNotPassed:
					verdict = 'unspecified reason [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1])
				#except Exception:
				#	verdict = 'unknown error [this may be a bug in test.py]%s' % strerror(sys.exc_info()[1])
				else:
					try:
						granted, comment = granted
					except TypeError:
						comment = ''
					else:
						if comment:
							comment = ' (%s)' % comment
					if granted >= 1:
						contexts[-1].case_correct()
						prob.testcases.send(True)
						verdict = 'OK' + comment
					elif not granted:
						verdict = 'wrong answer' + comment
					else:
						verdict = 'partly correct' + comment
					granted *= case.points
				say('%g/%g, %s' % (granted, case.points, verdict))
				contexts[-1].case_end()
				contexts[-1].score(granted, case.points)
			weighted = contexts[0].real * prob.config.taskweight / contexts[0].max if contexts[0].max else 0
			before_weighting = valued = ''
			if prob.config.taskweight != contexts[0].max:
				before_weighting = ' (%g/%g before weighting)' % (contexts[0].real, contexts[0].max)
			if contexts[0].nvalued != contexts[0].ntotal:
				valued = ' (%d/%d valued)' % (contexts[0].ncorrectvalued, contexts[0].nvalued)
			say('Problem total: %d/%d tests%s, %g/%g points%s' % (contexts[0].ncorrect, contexts[0].ntotal, valued, weighted, prob.config.taskweight, before_weighting))
			sys.stdout.flush()
			return weighted, prob.config.taskweight
		finally:
			if options.erase and case and case.has_iofiles:
				for var in 'in', 'out':
					name = getattr(prob.config, var + 'name')
					if name:
						try:
							os.remove(name)
						except Exception:
							pass
				if case.has_ansfile:
					if prob.config.ansname:
						try:
							os.remove(prob.config.ansname)
						except Exception:
							pass