Source code for icarus.models.cache.systems

"""Simple networks of caches modeled as single caches."""
import random
import numpy as np

from icarus.util import inheritdoc
from icarus.tools import DiscreteDist
from icarus.registry import register_cache_policy, CACHE_POLICY

from .policies import Cache


__all__ = [
    'PathCache',
    'TreeCache',
    'ArrayCache',
    'ShardedCache',
           ]


[docs]@register_cache_policy('PATH') class PathCache(object): """Path of caches This is not a single-node cache implementation but rather it implements a path of caching nodes in which requests are fed to the first node of the path and, in case of a miss, are propagated down to the remaining nodes of the path. A miss occurs if none of the nodes on the path has the requested content. """ def __init__(self, caches, **kwargs): """Constructor Parameters ---------- caches : array-like An array of caching nodes instances on the path """ self._caches = caches self._len = len(caches) def __len__(self): return self._len @property def maxlen(self): return self._len
[docs] def has(self, k): for c in self._caches: if c.has(k): return True else: return False
[docs] def get(self, k): for i in range(self._len): if self._caches[i].get(k): break else: return False # Put contents on all caches traversed by the retrieved content for j in range(i): self._caches[j].put(k) return True
[docs] def put(self, k): """Insert an item in the cache if not already inserted. If the element is already present in the cache, it will pushed to the top of the cache. Parameters ---------- k : any hashable type The item to be inserted Returns ------- evicted : any hashable type The evicted object or *None* if no contents were evicted. """ for c in self._caches: c.put(k)
[docs] def remove(self, k): raise NotImplementedError('This method is not implemented')
[docs] def position(self, k): raise NotImplementedError('This method is not implemented')
[docs] def dump(self, serialized=True): dump = [c.dump() for c in self._caches] return sum(dump, []) if serialized else dump
[docs] def clear(self): for c in self._caches: c.clear()
[docs]@register_cache_policy('TREE') class TreeCache(object): """Path of caches This is not a single-node cache implementation but rather it implements a tree of caching nodes in which requests are fed to a random leaf node and, in case of a miss, are propagated down to the remaining nodes of the path. A miss occurs if none of the nodes on the path has the requested content. Notes ----- This cache can only be operated in a read-through manner and not in write through or read/write aside. In other words, before issuing a put, you must issue a get for the same item. The reason for this limitation is to ensure that matching get/put requests go through the same randomly selected node. """ def __init__(self, leaf_caches, root_cache, **kwargs): """Constructor Parameters ---------- caches : array-like An array of caching nodes instances on the path segments : int The number of segments """ self._leaf_caches = leaf_caches self._root_cache = root_cache self._len = sum(len(c) for c in leaf_caches) + len(root_cache) self._n_leaves = len(leaf_caches) self._leaf = None def __len__(self): return self._len @property def maxlen(self): return self._len
[docs] def has(self, k): raise NotImplementedError('This method is not implemented')
[docs] def get(self, k): self._leaf = random.choice(self._leaf_caches) if self._leaf.get(k): return True else: if self._root_cache.get(k): self._leaf.put(k) return True else: return False
[docs] def put(self, k): """Insert an item in the cache if not already inserted. If the element is already present in the cache, it will pushed to the top of the cache. Parameters ---------- k : any hashable type The item to be inserted Returns ------- evicted : any hashable type The evicted object or *None* if no contents were evicted. """ if self._leaf is None: raise ValueError("You are trying to insert an item not requested before. " "Tree cache can be used in read-through mode only") self._leaf.put(k) self._root_cache.put(k)
[docs] def remove(self, k): raise NotImplementedError('This method is not implemented')
[docs] def position(self, k): raise NotImplementedError('This method is not implemented')
[docs] def dump(self, serialized=True): dump = [c.dump() for c in self._leaf_caches] dump.append(self._root_cache.dump()) return sum(dump, []) if serialized else dump
[docs] def clear(self): for c in self._caches: c.clear()
[docs]@register_cache_policy('ARRAY') class ArrayCache(object): """Array of caches This is not a single-node cache implementation but rather it implements an array of caching nodes in which requests are fed to a random node of a set. Notes ----- This cache can only be operated in a read-through manner and not in write through or read/write aside. In other words, before issuing a put, you must issue a get for the same item. The reason for this limitation is to ensure that matching get/put requests go through the same randomly selected node. """ def __init__(self, caches, weights=None, **kwargs): """Constructor Parameters ---------- caches : array-like An array of caching nodes instances on the array weights : array-like Random weights according to which a cache of the array should be selected to process a given request """ self._caches = caches self._len = sum(len(c) for c in caches) self._n_caches = len(caches) self._selected_cache = None if weights is not None: if np.abs(np.sum(weights) - 1) > 0.0001: raise ValueError("weights must sum up to 1") if len(weights) != self._n_caches: raise ValueError("weights must have as many elements as nr of caches") randvar = DiscreteDist(weights) self.select_cache = lambda: self._caches[randvar.rv() - 1] else: self.select_cache = lambda: random.choice(self._caches) def __len__(self): return self._len @property def maxlen(self): return self._len
[docs] def has(self, k): raise NotImplementedError('This method is not implemented')
[docs] def get(self, k): self._selected_cache = self.select_cache() return self._selected_cache.get(k)
[docs] def put(self, k): """Insert an item in the cache if not already inserted. If the element is already present in the cache, it will pushed to the top of the cache. Parameters ---------- k : any hashable type The item to be inserted Returns ------- evicted : any hashable type The evicted object or *None* if no contents were evicted. """ if self._selected_cache is None: raise ValueError("You are trying to insert an item not requested before. " "Array cache can be used in read-through mode only") self._selected_cache.put(k)
[docs] def remove(self, k): raise NotImplementedError('This method is not implemented')
[docs] def position(self, k): raise NotImplementedError('This method is not implemented')
[docs] def dump(self, serialized=True): dump = [c.dump() for c in self._caches] return sum(dump, []) if serialized else dump
[docs] def clear(self): for c in self._caches: c.clear()
[docs]@register_cache_policy('SHARD') class ShardedCache(Cache): """Set of sharded caches. Set of caches coordinately storing items. When a request reaches the caches, the request is forwarded to the specific cache (shard) based on the outcome of a hash function. So, an item can be stored only by a single node of the system. """ def __init__(self, maxlen, policy='LRU', nodes=4, f_map=None, policy_attr={}, **kwargs): """Constructor Parameters ---------- maxlen : int The maximum number of items the cache can store. policy : str, optional The eviction policy of each node (e.g., LRU, LFU, FIFO...). Default is LRU. nodes : int, optional The number of nodes, default is 4. f_map : callable, optional A callable governing the mapping between items and caching nodes. It receives as argument a value of an item :math:`k` and returns an integer between :math:`0` and :math:`nodes - 1` identifying the target node. If not specified, the mapping is done by computing the hash of the given item modulo the number of nodes. policy_attr : dict, optional A set of parameters for initializing the underlying caching policy. Notes ----- The maxlen parameter refers to the cumulative size of the caches in the set. The size of each shard is derived dividing maxlen by the number of nodes. """ maxlen = int(maxlen) if maxlen <= 0: raise ValueError('maxlen must be positive') if not isinstance(nodes, int) or nodes <= 0 or nodes > maxlen: raise ValueError('nodes must be an integer and 0 < nodes <= maxlen') # If maxlen is not a multiple of nodes, then some nodes have one slot # more than others self._node_maxlen = [maxlen // nodes for _ in range(nodes)] for i in range(maxlen % nodes): self._node_maxlen[i] += 1 self._maxlen = maxlen self._node = [CACHE_POLICY[policy](self._node_maxlen[i], **policy_attr) for i in range(nodes)] self.f_map = f_map if f_map is not None else lambda k: hash(k) % nodes @inheritdoc(Cache) def __len__(self): return sum(len(s) for s in self._node) @property def maxlen(self): return self._maxlen
[docs] @inheritdoc(Cache) def has(self, k): return self._node[self.f_map(k)].has(k)
[docs] @inheritdoc(Cache) def get(self, k): return self._node[self.f_map(k)].get(k)
[docs] @inheritdoc(Cache) def put(self, k): return self._node[self.f_map(k)].put(k)
[docs] @inheritdoc(Cache) def dump(self, serialized=True): dump = list(s.dump() for s in self._node) return sum(dump, []) if serialized else dump
[docs] @inheritdoc(Cache) def remove(self, k): return self._node[self.f_map(k)].remove(k)
[docs] @inheritdoc(Cache) def clear(self): for s in self._node: s.clear()