Source code for muteria.drivers.criteria.tools_by_languages.c.gcov.gcov
""" TODO: call coverage
"""
from __future__ import print_function
import os
import sys
import re
import shutil
import shlex
import logging
import subprocess
import muteria.common.mix as common_mix
import muteria.common.fs as common_fs
from muteria.repositoryandcode.codes_convert_support import CodeFormats
from muteria.repositoryandcode.callback_object import DefaultCallbackObject
from muteria.drivers.criteria.base_testcriteriatool import BaseCriteriaTool
from muteria.drivers.criteria import TestCriteria
from muteria.drivers import DriversUtils
from muteria.drivers.criteria.tools_by_languages.c.gcov.driver_config \
import DriverConfigGCov
ERROR_HANDLER = common_mix.ErrorHandler
[docs]class CriteriaToolGCov(BaseCriteriaTool):
def __init__(self, *args, **kwargs):
BaseCriteriaTool.__init__(self, *args, **kwargs)
self.driver_config = None
if self.config.get_tool_user_custom() is not None:
self.driver_config = \
self.config.get_tool_user_custom().DRIVER_CONFIG
if self.driver_config is None:
self.driver_config = DriverConfigGCov()
else:
ERROR_HANDLER.assert_true(isinstance(self.driver_config, \
DriverConfigGCov),\
"invalid driver config", __file__)
self.instrumentation_details = os.path.join(\
self.instrumented_code_storage_dir, '.instru.meta.json')
self.gcov_files_list_filename = "gcov_files.json"
self.gc_files_dir = os.path.join(\
self.criteria_working_dir, "gcno_gcda")
self.gcov_gdb_wrapper_template = os.path.join(\
os.path.dirname(__file__), \
'gcov-gdb_call_wrapper.sh.in')
self.gcov_gdb_wrapper_sh = os.path.join(\
self.instrumented_code_storage_dir, \
"tmp_gcov_gdb_wrapper.sh")
# clean any possible gcda file
for file_ in self._get_gcda_list():
os.remove(file_)
for file_ in self._get_gcov_list():
os.remove(file_)
#~ def __init__()
def _get_gcov_list(self):
gcov_files = self._recursive_list_files(self.gc_files_dir, '.gcov')
return gcov_files
#~ def _get_gcov_list()
def _get_gcda_list(self):
gcda_files = self._recursive_list_files(self.gc_files_dir, '.gcda')
return gcda_files
#~ def _get_gcda_list()
@staticmethod
def _recursive_list_files(topdir, file_suffix):
filtered_files = []
for root, dirs, files in os.walk(topdir):
for file_ in files:
if file_.endswith(file_suffix):
filtered_files.append(os.path.join(root, file_))
return filtered_files
#~ def _recursive_list_files()
[docs] class InstrumentCallbackObject(DefaultCallbackObject):
[docs] def after_command(self):
if self.op_retval != common_mix.GlobalConstants.COMMAND_SUCCESS:
ERROR_HANDLER.error_exit("Build failed", __file__)
gc_files_dir, rel_path_map = self.post_callback_args
for _, obj in list(self.source_files_to_objects.items()):
rel_raw_filename, _ = os.path.splitext(obj)
gcno_file = rel_raw_filename + '.gcno'
relloc = os.path.join(gc_files_dir, os.path.dirname(obj))
if not os.path.isdir(relloc):
os.makedirs(relloc)
abs_gcno = os.path.join(self.repository_rootdir, gcno_file)
ERROR_HANDLER.assert_true(os.path.isfile(abs_gcno), \
"gcno file missing after build", __file__)
shutil.copy2(abs_gcno, os.path.join(gc_files_dir, gcno_file))
# FIXME: For now, we store all gcda in th same directory.
# FIXME: To ensure that coverage is generated with gcov when
# FIXME: The source is not in rootdir (example of coreutils...)
gcno_dir, gcno_base = os.path.split(\
os.path.normpath(gcno_file))
gcno_in_top = os.path.join(gc_files_dir, gcno_base)
#logging.debug("gcno_instr: '{}' '{}' '{}'".format(gcno_file, \
#gcno_dir, gcno_in_top))
if gcno_dir != '':
# The gcno file must not be in the top gcno_gcda folder
ERROR_HANDLER.assert_true(not os.path.isfile(gcno_in_top),\
"Overriding of gcno ({}). Source name clash?"\
.format(gcno_file), __file__)
shutil.copy2(abs_gcno, gcno_in_top)
#~ FIXME end
self._copy_from_repo(rel_path_map)
return DefaultCallbackObject.after_command(self)
#~ def after_command()
#~ class InstrumentCallbackObject
[docs] @classmethod
def installed(cls, custom_binary_dir=None):
""" Check that the tool is installed
:return: bool reprenting whether the tool is installed or not
(executable accessible on the path)
- True: the tool is installed and works
- False: the tool is not installed or do not work
"""
for prog in ('gcc', 'gcov'):
if custom_binary_dir is not None:
prog = os.path.join(custom_binary_dir, prog)
if not DriversUtils.check_tool(prog=prog, args_list=['--version'],\
expected_exit_codes=[0]):
return False
return True
#~ def installed()
@classmethod
def _get_meta_instrumentation_criteria(cls):
""" Criteria where all elements are instrumented in same file
:return: list of citeria
"""
return [
TestCriteria.STATEMENT_COVERAGE,
TestCriteria.BRANCH_COVERAGE,
TestCriteria.FUNCTION_COVERAGE,
]
#~ def _get_meta_instrumentation_criteria()
@classmethod
def _get_separated_instrumentation_criteria(cls):
""" Criteria where all elements are instrumented in different files
:return: list of citeria
"""
return []
#~ def _get_separated_instrumentation_criteria()
[docs] def get_instrumented_executable_paths_map(self, enabled_criteria):
using_gdb_wrapper = self.driver_config.get_use_gdb_wrapper()
if using_gdb_wrapper:
# use wrapper if gdb is installed
using_gdb_wrapper = DriversUtils.check_tool(prog='gdb', \
args_list=['--version'], \
expected_exit_codes=[0])
if using_gdb_wrapper:
# XXX: Docker has issues running programs in Docker,
# unless the container is ran with the following arguments:
#
# docker run --cap-add=SYS_PTRACE \
# --security-opt seccomp=unconfined ...
#
# We check that it is fine by testing on echo
ret, o_e, _ = DriversUtils.execute_and_get_retcode_out_err(
prog='gdb', \
args_list=['--batch-silent', \
'--quiet',
'--return-child-result',
'-ex', 'run',
'--args', 'echo'], \
)#out_on=False, err_on=False)
using_gdb_wrapper = (ret == 0)
if not using_gdb_wrapper:
logging.warning("use gdb is enabled but call to gdb fails"
" (retcode {}) with msg: {}".format(ret, o_e))
else:
logging.warning("use gdb is enabled but gdb is not installed")
crit_to_exes_map = {}
obj = common_fs.loadJSON(self.instrumentation_details)
#exes = [p for _, p in list(obj.items())]
for name in obj:
obj[name] = os.path.join(self.instrumented_code_storage_dir, \
obj[name])
if using_gdb_wrapper:
# Using GDB WRAPPER
with open(self.gcov_gdb_wrapper_template) as f:
template_str = f.read()
single_exe = obj[list(obj)[0]]
template_str = template_str.replace(\
'MUTERIA_GCOV_PROGRAMEXE_PATHNAME', single_exe)
with open(self.gcov_gdb_wrapper_sh, 'w') as f:
f.write(template_str)
shutil.copymode(single_exe, self.gcov_gdb_wrapper_sh)
obj[list(obj)[0]] = self.gcov_gdb_wrapper_sh
exes_map = obj
for criterion in enabled_criteria:
crit_to_exes_map[criterion] = exes_map
#logging.debug("DBG: {} {} {}".format(\
# "Using gdb wrapper is {}.".format(using_gdb_wrapper), \
# "crit_to_exe_map is {}.".format(crit_to_exes_map), \
# "wraper path is {}!".format(self.gcov_gdb_wrapper_sh)))
return crit_to_exes_map
#~ def get_instrumented_executable_paths_map()
#~ def get_criterion_info_object(self, criterion)
def _get_criterion_element_executable_path(self, criterion, element_id):
ERROR_HANDLER.error_exit("not applicable for gcov", __file__)
#~ def _get_criterion_element_executable_path
def _get_criterion_element_environment_vars(self, criterion, element_id):
'''
return: python dictionary with environment variable as key
and their values as value (all strings)
'''
ERROR_HANDLER.error_exit("not applicable for gcov", __file__)
#~ def _get_criterion_element_environment_vars()
def _get_criteria_environment_vars(self, result_dir_tmp, enabled_criteria):
'''
return: python dictionary with environment variable as key
and their values as value (all strings)
'''
return {e:None for e in enabled_criteria}
#~ def _get_criteria_environment_vars()
def _collect_temporary_coverage_data(self, criteria_name_list, \
test_execution_verdict, \
used_environment_vars, \
result_dir_tmp, \
testcase):
''' get gcov files from gcda files into result_dir_tmp
'''
prog = 'gcov'
if self.custom_binary_dir is not None:
prog = os.path.join(self.custom_binary_dir, prog)
ERROR_HANDLER.assert_true(os.path.isfile(prog), \
"The tool {} is missing from the specified dir {}"\
.format(os.path.basename(prog), \
self.custom_binary_dir), __file__)
cov2flags = {
TestCriteria.STATEMENT_COVERAGE: [],
TestCriteria.BRANCH_COVERAGE: ['-b', '-c'],
TestCriteria.FUNCTION_COVERAGE: ['-f'],
}
args_list = []
for criterion in criteria_name_list:
args_list += cov2flags[criterion]
gcda_files = self._get_gcda_list()
raw_filename_list = [os.path.splitext(f)[0] for f in gcda_files]
args_list += raw_filename_list
if len(gcda_files) > 0:
# TODO: When gcov generate coverage for different files with
# same name filename but located at diferent dir. Avoid override.
# Go where the gcov will be looked for
cwd = os.getcwd()
os.chdir(self.gc_files_dir)
# collect gcda (gcno)
r, _, err_str = DriversUtils.execute_and_get_retcode_out_err(\
prog=prog, \
args_list=args_list, out_on=False, \
err_on=True, merge_err_to_out=False)
os.chdir(cwd)
if r != 0: # or err_str:
ERROR_HANDLER.error_exit("Program {} {}.".format(prog,\
'collecting coverage is problematic. ')+
"The error msg is {}. \nThe command:\n{}".format(\
err_str, " ".join([prog]+args_list)), __file__)
dot_gcov_file_list = self._get_gcov_list()
# Sources of interest
_, src_map = self.code_builds_factory.repository_manager.\
get_relative_exe_path_map()
base_dot_gcov = [os.path.basename(f) for f in dot_gcov_file_list]
base_raw = [os.path.basename(f) for f in raw_filename_list]
interest = [os.path.basename(s) for s,o in src_map.items()]
interest = [s+'.gcov' for s in interest if os.path.splitext(s)[0] \
in base_raw]
missed = set(interest) - set(base_dot_gcov)
# FIXME: Check for partial failure (compare number of gcno and gcov)
ERROR_HANDLER.assert_true(len(missed) == 0,
"{} did not generate the '.gcov' files {}.".format(\
prog, missed)+ "\nIts stderr is {}.\n CMD: {}".format(\
err_str, " ".join([prog]+args_list)), __file__)
# delete gcda
for gcda_f in gcda_files:
os.remove(gcda_f)
else:
if not self.driver_config.get_allow_missing_coverage():
ERROR_HANDLER.error_exit(\
"Testcase '{}' did not generate gcda, {}".format(\
testcase, "when allow missing coverage is disabled"))
dot_gcov_file_list = []
common_fs.dumpJSON(dot_gcov_file_list, \
os.path.join(result_dir_tmp,\
self.gcov_files_list_filename))
#~ def _collect_temporary_coverage_data()
def _extract_coverage_data_of_a_test(self, enabled_criteria, \
test_execution_verdict, result_dir_tmp):
''' read json files and extract data
return: the dict of criteria with covering count
# TODO: Restrict to returning coverage of specified headers files
'''
gcov_list = common_fs.loadJSON(os.path.join(result_dir_tmp,\
self.gcov_files_list_filename))
res = {c: {} for c in enabled_criteria}
func_cov = None
branch_cov = None
statement_cov = {}
if TestCriteria.FUNCTION_COVERAGE in enabled_criteria:
func_cov = res[TestCriteria.FUNCTION_COVERAGE]
if TestCriteria.BRANCH_COVERAGE in enabled_criteria:
branch_cov = res[TestCriteria.BRANCH_COVERAGE]
if TestCriteria.STATEMENT_COVERAGE in enabled_criteria:
statement_cov = res[TestCriteria.STATEMENT_COVERAGE]
# Sources of interest
_, src_map = self.code_builds_factory.repository_manager.\
get_relative_exe_path_map()
#logging.debug("gcov_list: {}".format(gcov_list))
for gcov_file in gcov_list:
with open(gcov_file) as fp:
last_line = None
src_file = None
for raw_line in fp:
line = raw_line.strip()
col_split = [v.strip() for v in line.split(':')]
if len(col_split) > 2 and col_split[1] == '0':
# preamble
if col_split[2] == "Source":
src_file = os.path.normpath(col_split[3])
if not src_file in src_map:
s_cand = None
for s in src_map.keys():
if s.endswith(os.sep+src_file):
ERROR_HANDLER.assert_true(s_cand is None,\
"multiple candidate, maybe same "+\
"source name in different dirs "+\
"for source {}".format(src_file), \
__file__)
s_cand = s
if s_cand is None:
# If src file does not represent (not equal
# and not relatively equal, for case where
# build happens not in rootdir),
# src not in considered
break
else:
# ensure compatibility in matrices
src_file = s_cand
elif line.startswith("function "):
# match function
parts = line.split()
ident = DriversUtils.make_meta_element(parts[1], \
src_file)
func_cov[ident] = int(parts[3])
elif line.startswith("branch "):
# match branch
parts = line.split()
ident = DriversUtils.make_meta_element(parts[1], \
last_line)
if parts[2:4] == ['never', 'executed']:
branch_cov[ident] = 0
else:
branch_cov[ident] = int(parts[3])
elif len(col_split) > 2 and \
re.match(r"^\d+$", col_split[1]):
# match line
if col_split[0] == '-':
continue
last_line = DriversUtils.make_meta_element(\
col_split[1], src_file)
if col_split[0] in ('#####', '====='):
exec_count = 0
else:
exec_count = \
int(re.findall(r'^\d+', col_split[0])[0])
statement_cov[last_line] = exec_count
# delete gcov files
for gcov_f in self._get_gcov_list():
os.remove(gcov_f)
#logging.debug("gcov res is: {}".format(res))
return res
#~ def _extract_coverage_data_of_a_test()
def _do_instrument_code (self, exe_path_map, \
code_builds_factory, \
enabled_criteria, parallel_count=1):
# Setup
if os.path.isdir(self.instrumented_code_storage_dir):
shutil.rmtree(self.instrumented_code_storage_dir)
os.mkdir(self.instrumented_code_storage_dir)
if os.path.isdir(self.gc_files_dir):
try:
shutil.rmtree(self.gc_files_dir)
except PermissionError:
self._dir_chmod777(self.gc_files_dir)
shutil.rmtree(self.gc_files_dir)
os.mkdir(self.gc_files_dir, mode=0o777)
prog = 'gcc'
if self.custom_binary_dir is not None:
prog = os.path.join(self.custom_binary_dir, prog)
ERROR_HANDLER.assert_true(os.path.isfile(prog), \
"The tool {} is missing from the specified dir {}"\
.format(os.path.basename(prog), \
self.custom_binary_dir), __file__)
flags = ['-g', '--coverage', '-fprofile-dir='+self.gc_files_dir, '-O0']
additionals = ["-fkeep-inline-functions"]
#additionals.append('-fprofile-abs-path')
#additionals.append('-fprofile-prefix-path='+code_builds_factory\
# .repository_manager.get_repository_dir_path())
# get gcc version
ret, out, err = DriversUtils.execute_and_get_retcode_out_err(\
prog, args_list=['-dumpversion'])
ERROR_HANDLER.assert_true(ret == 0, "'gcc -dumpversion' failed'")
# if version > 6.5
if int(out.split('.')[0]) >= 6:
if int(out.split('.')[0]) > 6 or int(out.split('.')[1]) > 5:
additionals += ["-fkeep-static-functions"]
flags += additionals
rel_rel_path_map = {}
rel_abs_path_map = {}
exes, _ = code_builds_factory.repository_manager.\
get_relative_exe_path_map()
ERROR_HANDLER.assert_true(len(exes) == 1, \
"Only single exe supported in GCOV", __file__)
for exe in exes:
filename = os.path.basename(exe)
rel_rel_path_map[exe] = filename
rel_abs_path_map[exe] = os.path.join(\
self.instrumented_code_storage_dir, filename)
self.instrument_callback_obj = self.InstrumentCallbackObject()
self.instrument_callback_obj.set_post_callback_args(\
(self.gc_files_dir, rel_abs_path_map))
pre_ret, ret, post_ret = code_builds_factory.transform_src_into_dest(\
src_fmt=CodeFormats.C_SOURCE,\
dest_fmt=CodeFormats.NATIVE_CODE,\
src_dest_files_paths_map=None,\
compiler=prog, flags_list=flags, clean_tmp=True, \
reconfigure=True, \
callback_object=self.instrument_callback_obj)
# Check
if ret == common_mix.GlobalConstants.COMMAND_FAILURE:
ERROR_HANDLER.error_exit("Program {} {}.".format(prog,\
'built problematic'), __file__)
# write down the rel_path_map
ERROR_HANDLER.assert_true(not os.path.isfile(\
self.instrumentation_details), "must not exist here", __file__)
common_fs.dumpJSON(rel_rel_path_map, self.instrumentation_details)
#~ def _do_instrument_code()
#~ class CriteriaToolGCov