"""
Run
---
The run functions are used to run mutation trials from the CLI. These can be used directly
for other customized running requirements. The ``Config`` data-class defines the running
parameters for the full trial suite. Sampling functions are defined here as well.
"""
import importlib
import itertools
import logging
import multiprocessing
import os
import random
import subprocess
import shutil
import sys
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from operator import attrgetter
from pathlib import Path
from typing import Any, Callable, List, NamedTuple, Optional
from mutatest import cache
from mutatest.api import Genome, GenomeGroup, GenomeGroupTarget
from mutatest.filters import CategoryCodeFilter
from mutatest.transformers import CATEGORIES, LocIndex
LOGGER = logging.getLogger(__name__)
# Additional seconds to add to max_timeout in the multi-processing subprocess
MULTI_PROC_TIMEOUT_BUFFER = 10 # seconds
# location to hold parallel pycache runs
PARALLEL_PYCACHE_DIR = Path(".mutatest_cache")
[docs]@dataclass
class Config:
"""Run configuration used for mutation trials."""
n_locations: int = 0
exclude_files: List[Path] = field(default_factory=list)
filter_codes: List[str] = field(default_factory=list)
random_seed: Optional[int] = None
break_on_survival: bool = False
break_on_detected: bool = False
break_on_error: bool = False
break_on_unknown: bool = False
break_on_timeout: bool = False
ignore_coverage: bool = False
max_runtime: float = 10
multi_processing: bool = False
[docs]class MutantReport(NamedTuple):
"""Pickleable reporting mutant object for multiprocessing collection."""
src_file: Path
src_idx: LocIndex
mutation: Any
[docs]class MutantTrialResult(NamedTuple):
"""Mutant trial result to encode return_code status with mutation information."""
mutant: MutantReport
return_code: int
@property
def status(self) -> str:
"""Based on pytest return codes"""
trial_status = {0: "SURVIVED", 1: "DETECTED", 2: "ERROR", 3: "TIMEOUT"}
return trial_status.get(self.return_code, "UNKNOWN")
[docs]class ResultsSummary(NamedTuple):
"""Results summary container."""
results: List[MutantTrialResult]
n_locs_mutated: int
n_locs_identified: int
total_runtime: timedelta
[docs]class BaselineTestException(Exception):
"""Used as an exception for the clean trial runs."""
pass
# Used to define signature of trial runners for dispatcher
TRIAL_RUNNER_TYPE = Callable[[Genome, LocIndex, Any, List[str], float], MutantTrialResult]
####################################################################################################
# UTILITY FUNCTIONS
####################################################################################################
[docs]def colorize_output(output: str, color: str) -> str:
"""Color output for the terminal display as either red or green.
Args:
output: string to colorize
color: choice of terminal color, "red" vs. "green"
Returns:
colorized string, or original string for bad color choice.
"""
colors = {
"red": f"\033[91m{output}\033[0m", # Red text
"green": f"\033[92m{output}\033[0m", # Green text
"yellow": f"\033[93m{output}\033[0m", # Yellow text
"blue": f"\033[94m{output}\033[0m", # Blue text
}
return colors.get(color, output)
[docs]def capture_output(log_level: int) -> bool:
"""Utility function used in subprocess for captured output.
Available log levels are: https://docs.python.org/3/library/logging.html#levels
10 is the value for Debug, so if it's not "DEBUG", return true and capture output.
Args:
log_level: the logging level
Returns:
Bool indicator on capturing output
"""
return log_level != 10
####################################################################################################
# CLEAN TRIAL RUNNING FUNCTIONS
####################################################################################################
[docs]def clean_trial(src_loc: Path, test_cmds: List[str]) -> timedelta:
"""Remove all existing cache files and run the test suite.
Args:
src_loc: the directory of the package for cache removal, may be a file
test_cmds: test running commands for subprocess.run()
Returns:
None
Raises:
BaselineTestException: if the clean trial does not pass from the test run.
"""
cache.remove_existing_cache_files(src_loc)
LOGGER.info("Running clean trial")
# clean trial will show output all the time for diagnostic purposes
start = datetime.now()
clean_run = subprocess.run(test_cmds, capture_output=False)
end = datetime.now()
if clean_run.returncode != 0:
raise BaselineTestException(
f"Clean trial does not pass, mutant tests will be meaningless.\n"
f"Output: {str(clean_run.stdout)}"
)
return end - start
####################################################################################################
# MUTATION SAMPLE GENERATION
####################################################################################################
[docs]def get_sample(ggrp: GenomeGroup, ignore_coverage: bool) -> List[GenomeGroupTarget]:
"""Get the sample space for the mutation trials.
This will attempt to use covered-targets as the default unless ``ignore_coverage`` is set
to True. If the set .coverage file is not found then the total targets are returned instead.
Args:
ggrp: the Genome Group to generate the sample space of targets
ignore_coverage: flag to ignore coverage if present
Returns:
Sorted list of Path-LocIndex pairs as complete sample space from the ``GenomeGroup``.
"""
if ignore_coverage:
LOGGER.info("Ignoring coverage file for sample space creation.")
try:
sample = ggrp.targets if ignore_coverage else ggrp.covered_targets
except FileNotFoundError:
LOGGER.info("Coverage file does not exist, proceeding to sample from all targets.")
sample = ggrp.targets
# sorted list used for repeat trials using random seed instead of set
sort_by_keys = attrgetter(
"source_path",
"loc_idx.lineno",
"loc_idx.col_offset",
"loc_idx.end_lineno",
"loc_idx.end_col_offset",
)
return sorted(sample, key=sort_by_keys)
[docs]def get_mutation_sample_locations(
sample_space: List[GenomeGroupTarget], n_locations: int
) -> List[GenomeGroupTarget]:
"""Create the mutation sample space and set n_locations to a correct value for reporting.
``n_locations`` will change if it is larger than the total sample_space.
Args:
sample_space: sample space to draw random locations from
n_locations: number of locations to draw
Returns:
mutation sample
"""
# set the mutation sample to the full sample space
# then if max_trials is set and less than the size of the sample space
# take a random sample without replacement
mutation_sample = sample_space
# natural Falsey evaluation of n_locations=0 requires exact None check
if n_locations <= 0:
raise ValueError("n_locations must be greater or equal to zero.")
if n_locations <= len(sample_space):
LOGGER.info(
"%s",
colorize_output(
f"Selecting {n_locations} locations from {len(sample_space)} potentials.", "green"
),
)
mutation_sample = random.sample(sample_space, k=n_locations)
else:
# set here for final reporting, though not used in rest of trial controls
LOGGER.info(
"%s",
colorize_output(
f"{n_locations} exceeds sample space, using full sample: {len(sample_space)}.",
"yellow",
),
)
return mutation_sample
[docs]def get_genome_group(src_loc: Path, config: Config) -> GenomeGroup:
"""Get the ``GenomeGroup`` based on ``src_loc`` and ``config``.
``Config`` is used to set global filter codes and exclude files on group creation.
Args:
src_loc: Path, can be directory or file
config: the running config object
Returns:
``GenomeGroup`` based on ``src_loc`` and config.
"""
ggrp = GenomeGroup()
# check if src_loc is a single file, otherwise assume it's a directory
if src_loc.is_file():
ggrp.add_file(source_file=src_loc)
else:
ggrp.add_folder(
source_folder=src_loc, exclude_files=config.exclude_files, ignore_test_files=True
)
if config.filter_codes:
LOGGER.info("Category restriction, chosen categories: %s", sorted(config.filter_codes))
ggrp.set_filter(filter_codes=config.filter_codes)
for k, genome in ggrp.items():
LOGGER.info(
"%s",
colorize_output(
f"{len(genome.targets)} mutation targets found in {genome.source_file} AST.",
"green" if len(genome.targets) > 0 else "yellow",
),
)
for e in config.exclude_files:
LOGGER.info("%s", colorize_output(f"{e.resolve()} excluded.", "yellow"))
return ggrp
####################################################################################################
# TRIAL RUNNERS
####################################################################################################
[docs]def trial_output_check_break(
trial_results: MutantTrialResult, config: Config, sample_src: Path, sample_idx: LocIndex
) -> bool:
"""Flagging function to break the mutation operations loop and output logging.
This is called within the ``run_mutation_trials`` as a utility function to determine the
break-on behavior for progression e.g., break-on-survival.
Args:
trial_results: mutation trial results
config: running configuration object
sample_src: the sample source location
sample_idx: the sample index where the mutation occurred
Returns:
Bool flag for whether or not to break the outer operations loop.
"""
@dataclass
class SwitchDatum:
status: str
break_config_attr: str
color: str
@property
def break_desc(self) -> str:
return self.break_config_attr.replace("_", " ").capitalize()
@property
def output_desc(self) -> str:
return f"Result: {self.status.capitalize()} at "
switch_data = [
SwitchDatum(status="SURVIVED", break_config_attr="break_on_survival", color="red"),
SwitchDatum(status="DETECTED", break_config_attr="break_on_detected", color="green"),
SwitchDatum(status="ERROR", break_config_attr="break_on_error", color="yellow"),
SwitchDatum(status="TIMEOUT", break_config_attr="break_on_timeout", color="yellow"),
SwitchDatum(status="UNKNOWN", break_config_attr="break_on_unknown", color="yellow"),
]
for switch_type in switch_data:
if trial_results.status == switch_type.status:
LOGGER.info(
"%s",
colorize_output(
(
f"{switch_type.output_desc}"
f"{sample_src}: ({sample_idx.lineno}, {sample_idx.col_offset})"
),
switch_type.color,
),
)
if getattr(config, switch_type.break_config_attr, False):
LOGGER.info(
"%s",
colorize_output(
f"{switch_type.break_desc}: stopping further mutations at location.",
switch_type.color,
),
)
return True
return False
[docs]def create_mutation_run_trial(
genome: Genome, target_idx: LocIndex, mutation_op: Any, test_cmds: List[str], max_runtime: float
) -> MutantTrialResult:
"""Run a single mutation trial by creating a new mutated cache file, running the
test commands, and then removing the mutated cache file.
Args:
genome: the genome to mutate
target_idx: the mutation location
mutation_op: the mutation operation
test_cmds: the test commands to execute with the mutated code
max_runtime: timeout for the trial
Returns:
The mutation trial result
"""
LOGGER.debug("Running trial for %s", mutation_op)
mutant = genome.mutate(target_idx, mutation_op, write_cache=True)
try:
mutant_trial = subprocess.run(
test_cmds,
capture_output=capture_output(LOGGER.getEffectiveLevel()),
timeout=max_runtime,
)
return_code = mutant_trial.returncode
except subprocess.TimeoutExpired:
return_code = 3
cache.remove_existing_cache_files(mutant.src_file)
return MutantTrialResult(
mutant=MutantReport(
src_file=mutant.src_file, src_idx=mutant.src_idx, mutation=mutant.mutation
),
return_code=return_code,
)
[docs]def create_mutation_run_parallelcache_trial(
genome: Genome, target_idx: LocIndex, mutation_op: Any, test_cmds: List[str], max_runtime: float
) -> MutantTrialResult:
"""Similar to run.create_mutation_run_trial() but using the parallel cache directory settings.
This function requires Python 3.8 and does not run with Python 3.7. Importantly, it has the
identical signature to run.create_mutation_run_trial() and is substituted in the
run.mutation_sample_dispatch().
Args:
genome: the genome to mutate
target_idx: the mutation location
mutation_op: the mutation operation
test_cmds: the test commands to execute with the mutated code
max_runtime: timeout for the subprocess trial
Returns:
MutantTrialResult
Raises:
EnvironmentError: if Python version is less than 3.8
"""
if sys.version_info < (3, 8):
raise EnvironmentError("Python 3.8 is required to use PYTHONPYCACHEPREFIX.")
# Note in coverage reports this shows as untested code due to the subprocess dispatching
# the 'slow' tests in `test_run.py` cover this.
cache.check_cache_invalidation_mode()
# create the mutant without writing the cache
mutant = genome.mutate(target_idx, mutation_op, write_cache=False)
# set up parallel cache structure
parallel_cache = Path.cwd() / PARALLEL_PYCACHE_DIR / uuid.uuid4().hex
resolved_source_parts = genome.source_file.resolve().parent.parts[1:] # type: ignore
parallel_cfile = parallel_cache.joinpath(*resolved_source_parts) / mutant.cfile.name
bytecode = importlib._bootstrap_external._code_to_timestamp_pyc( # type: ignore
mutant.mutant_code, mutant.source_stats["mtime"], mutant.source_stats["size"]
)
LOGGER.debug("Writing parallel mutant cache file: %s", parallel_cfile)
cache.create_cache_dirs(parallel_cfile)
importlib._bootstrap_external._write_atomic( # type: ignore
parallel_cfile, bytecode, mutant.mode,
)
copy_env = os.environ.copy()
copy_env["PYTHONPYCACHEPREFIX"] = str(parallel_cache)
try:
mutant_trial = subprocess.run(
test_cmds,
env=copy_env,
capture_output=capture_output(LOGGER.getEffectiveLevel()),
timeout=max_runtime + MULTI_PROC_TIMEOUT_BUFFER,
)
return_code = mutant_trial.returncode
except subprocess.TimeoutExpired:
return_code = 3
LOGGER.debug("Removing parallel cache file: %s", parallel_cache.parts[-1])
shutil.rmtree(parallel_cache)
return MutantTrialResult(
mutant=MutantReport(
src_file=mutant.src_file, src_idx=mutant.src_idx, mutation=mutant.mutation
),
return_code=return_code,
)
####################################################################################################
# DISPATCH AND TRIAL CONTROLS
####################################################################################################
[docs]def mutation_sample_dispatch(
ggrp_target: GenomeGroupTarget,
ggrp: GenomeGroup,
test_cmds: List[str],
config: Config,
trial_runner: TRIAL_RUNNER_TYPE,
) -> List[MutantTrialResult]:
"""Dispatch for the mutant trial.
This is fed either from a loop across GenomeGroupTargets, or through a multi-processing pool
using the starmap_async.
Args:
ggrp_target: The target index and source object
ggrp: the GenomeGroup
test_cmds: test commands to execute
config: running config object
trial_runner: function callable either for single or multi-processing execution
Returns:
MutantTrialResult
"""
# Select the valid mutations for the ggrp_target.loc_idx (sample)
# Then apply the selected mutations in a random order running the test commands
# until all mutations are tested or the appropriate break-on action occurs
results: List[MutantTrialResult] = []
LOGGER.info(
"Current target location: %s, %s", ggrp_target.source_path.name, ggrp_target.loc_idx
)
op_code = CATEGORIES[ggrp_target.loc_idx.ast_class]
mutant_operations = CategoryCodeFilter(codes=(op_code,)).valid_mutations
LOGGER.debug("MUTATION OPS: %s", mutant_operations)
LOGGER.debug("MUTATION: %s", ggrp_target.loc_idx)
mutant_operations.remove(ggrp_target.loc_idx.op_type)
while mutant_operations:
# random.choice doesn't support sets, but sample of 1 produces a list with one element
current_mutation = random.sample(mutant_operations, k=1)[0]
mutant_operations.remove(current_mutation)
trial_results = trial_runner(
ggrp[ggrp_target.source_path],
ggrp_target.loc_idx,
current_mutation,
test_cmds,
config.max_runtime,
)
results.append(trial_results)
# will log output results to console, and flag to break while loop of operations
if trial_output_check_break(
trial_results, config, ggrp_target.source_path, ggrp_target.loc_idx
):
break
return results
[docs]def run_mutation_trials(src_loc: Path, test_cmds: List[str], config: Config) -> ResultsSummary:
"""This is the main function for running the mutation trials.
It will cycle through creation of the GenomeGroups from the source location, selecting the
mutation sample based on the config settings, and executing the mutation trials using the
test commands. This function does not include a clean-trial, it only runs the
mutation trials.
Args:
src_loc: the source location path for mutation
test_cmds: the test commands to execute
config: the running config object
Returns:
``ResultsSummary`` object of the mutation trials.
"""
start = datetime.now()
# Create a GenomeGroup from the source-location with config flags
ggrp = get_genome_group(src_loc, config)
# Sample setup
LOGGER.info("Setting random.seed to: %s", config.random_seed)
random.seed(a=config.random_seed)
sample_space = get_sample(ggrp, config.ignore_coverage)
LOGGER.info("Total sample space size: %s", len(sample_space))
mutation_sample = get_mutation_sample_locations(sample_space, config.n_locations)
# Run trials through mutations
LOGGER.info("Starting individual mutation trials!")
results: List[MutantTrialResult] = []
if sys.version_info >= (3, 8) and config.multi_processing:
LOGGER.info("Running parallel (multi-processor) dispatch mode. CPUs: %s", os.cpu_count())
with multiprocessing.Pool() as pool:
mp_results = pool.starmap_async(
mutation_sample_dispatch,
itertools.product(
mutation_sample, # map each mutation_sample item as a tuple with other args
[ggrp],
[test_cmds],
[config],
[create_mutation_run_parallelcache_trial],
),
)
# mp_results.get() will be list of single item lists e.g., [[1], [2], [3]]
# this unpacks to to be a flat list [1, 2, 3]
results = [i for j in mp_results.get() for i in j]
else:
LOGGER.info("Running serial (single processor) dispatch mode.")
for ggrp_target in mutation_sample:
results.extend(
mutation_sample_dispatch(
ggrp_target=ggrp_target,
ggrp=ggrp,
test_cmds=test_cmds,
config=config,
trial_runner=create_mutation_run_trial,
)
)
end = datetime.now()
if PARALLEL_PYCACHE_DIR.exists():
# The subfolders should be deleted as trials proceed making this directory empty
LOGGER.info("Cleaning up parallel cache dir %s.", str(PARALLEL_PYCACHE_DIR))
try:
PARALLEL_PYCACHE_DIR.rmdir()
except OSError:
LOGGER.info("%s is not empty and cannot be removed.", str(PARALLEL_PYCACHE_DIR))
return ResultsSummary(
results=results,
n_locs_mutated=len(mutation_sample),
n_locs_identified=len(sample_space),
total_runtime=end - start,
)