Source code for muteria.drivers.testgeneration.custom_dev_testcase.system_wrappers.base_wrapper_setup
from __future__ import print_function
import os
import sys
import shutil
import logging
import abc
import muteria.common.mix as common_mix
import muteria.drivers.testgeneration.custom_dev_testcase.system_wrappers as \
system_wrappers
ERROR_HANDLER = common_mix.ErrorHandler
[docs]class BaseSystemTestSplittingWrapper(abc.ABC):
[docs] def get_sub_test_id_env_vars(self, subtest_id):
return {system_wrappers.TEST_COUNT_ID_ENV_VAR: str(subtest_id)}
#~ def get_sub_test_id_env_vars()
[docs] @abc.abstractmethod
def set_wrapper(self, workdir, exe_path_map):
""" Return the new exe path map
"""
print ("Implement!!!")
#~ def set_wrapper()
[docs] @abc.abstractmethod
def switch_to_new_test(self):
""" reset the counters
"""
print ("Implement!!!")
#~ def switch_to_new_test()
[docs] @abc.abstractmethod
def collect_data(self):
""" get number of sub tests and args
"""
print ("Implement!!!")
#~ def collect_data()
[docs] @abc.abstractmethod
def cleanup(self):
print ("Implement!!!")
#~ def cleanup()
#~ class BaseSystemTestSplittingWrapper
[docs]class BaseSystemWrapper(abc.ABC):
# do not change
backup_ext = '.muteria_bak'
used_ext = '.muteria_used'
counter_ext = '.muteria_counter'
outlog_ext = '.muteria_outlog'
outretcode_ext = '.muteria_outretcode'
# Must Override
[docs] @abc.abstractmethod
def get_dev_null(self):
print ("Implement!!!")
#~ def get_dev_null()
@abc.abstractmethod
def _get_wrapper_template_string(self):
print("Implement!!!")
#~ def _get_wrapper_template_string()
@abc.abstractmethod
def _get_timedout_codes(self):
print("Implement!!!")
#~ def _get_timedout_codes()
## Wrapper test splitting TODO TODO
[docs] @abc.abstractmethod
def get_test_splitting_wrapper_class(self):
print("Implement!!!")
#~ def get_sub_test_id_env_vars()
# Can override
def __init__(self, repo_mgr):
self.repo_mgr = repo_mgr
self.test_splitting_wrapper = self.get_test_splitting_wrapper_class()
if self.test_splitting_wrapper is not None:
self.test_splitting_wrapper = self.test_splitting_wrapper()
#~ def __init__()
[docs] def get_test_splitting_wrapper(self):
return self.test_splitting_wrapper
#~ def get_test_splitting_wrapper()
def _get_repo_run_path_pairs(self, exe_path_map):
ERROR_HANDLER.assert_true(len(exe_path_map) == 1, \
"support a single exe for now. got: "+str(exe_path_map), \
__file__)
repo_exe = list(exe_path_map.keys())[0]
run_exe = exe_path_map[repo_exe]
repo_exe = self.repo_mgr.repo_abs_path(repo_exe)
if run_exe is None:
run_exe = repo_exe
return [(repo_exe, run_exe)]
#~ def _get_repo_run_path_pairs()
[docs] def cleanup_logs(self, exe_path_map):
repo_exe_abs_path, _ = self._get_repo_run_path_pairs(exe_path_map)[0]
if os.path.isfile(repo_exe_abs_path + self.counter_ext):
os.remove(repo_exe_abs_path + self.counter_ext)
if os.path.isfile(repo_exe_abs_path + self.outretcode_ext):
os.remove(repo_exe_abs_path + self.outretcode_ext)
if os.path.isfile(repo_exe_abs_path + self.outlog_ext):
os.remove(repo_exe_abs_path + self.outlog_ext)
#~ def cleanup(repo_exe_abs_path):
[docs] def collect_output(self, exe_path_map, collected_output, testcase, \
max_allowed_outlog_bytes):
repo_exe_abs_path, _ = self._get_repo_run_path_pairs(exe_path_map)[0]
o_logfile = repo_exe_abs_path + self.outlog_ext
o_retcodefile = repo_exe_abs_path + self.outretcode_ext
tmp = []
if not os.path.isfile(o_retcodefile):
ERROR_HANDLER.error_exit("testcase has no log: '" +testcase+ "'."
" repo_exe_abs_path is " + repo_exe_abs_path, \
__file__)
timedout = []
with open(o_retcodefile) as f:
for line in f:
tmp.append(int(line.strip()))
timedout.append(tmp[-1] in self._get_timedout_codes())
if len(tmp) == 1:
tmp = tmp[0]
timedout = timedout[0]
# return code
collected_output.append(tmp)
# Outlog
if os.path.getsize(o_logfile) > max_allowed_outlog_bytes:
collected_output.append((os.path.getsize(o_logfile), None))
else:
try:
with open(o_logfile) as f:
collected_output.append(f.read())
except UnicodeDecodeError:
try:
with open(o_logfile, encoding='ISO-8859-1') as f:
collected_output.append(f.read())
except MemoryError:
ERROR_HANDLER.error_exit("The available system memory is lower"
"Than the maximum allowed out log file size. "
"Try increasing the system memory", __file__)
except MemoryError:
ERROR_HANDLER.error_exit("The available system memory is lower"
"Than the maximum allowed out log file size. "
"Try increasing the system memory", __file__)
# Timeout flag
collected_output.append(timedout)
#~ def collect_output()
[docs] def install_wrapper(self, exe_path_map, collect_output):
repo_exe_abs_path, run_exe_abs_path = \
self._get_repo_run_path_pairs(exe_path_map)[0]
if os.path.isfile(repo_exe_abs_path + self.used_ext):
os.remove(repo_exe_abs_path + self.used_ext)
# set run exe
try:
shutil.copy2(run_exe_abs_path, repo_exe_abs_path + self.used_ext)
# use link instead of copy to avoid copying large unchanging exes
#os.link(run_exe_abs_path, repo_exe_abs_path + self.used_ext)
except PermissionError:
os.remove(repo_exe_abs_path + self.used_ext)
shutil.copy2(run_exe_abs_path, repo_exe_abs_path + self.used_ext)
# use link instead of copy to avoid copying large unchanging exes
#os.link(run_exe_abs_path, repo_exe_abs_path + self.used_ext)
# backup
shutil.move(repo_exe_abs_path, repo_exe_abs_path + self.backup_ext)
# place the wrapper
match_replacing = {
'WRAPPER_TEMPLATE_DEFAULT_EXE_ASBSOLUTE_PATH': \
repo_exe_abs_path+self.backup_ext,
'WRAPPER_TEMPLATE_RUN_EXE_ASBSOLUTE_PATH': \
repo_exe_abs_path+self.used_ext,
'WRAPPER_TEMPLATE_COUNTER_FILE': \
repo_exe_abs_path+self.counter_ext, \
'WRAPPER_TEMPLATE_OUTPUT_RETCODE': \
repo_exe_abs_path+self.outretcode_ext \
if collect_output else self.get_dev_null(),
'WRAPPER_TEMPLATE_OUTPUT_LOG': repo_exe_abs_path+self.outlog_ext \
if collect_output else self.get_dev_null(),
}
wrapper_obj = self._get_wrapper_template_string()
for match, replace in match_replacing.items():
wrapper_obj = wrapper_obj.replace(match, replace)
with open(repo_exe_abs_path, 'w') as dest:
dest.write(wrapper_obj+'\n')
# make executable
shutil.copymode(repo_exe_abs_path + self.backup_ext, repo_exe_abs_path)
# cleanup data
self.cleanup_logs(exe_path_map)
#~ def install_wrapper()
[docs] def uninstall_wrapper(self, exe_path_map):
repo_exe_abs_path, _ = self._get_repo_run_path_pairs(exe_path_map)[0]
# unset run exe
shutil.move(repo_exe_abs_path + self.backup_ext, repo_exe_abs_path)
# small cleanup
if os.path.isfile(repo_exe_abs_path + self.used_ext):
os.remove(repo_exe_abs_path + self.used_ext)
# cleanup data
self.cleanup_logs(exe_path_map)
#~ def uninstall_wrapper()
#~ class BaseSystemWrapper