Merge branch 'tc-testing-next'

Lucas Bates says:

====================
tc-testing: implement command timeouts and better results tracking

Patch 1 adds a timeout feature for any command tdc launches in a subshell.
This prevents tdc from hanging indefinitely.

Patches 2-4 introduce a new method for tracking and generating test case
results, and implements it across the core script and all applicable
plugins.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2018-12-07 16:39:03 -08:00
commit 83af01ba1c
6 changed files with 239 additions and 56 deletions

View file

@ -1,2 +1,5 @@
__pycache__/
*.pyc
plugins/
*.xml
*.tap

View file

@ -18,11 +18,12 @@ class TdcPlugin:
if self.args.verbose > 1:
print(' -- {}.post_suite'.format(self.sub_class))
def pre_case(self, test_ordinal, testid):
def pre_case(self, test_ordinal, testid, test_name):
'''run commands before test_runner does one test'''
if self.args.verbose > 1:
print(' -- {}.pre_case'.format(self.sub_class))
self.args.testid = testid
self.args.test_name = test_name
self.args.test_ordinal = test_ordinal
def post_case(self):

View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
from enum import Enum
class ResultState(Enum):
noresult = -1
skip = 0
success = 1
fail = 2
class TestResult:
def __init__(self, test_id="", test_name=""):
self.test_id = test_id
self.test_name = test_name
self.result = ResultState.noresult
self.failmsg = ""
self.errormsg = ""
self.steps = []
def set_result(self, result):
if (isinstance(result, ResultState)):
self.result = result
return True
else:
raise TypeError('Unknown result type, must be type ResultState')
def get_result(self):
return self.result
def set_errormsg(self, errormsg):
self.errormsg = errormsg
return True
def append_errormsg(self, errormsg):
self.errormsg = '{}\n{}'.format(self.errormsg, errormsg)
def get_errormsg(self):
return self.errormsg
def set_failmsg(self, failmsg):
self.failmsg = failmsg
return True
def append_failmsg(self, failmsg):
self.failmsg = '{}\n{}'.format(self.failmsg, failmsg)
def get_failmsg(self):
return self.failmsg
def add_steps(self, newstep):
if type(newstep) == list:
self.steps.extend(newstep)
elif type(newstep) == str:
self.steps.append(step)
else:
raise TypeError('TdcResults.add_steps() requires a list or str')
def get_executed_steps(self):
return self.steps
class TestSuiteReport():
_testsuite = []
def add_resultdata(self, result_data):
if isinstance(result_data, TestResult):
self._testsuite.append(result_data)
return True
def count_tests(self):
return len(self._testsuite)
def count_failures(self):
return sum(1 for t in self._testsuite if t.result == ResultState.fail)
def count_skips(self):
return sum(1 for t in self._testsuite if t.result == ResultState.skip)
def find_result(self, test_id):
return next((tr for tr in self._testsuite if tr.test_id == test_id), None)
def update_result(self, result_data):
orig = self.find_result(result_data.test_id)
if orig != None:
idx = self._testsuite.index(orig)
self._testsuite[idx] = result_data
else:
self.add_resultdata(result_data)
def format_tap(self):
ftap = ""
ftap += '1..{}\n'.format(self.count_tests())
index = 1
for t in self._testsuite:
if t.result == ResultState.fail:
ftap += 'not '
ftap += 'ok {} {} - {}'.format(str(index), t.test_id, t.test_name)
if t.result == ResultState.skip or t.result == ResultState.noresult:
ftap += ' # skipped - {}\n'.format(t.errormsg)
elif t.result == ResultState.fail:
if len(t.steps) > 0:
ftap += '\tCommands executed in this test case:'
for step in t.steps:
ftap += '\n\t\t{}'.format(step)
ftap += '\n\t{}'.format(t.failmsg)
ftap += '\n'
index += 1
return ftap
def format_xunit(self):
from xml.sax.saxutils import escape
xunit = "<testsuites>\n"
xunit += '\t<testsuite tests=\"{}\" skips=\"{}\">\n'.format(self.count_tests(), self.count_skips())
for t in self._testsuite:
xunit += '\t\t<testcase classname=\"{}\" '.format(escape(t.test_id))
xunit += 'name=\"{}\">\n'.format(escape(t.test_name))
if t.failmsg:
xunit += '\t\t\t<failure>\n'
if len(t.steps) > 0:
xunit += 'Commands executed in this test case:\n'
for step in t.steps:
xunit += '\t{}\n'.format(escape(step))
xunit += 'FAILURE: {}\n'.format(escape(t.failmsg))
xunit += '\t\t\t</failure>\n'
if t.errormsg:
xunit += '\t\t\t<error>\n{}\n'.format(escape(t.errormsg))
xunit += '\t\t\t</error>\n'
if t.result == ResultState.skip:
xunit += '\t\t\t<skipped/>\n'
xunit += '\t\t</testcase>\n'
xunit += '\t</testsuite>\n'
xunit += '</testsuites>\n'
return xunit

