Source code for icarus.execution.collectors

"""Performance metrics loggers

This module contains all data collectors that record events while simulations
are being executed and compute performance metrics.

Currently implemented data collectors allow users to measure cache hit ratio,
latency, path stretch and link load.

To create a new data collector, it is sufficient to create a new class
inheriting from the `DataCollector` class and override all required methods.
"""
from __future__ import division
import collections

from icarus.registry import register_data_collector
from icarus.tools import cdf
from icarus.util import Tree, inheritdoc


__all__ = [
    'DataCollector',
    'CollectorProxy',
    'CacheHitRatioCollector',
    'LinkLoadCollector',
    'LatencyCollector',
    'PathStretchCollector',
    'DummyCollector'
           ]


[docs]class DataCollector(object): """Object collecting notifications about simulation events and measuring relevant metrics. """ def __init__(self, view, **params): """Constructor Parameters ---------- view : NetworkView An instance of the network view params : keyworded parameters Collector parameters """ self.view = view
[docs] def start_session(self, timestamp, receiver, content): """Notifies the collector that a new network session started. A session refers to the retrieval of a content from a receiver, from the issuing of a content request to the delivery of the content. Parameters ---------- timestamp : int The timestamp of the event receiver : any hashable type The receiver node requesting a content content : any hashable type The content identifier requested by the receiver """ pass
[docs] def cache_hit(self, node): """Reports that the requested content has been served by the cache at node *node*. Parameters ---------- node : any hashable type The node whose cache served the content """ pass
[docs] def cache_miss(self, node): """Reports that the cache at node *node* has been looked up for requested content but there was a cache miss. Parameters ---------- node : any hashable type The node whose cache served the content """ pass
[docs] def server_hit(self, node): """Reports that the requested content has been served by the server at node *node*. Parameters ---------- node : any hashable type The server node which served the content """ pass
[docs] def request_hop(self, u, v, main_path=True): """Reports that a request has traversed the link *(u, v)* Parameters ---------- u : any hashable type Origin node v : any hashable type Destination node main_path : bool, optional If *True*, indicates that link link is on the main path that will lead to hit a content. It is normally used to calculate latency correctly in multicast cases. Default value is *True* """ pass
[docs] def content_hop(self, u, v, main_path=True): """Reports that a content has traversed the link *(u, v)* Parameters ---------- u : any hashable type Origin node v : any hashable type Destination node main_path : bool, optional If *True*, indicates that this link is being traversed by content that will be delivered to the receiver. This is needed to calculate latency correctly in multicast cases. Default value is *True* """ pass
[docs] def end_session(self, success=True): """Reports that the session is closed, i.e. the content has been successfully delivered to the receiver or a failure blocked the execution of the request Parameters ---------- success : bool, optional *True* if the session was completed successfully, *False* otherwise """ pass
[docs] def results(self): """Returns the aggregated results measured by the collector. Returns ------- results : dict Dictionary mapping metric with results. """ pass
# Note: The implementation of CollectorProxy could be improved to avoid having # to rewrite almost identical methods, for example by playing with __dict__ # attribute. However, it was implemented this way to make it more readable and # easier to understand.
[docs]class CollectorProxy(DataCollector): """This class acts as a proxy for all concrete collectors towards the network controller. An instance of this class registers itself with the network controller and it receives notifications for all events. This class is responsible for dispatching events of interests to concrete collectors. """ EVENTS = ('start_session', 'end_session', 'cache_hit', 'cache_miss', 'server_hit', 'request_hop', 'content_hop', 'results') def __init__(self, view, collectors): """Constructor Parameters ---------- view : NetworkView An instance of the network view collector : list of DataCollector List of instances of DataCollector that will be notified of events """ self.view = view self.collectors = {e: [c for c in collectors if e in type(c).__dict__] for e in self.EVENTS}
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): for c in self.collectors['start_session']: c.start_session(timestamp, receiver, content)
[docs] @inheritdoc(DataCollector) def cache_hit(self, node): for c in self.collectors['cache_hit']: c.cache_hit(node)
[docs] @inheritdoc(DataCollector) def cache_miss(self, node): for c in self.collectors['cache_miss']: c.cache_miss(node)
[docs] @inheritdoc(DataCollector) def server_hit(self, node): for c in self.collectors['server_hit']: c.server_hit(node)
[docs] @inheritdoc(DataCollector) def request_hop(self, u, v, main_path=True): for c in self.collectors['request_hop']: c.request_hop(u, v, main_path)
[docs] @inheritdoc(DataCollector) def content_hop(self, u, v, main_path=True): for c in self.collectors['content_hop']: c.content_hop(u, v, main_path)
[docs] @inheritdoc(DataCollector) def end_session(self, success=True): for c in self.collectors['end_session']: c.end_session(success)
[docs] @inheritdoc(DataCollector) def results(self): return Tree(**{c.name: c.results() for c in self.collectors['results']})
[docs]@register_data_collector('LINK_LOAD') class LinkLoadCollector(DataCollector): """Data collector measuring the link load """ def __init__(self, view, req_size=150, content_size=1500): """Constructor Parameters ---------- view : NetworkView The network view instance req_size : int Average size (in bytes) of a request content_size : int Average size (in byte) of a content """ self.view = view self.req_count = collections.defaultdict(int) self.cont_count = collections.defaultdict(int) if req_size <= 0 or content_size <= 0: raise ValueError('req_size and content_size must be positive') self.req_size = req_size self.content_size = content_size self.t_start = -1 self.t_end = 1
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): if self.t_start < 0: self.t_start = timestamp self.t_end = timestamp
[docs] @inheritdoc(DataCollector) def request_hop(self, u, v, main_path=True): self.req_count[(u, v)] += 1
[docs] @inheritdoc(DataCollector) def content_hop(self, u, v, main_path=True): self.cont_count[(u, v)] += 1
[docs] @inheritdoc(DataCollector) def results(self): duration = self.t_end - self.t_start used_links = set(self.req_count.keys()).union(set(self.cont_count.keys())) link_loads = dict((link, (self.req_size * self.req_count[link] + self.content_size * self.cont_count[link]) / duration) for link in used_links) link_loads_int = dict((link, load) for link, load in link_loads.items() if self.view.link_type(*link) == 'internal') link_loads_ext = dict((link, load) for link, load in link_loads.items() if self.view.link_type(*link) == 'external') mean_load_int = sum(link_loads_int.values()) / len(link_loads_int) \ if len(link_loads_int) > 0 else 0 mean_load_ext = sum(link_loads_ext.values()) / len(link_loads_ext) \ if len(link_loads_ext) > 0 else 0 return Tree({'MEAN_INTERNAL': mean_load_int, 'MEAN_EXTERNAL': mean_load_ext, 'PER_LINK_INTERNAL': link_loads_int, 'PER_LINK_EXTERNAL': link_loads_ext})
[docs]@register_data_collector('LATENCY') class LatencyCollector(DataCollector): """Data collector measuring latency, i.e. the delay taken to delivery a content. """ def __init__(self, view, cdf=False): """Constructor Parameters ---------- view : NetworkView The network view instance cdf : bool, optional If *True*, also collects a cdf of the latency """ self.cdf = cdf self.view = view self.req_latency = 0.0 self.sess_count = 0 self.latency = 0.0 if cdf: self.latency_data = collections.deque()
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): self.sess_count += 1 self.sess_latency = 0.0
[docs] @inheritdoc(DataCollector) def request_hop(self, u, v, main_path=True): if main_path: self.sess_latency += self.view.link_delay(u, v)
[docs] @inheritdoc(DataCollector) def content_hop(self, u, v, main_path=True): if main_path: self.sess_latency += self.view.link_delay(u, v)
[docs] @inheritdoc(DataCollector) def end_session(self, success=True): if not success: return if self.cdf: self.latency_data.append(self.sess_latency) self.latency += self.sess_latency
[docs] @inheritdoc(DataCollector) def results(self): results = Tree({'MEAN': self.latency / self.sess_count}) if self.cdf: results['CDF'] = cdf(self.latency_data) return results
[docs]@register_data_collector('CACHE_HIT_RATIO') class CacheHitRatioCollector(DataCollector): """Collector measuring the cache hit ratio, i.e. the portion of content requests served by a cache. """ def __init__(self, view, off_path_hits=False, per_node=True, content_hits=False): """Constructor Parameters ---------- view : NetworkView The NetworkView instance off_path_hits : bool, optional If *True* also records cache hits from caches not on located on the shortest path. This metric may be relevant only for some strategies content_hits : bool, optional If *True* also records cache hits per content instead of just globally """ self.view = view self.off_path_hits = off_path_hits self.per_node = per_node self.cont_hits = content_hits self.sess_count = 0 self.cache_hits = 0 self.serv_hits = 0 if off_path_hits: self.off_path_hit_count = 0 if per_node: self.per_node_cache_hits = collections.defaultdict(int) self.per_node_server_hits = collections.defaultdict(int) if content_hits: self.curr_cont = None self.cont_cache_hits = collections.defaultdict(int) self.cont_serv_hits = collections.defaultdict(int)
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): self.sess_count += 1 if self.off_path_hits: source = self.view.content_source(content) self.curr_path = self.view.shortest_path(receiver, source) if self.cont_hits: self.curr_cont = content
[docs] @inheritdoc(DataCollector) def cache_hit(self, node): self.cache_hits += 1 if self.off_path_hits and node not in self.curr_path: self.off_path_hit_count += 1 if self.cont_hits: self.cont_cache_hits[self.curr_cont] += 1 if self.per_node: self.per_node_cache_hits[node] += 1
[docs] @inheritdoc(DataCollector) def server_hit(self, node): self.serv_hits += 1 if self.cont_hits: self.cont_serv_hits[self.curr_cont] += 1 if self.per_node: self.per_node_server_hits[node] += 1
[docs] @inheritdoc(DataCollector) def results(self): n_sess = self.cache_hits + self.serv_hits hit_ratio = self.cache_hits / n_sess results = Tree(**{'MEAN': hit_ratio}) if self.off_path_hits: results['MEAN_OFF_PATH'] = self.off_path_hit_count / n_sess results['MEAN_ON_PATH'] = results['MEAN'] - results['MEAN_OFF_PATH'] if self.cont_hits: cont_set = set(list(self.cont_cache_hits.keys()) + list(self.cont_serv_hits.keys())) cont_hits = dict((i, (self.cont_cache_hits[i] / (self.cont_cache_hits[i] + self.cont_serv_hits[i]))) for i in cont_set) results['PER_CONTENT'] = cont_hits if self.per_node: for v in self.per_node_cache_hits: self.per_node_cache_hits[v] /= n_sess for v in self.per_node_server_hits: self.per_node_server_hits[v] /= n_sess results['PER_NODE_CACHE_HIT_RATIO'] = self.per_node_cache_hits results['PER_NODE_SERVER_HIT_RATIO'] = self.per_node_server_hits return results
[docs]@register_data_collector('PATH_STRETCH') class PathStretchCollector(DataCollector): """Collector measuring the path stretch, i.e. the ratio between the actual path length and the shortest path length. """ def __init__(self, view, cdf=False): """Constructor Parameters ---------- view : NetworkView The network view instance cdf : bool, optional If *True*, also collects a cdf of the path stretch """ self.view = view self.cdf = cdf self.req_path_len = collections.defaultdict(int) self.cont_path_len = collections.defaultdict(int) self.sess_count = 0 self.mean_req_stretch = 0.0 self.mean_cont_stretch = 0.0 self.mean_stretch = 0.0 if self.cdf: self.req_stretch_data = collections.deque() self.cont_stretch_data = collections.deque() self.stretch_data = collections.deque()
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): self.receiver = receiver self.source = self.view.content_source(content) self.req_path_len = 0 self.cont_path_len = 0 self.sess_count += 1
[docs] @inheritdoc(DataCollector) def request_hop(self, u, v, main_path=True): self.req_path_len += 1
[docs] @inheritdoc(DataCollector) def content_hop(self, u, v, main_path=True): self.cont_path_len += 1
[docs] @inheritdoc(DataCollector) def end_session(self, success=True): if not success: return req_sp_len = len(self.view.shortest_path(self.receiver, self.source)) cont_sp_len = len(self.view.shortest_path(self.source, self.receiver)) req_stretch = self.req_path_len / req_sp_len cont_stretch = self.cont_path_len / cont_sp_len stretch = (self.req_path_len + self.cont_path_len) / (req_sp_len + cont_sp_len) self.mean_req_stretch += req_stretch self.mean_cont_stretch += cont_stretch self.mean_stretch += stretch if self.cdf: self.req_stretch_data.append(req_stretch) self.cont_stretch_data.append(cont_stretch) self.stretch_data.append(stretch)
[docs] @inheritdoc(DataCollector) def results(self): results = Tree({'MEAN': self.mean_stretch / self.sess_count, 'MEAN_REQUEST': self.mean_req_stretch / self.sess_count, 'MEAN_CONTENT': self.mean_cont_stretch / self.sess_count}) if self.cdf: results['CDF'] = cdf(self.stretch_data) results['CDF_REQUEST'] = cdf(self.req_stretch_data) results['CDF_CONTENT'] = cdf(self.cont_stretch_data) return results
[docs]@register_data_collector('DUMMY') class DummyCollector(DataCollector): """Dummy collector to be used for test cases only.""" def __init__(self, view): """Constructor Parameters ---------- view : NetworkView The network view instance output : stream Stream on which debug collector writes """ self.view = view
[docs] @inheritdoc(DataCollector) def start_session(self, timestamp, receiver, content): self.session = dict(timestamp=timestamp, receiver=receiver, content=content, cache_misses=[], request_hops=[], content_hops=[])
[docs] @inheritdoc(DataCollector) def cache_hit(self, node): self.session['serving_node'] = node
[docs] @inheritdoc(DataCollector) def cache_miss(self, node): self.session['cache_misses'].append(node)
[docs] @inheritdoc(DataCollector) def server_hit(self, node): self.session['serving_node'] = node
[docs] @inheritdoc(DataCollector) def request_hop(self, u, v, main_path=True): self.session['request_hops'].append((u, v))
[docs] @inheritdoc(DataCollector) def content_hop(self, u, v, main_path=True): self.session['content_hops'].append((u, v))
[docs] @inheritdoc(DataCollector) def end_session(self, success=True): self.session['success'] = success
[docs] def session_summary(self): """Return a summary of latest session Returns ------- session : dict Summary of session """ return self.session