import scipy.sparse as sp
import copy
import torch
import warnings
from typing import Dict, Iterable, List, Optional, Tuple, Union
import numpy as np
SpMatData = Dict[int, sp.spmatrix]
DictData = Dict[int, Dict[Tuple, float]]
[docs]class Profile:
"""Summary"""
def __init__(
self,
neuron_counts: Optional[Union[SpMatData, DictData]] = None,
neuron_weights: Optional[Union[SpMatData, DictData]] = None,
synapse_counts: Optional[Union[SpMatData, DictData]] = None,
synapse_weights: Optional[Union[SpMatData, DictData]] = None,
activation_shapes: Optional[Union[SpMatData, DictData]] = None,
pred_dict: Optional[Dict[int, List[int]]] = None,
num_inputs: Optional[int] = 0,
neuron_type: Optional[str] = None,
) -> None:
"""
Parameters
----------
neuron_counts : dict, optional
Dictionary representing profile neurons and their counts, i.e. how many
synapses they were influential or contributing to
neuron_weights : dict, optional
Dictionary representing influential profile neurons and their weights
synapse_counts : dict, optional
Dictionary representing profile synapses and their counts
synapse_weights : dict, optional
Dictionary representing profile synapses and their weights
activation_shapes : dict, optional
Dictionary of the activation tensors shapes, keyed by layer
pred_dict : dict, optional
Dictionary giving the layer predecessor hierarchy by layer index
num_inputs : int, optional
Number of inputs represented by the profile
neuron_type: str, optional
The type of neurons used in the profile, i.e. 'element' or 'channel'
Note
----
The format for the inputs is very strict so that it can be used to store
the results of a profiling process but there is no type checking. If the
input is not in the correct format, the metrics could fail or return inaccurate
values.
"""
self._neuron_counts = neuron_counts or dict()
self._neuron_weights = neuron_weights or dict()
self._synapse_counts = synapse_counts or dict()
self._synapse_weights = synapse_weights or dict()
self._activation_shapes = activation_shapes
self._pred_dict = pred_dict
self._num_inputs = num_inputs
self._neuron_type = neuron_type
@property
def neuron_counts(self) -> Union[SpMatData, DictData]:
"""
Returns
-------
neuron_counts : Dict of scipy.sparse matrices or Dict of dicts
Dictionary representing profile neurons and their counts, i.e. how many \
synapses they were influential or contributing to
"""
return self._neuron_counts
@property
def neuron_weights(self) -> Union[SpMatData, DictData]:
"""
Returns
-------
neuron_weights : Dict of scipy.sparse matrices or Dict of dicts
Dictionary representing influential profile neurons and their weights
"""
return self._neuron_weights
@property
def synapse_counts(self) -> Union[SpMatData, DictData]:
"""
Returns
-------
synapse_counts : Dict of scipy.sparse matrices or Dict of dicts
Dictionary representing profile synapses and their counts
Note
----
For a single image profile (num_inputs=1) all synapses should have a count of 1
"""
return self._synapse_counts
@property
def synapse_weights(self) -> Union[SpMatData, DictData]:
"""
Returns
-------
synapse_weights : Dict of scipy.sparse matrices or Dict of dicts
Dictionary representing profile synapses and their weights
"""
return self._synapse_weights
@property
def activation_shapes(self) -> Dict[int, torch.Size]:
"""
Returns
-------
activation_shapes: Dict of torch.Sizes
Dictionary of the activation tensors shapes, keyed by layer
"""
return self._activation_shapes
@property
def pred_dict(self) -> Dict[int, List[int]]:
"""
Returns
-------
pred_dict: Dict of list of ints
Dictionary giving the layer predecessor hierarchy by layer index
"""
return self._pred_dict
@property
def num_inputs(self) -> int:
"""
Returns
-------
num_inputs : int
The number of input images represented by the profile
Note
----
Class profiles and other aggregate profiles will have num_inputs > 1
"""
return self._num_inputs
@property
def neuron_type(self) -> str:
"""
Returns
-------
neuron_type : str
The type of neurons used in the profile, i.e. 'element', 'channel', or \
'mixed' (aggregate of profiles with mismatched types)
"""
return self._neuron_type
@property
def total(self) -> int:
"""
Returns
-------
int
Total sum of neuron counts across all layers
"""
return sum([self._neuron_counts[layer].sum() for layer in self._neuron_counts])
@property
def size(self) -> int:
"""
Returns
-------
int
Total number of neurons identified as influential or contributing \
(neurons with nonzero neuron counts)
"""
return sum(
[self.neuron_counts[layer].count_nonzero() for layer in self.neuron_counts]
)
@property
def num_synapses(self) -> int:
"""
Returns
-------
int
Total number of synapses across all layers
"""
return sum(
[
self.synapse_counts[layer].count_nonzero()
for layer in self.synapse_counts
]
)
def __eq__(self, other: "Profile") -> bool:
"""
Parameters
----------
other : Profile
Returns
-------
bool
True if the profile data held by self is equal to the profile data held by other,
otherwise False
Note
----
If neuron type is specified by one profile but not the other, the two can still be equal
if all other data is equal
"""
return bool(
self._neuron_counts == other.neuron_counts
and self._neuron_weights == other.neuron_weights
and self._synapse_counts == other.synapse_counts
and self._synapse_weights == other.synapse_weights
and self._num_inputs == other.num_inputs
and (
self._neuron_type == other.neuron_type
or ((self._neuron_type is None) ^ (other.neuron_type is None))
)
)
def __iter__(self) -> Iterable:
"""
Returns
-------
Iterable
An iterable over the layer keys of the neuron counts
"""
return self.neuron_counts.keys()
def __add__(self, other: "Profile") -> "Profile":
"""
Adds the neuron and synapse counts and weights of other and self.
Parameters
----------
other : Profile
Returns
-------
new_profile : Profile
The aggregate profile of self and other
Note
----
Not supported for dictionary-formatted profiles
"""
with torch.no_grad():
if self.num_inputs == 0:
new_profile = Profile(profile=other)
else:
new_profile = copy.deepcopy(self)
if other.num_inputs > 0:
new_profile += other
return new_profile
def __iadd__(self, other: "Profile") -> "Profile":
"""
Adds in place the neuron and synapse counts and weights of other to self.
Parameters
----------
other : Profile
Returns
-------
self : Profile
Note
----
Not supported for dictionary-formatted profiles
"""
with torch.no_grad():
if self._num_inputs == 0:
self._neuron_counts = copy.deepcopy(other.neuron_counts)
self._neuron_weights = copy.deepcopy(other.neuron_weights)
self._synapse_counts = copy.deepcopy(other.synapse_counts)
self._synapse_weights = copy.deepcopy(other.synapse_weights)
self._num_inputs = other.num_inputs
self._neuron_type = other.neuron_type
elif other.num_inputs > 0:
self._num_inputs += other.num_inputs
if (
self.neuron_type is not None
and other.neuron_type is not None
and self.neuron_type != other.neuron_type
):
self._neuron_type = "mixed"
warnings.warn(
"Incompatible profiles: mismatched neuron types - output neuron type will be mixed"
)
if (
self.neuron_counts.keys() != other.neuron_counts.keys()
or self.neuron_weights.keys() != other.neuron_weights.keys()
or self.synapse_counts.keys() != other.synapse_counts.keys()
or self.synapse_weights.keys() != other.synapse_weights.keys()
):
warnings.warn(
"Warning: These profiles have mismatched layer key sets, aggregation may give strange results"
)
# Combine neuron counts and weights
for layer in other.neuron_counts:
if layer in self.neuron_counts:
if (
layer in self.neuron_weights
and layer in other.neuron_weights
):
# use compressed sparse row (csr) matrix for indexing and fast arithmetic
# sum of neuron weights (weighted by counts)
neuron_weights = (
self.neuron_weights[layer].multiply(
self.neuron_counts[layer]
)
+ other.neuron_weights[layer].multiply(
other.neuron_counts[layer]
)
).tocsr()
# aggregate total neuron counts
self._neuron_counts[layer] += other.neuron_counts[layer]
# dictionary of keys (dok) matrix lets us index by nonzero values (avoid 0/0)
self._neuron_weights[layer] = sp.dok_matrix(
neuron_weights.shape
)
# normalize by total neuron counts
self._neuron_weights[layer][neuron_weights.nonzero()] = (
neuron_weights[neuron_weights.nonzero()]
/ self.neuron_counts[layer].tocsr()[
neuron_weights.nonzero()
]
)
# convert back to COOrdinate matrix
self._neuron_weights[layer] = self.neuron_weights[
layer
].tocoo()
else:
# copy neuron weights if keyset is mismatched
if layer in other.neuron_weights:
self._neuron_weights[layer] = copy.deepcopy(
other.neuron_weights[layer]
)
# aggregate total neuron counts
self._neuron_counts[layer] += other.neuron_counts[layer]
else:
# mismatched keysets, copy counts and weights if necessary
if layer in other.neuron_weights:
self._neuron_weights[layer] = copy.deepcopy(
other.neuron_weights[layer]
)
self._neuron_counts[layer] = copy.deepcopy(
other.neuron_counts[layer]
)
# Combine synapse counts and weights
for layer in other.synapse_counts:
if layer in self.synapse_counts:
if (
layer in self.synapse_weights
and layer in other.synapse_weights
):
synapse_weights = (
self.synapse_weights[layer].multiply(
self.synapse_counts[layer]
)
+ other.synapse_weights[layer].multiply(
other.synapse_counts[layer]
)
).tocsr()
# aggregate total synapse counts
self._synapse_counts[layer] += other.synapse_counts[layer]
# dictionary of keys (dok) matrix lets us index by nonzero values (avoid 0/0)
self._synapse_weights[layer] = sp.dok_matrix(
synapse_weights.shape
)
# normalize by total synapse counts
self._synapse_weights[layer][synapse_weights.nonzero()] = (
synapse_weights[synapse_weights.nonzero()]
/ self.synapse_counts[layer].tocsr()[
synapse_weights.nonzero()
]
)
# convert back to COOrdinate matrix
self._synapse_weights[layer] = self.synapse_weights[
layer
].tocoo()
else:
# copy synapse weights if keyset is mismatched
if layer in other.synapse_weights:
self._synapse_weights[layer] = copy.deepcopy(
other.synapse_weights[layer]
)
# aggregate total synapse counts
self._synapse_counts[layer] += other.synapse_counts
else:
# mismatched keysets, copy counts and weights if necessary
if layer in other.synapse_weights:
self._synapse_weights[layer] = copy.deepcopy(
other.synapse_weights[layer]
)
self._synapse_counts[layer] = copy.deepcopy(
other.synapse_counts[layer]
)
return self
[docs] def dict_view(self) -> "Profile":
"""
Returns
-------
A copy of the profile with neuron and synapse counts and weights
reformatted as dicts
"""
if self._activation_shapes is None or self._pred_dict is None:
warnings.warn(
"activation_shapes and pred_dict must be known to generate dictionary view"
)
return None
# construct dicts for neuron counts and weights
neuron_stats = {"counts": self._neuron_counts, "weights": self._neuron_weights}
neuron_views = {stat: dict() for stat in neuron_stats}
for stat in neuron_stats:
neuron_spmat = neuron_stats[stat]
neuron_dict = neuron_views[stat]
for layer, spmat in neuron_spmat.items():
dims = self._activation_shapes[layer]
# list neurons and values from sparse matrix
neurons = list(spmat.todok().items())
flat_idx, values = zip(*neurons)
neuron_idx = tuple(idx[1] for idx in flat_idx)
# convert flat indices to full spatial or element indices if necessary
if (
len(dims) == 4
and self._neuron_type != "channel"
and self._neuron_type != "svd"
):
spatial_idx = np.unravel_index(neuron_idx, dims[2:])
if self._neuron_type == "element":
channel_idx = np.array([int(idx[0]) for idx in flat_idx])
neuron_idx = (channel_idx,) + spatial_idx
elif self._neuron_type == "spatial":
neuron_idx = spatial_idx
neuron_idx = tuple(zip(*neuron_idx))
else:
neuron_idx = tuple(zip(neuron_idx))
# add each neuron to the dict with its value
# neuron format: (layer index, (full neuron index,))
full_idx = tuple((layer, idx) for idx in neuron_idx)
neuron_dict[layer] = {idx: val for idx, val in zip(full_idx, values)}
# construct dicts for synapse counts and weights
synapse_stats = {
"counts": self._synapse_counts,
"weights": self._synapse_weights,
}
synapse_views = {stat: dict() for stat in synapse_stats}
for stat in synapse_stats:
synapse_spmat = synapse_stats[stat]
synapse_dict = synapse_views[stat]
for layer in synapse_spmat:
dims = self._activation_shapes[layer]
# list predecessors/input layers
pred_list = self._pred_dict[layer]
# split sparse matrix if there are two input layers (i.e. resnetadd)
if len(pred_list) == 2:
pred_spmats = (
synapse_spmat[layer].tocsr()[: dims[1], : dims[1]],
synapse_spmat[layer].tocsr()[dims[1] :, dims[1] :],
)
pred_nonzero = tuple(p.nnz > 0 for p in pred_spmats)
if not all(pred_nonzero):
pred_spmats = tuple(
p for p, nz in zip(pred_spmats, pred_nonzero) if nz
)
pred_list = tuple(
pdx for pdx, nz in zip(pred_list, pred_nonzero) if nz
)
else:
pred_spmats = (synapse_spmat[layer],)
synapse_dict[layer] = dict()
# add synapses from each input layer to the dict
for pred, spmat in zip(pred_list, pred_spmats):
pdims = self._activation_shapes[pred]
# list input neurons, output neurons, and values from sparse matrix
synapses = [(*syn, val) for syn, val in spmat.todok().items()]
out_idx, in_idx, values = zip(*synapses)
full_idxs = []
# convert flat indices of input and output neurons to full spatial
# or element indices if necessary
for ldx, ldims, neuron_idx in (
(pred, pdims, in_idx),
(layer, dims, out_idx),
):
if (
len(ldims) == 4
and self._neuron_type != "channel"
and self._neuron_type != "svd"
):
if self._neuron_type == "element":
neuron_idx = np.unravel_index(neuron_idx, ldims[1:])
elif self._neuron_type == "spatial":
neuron_idx = np.unravel_index(neuron_idx, ldims[2:])
neuron_idx = tuple(zip(*neuron_idx))
else:
neuron_idx = tuple(zip(neuron_idx))
full_idxs += [tuple((ldx, idx) for idx in neuron_idx)]
in_idx, out_idx = full_idxs
# add each synapse to the dict with its value
# synapse format: ((input layer index, (input neuron index,)),
# (output layer index, (output neuron index,)))
synapse_dict[layer].update(
{(i, o): v for i, o, v in zip(in_idx, out_idx, values)}
)
return Profile(
neuron_counts=neuron_views["counts"],
neuron_weights=neuron_views["weights"],
synapse_counts=synapse_views["counts"],
synapse_weights=synapse_views["weights"],
activation_shapes=self._activation_shapes,
pred_dict=self._pred_dict,
num_inputs=self._num_inputs,
neuron_type=f"{self._neuron_type} (dict)",
)