diff options
| author | Ján Jančár | 2023-03-15 15:33:53 +0100 |
|---|---|---|
| committer | GitHub | 2023-03-15 15:33:53 +0100 |
| commit | 7f31b31ea6c43466c857a6219ab0541bba94296e (patch) | |
| tree | 9922894f5d12afd835a89996a2a84318640a08dd | |
| parent | 445eaa41f22ed82502ca813e98a92c2b078c9a79 (diff) | |
| parent | 7d967fec195f01e73960c74bc6843cd3123d67cb (diff) | |
| download | pyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.tar.gz pyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.tar.zst pyecsca-7f31b31ea6c43466c857a6219ab0541bba94296e.zip | |
Merge pull request #27 from Tomko10/feat/stacked-perf-test
feat: Added CPU implementations of combine on stacked traces
| -rw-r--r-- | pyecsca/sca/stacked_traces/__init__.py | 1 | ||||
| -rw-r--r-- | pyecsca/sca/stacked_traces/combine.py | 397 | ||||
| -rw-r--r-- | pyecsca/sca/stacked_traces/stacked_traces.py | 276 | ||||
| -rw-r--r-- | test/sca/perf_stacked_combine.py | 496 |
4 files changed, 904 insertions, 266 deletions
diff --git a/pyecsca/sca/stacked_traces/__init__.py b/pyecsca/sca/stacked_traces/__init__.py index 024bd19..a38075a 100644 --- a/pyecsca/sca/stacked_traces/__init__.py +++ b/pyecsca/sca/stacked_traces/__init__.py @@ -1 +1,2 @@ from .stacked_traces import * +from .combine import * diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py new file mode 100644 index 0000000..f6bd738 --- /dev/null +++ b/pyecsca/sca/stacked_traces/combine.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +from numba import cuda +from numba.cuda import devicearray +import numpy as np +from math import sqrt + +from public import public +from typing import Callable, Union, Tuple, cast + +from pyecsca.sca.trace.trace import CombinedTrace +from pyecsca.sca.stacked_traces import StackedTraces + +TPB = Union[int, Tuple[int, ...]] +CudaCTX = Tuple[ + Tuple[devicearray.DeviceNDArray, ...], + Union[int, Tuple[int, ...]] +] + + +@public +class BaseTraceManager: + """Base class for trace managers""" + + traces: StackedTraces + + def __init__(self, traces: StackedTraces) -> None: + self.traces = traces + + def average(self) -> CombinedTrace: + """ + Average :paramref:`~.average.traces`, sample-wise. + + :param traces: + :return: + """ + raise NotImplementedError + + def conditional_average(self, cond: Callable[[np.ndarray], bool]) \ + -> CombinedTrace: + """ + Average :paramref:`~.conditional_average.traces` for which the + :paramref:`~.conditional_average.condition` is ``True``, sample-wise. + + :param traces: + :param condition: + :return: + """ + raise NotImplementedError + + def standard_deviation(self) -> CombinedTrace: + """ + Compute the sample standard-deviation of the + :paramref:`~.standard_deviation.traces`, sample-wise. + + :param traces: + :return: + """ + raise NotImplementedError + + def variance(self) -> CombinedTrace: + """ + Compute the sample variance of the + :paramref:`~.variance.traces`, sample-wise. + + :param traces: + :return: + """ + raise NotImplementedError + + def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: + """ + Compute the sample average and variance of the + :paramref:`~.average_and_variance.traces`, sample-wise. + + :param traces: + :return: + """ + raise NotImplementedError + + def add(self) -> CombinedTrace: + """ + Add :paramref:`~.add.traces`, sample-wise. + + :param traces: + :return: + """ + raise NotImplementedError + + +@public +class GPUTraceManager(BaseTraceManager): + """Manager for operations with stacked traces on GPU""" + + _tpb: TPB + _samples_global: devicearray.DeviceNDArray + + def __init__(self, traces: StackedTraces, tpb: TPB = 128) -> None: + if not cuda.is_available(): + raise RuntimeError("CUDA is not available, " + "use CPUTraceManager instead") + if isinstance(tpb, int) and tpb % 32 != 0: + raise ValueError('TPB should be a multiple of 32') + if isinstance(tpb, tuple) and any(t % 32 != 0 for t in tpb): + raise ValueError( + 'TPB should be a multiple of 32 in each dimension' + ) + + super().__init__(traces) + self.tpb = tpb + self._samples_global = cuda.to_device(self.traces.samples) + + def _setup1D(self, output_count: int) -> CudaCTX: + """ + Creates context for 1D GPU CUDA functions + + :param traces: The input stacked traces. + :param tpb: Threads per block to invoke the kernel with. + :param output_count: Number of outputs expected from the GPU function. + :return: Created context of input and output arrays and calculated + blocks per grid dimensions. + """ + if not isinstance(self.tpb, int): + raise TypeError("tpb is not an int for a 1D kernel") + + device_output = tuple(( + cuda.device_array(self.traces.samples.shape[1]) + for _ in range(output_count) + )) + bpg = (self.traces.samples.size + (self.tpb - 1)) // self.tpb + + return device_output, bpg + + def _gpu_combine1D(self, func, output_count: int = 1) \ + -> Union[CombinedTrace, Tuple[CombinedTrace, ...]]: + """ + Runs GPU Cuda StackedTrace 1D combine function + + :param func: Function to run. + :param traces: Stacked traces to provide as input to the function. + :param tpb: Threads per block to invoke the kernel with + :param output_count: Number of outputs expected from the GPU function. + :return: Combined trace output from the GPU function + """ + device_outputs, bpg = self._setup1D(output_count) + + func[bpg, self.tpb](self._samples_global, *device_outputs) + + if len(device_outputs) == 1: + return CombinedTrace( + device_outputs[0].copy_to_host(), + self.traces.meta + ) + return tuple( + CombinedTrace(device_output.copy_to_host(), self.traces.meta) + for device_output + in device_outputs + ) + + def average(self) -> CombinedTrace: + return cast(CombinedTrace, self._gpu_combine1D(gpu_average, 1)) + + def conditional_average(self, cond: Callable[[np.ndarray], bool]) \ + -> CombinedTrace: + raise NotImplementedError() + + def standard_deviation(self) -> CombinedTrace: + return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev, 1)) + + def variance(self) -> CombinedTrace: + return cast(CombinedTrace, self._gpu_combine1D(gpu_variance, 1)) + + def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: + averages, variances = self._gpu_combine1D(gpu_avg_var, 2) + return averages, variances + + def add(self) -> CombinedTrace: + return cast(CombinedTrace, self._gpu_combine1D(gpu_add, 1)) + + +@cuda.jit(device=True) +def _gpu_average(col: int, samples: np.ndarray, result: np.ndarray): + """ + Cuda device thread function computing the average of a sample of stacked traces. + + :param col: Index of the sample. + :param samples: Shared array of the samples of stacked traces. + :param result: Result output array. + """ + acc = 0. + for row in range(samples.shape[0]): + acc += samples[row, col] + result[col] = acc / samples.shape[0] + + +@cuda.jit +def gpu_average(samples: np.ndarray, result: np.ndarray): + """ + Sample average of stacked traces, sample-wise. + + :param samples: Stacked traces' samples. + :param result: Result output array. + """ + col = cuda.grid(1) + + if col >= samples.shape[1]: + return + + _gpu_average(col, samples, result) + + +@cuda.jit(device=True) +def _gpu_var_from_avg(col: int, samples: np.ndarray, + averages: np.ndarray, result: np.ndarray): + """ + Cuda device thread function computing the variance from the average of a sample of stacked traces. + + :param col: Index of the sample. + :param samples: Shared array of the samples of stacked traces. + :param averages: Array of averages of samples. + :param result: Result output array. + """ + var = 0. + for row in range(samples.shape[0]): + current = samples[row, col] - averages[col] + var += current * current + result[col] = var / samples.shape[0] + + +@cuda.jit(device=True) +def _gpu_variance(col: int, samples: np.ndarray, result: np.ndarray): + """ + Cuda device thread function computing the variance of a sample of stacked traces. + + :param col: Index of the sample. + :param samples: Shared array of the samples of stacked traces. + :param result: Result output array. + """ + _gpu_average(col, samples, result) + _gpu_var_from_avg(col, samples, result, result) + + +@cuda.jit +def gpu_std_dev(samples: np.ndarray, result: np.ndarray): + """ + Sample standard deviation of stacked traces, sample-wise. + + :param samples: Stacked traces' samples. + :param result: Result output array. + """ + col = cuda.grid(1) + + if col >= samples.shape[1]: + return + + _gpu_variance(col, samples, result) + + result[col] = sqrt(result[col]) + + +@cuda.jit +def gpu_variance(samples: np.ndarray, result: np.ndarray): + """ + Sample variance of stacked traces, sample-wise. + + :param samples: Stacked traces' samples. + :param result: Result output array. + """ + col = cuda.grid(1) + + if col >= samples.shape[1]: + return + + _gpu_variance(col, samples, result) + + +@cuda.jit +def gpu_avg_var(samples: np.ndarray, result_avg: np.ndarray, + result_var: np.ndarray): + """ + Sample average and variance of stacked traces, sample-wise. + + :param samples: Stacked traces' samples. + :param result_avg: Result average output array. + :param result_var: Result variance output array. + """ + col = cuda.grid(1) + + if col >= samples.shape[1]: + return + + _gpu_average(col, samples, result_avg) + _gpu_var_from_avg(col, samples, result_avg, result_var) + + +@cuda.jit +def gpu_add(samples: np.ndarray, result: np.ndarray): + """ + Add samples of stacked traces, sample-wise. + + :param samples: Stacked traces' samples. + :param result: Result output array. + """ + col = cuda.grid(1) + + if col >= samples.shape[1]: + return + + res = 0. + for row in range(samples.shape[0]): + res += samples[row, col] + result[col] = res + + +@public +class CPUTraceManager: + """Manager for operations on stacked traces on CPU.""" + + traces: StackedTraces + + def __init__(self, traces: StackedTraces) -> None: + self.traces = traces + + def average(self) -> CombinedTrace: + """ + Compute the average of the :paramref:`~.average.traces`, sample-wise. + + :param traces: + :return: + """ + return CombinedTrace( + np.average(self.traces.samples, 0), + self.traces.meta + ) + + def conditional_average(self, condition: Callable[[np.ndarray], bool]) -> CombinedTrace: + """ + Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise. + + :param traces: + :return: + """ + # TODO: Consider other ways to implement this + samples = self.traces.samples + mask = samples[np.apply_along_axis(condition, 1, samples)] + return CombinedTrace( + np.average(samples[mask], 1), + self.traces.meta + ) + + def standard_deviation(self) -> CombinedTrace: + """ + Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise. + + :param traces: + :return: + """ + return CombinedTrace( + np.std(self.traces.samples, 0), + self.traces.meta + ) + + def variance(self) -> CombinedTrace: + """ + Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise. + + :param traces: + :return: + """ + return CombinedTrace( + np.var(self.traces.samples, 0), + self.traces.meta + ) + + def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: + """ + Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise. + + :param traces: + :return: + """ + return ( + self.average(), + self.variance() + ) + + def add(self) -> CombinedTrace: + """ + Add :paramref:`~.add.traces`, sample-wise. + + :param traces: + :return: + """ + return CombinedTrace( + np.sum(self.traces.samples, 0), + self.traces.meta + ) diff --git a/pyecsca/sca/stacked_traces/stacked_traces.py b/pyecsca/sca/stacked_traces/stacked_traces.py index c54abc2..09169bd 100644 --- a/pyecsca/sca/stacked_traces/stacked_traces.py +++ b/pyecsca/sca/stacked_traces/stacked_traces.py @@ -1,11 +1,10 @@ -from numba import cuda -from numba.cuda import devicearray +from __future__ import annotations + import numpy as np from public import public -from typing import Any, Mapping, Sequence, Tuple, Union, Optional -from math import sqrt +from typing import Any, Mapping, Sequence -from pyecsca.sca.trace.trace import CombinedTrace +from pyecsca.sca.trace_set.base import TraceSet @public @@ -15,9 +14,10 @@ class StackedTraces: meta: Mapping[str, Any] samples: np.ndarray + # TODO: Split metadata into common and per-trace def __init__( self, samples: np.ndarray, - meta: Optional[Mapping[str, Any]] = None) -> None: + meta: Mapping[str, Any] | None = None) -> None: if meta is None: meta = {} self.meta = meta @@ -25,7 +25,9 @@ class StackedTraces: @classmethod def fromarray(cls, traces: Sequence[np.ndarray], - meta: Optional[Mapping[str, Any]] = None) -> 'StackedTraces': + meta: Mapping[str, Any] | None = None) -> 'StackedTraces': + if meta is None: + meta = {} ts = list(traces) min_samples = min(map(len, ts)) for i, t in enumerate(ts): @@ -34,7 +36,7 @@ class StackedTraces: return cls(stacked, meta) @classmethod - def fromtraceset(cls, traceset) -> 'StackedTraces': + def fromtraceset(cls, traceset: TraceSet) -> 'StackedTraces': traces = [t.samples for t in traceset] return cls.fromarray(traces) @@ -46,261 +48,3 @@ class StackedTraces: def __iter__(self): yield from self.samples - - -TPB = Union[int, Tuple[int, ...]] -CudaCTX = Tuple[ - Tuple[devicearray.DeviceNDArray, ...], - Union[int, Tuple[int, ...]] -] - - -@public -class GPUTraceManager: - """Manager for operations with stacked traces on GPU""" - - traces: StackedTraces - _tpb: TPB - _samples_global: devicearray.DeviceNDArray - - def __init__(self, traces: StackedTraces, tpb: TPB = 128) -> None: - if isinstance(tpb, int) and tpb % 32 != 0: - raise ValueError('TPB should be a multiple of 32') - if isinstance(tpb, tuple) and any(t % 32 != 0 for t in tpb): - raise ValueError( - 'TPB should be a multiple of 32 in each dimension' - ) - - self.traces = traces - self.tpb = tpb - self._samples_global = cuda.to_device(self.traces.samples) - - def _setup1D(self, output_count: int) -> CudaCTX: - """ - Creates context for 1D GPU CUDA functions - - :param traces: The input stacked traces. - :param tpb: Threads per block to invoke the kernel with. - :param output_count: Number of outputs expected from the GPU function. - :return: Created context of input and output arrays and calculated - blocks per grid dimensions. - """ - if not isinstance(self.tpb, int): - raise TypeError("tpb is not an int for a 1D kernel") - - device_output = tuple(( - cuda.device_array(self.traces.samples.shape[1]) - for _ in range(output_count) - )) - bpg = (self.traces.samples.size + (self.tpb - 1)) // self.tpb - - return device_output, bpg - - def _gpu_combine1D(self, func, output_count: int = 1) \ - -> Tuple[CombinedTrace, ...]: - """ - Runs GPU Cuda StackedTrace 1D combine function - - :param func: Function to run. - :param traces: Stacked traces to provide as input to the function. - :param tpb: Threads per block to invoke the kernel with - :param output_count: Number of outputs expected from the GPU function. - :return: Combined trace output from the GPU function - """ - device_outputs, bpg = self._setup1D(output_count) - - func[bpg, self.tpb](self._samples_global, *device_outputs) - - return tuple( - CombinedTrace(device_output.copy_to_host(), self.traces.meta) - for device_output - in device_outputs - ) - - def average(self) -> CombinedTrace: - """ - Average :paramref:`~.average.traces`, sample-wise. - - :param traces: - :return: - """ - return self._gpu_combine1D(gpu_average, 1)[0] - - def conditional_average(self) -> CombinedTrace: - """ - Not implemented due to the nature of GPU functions. - - Use sca.trace.combine.conditional_average instead. - """ - raise NotImplementedError - - def standard_deviation(self) -> CombinedTrace: - """ - Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise. - - :param traces: - :return: - """ - return self._gpu_combine1D(gpu_std_dev, 1)[0] - - def variance(self) -> CombinedTrace: - """ - Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise. - - :param traces: - :return: - """ - return self._gpu_combine1D(gpu_variance, 1)[0] - - def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: - """ - Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise. - - :param traces: - :return: - """ - averages, variances = self._gpu_combine1D(gpu_avg_var, 2) - return averages, variances - - def add(self) -> CombinedTrace: - """ - Add :paramref:`~.add.traces`, sample-wise. - - :param traces: - :return: - """ - return self._gpu_combine1D(gpu_add, 1)[0] - - -@cuda.jit(device=True) -def _gpu_average(col: int, samples: np.ndarray, result: np.ndarray): - """ - Cuda device thread function computing the average of a sample of stacked traces. - - :param col: Index of the sample. - :param samples: Shared array of the samples of stacked traces. - :param result: Result output array. - """ - acc = 0. - for row in range(samples.shape[0]): - acc += samples[row, col] - result[col] = acc / samples.shape[0] - - -@cuda.jit -def gpu_average(samples: np.ndarray, result: np.ndarray): - """ - Sample average of stacked traces, sample-wise. - - :param samples: Stacked traces' samples. - :param result: Result output array. - """ - col = cuda.grid(1) - - if col >= samples.shape[1]: - return - - _gpu_average(col, samples, result) - - -@cuda.jit(device=True) -def _gpu_var_from_avg(col: int, samples: np.ndarray, - averages: np.ndarray, result: np.ndarray): - """ - Cuda device thread function computing the variance from the average of a sample of stacked traces. - - :param col: Index of the sample. - :param samples: Shared array of the samples of stacked traces. - :param averages: Array of averages of samples. - :param result: Result output array. - """ - var = 0. - for row in range(samples.shape[0]): - current = samples[row, col] - averages[col] - var += current * current - result[col] = var / samples.shape[0] - - -@cuda.jit(device=True) -def _gpu_variance(col: int, samples: np.ndarray, result: np.ndarray): - """ - Cuda device thread function computing the variance of a sample of stacked traces. - - :param col: Index of the sample. - :param samples: Shared array of the samples of stacked traces. - :param result: Result output array. - """ - _gpu_average(col, samples, result) - _gpu_var_from_avg(col, samples, result, result) - - -@cuda.jit -def gpu_std_dev(samples: np.ndarray, result: np.ndarray): - """ - Sample standard deviation of stacked traces, sample-wise. - - :param samples: Stacked traces' samples. - :param result: Result output array. - """ - col = cuda.grid(1) - - if col >= samples.shape[1]: - return - - _gpu_variance(col, samples, result) - - result[col] = sqrt(result[col]) - - -@cuda.jit -def gpu_variance(samples: np.ndarray, result: np.ndarray): - """ - Sample variance of stacked traces, sample-wise. - - :param samples: Stacked traces' samples. - :param result: Result output array. - """ - col = cuda.grid(1) - - if col >= samples.shape[1]: - return - - _gpu_variance(col, samples, result) - - -@cuda.jit -def gpu_avg_var(samples: np.ndarray, result_avg: np.ndarray, - result_var: np.ndarray): - """ - Sample average and variance of stacked traces, sample-wise. - - :param samples: Stacked traces' samples. - :param result_avg: Result average output array. - :param result_var: Result variance output array. - """ - col = cuda.grid(1) - - if col >= samples.shape[1]: - return - - _gpu_average(col, samples, result_avg) - _gpu_var_from_avg(col, samples, result_avg, result_var) - - -@cuda.jit -def gpu_add(samples: np.ndarray, result: np.ndarray): - """ - Add samples of stacked traces, sample-wise. - - :param samples: Stacked traces' samples. - :param result: Result output array. - """ - col = cuda.grid(1) - - if col >= samples.shape[1]: - return - - res = 0. - for row in range(samples.shape[0]): - res += samples[row, col] - result[col] = res diff --git a/test/sca/perf_stacked_combine.py b/test/sca/perf_stacked_combine.py new file mode 100644 index 0000000..ab4c5cd --- /dev/null +++ b/test/sca/perf_stacked_combine.py @@ -0,0 +1,496 @@ +from __future__ import annotations + +import argparse +import json +import sys +from typing import Any, Callable, Dict, List, TextIO, Tuple + +import numpy as np +import numpy.random as npr +import numpy.typing as npt + +from pyecsca.sca import (CPUTraceManager, GPUTraceManager, StackedTraces, + Trace, TraceSet, add, average, average_and_variance, + conditional_average, standard_deviation, variance) + +Operation = str +Duration = int +TimeRecord = Tuple[Operation, Duration] + +traceset_ops = { + "average": average, + "conditional_average": conditional_average, + "standard_deviation": standard_deviation, + "variance": variance, + "average_and_variance": average_and_variance, + "add": add, +} + + +def _generate_floating(rng: npr.Generator, + trace_count: int, + trace_length: int, + dtype: npt.DTypeLike = np.float32, + distribution: str = "uniform", + low: float = 0.0, + high: float = 1.0, + mean: float = 0.0, + std: float = 0.0) -> np.ndarray: + if not np.issubdtype(dtype, np.floating): + raise ValueError("dtype must be a floating point type") + + dtype_ = (dtype if (np.issubdtype(dtype, np.float32) + or np.issubdtype(dtype, np.float64)) + else np.float32) + if distribution == "uniform": + samples = rng.random((trace_count, trace_length), + dtype=dtype_) # type: ignore + + if (not np.issubdtype(dtype, np.float32) + and not np.issubdtype(dtype, np.float64)): + samples = samples.astype(dtype) + return (samples * (high - low) + low) + elif distribution == "normal": + return (rng + .normal(mean, std, (trace_count, trace_length)) + .clip(low, high) + .astype(dtype)) + + raise ValueError("Unknown distribution") + + +def _generate_integers(rng: npr.Generator, + trace_count: int, + trace_length: int, + dtype: npt.DTypeLike = np.int32, + distribution: str = "uniform", + low: int = 0, + high: int = 1, + mean: float = 0.0, + std: float = 0.0) -> np.ndarray: + if not np.issubdtype(dtype, np.integer): + raise ValueError("dtype must be an integer type") + + if distribution == "uniform": + return rng.integers(low, + high, + size=(trace_count, trace_length), + dtype=dtype) # type: ignore + elif distribution == "normal": + return (rng + .normal(mean, std, (trace_count, trace_length)) + .astype(dtype) + .clip(low, high - 1)) + + raise ValueError("Unknown distribution") + + +def generate_dataset(rng: npr.Generator, + trace_count: int, + trace_length: int, + dtype: npt.DTypeLike = np.float32, + distribution: str = "uniform", + low: float | int = 0, + high: float | int = 1, + mean: float | int = 0, + std: float | int = 1, + seed: int | None = None) -> np.ndarray: + """Generate a TraceSet with random samples + + For float dtype only float32 and float64 are supported natively, + other floats are converted after generation. + For int dtype, all numpy int types are supported. + :param trace_count: Number of traces + :param trace_length: Number of samples per trace + :param dtype: Data type of the samples + :param low: Lower bound of the samples + :param high: Upper bound of the samples + :param seed: Seed for the random number generator + :return: TraceSet + """ + if (not np.issubdtype(dtype, np.integer) + and not np.issubdtype(dtype, np.floating)): + raise ValueError("dtype must be an integer or floating point type") + + gen_fun, cast_fun = ((_generate_integers, int) + if np.issubdtype(dtype, np.integer) else + (_generate_floating, float)) + samples = gen_fun(rng, + trace_count, + trace_length, + dtype, + distribution, + cast_fun(low), # type: ignore + cast_fun(high), # type: ignore + mean, + std) + + return samples + + +def timed(time_storage: List[TimeRecord] | None = None, + log: bool = True) \ + -> Callable[[Callable[..., Any]], Callable[..., Any]]: + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + import time + + def timed_func(*args, **kwargs) -> Callable[..., Callable]: + start = time.perf_counter_ns() + result = func(*args, **kwargs) + duration = time.perf_counter_ns() - start + if log: + print(f"{func.__name__} took {duration} ns") + if time_storage is not None: + time_storage.append((func.__name__, duration)) + return result + return timed_func + return decorator + + +def stack_traceset(traceset: TraceSet) -> StackedTraces: + return StackedTraces.fromtraceset(traceset) + + +def stack_array(dataset: np.ndarray) -> StackedTraces: + return StackedTraces.fromarray(dataset) # type: ignore + + +def to_traceset(dataset: np.ndarray) -> TraceSet: + return TraceSet(*(Trace(samples) for samples in dataset)) + + +def stack(dataset: np.ndarray, + from_array: bool, + time: bool, + time_storage: List[TimeRecord] | None = None, + log: bool = True) -> StackedTraces: + time_fun = timed(time_storage, log) if time else lambda x: x + data = (dataset + if from_array + else to_traceset(dataset)) + stack_fun = stack_array if from_array else stack_traceset + return time_fun(stack_fun)(data) + + +def _get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + parser.add_argument("-r", "--repetitions", type=int, + default=1, help="Number of repetitions") + parser.add_argument( + "-o", "--output", + type=argparse.FileType("w"), + default=sys.stdout, + help="Output file" + ) + combine = parser.add_argument_group( + "operations", + "Operations to perform on the traces" + ) + combine.add_argument( + "-d", "--device", + choices=["cpu", "gpu"], + help="Device to use for the computation" + ) + stacking = combine.add_mutually_exclusive_group() + stacking.add_argument( + "-s", "--stack", + action="store_true", + default=False, + help="Use stacked traces" + ) + stacking.add_argument( + "--stack-traceset", + action="store_true", + default=False, + help="Perform stacking from a TraceSet" + ) + combine.add_argument( + "--time-stack", + action="store_true", + default=False, + help="Time the stacking operation" + ) + + combine.add_argument( + "--operations", + nargs="*", + choices=traceset_ops.keys(), + help="Operations to perform on the traces" + ) + + dataset = parser.add_argument_group( + "data generation", + "Options for data generation" + ) + dataset.add_argument("--trace-count", type=int, + default=1024, help="Number of traces") + dataset.add_argument("--trace-length", type=int, + default=1024, help="Number of samples per trace") + dataset.add_argument("--seed", type=int, default=None, + help="Seed for the random number generator") + dataset.add_argument( + "--dtype", + type=str, + default="float32", + choices=["float16", "float32", "float64", "int8", + "int16", "int32", "int64"], + help="Data type of the samples" + ) + dataset.add_argument( + "--distribution", + type=str, + default="uniform", + choices=["uniform", "normal"], + help="Distribution of the samples") + dataset.add_argument("--low", type=float, default=0.0, + help="Inclusive lower bound for generated samples") + dataset.add_argument("--high", type=float, default=1.0, + help="Exclusive upper bound for generated samples") + dataset.add_argument("--mean", type=float, default=0.0, + help="Mean of the normal distribution") + dataset.add_argument("--std", type=float, default=1.0, + help="Standard deviation of the normal distribution") + + verbosity = parser.add_mutually_exclusive_group() + verbosity.add_argument("-v", "--verbose", action="store_true") + verbosity.add_argument("-q", "--quiet", action="store_true") + return parser + + +def _get_args(parser: argparse.ArgumentParser) -> argparse.Namespace: + args = parser.parse_args() + + if args.time_stack and not args.stack and not args.stack_traceset: + parser.error("Cannot time stack without stacking") + + if not args.operations and not args.stack: + parser.error("No operation specified") + + if args.low >= args.high: + parser.error("Lower bound must be smaller than upper bound") + + if args.operations and not args.device: + parser.error("Device must be specified when performing operations") + + if (args.operations + and args.device == "gpu" + and not args.stack + and not args.stack_traceset): + args.stack = True + args.stack_traceset = False + + return args + + +def report(time_storage: List[TimeRecord], + total_only: bool = False) -> None: + if total_only: + print(f"Total: {sum(duration for _, duration in time_storage):,} ns") + return + + print("Timings:") + for name, duration in time_storage: + print(f"{name : <20} | {duration : 15,} ns") + print("-" * 41) + print(f"{'Total' : <20} | " + f"{sum(duration for _, duration in time_storage) : 15,} ns") + + +def group_times_by_operation(time_storage: List[List[TimeRecord]]) \ + -> Dict[Operation, List[Duration]]: + result: Dict[Operation, List[Duration]] = {} + for times in time_storage: + for operation, duration in times: + if operation.startswith("stack"): + operation = "stack" + result.setdefault(operation, []).append(duration) + + return result + + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj: Any) -> Any: + if isinstance(obj, np.integer): + return int(obj) + if isinstance(obj, np.floating): + return float(obj) + if isinstance(obj, np.ndarray): + return obj.tolist() + return super().default(obj) + + +def export_report(time_storage: List[List[TimeRecord]], + args: argparse.Namespace, + output: TextIO) -> None: + by_operation = group_times_by_operation(time_storage) + data: Dict[str, Any] = {} + data["config"] = { + "repetitions": args.repetitions, + "operations": { + "device": args.device, + "operations": args.operations, + "stack": args.stack, + "stack_traceset": args.stack_traceset, + }, + "dataset": { + "seed": args.seed, + "trace_count": args.trace_count, + "trace_length": args.trace_length, + "data_type": args.dtype, + "distribution": args.distribution, + "low": args.low, + "high": args.high, + "mean": args.mean, + "std_dev": args.std, + } + } + data["timing"] = [ + { + "repetition": rep_num + 1, + "timings": { + ("stack" + if name.startswith("stack") + else name): duration + for name, duration + in rep + } + } + for rep_num, rep + in enumerate(time_storage) + ] + data["timing"].append({ + "repetition": "total", + "timings": { + name: sum(durations) + for name, durations + in by_operation.items() + }, + }) + data["timing"][-1]["total"] = sum( + duration + for duration + in data["timing"][-1]["timings"].values() + ) + + operations = [] + if args.time_stack: + operations.append("stack") + operations.extend(args.operations) + + data["summary"] = { + op: { + "sum": np.sum(by_operation[op]), + "average": np.mean(by_operation[op]), + "min": np.min(by_operation[op]), + "max": np.max(by_operation[op]), + "std_dev": np.std(by_operation[op]), + "variance": np.var(by_operation[op]), + "median": np.median(by_operation[op]), + "q25": np.quantile(by_operation[op], 0.25), + "q75": np.quantile(by_operation[op], 0.75), + } + for op in operations + } + + json.dump(data, + output, + cls=NumpyEncoder, + indent=4) + + +def repetition(args: argparse.Namespace, + rng: npr.Generator) -> List[TimeRecord]: + # Prepare time storage + time_storage: List[TimeRecord] | None = [] + + # Generate data + if args.verbose: + print("Generating data...") + dataset = generate_dataset(rng, + args.trace_count, + args.trace_length, + args.dtype, + args.distribution, + args.low, + args.high, + args.mean, + args.std, + args.seed) + + # Transform data for operations input + if args.stack: + if args.verbose: + print("Stacking data...") + data = stack(dataset, + not args.stack_traceset, + args.time_stack, + time_storage, + args.verbose) + else: + data = to_traceset(dataset) + + if not args.operations: + report(time_storage) + return time_storage + + if args.verbose: + print("Performing operations...") + + # Operations on stacked traces + if args.stack: + # Initialize trace manager + assert isinstance(data, StackedTraces) + tm_class = (CPUTraceManager + if args.device == "cpu" + else GPUTraceManager) + + trace_manager = tm_class(data) + + # Perform operations + for op in args.operations: + if args.verbose: + print(f"Performing {op}...") + op_func = getattr(trace_manager, op) + timed(time_storage, args.verbose)(op_func)() + else: + assert isinstance(data, TraceSet) + + # Perform operations + for op in args.operations: + if args.verbose: + print(f"Performing {op}...") + op_func = traceset_ops[op] + timed(time_storage, args.verbose)(op_func)(*data) + + if args.verbose: + print("------------------------") + report(time_storage) + print("-" * 41 + "\n") + return time_storage + + +def main(args: argparse.Namespace) -> None: + if args.verbose: + print(f"Repetitions: {args.repetitions}") + print(f"Dataset: {args.trace_count} x {args.trace_length} " + "(count x length)") + print(f"Device: {args.device},", + "stacked" if args.stack else "not stacked") + print(f"Operations: {', '.join(args.operations)}") + + time_storage: List[List[TimeRecord]] = [] + rng = np.random.default_rng(args.seed) + for i in range(args.repetitions): + print(f"Repetition {i + 1} of {args.repetitions}") + time_storage.append(repetition(args, rng)) + + total_time = sum(sum(dur for _, dur in rep) + for rep in time_storage) + print("\nSummary") + print(f"Total: {total_time:,} ns") + export_report(time_storage, args, args.output) + + +if __name__ == "__main__": + args = _get_args(_get_parser()) + main(args) |