View file

@ -11,6 +11,7 @@ from string import Template
import subprocess
import time
from TdcPlugin import TdcPlugin
from TdcResults import *
from tdc_config import *
@ -21,6 +22,7 @@ class SubPlugin(TdcPlugin):
def __init__(self):
self.sub_class = 'valgrind/SubPlugin'
self.tap = ''
self._tsr = TestSuiteReport()
super().__init__()
def pre_suite(self, testcount, testidlist):
@ -34,10 +36,14 @@ class SubPlugin(TdcPlugin):
def post_suite(self, index):
'''run commands after test_runner goes into a test loop'''
super().post_suite(index)
self._add_to_tap('\n|---\n')
if self.args.verbose > 1:
print('{}.post_suite'.format(self.sub_class))
print('{}'.format(self.tap))
#print('{}'.format(self.tap))
for xx in range(index - 1, self.testcount):
res = TestResult('{}-mem'.format(self.testidlist[xx]), 'Test skipped')
res.set_result(ResultState.skip)
res.set_errormsg('Skipped because of prior setup/teardown failure')
self._add_results(res)
if self.args.verbose < 4:
subprocess.check_output('rm -f vgnd-*.log', shell=True)
@ -128,8 +134,17 @@ class SubPlugin(TdcPlugin):
nle_num = int(nle_mo.group(1))
mem_results = ''
res = TestResult('{}-mem'.format(self.args.testid),
'{} memory leak check'.format(self.args.test_name))
if (def_num > 0) or (ind_num > 0) or (pos_num > 0) or (nle_num > 0):
mem_results += 'not '
res.set_result(ResultState.fail)
res.set_failmsg('Memory leak detected')
res.append_failmsg(content)
else:
res.set_result(ResultState.success)
self._add_results(res)
mem_results += 'ok {} - {}-mem # {}\n'.format(
self.args.test_ordinal, self.args.testid, 'memory leak check')
@ -138,5 +153,8 @@ class SubPlugin(TdcPlugin):
print('{}'.format(content))
self._add_to_tap(content)
def _add_results(self, res):
self._tsr.add_resultdata(res)
def _add_to_tap(self, more_tap_output):
self.tap += more_tap_output

View file

