Source code for icarus.scenarios.cacheplacement
"""Cache placement strategies
This module provides algorithms for performing cache placement, i.e., given
a cumulative cache size and a topology where each possible node candidate is
labelled, these functions deploy caching space to the nodes of the topology.
"""
from __future__ import division
import random
import networkx as nx
from icarus.util import iround
from icarus.registry import register_cache_placement
from icarus.scenarios.algorithms import compute_clusters, compute_p_median, deploy_clusters
__all__ = [
'uniform_cache_placement',
'degree_centrality_cache_placement',
'betweenness_centrality_cache_placement',
'uniform_consolidated_cache_placement',
'random_cache_placement',
'optimal_median_cache_placement',
'optimal_hashrouting_cache_placement',
'clustered_hashrouting_cache_placement',
]
[docs]@register_cache_placement('UNIFORM')
def uniform_cache_placement(topology, cache_budget, **kwargs):
"""Places cache budget uniformly across cache nodes.
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
"""
icr_candidates = topology.graph['icr_candidates']
cache_size = iround(cache_budget / len(icr_candidates))
for v in icr_candidates:
topology.node[v]['stack'][1]['cache_size'] = cache_size
[docs]@register_cache_placement('DEGREE')
def degree_centrality_cache_placement(topology, cache_budget, **kwargs):
"""Places cache budget proportionally to the degree of the node.
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
"""
deg = nx.degree(topology)
total_deg = sum(deg.values())
icr_candidates = topology.graph['icr_candidates']
for v in icr_candidates:
topology.node[v]['stack'][1]['cache_size'] = iround(cache_budget * deg[v] / total_deg)
[docs]@register_cache_placement('BETWEENNESS_CENTRALITY')
def betweenness_centrality_cache_placement(topology, cache_budget, **kwargs):
"""Places cache budget proportionally to the betweenness centrality of the
node.
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
"""
betw = nx.betweenness_centrality(topology)
total_betw = sum(betw.values())
icr_candidates = topology.graph['icr_candidates']
for v in icr_candidates:
topology.node[v]['stack'][1]['cache_size'] = iround(cache_budget * betw[v] / total_betw)
[docs]@register_cache_placement('CONSOLIDATED')
def uniform_consolidated_cache_placement(topology, cache_budget, spread=0.5,
metric_dict=None, target='top',
**kwargs):
"""Consolidate caches in nodes with top centrality.
Differently from other cache placement strategies that place cache space
to all nodes but proportionally to their centrality, this strategy places
caches of all the same size in a set of selected nodes.
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
spread : float [0, 1], optional
The spread factor, The greater it is the more the cache budget is
spread among nodes. If it is 1, all candidate nodes are assigned a
cache, if it is 0, only the node with the highest/lowest centrality
is assigned a cache
metric_dict : dict, optional
The centrality metric according to which nodes are selected. If not
specified, betweenness centrality is selected.
target : ("top" | "bottom"), optional
The subsection of the ranked node on which to the deploy caches.
"""
if spread < 0 or spread > 1:
raise ValueError('spread factor must be between 0 and 1')
if target not in ('top', 'bottom'):
raise ValueError('target argument must be either "top" or "bottom"')
if metric_dict is None and spread < 1:
metric_dict = nx.betweenness_centrality(topology)
icr_candidates = topology.graph['icr_candidates']
if spread == 1:
target_nodes = icr_candidates
else:
nodes = sorted(icr_candidates, key=lambda k: metric_dict[k])
if target == 'top':
nodes = list(reversed(nodes))
# cutoff node must be at least one otherwise, if spread is too low, no
# nodes would be selected
cutoff = max(1, iround(spread * len(nodes)))
target_nodes = nodes[:cutoff]
cache_size = iround(cache_budget / len(target_nodes))
if cache_size == 0:
return
for v in target_nodes:
topology.node[v]['stack'][1]['cache_size'] = cache_size
[docs]@register_cache_placement('RANDOM')
def random_cache_placement(topology, cache_budget, n_cache_nodes,
seed=None, **kwargs):
"""Deploy caching nodes randomly
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
n_nodes : int
The number of caching nodes to deploy
"""
n_cache_nodes = int(n_cache_nodes)
icr_candidates = topology.graph['icr_candidates']
if len(icr_candidates) < n_cache_nodes:
raise ValueError("The number of ICR candidates is lower than the target number of caches")
elif len(icr_candidates) == n_cache_nodes:
caches = icr_candidates
else:
random.seed(seed)
caches = random.sample(icr_candidates, n_cache_nodes)
cache_size = iround(cache_budget / n_cache_nodes)
if cache_size == 0:
return
for v in caches:
topology.node[v]['stack'][1]['cache_size'] = cache_size
[docs]@register_cache_placement('OPTIMAL_MEDIAN')
def optimal_median_cache_placement(topology, cache_budget, n_cache_nodes,
hit_ratio, weight='delay', **kwargs):
"""Deploy caching nodes in locations that minimize overall latency assuming
a partitioned strategy (a la Google Global Cache). According to this, in
the network, a set of caching nodes are deployed and each receiver is
mapped to one and only one caching node. Requests from this receiver are
always sent to the designated caching node. In case of cache miss requests
are forwarded to the original source.
This placement problem can be mapped to the p-median location-allocation
problem. This function solves this problem using the vertex substitution
heuristic, which practically works like the k-medoid PAM algorithms, which
is also similar to the k-means clustering algorithm. The result is not
guaranteed to be globally optimal, only locally optimal.
Notes
-----
This placement assumes that all receivers have degree = 1 and are connected
to an ICR candidate nodes. Also, it assumes that contents are uniformly
assigned to sources.
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
n_nodes : int
The number of caching nodes to deploy
hit_ratio : float
The expected cache hit ratio of a single cache
weight : str
The weight attribute
"""
n_cache_nodes = int(n_cache_nodes)
icr_candidates = topology.graph['icr_candidates']
if len(icr_candidates) < n_cache_nodes:
raise ValueError("The number of ICR candidates (%d) is lower than "
"the target number of caches (%d)"
% (len(icr_candidates), n_cache_nodes))
elif len(icr_candidates) == n_cache_nodes:
caches = list(icr_candidates)
cache_assignment = {v: list(topology.edge[v].keys())[0]
for v in topology.receivers()}
else:
# Need to optimally allocate caching nodes
distances = nx.all_pairs_dijkstra_path_length(topology, weight=weight)
sources = topology.sources()
d = {u: {} for u in icr_candidates}
for u in icr_candidates:
source_dist = sum(distances[u][source] for source in sources) / len(sources)
for v in icr_candidates:
if v in d[u]:
d[v][u] = d[u][v]
else:
d[v][u] = distances[v][u] + (hit_ratio * source_dist)
allocation, caches, _ = compute_p_median(distances, n_cache_nodes)
cache_assignment = {v: allocation[list(topology.edge[v].keys())[0]]
for v in topology.receivers()}
cache_size = iround(cache_budget / n_cache_nodes)
if cache_size == 0:
raise ValueError("Cache budget is %d but it's too small to deploy it on %d nodes. "
"Each node will have a zero-sized cache. "
"Set a larger cache budget and try again"
% (cache_budget, n_cache_nodes))
for v in caches:
topology.node[v]['stack'][1]['cache_size'] = cache_size
topology.graph['cache_assignment'] = cache_assignment
[docs]@register_cache_placement('OPTIMAL_HASHROUTING')
def optimal_hashrouting_cache_placement(topology, cache_budget, n_cache_nodes,
hit_ratio, weight='delay', **kwargs):
"""Deploy caching nodes for hashrouting in optimized location
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
n_nodes : int
The number of caching nodes to deploy
hit_ratio : float
The expected global cache hit ratio
weight : str, optional
The weight attribute. Default is 'delay'
"""
n_cache_nodes = int(n_cache_nodes)
icr_candidates = topology.graph['icr_candidates']
if len(icr_candidates) < n_cache_nodes:
raise ValueError("The number of ICR candidates (%d) is lower than "
"the target number of caches (%d)"
% (len(icr_candidates), n_cache_nodes))
elif len(icr_candidates) == n_cache_nodes:
caches = list(icr_candidates)
else:
# Need to optimally allocate caching nodes
distances = nx.all_pairs_dijkstra_path_length(topology, weight=weight)
d = {}
for v in icr_candidates:
d[v] = 0
for r in topology.receivers():
d[v] += distances[r][v]
for s in topology.sources():
d[v] += distances[v][s] * hit_ratio
# Sort caches in increasing order of distances and assign cache sizes
caches = sorted(icr_candidates, key=lambda k: d[k])
cache_size = iround(cache_budget / n_cache_nodes)
if cache_size == 0:
raise ValueError("Cache budget is %d but it's too small to deploy it on %d nodes. "
"Each node will have a zero-sized cache. "
"Set a larger cache budget and try again"
% (cache_budget, n_cache_nodes))
for v in caches[:n_cache_nodes]:
topology.node[v]['stack'][1]['cache_size'] = cache_size
[docs]@register_cache_placement('CLUSTERED_HASHROUTING')
def clustered_hashrouting_cache_placement(topology, cache_budget, n_clusters,
policy, distance='delay', **kwargs):
"""Deploy caching nodes for hashrouting in with clusters
Parameters
----------
topology : Topology
The topology object
cache_budget : int
The cumulative cache budget
n_clusters : int
The number of clusters
policy : str (node_const | cluster_const)
The expected global cache hit ratio
distance : str
The attribute used to quantify distance between pairs of nodes.
Default is 'delay'
"""
icr_candidates = topology.graph['icr_candidates']
if n_clusters <= 0 or n_clusters > len(icr_candidates):
raise ValueError("The number of cluster must be positive and <= the "
"number of ICR candidate nodes")
elif n_clusters == 1:
clusters = [set(icr_candidates)]
elif n_clusters == len(icr_candidates):
clusters = [set([v]) for v in icr_candidates]
else:
clusters = compute_clusters(topology, n_clusters, distance=distance,
nbunch=icr_candidates, n_iter=100)
deploy_clusters(topology, clusters, assign_src_rcv=True)
if policy == 'node_const':
# Each node is assigned the same amount of caching space
cache_size = iround(cache_budget / len(icr_candidates))
if cache_size == 0:
return
for v in icr_candidates:
topology.node[v]['stack'][1]['cache_size'] = cache_size
elif policy == 'cluster_const':
cluster_cache_size = iround(cache_budget / n_clusters)
for cluster in topology.graph['clusters']:
cache_size = iround(cluster_cache_size / len(cluster))
for v in cluster:
if v not in icr_candidates:
continue
topology.node[v]['stack'][1]['cache_size'] = cache_size
else:
raise ValueError('clustering policy %s not supported' % policy)