"""Orchestrate the execution of all experiments.
The orchestrator is responsible for scheduling experiments specified in the
user-provided settings.
"""
from __future__ import division
import time
import collections
import multiprocessing as mp
import logging
import copy
import sys
import signal
import traceback
from icarus.execution import exec_experiment
from icarus.registry import TOPOLOGY_FACTORY, CACHE_PLACEMENT, CONTENT_PLACEMENT, \
CACHE_POLICY, WORKLOAD, DATA_COLLECTOR, STRATEGY
from icarus.results import ResultSet
from icarus.util import SequenceNumber, timestr
__all__ = ['Orchestrator', 'run_scenario']
logger = logging.getLogger('orchestration')
[docs]class Orchestrator(object):
"""Orchestrator.
It is responsible for orchestrating the execution of all experiments and
aggregate results.
"""
def __init__(self, settings, summary_freq=4):
"""Constructor
Parameters
----------
settings : Settings
The settings of the simulator
summary_freq : int
Frequency (in number of experiment) at which summary messages
are displayed
"""
self.settings = settings
self.results = ResultSet()
self.seq = SequenceNumber()
self.exp_durations = collections.deque(maxlen=30)
self.n_success = 0
self.n_fail = 0
self.summary_freq = summary_freq
self._stop = False
if self.settings.PARALLEL_EXECUTION:
self.pool = mp.Pool(settings.N_PROCESSES)
[docs] def stop(self):
"""Stop the execution of the orchestrator
"""
logger.info('Orchestrator is stopping')
self._stop = True
if self.settings.PARALLEL_EXECUTION:
self.pool.terminate()
self.pool.join()
[docs] def run(self):
"""Run the orchestrator.
This call is blocking, whether multiple processes are used or not. This
methods returns only after all experiments are executed.
"""
# Create queue of experiment configurations
queue = collections.deque(self.settings.EXPERIMENT_QUEUE)
# Calculate number of experiments and number of processes
self.n_exp = len(queue) * self.settings.N_REPLICATIONS
self.n_proc = self.settings.N_PROCESSES \
if self.settings.PARALLEL_EXECUTION \
else 1
logger.info('Starting simulations: %d experiments, %d process(es)'
% (self.n_exp, self.n_proc))
if self.settings.PARALLEL_EXECUTION:
# Starting from Python 3.2, multiprocessing.Pool.apply_async
# accepts a new error_callback argument that is a callable for
# returning a message when uncaught errors are thrown.
# The following lines ensure compatibility with Python < 3.2
callbacks = {"callback": self.experiment_callback}
if sys.version_info > (3, 2):
callbacks["error_callback"] = self.error_callback
# This job queue is used only to keep track of which jobs have
# finished and which are still running. Currently this information
# is used only to handle keyboard interrupts correctly
job_queue = collections.deque()
# Schedule experiments from the queue
while queue:
experiment = queue.popleft()
for _ in range(self.settings.N_REPLICATIONS):
job_queue.append(self.pool.apply_async(run_scenario,
args=(self.settings, experiment,
self.seq.assign(), self.n_exp),
**callbacks))
self.pool.close()
# This solution is probably not optimal, but at least makes
# KeyboardInterrupt work fine, which is crucial if launching the
# simulation remotely via screen.
# What happens here is that we keep waiting for possible
# KeyboardInterrupts till the last process terminates successfully.
# We may have to wait up to 5 seconds after the last process
# terminates before exiting, which is really negligible
try:
while job_queue:
job = job_queue.popleft()
while not job.ready():
time.sleep(5)
except KeyboardInterrupt:
self.pool.terminate()
self.pool.join()
else: # Single-process execution
while queue:
experiment = queue.popleft()
for _ in range(self.settings.N_REPLICATIONS):
self.experiment_callback(run_scenario(self.settings,
experiment, self.seq.assign(),
self.n_exp))
if self._stop:
self.stop()
logger.info('END | Planned: %d, Completed: %d, Succeeded: %d, Failed: %d',
self.n_exp, self.n_fail + self.n_success, self.n_success, self.n_fail)
[docs] def error_callback(self, msg):
"""Callback method called in case of error in Python > 3.2
Parameters
----------
msg : string
Error message
"""
logger.error("FAILURE | Experiment failed: {}".format(msg))
self.n_fail += 1
[docs] def experiment_callback(self, args):
"""Callback method called by run_scenario
Parameters
----------
args : tuple
Tuple of arguments
"""
# If args is None, that means that an exception was raised during the
# execution of the experiment. In such case, ignore it
if not args:
self.n_fail += 1
return
# Extract parameters
params, results, duration = args
self.n_success += 1
# Store results
self.results.add(params, results)
self.exp_durations.append(duration)
if self.n_success % self.summary_freq == 0:
# Number of experiments scheduled to be executed
n_scheduled = self.n_exp - (self.n_fail + self.n_success)
# Compute ETA
n_cores = min(mp.cpu_count(), self.n_proc)
mean_duration = sum(self.exp_durations) / len(self.exp_durations)
eta = timestr(n_scheduled * mean_duration / n_cores, False)
# Print summary
logger.info('SUMMARY | Completed: %d, Failed: %d, Scheduled: %d, ETA: %s',
self.n_success, self.n_fail, n_scheduled, eta)
[docs]def run_scenario(settings, params, curr_exp, n_exp):
"""Run a single scenario experiment
Parameters
----------
settings : Settings
The simulator settings
params : Tree
experiment parameters tree
curr_exp : int
sequence number of the experiment
n_exp : int
Number of scheduled experiments
Returns
-------
results : 3-tuple
A (params, results, duration) 3-tuple. The first element is a dictionary
which stores all the attributes of the experiment. The second element
is a dictionary which stores the results. The third element is an
integer expressing the wall-clock duration of the experiment (in
seconds)
"""
try:
start_time = time.time()
proc_name = mp.current_process().name
logger = logging.getLogger('runner-%s' % proc_name)
# Get list of metrics required
metrics = settings.DATA_COLLECTORS
# Copy parameters so that they can be manipulated
tree = copy.deepcopy(params)
# Set topology
topology_spec = tree['topology']
topology_name = topology_spec.pop('name')
if topology_name not in TOPOLOGY_FACTORY:
logger.error('No topology factory implementation for %s was found.'
% topology_name)
return None
topology = TOPOLOGY_FACTORY[topology_name](**topology_spec)
workload_spec = tree['workload']
workload_name = workload_spec.pop('name')
if workload_name not in WORKLOAD:
logger.error('No workload implementation named %s was found.'
% workload_name)
return None
workload = WORKLOAD[workload_name](topology, **workload_spec)
# Assign caches to nodes
if 'cache_placement' in tree:
cachepl_spec = tree['cache_placement']
cachepl_name = cachepl_spec.pop('name')
if cachepl_name not in CACHE_PLACEMENT:
logger.error('No cache placement named %s was found.'
% cachepl_name)
return None
network_cache = cachepl_spec.pop('network_cache')
# Cache budget is the cumulative number of cache entries across
# the whole network
cachepl_spec['cache_budget'] = workload.n_contents * network_cache
CACHE_PLACEMENT[cachepl_name](topology, **cachepl_spec)
# Assign contents to sources
# If there are many contents, after doing this, performing operations
# requiring a topology deep copy, i.e. to_directed/undirected, will
# take long.
contpl_spec = tree['content_placement']
contpl_name = contpl_spec.pop('name')
if contpl_name not in CONTENT_PLACEMENT:
logger.error('No content placement implementation named %s was found.'
% contpl_name)
return None
CONTENT_PLACEMENT[contpl_name](topology, workload.contents, **contpl_spec)
# caching and routing strategy definition
strategy = tree['strategy']
if strategy['name'] not in STRATEGY:
logger.error('No implementation of strategy %s was found.' % strategy['name'])
return None
# cache eviction policy definition
cache_policy = tree['cache_policy']
if cache_policy['name'] not in CACHE_POLICY:
logger.error('No implementation of cache policy %s was found.' % cache_policy['name'])
return None
# Configuration parameters of network model
netconf = tree['netconf']
# Text description of the scenario run to print on screen
scenario = tree['desc'] if 'desc' in tree else "Description N/A"
logger.info('Experiment %d/%d | Preparing scenario: %s', curr_exp, n_exp, scenario)
if any(m not in DATA_COLLECTOR for m in metrics):
logger.error('There are no implementations for at least one data collector specified')
return None
collectors = {m: {} for m in metrics}
logger.info('Experiment %d/%d | Start simulation', curr_exp, n_exp)
results = exec_experiment(topology, workload, netconf, strategy, cache_policy, collectors)
duration = time.time() - start_time
logger.info('Experiment %d/%d | End simulation | Duration %s.',
curr_exp, n_exp, timestr(duration, True))
return (params, results, duration)
except KeyboardInterrupt:
logger.error('Received keyboard interrupt. Terminating')
sys.exit(-signal.SIGINT)
except Exception as e:
err_type = str(type(e)).split("'")[1].split(".")[1]
err_message = e.message
logger.error('Experiment %d/%d | Failed | %s: %s\n%s',
curr_exp, n_exp, err_type, err_message,
traceback.format_exc())