@ -23,6 +23,7 @@ from tdc_config import *
from tdc_helper import *
import TdcPlugin
from TdcResults import *
class PluginMgrTestFail(Exception):
@ -60,10 +61,10 @@ class PluginMgr:
for pgn_inst in reversed(self.plugin_instances):
pgn_inst.post_suite(index)
def call_pre_case(self, test_ordinal, testid):
def call_pre_case(self, test_ordinal, testid, test_name):
for pgn_inst in self.plugin_instances:
try:
pgn_inst.pre_case(test_ordinal, testid)
pgn_inst.pre_case(test_ordinal, testid, test_name)
except Exception as ee:
print('exception {} in call to pre_case for {} plugin'.
format(ee, pgn_inst.__class__))
@ -102,7 +103,6 @@ class PluginMgr:
self.argparser = argparse.ArgumentParser(
description='Linux TC unit tests')
def replace_keywords(cmd):
"""
For a given executable command, substitute any known
@ -131,12 +131,16 @@ def exec_cmd(args, pm, stage, command):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=ENVIR)
(rawout, serr) = proc.communicate()
if proc.returncode != 0 and len(serr) > 0:
foutput = serr.decode("utf-8", errors="ignore")
else:
foutput = rawout.decode("utf-8", errors="ignore")
try:
(rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
if proc.returncode != 0 and len(serr) > 0:
foutput = serr.decode("utf-8", errors="ignore")
else:
foutput = rawout.decode("utf-8", errors="ignore")
except subprocess.TimeoutExpired:
foutput = "Command \"{}\" timed out\n".format(command)
proc.returncode = 255
proc.stdout.close()
proc.stderr.close()
@ -183,6 +187,7 @@ def run_one_test(pm, args, index, tidx):
result = True
tresult = ""
tap = ""
res = TestResult(tidx['id'], tidx['name'])
if args.verbose > 0:
print("\t====================\n=====> ", end="")
print("Test " + tidx["id"] + ": " + tidx["name"])
@ -190,7 +195,7 @@ def run_one_test(pm, args, index, tidx):
# populate NAMES with TESTID for this test
NAMES['TESTID'] = tidx['id']
pm.call_pre_case(index, tidx['id'])
pm.call_pre_case(index, tidx['id'], tidx['name'])
prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
if (args.verbose > 0):
@ -205,10 +210,11 @@ def run_one_test(pm, args, index, tidx):
pm.call_post_execute()
if (exit_code is None or exit_code != int(tidx["expExitCode"])):
result = False
print("exit: {!r}".format(exit_code))
print("exit: {}".format(int(tidx["expExitCode"])))
#print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
res.set_result(ResultState.fail)
res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
print(procout)
else:
if args.verbose > 0:
@ -219,20 +225,15 @@ def run_one_test(pm, args, index, tidx):
if procout:
match_index = re.findall(match_pattern, procout)
if len(match_index) != int(tidx["matchCount"]):
result = False
res.set_result(ResultState.fail)
res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
else:
res.set_result(ResultState.success)
elif int(tidx["matchCount"]) != 0:
result = False
if not result:
tresult += 'not '
tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
tap += tresult
if result == False:
if procout:
tap += procout
res.set_result(ResultState.fail)
res.set_failmsg('No output generated by verify command.')
else:
tap += 'No output!\n'
res.set_result(ResultState.success)
prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
pm.call_post_case()
@ -241,7 +242,7 @@ def run_one_test(pm, args, index, tidx):
# remove TESTID from NAMES
del(NAMES['TESTID'])
return tap
return res
def test_runner(pm, args, filtered_tests):
"""
@ -261,25 +262,15 @@ def test_runner(pm, args, filtered_tests):
emergency_exit = False
emergency_exit_message = ''
if args.notap:
if args.verbose:
tap = 'notap requested: omitting test plan\n'
else:
tap = str(index) + ".." + str(tcount) + "\n"
tsr = TestSuiteReport()
try:
pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
except Exception as ee:
ex_type, ex, ex_tb = sys.exc_info()
print('Exception {} {} (caught in pre_suite).'.
format(ex_type, ex))
# when the extra print statements are uncommented,
# the traceback does not appear between them
# (it appears way earlier in the tdc.py output)
# so don't bother ...
# print('--------------------(')
# print('traceback')
traceback.print_tb(ex_tb)
# print('--------------------)')
emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
emergency_exit = True
stage = 'pre-SUITE'
@ -295,15 +286,26 @@ def test_runner(pm, args, filtered_tests):
if args.verbose > 1:
print('Not executing test {} {} because DEV2 not defined'.
format(tidx['id'], tidx['name']))
res = TestResult(tidx['id'], tidx['name'])
res.set_result(ResultState.skip)
res.set_errormsg('Not executed because DEV2 is not defined')
tsr.add_resultdata(res)
continue
try:
badtest = tidx # in case it goes bad
tap += run_one_test(pm, args, index, tidx)
res = run_one_test(pm, args, index, tidx)
tsr.add_resultdata(res)
except PluginMgrTestFail as pmtf:
ex_type, ex, ex_tb = sys.exc_info()
stage = pmtf.stage
message = pmtf.message
output = pmtf.output
res = TestResult(tidx['id'], tidx['name'])
res.set_result(ResultState.skip)
res.set_errormsg(pmtf.message)
res.set_failmsg(pmtf.output)
tsr.add_resultdata(res)
index += 1
print(message)
print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
@ -322,16 +324,16 @@ def test_runner(pm, args, filtered_tests):
# if we failed in setup or teardown,
# fill in the remaining tests with ok-skipped
count = index
if not args.notap:
tap += 'about to flush the tap output if tests need to be skipped\n'
if tcount + 1 != index:
for tidx in testlist[index - 1:]:
msg = 'skipped - previous {} failed'.format(stage)
tap += 'ok {} - {} # {} {} {}\n'.format(
count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
count += 1
tap += 'done flushing skipped test tap output\n'
if tcount + 1 != count:
for tidx in testlist[count - 1:]:
res = TestResult(tidx['id'], tidx['name'])
res.set_result(ResultState.skip)
msg = 'skipped - previous {} failed {} {}'.format(stage,
index, badtest.get('id', '--Unknown--'))
res.set_errormsg(msg)
tsr.add_resultdata(res)
count += 1
if args.pause:
print('Want to pause\nPress enter to continue ...')
@ -340,7 +342,7 @@ def test_runner(pm, args, filtered_tests):
pm.call_post_suite(index)
return tap
return tsr
def has_blank_ids(idlist):
"""
@ -380,6 +382,10 @@ def set_args(parser):
"""
Set the command line arguments for tdc.
"""
parser.add_argument(
'--outfile', type=str,
help='Path to the file in which results should be saved. ' +
'Default target is the current directory.')
parser.add_argument(
'-p', '--path', type=str,
help='The full path to the tc executable to use')
@ -416,8 +422,9 @@ def set_args(parser):
'-v', '--verbose', action='count', default=0,
help='Show the commands that are being run')
parser.add_argument(
'-N', '--notap', action='store_true',
help='Suppress tap results for command under test')
'--format', default='tap', const='tap', nargs='?',
choices=['none', 'xunit', 'tap'],
help='Specify the format for test results. (Default: TAP)')
parser.add_argument('-d', '--device',
help='Execute the test case in flower category')
parser.add_argument(
@ -438,6 +445,8 @@ def check_default_settings(args, remaining, pm):
NAMES['TC'] = args.path
if args.device != None:
NAMES['DEV2'] = args.device
if 'TIMEOUT' not in NAMES:
NAMES['TIMEOUT'] = None
if not os.path.isfile(NAMES['TC']):
print("The specified tc path " + NAMES['TC'] + " does not exist.")
exit(1)
@ -632,12 +641,30 @@ def set_operation_mode(pm, args):
if len(alltests):
catresults = test_runner(pm, args, alltests)
if args.format == 'none':
print('Test results output suppression requested\n')
else:
print('\nAll test results: \n')
if args.format == 'xunit':
suffix = 'xml'
res = catresults.format_xunit()
elif args.format == 'tap':
suffix = 'tap'
res = catresults.format_tap()
print(res)
print('\n\n')
if not args.outfile:
fname = 'test-results.{}'.format(suffix)
else:
fname = args.outfile
with open(fname, 'w') as fh:
fh.write(res)
fh.close()
if os.getenv('SUDO_UID') is not None:
os.chown(fname, uid=int(os.getenv('SUDO_UID')),
gid=int(os.getenv('SUDO_GID')))
else:
catresults = 'No tests found\n'
if args.notap:
print('Tap output suppression requested\n')
else:
print('All test results: \n\n{}'.format(catresults))
print('No tests found\n')
def main():
"""

View file

@ -15,6 +15,8 @@ NAMES = {
'DEV1': 'v0p1',
'DEV2': '',
'BATCH_FILE': './batch.txt',
# Length of time in seconds to wait before terminating a command
'TIMEOUT': 12,
# Name of the namespace to use
'NS': 'tcut',
# Directory containing eBPF test programs