summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJán Jančár2023-03-15 15:33:53 +0100
committerGitHub2023-03-15 15:33:53 +0100
commit7f31b31ea6c43466c857a6219ab0541bba94296e (patch)
tree9922894f5d12afd835a89996a2a84318640a08dd
parent445eaa41f22ed82502ca813e98a92c2b078c9a79 (diff)
parent7d967fec195f01e73960c74bc6843cd3123d67cb (diff)
downloadpyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.tar.gz
pyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.tar.zst
pyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.zip
Merge pull request #27 from Tomko10/feat/stacked-perf-test
feat: Added CPU implementations of combine on stacked traces
-rw-r--r--pyecsca/sca/stacked_traces/__init__.py1
-rw-r--r--pyecsca/sca/stacked_traces/combine.py397
-rw-r--r--pyecsca/sca/stacked_traces/stacked_traces.py276
-rw-r--r--test/sca/perf_stacked_combine.py496
4 files changed, 904 insertions, 266 deletions
diff --git a/pyecsca/sca/stacked_traces/__init__.py b/pyecsca/sca/stacked_traces/__init__.py
index 024bd19..a38075a 100644
--- a/pyecsca/sca/stacked_traces/__init__.py
+++ b/pyecsca/sca/stacked_traces/__init__.py
@@ -1 +1,2 @@
from .stacked_traces import *
+from .combine import *
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py
new file mode 100644
index 0000000..f6bd738
--- /dev/null
+++ b/pyecsca/sca/stacked_traces/combine.py
@@ -0,0 +1,397 @@
+from __future__ import annotations
+
+from numba import cuda
+from numba.cuda import devicearray
+import numpy as np
+from math import sqrt
+
+from public import public
+from typing import Callable, Union, Tuple, cast
+
+from pyecsca.sca.trace.trace import CombinedTrace
+from pyecsca.sca.stacked_traces import StackedTraces
+
+TPB = Union[int, Tuple[int, ...]]
+CudaCTX = Tuple[
+ Tuple[devicearray.DeviceNDArray, ...],
+ Union[int, Tuple[int, ...]]
+]
+
+
+@public
+class BaseTraceManager:
+ """Base class for trace managers"""
+
+ traces: StackedTraces
+
+ def __init__(self, traces: StackedTraces) -> None:
+ self.traces = traces
+
+ def average(self) -> CombinedTrace:
+ """
+ Average :paramref:`~.average.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ raise NotImplementedError
+
+ def conditional_average(self, cond: Callable[[np.ndarray], bool]) \
+ -> CombinedTrace:
+ """
+ Average :paramref:`~.conditional_average.traces` for which the
+ :paramref:`~.conditional_average.condition` is ``True``, sample-wise.
+
+ :param traces:
+ :param condition:
+ :return:
+ """
+ raise NotImplementedError
+
+ def standard_deviation(self) -> CombinedTrace:
+ """
+ Compute the sample standard-deviation of the
+ :paramref:`~.standard_deviation.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ raise NotImplementedError
+
+ def variance(self) -> CombinedTrace:
+ """
+ Compute the sample variance of the
+ :paramref:`~.variance.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ raise NotImplementedError
+
+ def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
+ """
+ Compute the sample average and variance of the
+ :paramref:`~.average_and_variance.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ raise NotImplementedError
+
+ def add(self) -> CombinedTrace:
+ """
+ Add :paramref:`~.add.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ raise NotImplementedError
+
+
+@public
+class GPUTraceManager(BaseTraceManager):
+ """Manager for operations with stacked traces on GPU"""
+
+ _tpb: TPB
+ _samples_global: devicearray.DeviceNDArray
+
+ def __init__(self, traces: StackedTraces, tpb: TPB = 128) -> None:
+ if not cuda.is_available():
+ raise RuntimeError("CUDA is not available, "
+ "use CPUTraceManager instead")
+ if isinstance(tpb, int) and tpb % 32 != 0:
+ raise ValueError('TPB should be a multiple of 32')
+ if isinstance(tpb, tuple) and any(t % 32 != 0 for t in tpb):
+ raise ValueError(
+ 'TPB should be a multiple of 32 in each dimension'
+ )
+
+ super().__init__(traces)
+ self.tpb = tpb
+ self._samples_global = cuda.to_device(self.traces.samples)
+
+ def _setup1D(self, output_count: int) -> CudaCTX:
+ """
+ Creates context for 1D GPU CUDA functions
+
+ :param traces: The input stacked traces.
+ :param tpb: Threads per block to invoke the kernel with.
+ :param output_count: Number of outputs expected from the GPU function.
+ :return: Created context of input and output arrays and calculated
+ blocks per grid dimensions.
+ """
+ if not isinstance(self.tpb, int):
+ raise TypeError("tpb is not an int for a 1D kernel")
+
+ device_output = tuple((
+ cuda.device_array(self.traces.samples.shape[1])
+ for _ in range(output_count)
+ ))
+ bpg = (self.traces.samples.size + (self.tpb - 1)) // self.tpb
+
+ return device_output, bpg
+
+ def _gpu_combine1D(self, func, output_count: int = 1) \
+ -> Union[CombinedTrace, Tuple[CombinedTrace, ...]]:
+ """
+ Runs GPU Cuda StackedTrace 1D combine function
+
+ :param func: Function to run.
+ :param traces: Stacked traces to provide as input to the function.
+ :param tpb: Threads per block to invoke the kernel with
+ :param output_count: Number of outputs expected from the GPU function.
+ :return: Combined trace output from the GPU function
+ """
+ device_outputs, bpg = self._setup1D(output_count)
+
+ func[bpg, self.tpb](self._samples_global, *device_outputs)
+
+ if len(device_outputs) == 1:
+ return CombinedTrace(
+ device_outputs[0].copy_to_host(),
+ self.traces.meta
+ )
+ return tuple(
+ CombinedTrace(device_output.copy_to_host(), self.traces.meta)
+ for device_output
+ in device_outputs
+ )
+
+ def average(self) -> CombinedTrace:
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_average, 1))
+
+ def conditional_average(self, cond: Callable[[np.ndarray], bool]) \
+ -> CombinedTrace:
+ raise NotImplementedError()
+
+ def standard_deviation(self) -> CombinedTrace:
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev, 1))
+
+ def variance(self) -> CombinedTrace:
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_variance, 1))
+
+ def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
+ averages, variances = self._gpu_combine1D(gpu_avg_var, 2)
+ return averages, variances
+
+ def add(self) -> CombinedTrace:
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_add, 1))
+
+
+@cuda.jit(device=True)
+def _gpu_average(col: int, samples: np.ndarray, result: np.ndarray):
+ """
+ Cuda device thread function computing the average of a sample of stacked traces.
+
+ :param col: Index of the sample.
+ :param samples: Shared array of the samples of stacked traces.
+ :param result: Result output array.
+ """
+ acc = 0.
+ for row in range(samples.shape[0]):
+ acc += samples[row, col]
+ result[col] = acc / samples.shape[0]
+
+
+@cuda.jit
+def gpu_average(samples: np.ndarray, result: np.ndarray):
+ """
+ Sample average of stacked traces, sample-wise.
+
+ :param samples: Stacked traces' samples.
+ :param result: Result output array.
+ """
+ col = cuda.grid(1)
+
+ if col >= samples.shape[1]:
+ return
+
+ _gpu_average(col, samples, result)
+
+
+@cuda.jit(device=True)
+def _gpu_var_from_avg(col: int, samples: np.ndarray,
+ averages: np.ndarray, result: np.ndarray):
+ """
+ Cuda device thread function computing the variance from the average of a sample of stacked traces.
+
+ :param col: Index of the sample.
+ :param samples: Shared array of the samples of stacked traces.
+ :param averages: Array of averages of samples.
+ :param result: Result output array.
+ """
+ var = 0.
+ for row in range(samples.shape[0]):
+ current = samples[row, col] - averages[col]
+ var += current * current
+ result[col] = var / samples.shape[0]
+
+
+@cuda.jit(device=True)
+def _gpu_variance(col: int, samples: np.ndarray, result: np.ndarray):
+ """
+ Cuda device thread function computing the variance of a sample of stacked traces.
+
+ :param col: Index of the sample.
+ :param samples: Shared array of the samples of stacked traces.
+ :param result: Result output array.
+ """
+ _gpu_average(col, samples, result)
+ _gpu_var_from_avg(col, samples, result, result)
+
+
+@cuda.jit
+def gpu_std_dev(samples: np.ndarray, result: np.ndarray):
+ """
+ Sample standard deviation of stacked traces, sample-wise.
+
+ :param samples: Stacked traces' samples.
+ :param result: Result output array.
+ """
+ col = cuda.grid(1)
+
+ if col >= samples.shape[1]:
+ return
+
+ _gpu_variance(col, samples, result)
+
+ result[col] = sqrt(result[col])
+
+
+@cuda.jit
+def gpu_variance(samples: np.ndarray, result: np.ndarray):
+ """
+ Sample variance of stacked traces, sample-wise.
+
+ :param samples: Stacked traces' samples.
+ :param result: Result output array.
+ """
+ col = cuda.grid(1)
+
+ if col >= samples.shape[1]:
+ return
+
+ _gpu_variance(col, samples, result)
+
+
+@cuda.jit
+def gpu_avg_var(samples: np.ndarray, result_avg: np.ndarray,
+ result_var: np.ndarray):
+ """
+ Sample average and variance of stacked traces, sample-wise.
+
+ :param samples: Stacked traces' samples.
+ :param result_avg: Result average output array.
+ :param result_var: Result variance output array.
+ """
+ col = cuda.grid(1)
+
+ if col >= samples.shape[1]:
+ return
+
+ _gpu_average(col, samples, result_avg)
+ _gpu_var_from_avg(col, samples, result_avg, result_var)
+
+
+@cuda.jit
+def gpu_add(samples: np.ndarray, result: np.ndarray):
+ """
+ Add samples of stacked traces, sample-wise.
+
+ :param samples: Stacked traces' samples.
+ :param result: Result output array.
+ """
+ col = cuda.grid(1)
+
+ if col >= samples.shape[1]:
+ return
+
+ res = 0.
+ for row in range(samples.shape[0]):
+ res += samples[row, col]
+ result[col] = res
+
+
+@public
+class CPUTraceManager:
+ """Manager for operations on stacked traces on CPU."""
+
+ traces: StackedTraces
+
+ def __init__(self, traces: StackedTraces) -> None:
+ self.traces = traces
+
+ def average(self) -> CombinedTrace:
+ """
+ Compute the average of the :paramref:`~.average.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ return CombinedTrace(
+ np.average(self.traces.samples, 0),
+ self.traces.meta
+ )
+
+ def conditional_average(self, condition: Callable[[np.ndarray], bool]) -> CombinedTrace:
+ """
+ Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ # TODO: Consider other ways to implement this
+ samples = self.traces.samples
+ mask = samples[np.apply_along_axis(condition, 1, samples)]
+ return CombinedTrace(
+ np.average(samples[mask], 1),
+ self.traces.meta
+ )
+
+ def standard_deviation(self) -> CombinedTrace:
+ """
+ Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ return CombinedTrace(
+ np.std(self.traces.samples, 0),
+ self.traces.meta
+ )
+
+ def variance(self) -> CombinedTrace:
+ """
+ Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ return CombinedTrace(
+ np.var(self.traces.samples, 0),
+ self.traces.meta
+ )
+
+ def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
+ """
+ Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ return (
+ self.average(),
+ self.variance()
+ )
+
+ def add(self) -> CombinedTrace:
+ """
+ Add :paramref:`~.add.traces`, sample-wise.
+
+ :param traces:
+ :return:
+ """
+ return CombinedTrace(
+ np.sum(self.traces.samples, 0),
+ self.traces.meta
+ )
diff --git a/pyecsca/sca/stacked_traces/stacked_traces.py b/pyecsca/sca/stacked_traces/stacked_traces.py
index c54abc2..09169bd 100644
--- a/pyecsca/sca/stacked_traces/stacked_traces.py
+++ b/pyecsca/sca/stacked_traces/stacked_traces.py
@@ -1,11 +1,10 @@
-from numba import cuda
-from numba.cuda import devicearray
+from __future__ import annotations
+
import numpy as np
from public import public
-from typing import Any, Mapping, Sequence, Tuple, Union, Optional
-from math import sqrt
+from typing import Any, Mapping, Sequence
-from pyecsca.sca.trace.trace import CombinedTrace
+from pyecsca.sca.trace_set.base import TraceSet
@public
@@ -15,9 +14,10 @@ class StackedTraces:
meta: Mapping[str, Any]
samples: np.ndarray
+ # TODO: Split metadata into common and per-trace
def __init__(
self, samples: np.ndarray,
- meta: Optional[Mapping[str, Any]] = None) -> None:
+ meta: Mapping[str, Any] | None = None) -> None:
if meta is None:
meta = {}
self.meta = meta
@@ -25,7 +25,9 @@ class StackedTraces:
@classmethod
def fromarray(cls, traces: Sequence[np.ndarray],
- meta: Optional[Mapping[str, Any]] = None) -> 'StackedTraces':
+ meta: Mapping[str, Any] | None = None) -> 'StackedTraces':
+ if meta is None:
+ meta = {}
ts = list(traces)
min_samples = min(map(len, ts))
for i, t in enumerate(ts):
@@ -34,7 +36,7 @@ class StackedTraces:
return cls(stacked, meta)
@classmethod
- def fromtraceset(cls, traceset) -> 'StackedTraces':
+ def fromtraceset(cls, traceset: TraceSet) -> 'StackedTraces':
traces = [t.samples for t in traceset]
return cls.fromarray(traces)
@@ -46,261 +48,3 @@ class StackedTraces:
def __iter__(self):
yield from self.samples
-
-
-TPB = Union[int, Tuple[int, ...]]
-CudaCTX = Tuple[
- Tuple[devicearray.DeviceNDArray, ...],
- Union[int, Tuple[int, ...]]
-]
-
-
-@public
-class GPUTraceManager:
- """Manager for operations with stacked traces on GPU"""
-
- traces: StackedTraces
- _tpb: TPB
- _samples_global: devicearray.DeviceNDArray
-
- def __init__(self, traces: StackedTraces, tpb: TPB = 128) -> None:
- if isinstance(tpb, int) and tpb % 32 != 0:
- raise ValueError('TPB should be a multiple of 32')
- if isinstance(tpb, tuple) and any(t % 32 != 0 for t in tpb):
- raise ValueError(
- 'TPB should be a multiple of 32 in each dimension'
- )
-
- self.traces = traces
- self.tpb = tpb
- self._samples_global = cuda.to_device(self.traces.samples)
-
- def _setup1D(self, output_count: int) -> CudaCTX:
- """
- Creates context for 1D GPU CUDA functions
-
- :param traces: The input stacked traces.
- :param tpb: Threads per block to invoke the kernel with.
- :param output_count: Number of outputs expected from the GPU function.
- :return: Created context of input and output arrays and calculated
- blocks per grid dimensions.
- """
- if not isinstance(self.tpb, int):
- raise TypeError("tpb is not an int for a 1D kernel")
-
- device_output = tuple((
- cuda.device_array(self.traces.samples.shape[1])
- for _ in range(output_count)
- ))
- bpg = (self.traces.samples.size + (self.tpb - 1)) // self.tpb
-
- return device_output, bpg
-
- def _gpu_combine1D(self, func, output_count: int = 1) \
- -> Tuple[CombinedTrace, ...]:
- """
- Runs GPU Cuda StackedTrace 1D combine function
-
- :param func: Function to run.
- :param traces: Stacked traces to provide as input to the function.
- :param tpb: Threads per block to invoke the kernel with
- :param output_count: Number of outputs expected from the GPU function.
- :return: Combined trace output from the GPU function
- """
- device_outputs, bpg = self._setup1D(output_count)
-
- func[bpg, self.tpb](self._samples_global, *device_outputs)
-
- return tuple(
- CombinedTrace(device_output.copy_to_host(), self.traces.meta)
- for device_output
- in device_outputs
- )
-
- def average(self) -> CombinedTrace:
- """
- Average :paramref:`~.average.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return self._gpu_combine1D(gpu_average, 1)[0]
-
- def conditional_average(self) -> CombinedTrace:
- """
- Not implemented due to the nature of GPU functions.
-
- Use sca.trace.combine.conditional_average instead.
- """
- raise NotImplementedError
-
- def standard_deviation(self) -> CombinedTrace:
- """
- Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return self._gpu_combine1D(gpu_std_dev, 1)[0]
-
- def variance(self) -> CombinedTrace:
- """
- Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return self._gpu_combine1D(gpu_variance, 1)[0]
-
- def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
- """
- Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- averages, variances = self._gpu_combine1D(gpu_avg_var, 2)
- return averages, variances
-
- def add(self) -> CombinedTrace:
- """
- Add :paramref:`~.add.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return self._gpu_combine1D(gpu_add, 1)[0]
-
-
-@cuda.jit(device=True)
-def _gpu_average(col: int, samples: np.ndarray, result: np.ndarray):
- """
- Cuda device thread function computing the average of a sample of stacked traces.
-
- :param col: Index of the sample.
- :param samples: Shared array of the samples of stacked traces.
- :param result: Result output array.
- """
- acc = 0.
- for row in range(samples.shape[0]):
- acc += samples[row, col]
- result[col] = acc / samples.shape[0]
-
-
-@cuda.jit
-def gpu_average(samples: np.ndarray, result: np.ndarray):
- """
- Sample average of stacked traces, sample-wise.
-
- :param samples: Stacked traces' samples.
- :param result: Result output array.
- """
- col = cuda.grid(1)
-
- if col >= samples.shape[1]:
- return
-
- _gpu_average(col, samples, result)
-
-
-@cuda.jit(device=True)
-def _gpu_var_from_avg(col: int, samples: np.ndarray,
- averages: np.ndarray, result: np.ndarray):
- """
- Cuda device thread function computing the variance from the average of a sample of stacked traces.
-
- :param col: Index of the sample.
- :param samples: Shared array of the samples of stacked traces.
- :param averages: Array of averages of samples.
- :param result: Result output array.
- """
- var = 0.
- for row in range(samples.shape[0]):
- current = samples[row, col] - averages[col]
- var += current * current
- result[col] = var / samples.shape[0]
-
-
-@cuda.jit(device=True)
-def _gpu_variance(col: int, samples: np.ndarray, result: np.ndarray):
- """
- Cuda device thread function computing the variance of a sample of stacked traces.
-
- :param col: Index of the sample.
- :param samples: Shared array of the samples of stacked traces.
- :param result: Result output array.
- """
- _gpu_average(col, samples, result)
- _gpu_var_from_avg(col, samples, result, result)
-
-
-@cuda.jit
-def gpu_std_dev(samples: np.ndarray, result: np.ndarray):
- """
- Sample standard deviation of stacked traces, sample-wise.
-
- :param samples: Stacked traces' samples.
- :param result: Result output array.
- """
- col = cuda.grid(1)
-
- if col >= samples.shape[1]:
- return
-
- _gpu_variance(col, samples, result)
-
- result[col] = sqrt(result[col])
-
-
-@cuda.jit
-def gpu_variance(samples: np.ndarray, result: np.ndarray):
- """
- Sample variance of stacked traces, sample-wise.
-
- :param samples: Stacked traces' samples.
- :param result: Result output array.
- """
- col = cuda.grid(1)
-
- if col >= samples.shape[1]:
- return
-
- _gpu_variance(col, samples, result)
-
-
-@cuda.jit
-def gpu_avg_var(samples: np.ndarray, result_avg: np.ndarray,
- result_var: np.ndarray):
- """
- Sample average and variance of stacked traces, sample-wise.
-
- :param samples: Stacked traces' samples.
- :param result_avg: Result average output array.
- :param result_var: Result variance output array.
- """
- col = cuda.grid(1)
-
- if col >= samples.shape[1]:
- return
-
- _gpu_average(col, samples, result_avg)
- _gpu_var_from_avg(col, samples, result_avg, result_var)
-
-
-@cuda.jit
-def gpu_add(samples: np.ndarray, result: np.ndarray):
- """
- Add samples of stacked traces, sample-wise.
-
- :param samples: Stacked traces' samples.
- :param result: Result output array.
- """
- col = cuda.grid(1)
-
- if col >= samples.shape[1]:
- return
-
- res = 0.
- for row in range(samples.shape[0]):
- res += samples[row, col]
- result[col] = res
diff --git a/test/sca/perf_stacked_combine.py b/test/sca/perf_stacked_combine.py
new file mode 100644
index 0000000..ab4c5cd
--- /dev/null
+++ b/test/sca/perf_stacked_combine.py
@@ -0,0 +1,496 @@
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from typing import Any, Callable, Dict, List, TextIO, Tuple
+
+import numpy as np
+import numpy.random as npr
+import numpy.typing as npt
+
+from pyecsca.sca import (CPUTraceManager, GPUTraceManager, StackedTraces,
+ Trace, TraceSet, add, average, average_and_variance,
+ conditional_average, standard_deviation, variance)
+
+Operation = str
+Duration = int
+TimeRecord = Tuple[Operation, Duration]
+
+traceset_ops = {
+ "average": average,
+ "conditional_average": conditional_average,
+ "standard_deviation": standard_deviation,
+ "variance": variance,
+ "average_and_variance": average_and_variance,
+ "add": add,
+}
+
+
+def _generate_floating(rng: npr.Generator,
+ trace_count: int,
+ trace_length: int,
+ dtype: npt.DTypeLike = np.float32,
+ distribution: str = "uniform",
+ low: float = 0.0,
+ high: float = 1.0,
+ mean: float = 0.0,
+ std: float = 0.0) -> np.ndarray:
+ if not np.issubdtype(dtype, np.floating):
+ raise ValueError("dtype must be a floating point type")
+
+ dtype_ = (dtype if (np.issubdtype(dtype, np.float32)
+ or np.issubdtype(dtype, np.float64))
+ else np.float32)
+ if distribution == "uniform":
+ samples = rng.random((trace_count, trace_length),
+ dtype=dtype_) # type: ignore
+
+ if (not np.issubdtype(dtype, np.float32)
+ and not np.issubdtype(dtype, np.float64)):
+ samples = samples.astype(dtype)
+ return (samples * (high - low) + low)
+ elif distribution == "normal":
+ return (rng
+ .normal(mean, std, (trace_count, trace_length))
+ .clip(low, high)
+ .astype(dtype))
+
+ raise ValueError("Unknown distribution")
+
+
+def _generate_integers(rng: npr.Generator,
+ trace_count: int,
+ trace_length: int,
+ dtype: npt.DTypeLike = np.int32,
+ distribution: str = "uniform",
+ low: int = 0,
+ high: int = 1,
+ mean: float = 0.0,
+ std: float = 0.0) -> np.ndarray:
+ if not np.issubdtype(dtype, np.integer):
+ raise ValueError("dtype must be an integer type")
+
+ if distribution == "uniform":
+ return rng.integers(low,
+ high,
+ size=(trace_count, trace_length),
+ dtype=dtype) # type: ignore
+ elif distribution == "normal":
+ return (rng
+ .normal(mean, std, (trace_count, trace_length))
+ .astype(dtype)
+ .clip(low, high - 1))
+
+ raise ValueError("Unknown distribution")
+
+
+def generate_dataset(rng: npr.Generator,
+ trace_count: int,
+ trace_length: int,
+ dtype: npt.DTypeLike = np.float32,
+ distribution: str = "uniform",
+ low: float | int = 0,
+ high: float | int = 1,
+ mean: float | int = 0,
+ std: float | int = 1,
+ seed: int | None = None) -> np.ndarray:
+ """Generate a TraceSet with random samples
+
+ For float dtype only float32 and float64 are supported natively,
+ other floats are converted after generation.
+ For int dtype, all numpy int types are supported.
+ :param trace_count: Number of traces
+ :param trace_length: Number of samples per trace
+ :param dtype: Data type of the samples
+ :param low: Lower bound of the samples
+ :param high: Upper bound of the samples
+ :param seed: Seed for the random number generator
+ :return: TraceSet
+ """
+ if (not np.issubdtype(dtype, np.integer)
+ and not np.issubdtype(dtype, np.floating)):
+ raise ValueError("dtype must be an integer or floating point type")
+
+ gen_fun, cast_fun = ((_generate_integers, int)
+ if np.issubdtype(dtype, np.integer) else
+ (_generate_floating, float))
+ samples = gen_fun(rng,
+ trace_count,
+ trace_length,
+ dtype,
+ distribution,
+ cast_fun(low), # type: ignore
+ cast_fun(high), # type: ignore
+ mean,
+ std)
+
+ return samples
+
+
+def timed(time_storage: List[TimeRecord] | None = None,
+ log: bool = True) \
+ -> Callable[[Callable[..., Any]], Callable[..., Any]]:
+ def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
+ import time
+
+ def timed_func(*args, **kwargs) -> Callable[..., Callable]:
+ start = time.perf_counter_ns()
+ result = func(*args, **kwargs)
+ duration = time.perf_counter_ns() - start
+ if log:
+ print(f"{func.__name__} took {duration} ns")
+ if time_storage is not None:
+ time_storage.append((func.__name__, duration))
+ return result
+ return timed_func
+ return decorator
+
+
+def stack_traceset(traceset: TraceSet) -> StackedTraces:
+ return StackedTraces.fromtraceset(traceset)
+
+
+def stack_array(dataset: np.ndarray) -> StackedTraces:
+ return StackedTraces.fromarray(dataset) # type: ignore
+
+
+def to_traceset(dataset: np.ndarray) -> TraceSet:
+ return TraceSet(*(Trace(samples) for samples in dataset))
+
+
+def stack(dataset: np.ndarray,
+ from_array: bool,
+ time: bool,
+ time_storage: List[TimeRecord] | None = None,
+ log: bool = True) -> StackedTraces:
+ time_fun = timed(time_storage, log) if time else lambda x: x
+ data = (dataset
+ if from_array
+ else to_traceset(dataset))
+ stack_fun = stack_array if from_array else stack_traceset
+ return time_fun(stack_fun)(data)
+
+
+def _get_parser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-r", "--repetitions", type=int,
+ default=1, help="Number of repetitions")
+ parser.add_argument(
+ "-o", "--output",
+ type=argparse.FileType("w"),
+ default=sys.stdout,
+ help="Output file"
+ )
+ combine = parser.add_argument_group(
+ "operations",
+ "Operations to perform on the traces"
+ )
+ combine.add_argument(
+ "-d", "--device",
+ choices=["cpu", "gpu"],
+ help="Device to use for the computation"
+ )
+ stacking = combine.add_mutually_exclusive_group()
+ stacking.add_argument(
+ "-s", "--stack",
+ action="store_true",
+ default=False,
+ help="Use stacked traces"
+ )
+ stacking.add_argument(
+ "--stack-traceset",
+ action="store_true",
+ default=False,
+ help="Perform stacking from a TraceSet"
+ )
+ combine.add_argument(
+ "--time-stack",
+ action="store_true",
+ default=False,
+ help="Time the stacking operation"
+ )
+
+ combine.add_argument(
+ "--operations",
+ nargs="*",
+ choices=traceset_ops.keys(),
+ help="Operations to perform on the traces"
+ )
+
+ dataset = parser.add_argument_group(
+ "data generation",
+ "Options for data generation"
+ )
+ dataset.add_argument("--trace-count", type=int,
+ default=1024, help="Number of traces")
+ dataset.add_argument("--trace-length", type=int,
+ default=1024, help="Number of samples per trace")
+ dataset.add_argument("--seed", type=int, default=None,
+ help="Seed for the random number generator")
+ dataset.add_argument(
+ "--dtype",
+ type=str,
+ default="float32",
+ choices=["float16", "float32", "float64", "int8",
+ "int16", "int32", "int64"],
+ help="Data type of the samples"
+ )
+ dataset.add_argument(
+ "--distribution",
+ type=str,
+ default="uniform",
+ choices=["uniform", "normal"],
+ help="Distribution of the samples")
+ dataset.add_argument("--low", type=float, default=0.0,
+ help="Inclusive lower bound for generated samples")
+ dataset.add_argument("--high", type=float, default=1.0,
+ help="Exclusive upper bound for generated samples")
+ dataset.add_argument("--mean", type=float, default=0.0,
+ help="Mean of the normal distribution")
+ dataset.add_argument("--std", type=float, default=1.0,
+ help="Standard deviation of the normal distribution")
+
+ verbosity = parser.add_mutually_exclusive_group()
+ verbosity.add_argument("-v", "--verbose", action="store_true")
+ verbosity.add_argument("-q", "--quiet", action="store_true")
+ return parser
+
+
+def _get_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
+ args = parser.parse_args()
+
+ if args.time_stack and not args.stack and not args.stack_traceset:
+ parser.error("Cannot time stack without stacking")
+
+ if not args.operations and not args.stack:
+ parser.error("No operation specified")
+
+ if args.low >= args.high:
+ parser.error("Lower bound must be smaller than upper bound")
+
+ if args.operations and not args.device:
+ parser.error("Device must be specified when performing operations")
+
+ if (args.operations
+ and args.device == "gpu"
+ and not args.stack
+ and not args.stack_traceset):
+ args.stack = True
+ args.stack_traceset = False
+
+ return args
+
+
+def report(time_storage: List[TimeRecord],
+ total_only: bool = False) -> None:
+ if total_only:
+ print(f"Total: {sum(duration for _, duration in time_storage):,} ns")
+ return
+
+ print("Timings:")
+ for name, duration in time_storage:
+ print(f"{name : <20} | {duration : 15,} ns")
+ print("-" * 41)
+ print(f"{'Total' : <20} | "
+ f"{sum(duration for _, duration in time_storage) : 15,} ns")
+
+
+def group_times_by_operation(time_storage: List[List[TimeRecord]]) \
+ -> Dict[Operation, List[Duration]]:
+ result: Dict[Operation, List[Duration]] = {}
+ for times in time_storage:
+ for operation, duration in times:
+ if operation.startswith("stack"):
+ operation = "stack"
+ result.setdefault(operation, []).append(duration)
+
+ return result
+
+
+class NumpyEncoder(json.JSONEncoder):
+ def default(self, obj: Any) -> Any:
+ if isinstance(obj, np.integer):
+ return int(obj)
+ if isinstance(obj, np.floating):
+ return float(obj)
+ if isinstance(obj, np.ndarray):
+ return obj.tolist()
+ return super().default(obj)
+
+
+def export_report(time_storage: List[List[TimeRecord]],
+ args: argparse.Namespace,
+ output: TextIO) -> None:
+ by_operation = group_times_by_operation(time_storage)
+ data: Dict[str, Any] = {}
+ data["config"] = {
+ "repetitions": args.repetitions,
+ "operations": {
+ "device": args.device,
+ "operations": args.operations,
+ "stack": args.stack,
+ "stack_traceset": args.stack_traceset,
+ },
+ "dataset": {
+ "seed": args.seed,
+ "trace_count": args.trace_count,
+ "trace_length": args.trace_length,
+ "data_type": args.dtype,
+ "distribution": args.distribution,
+ "low": args.low,
+ "high": args.high,
+ "mean": args.mean,
+ "std_dev": args.std,
+ }
+ }
+ data["timing"] = [
+ {
+ "repetition": rep_num + 1,
+ "timings": {
+ ("stack"
+ if name.startswith("stack")
+ else name): duration
+ for name, duration
+ in rep
+ }
+ }
+ for rep_num, rep
+ in enumerate(time_storage)
+ ]
+ data["timing"].append({
+ "repetition": "total",
+ "timings": {
+ name: sum(durations)
+ for name, durations
+ in by_operation.items()
+ },
+ })
+ data["timing"][-1]["total"] = sum(
+ duration
+ for duration
+ in data["timing"][-1]["timings"].values()
+ )
+
+ operations = []
+ if args.time_stack:
+ operations.append("stack")
+ operations.extend(args.operations)
+
+ data["summary"] = {
+ op: {
+ "sum": np.sum(by_operation[op]),
+ "average": np.mean(by_operation[op]),
+ "min": np.min(by_operation[op]),
+ "max": np.max(by_operation[op]),
+ "std_dev": np.std(by_operation[op]),
+ "variance": np.var(by_operation[op]),
+ "median": np.median(by_operation[op]),
+ "q25": np.quantile(by_operation[op], 0.25),
+ "q75": np.quantile(by_operation[op], 0.75),
+ }
+ for op in operations
+ }
+
+ json.dump(data,
+ output,
+ cls=NumpyEncoder,
+ indent=4)
+
+
+def repetition(args: argparse.Namespace,
+ rng: npr.Generator) -> List[TimeRecord]:
+ # Prepare time storage
+ time_storage: List[TimeRecord] | None = []
+
+ # Generate data
+ if args.verbose:
+ print("Generating data...")
+ dataset = generate_dataset(rng,
+ args.trace_count,
+ args.trace_length,
+ args.dtype,
+ args.distribution,
+ args.low,
+ args.high,
+ args.mean,
+ args.std,
+ args.seed)
+
+ # Transform data for operations input
+ if args.stack:
+ if args.verbose:
+ print("Stacking data...")
+ data = stack(dataset,
+ not args.stack_traceset,
+ args.time_stack,
+ time_storage,
+ args.verbose)
+ else:
+ data = to_traceset(dataset)
+
+ if not args.operations:
+ report(time_storage)
+ return time_storage
+
+ if args.verbose:
+ print("Performing operations...")
+
+ # Operations on stacked traces
+ if args.stack:
+ # Initialize trace manager
+ assert isinstance(data, StackedTraces)
+ tm_class = (CPUTraceManager
+ if args.device == "cpu"
+ else GPUTraceManager)
+
+ trace_manager = tm_class(data)
+
+ # Perform operations
+ for op in args.operations:
+ if args.verbose:
+ print(f"Performing {op}...")
+ op_func = getattr(trace_manager, op)
+ timed(time_storage, args.verbose)(op_func)()
+ else:
+ assert isinstance(data, TraceSet)
+
+ # Perform operations
+ for op in args.operations:
+ if args.verbose:
+ print(f"Performing {op}...")
+ op_func = traceset_ops[op]
+ timed(time_storage, args.verbose)(op_func)(*data)
+
+ if args.verbose:
+ print("------------------------")
+ report(time_storage)
+ print("-" * 41 + "\n")
+ return time_storage
+
+
+def main(args: argparse.Namespace) -> None:
+ if args.verbose:
+ print(f"Repetitions: {args.repetitions}")
+ print(f"Dataset: {args.trace_count} x {args.trace_length} "
+ "(count x length)")
+ print(f"Device: {args.device},",
+ "stacked" if args.stack else "not stacked")
+ print(f"Operations: {', '.join(args.operations)}")
+
+ time_storage: List[List[TimeRecord]] = []
+ rng = np.random.default_rng(args.seed)
+ for i in range(args.repetitions):
+ print(f"Repetition {i + 1} of {args.repetitions}")
+ time_storage.append(repetition(args, rng))
+
+ total_time = sum(sum(dur for _, dur in rep)
+ for rep in time_storage)
+ print("\nSummary")
+ print(f"Total: {total_time:,} ns")
+ export_report(time_storage, args, args.output)
+
+
+if __name__ == "__main__":
+ args = _get_args(_get_parser())
+ main(args)