diff options
| -rw-r--r-- | pyecsca/sca/__init__.py | 4 | ||||
| -rw-r--r-- | pyecsca/sca/stacked_traces/__init__.py | 2 | ||||
| -rw-r--r-- | pyecsca/sca/stacked_traces/combine.py | 159 | ||||
| -rw-r--r-- | pyecsca/sca/trace/trace.py | 13 |
4 files changed, 80 insertions, 98 deletions
diff --git a/pyecsca/sca/__init__.py b/pyecsca/sca/__init__.py index 89e8527..3394de0 100644 --- a/pyecsca/sca/__init__.py +++ b/pyecsca/sca/__init__.py @@ -1,9 +1,9 @@ """Package for Side-Channel Analysis.""" +from .attack import * from .re import * from .scope import * +from .stacked_traces import * from .target import * from .trace import * from .trace_set import * -from .stacked_traces import * -from .attack import * diff --git a/pyecsca/sca/stacked_traces/__init__.py b/pyecsca/sca/stacked_traces/__init__.py index a38075a..cc015e7 100644 --- a/pyecsca/sca/stacked_traces/__init__.py +++ b/pyecsca/sca/stacked_traces/__init__.py @@ -1,2 +1,4 @@ +"""Package for efficient handling of stacked traces and GPU acceleration.""" + from .stacked_traces import * from .combine import * diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py index d43cd10..bf1e857 100644 --- a/pyecsca/sca/stacked_traces/combine.py +++ b/pyecsca/sca/stacked_traces/combine.py @@ -1,5 +1,7 @@ from __future__ import annotations +from abc import ABC, abstractmethod + from numba import cuda from numba.cuda import devicearray from numba.cuda.cudadrv.devicearray import DeviceNDArray @@ -7,6 +9,7 @@ import numpy as np import numpy.typing as npt from math import sqrt +from numba.cuda.types import CUDADispatcher from public import public from typing import Callable, Union, Tuple, Optional, cast, List @@ -18,7 +21,7 @@ CudaCTX = Tuple[devicearray.DeviceNDArray, ...] @public -class BaseTraceManager: +class BaseTraceManager(ABC): """Base class for trace managers""" _traces: StackedTraces @@ -26,67 +29,76 @@ class BaseTraceManager: def __init__(self, traces: StackedTraces) -> None: self._traces = traces + @abstractmethod def average(self) -> CombinedTrace: """ - Average :paramref:`~.average.traces`, sample-wise. + Average traces, sample-wise. - :param traces: - :return: + :return: The average of the traces. """ raise NotImplementedError + @abstractmethod def conditional_average( self, cond: Callable[[npt.NDArray[np.number]], bool] ) -> CombinedTrace: """ - Average :paramref:`~.conditional_average.traces` for which the - :paramref:`~.conditional_average.condition` is ``True``, sample-wise. + Average traces for which the + :paramref:`~.conditional_average.cond` is ``True``, sample-wise. - :param traces: - :param condition: - :return: + :param cond: The condition for selecting the traces. + :return: The average of (some of) the traces. """ raise NotImplementedError + @abstractmethod def standard_deviation(self) -> CombinedTrace: """ - Compute the sample standard-deviation of the - :paramref:`~.standard_deviation.traces`, sample-wise. + Compute the sample standard-deviation of the traces, sample-wise. - :param traces: - :return: + :return: The standard deviation of the traces. """ raise NotImplementedError + @abstractmethod def variance(self) -> CombinedTrace: """ - Compute the sample variance of the - :paramref:`~.variance.traces`, sample-wise. + Compute the sample variance of the traces, sample-wise. - :param traces: - :return: + :return: The variance of the traces. """ raise NotImplementedError + @abstractmethod def average_and_variance(self) -> List[CombinedTrace]: """ - Compute the sample average and variance of the - :paramref:`~.average_and_variance.traces`, sample-wise. + Compute the sample average and variance of the traces, sample-wise. - :param traces: - :return: + :return: The average and variance of the traces. """ raise NotImplementedError + @abstractmethod def add(self) -> CombinedTrace: """ - Add :paramref:`~.add.traces`, sample-wise. + Add traces, sample-wise. - :param traces: - :return: + :return: The sum of the traces. """ raise NotImplementedError + @abstractmethod + def pearson_corr( + self, intermediate_values: npt.NDArray[np.number] + ) -> CombinedTrace: + """ + Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise. + + :param intermediate_values: A 1D array of shape (n,) containing the intermediate values. + :type intermediate_values: npt.NDArray[np.number] + :return: The Pearson correlation coefficient between the samples and intermediate values. + """ + raise NotImplementedError InputType = Union[npt.NDArray[np.number], npt.ArrayLike] @@ -96,7 +108,7 @@ STREAM_COUNT = 4 @public -class GPUTraceManager(BaseTraceManager): +class GPUTraceManager(BaseTraceManager): # pragma: no cover """Manager for operations with stacked traces on GPU""" _tpb: TPB @@ -234,7 +246,7 @@ class GPUTraceManager(BaseTraceManager): return self._traces.samples.shape def _gpu_combine1D( - self, func, inputs: Optional[List[InputType]] = None, output_count: int = 1 + self, func: CUDADispatcher, inputs: Optional[List[InputType]] = None, output_count: int = 1 ) -> Union[CombinedTrace, List[CombinedTrace]]: inputs = [] if inputs is None else inputs results = self._combine_func(func, inputs, output_count) @@ -245,7 +257,7 @@ class GPUTraceManager(BaseTraceManager): return [CombinedTrace(result, self._traces.meta) for result in results] def _gpu_combine1D_all( - self, func, inputs: List[InputType], output_count: int = 1 + self, func: CUDADispatcher, inputs: List[InputType], output_count: int = 1 ) -> List[npt.NDArray[np.number]]: """ Runs a combination function on the samples column-wise. @@ -270,7 +282,7 @@ class GPUTraceManager(BaseTraceManager): return [device_output.copy_to_host() for device_output in device_outputs] def _gpu_combine1D_chunked( - self, func, inputs: List[InputType], output_count: int = 1 + self, func: CUDADispatcher, inputs: List[InputType], output_count: int = 1 ) -> List[npt.NDArray[np.number]]: if self._chunk_size is None: raise ValueError("Something went wrong. " "Chunk size should be specified") @@ -348,7 +360,7 @@ class GPUTraceManager(BaseTraceManager): return [np.concatenate(chunk_result) for chunk_result in chunk_results] def average(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_average)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_average)) # type: ignore def conditional_average( self, cond: Callable[[npt.NDArray[np.number]], bool] @@ -356,17 +368,17 @@ class GPUTraceManager(BaseTraceManager): raise NotImplementedError() def standard_deviation(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev)) # type: ignore def variance(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_variance)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_variance)) # type: ignore def average_and_variance(self) -> List[CombinedTrace]: - averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2) + averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2) # type: ignore return [averages, variances] def add(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) # type: ignore def pearson_corr( self, intermediate_values: npt.NDArray[np.number] @@ -389,7 +401,7 @@ class GPUTraceManager(BaseTraceManager): np.array([intermed_sq_sum]), ] - return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs)) # type: ignore def run( self, @@ -397,11 +409,11 @@ class GPUTraceManager(BaseTraceManager): inputs: Optional[List[InputType]] = None, output_count: int = 1, ) -> Union[CombinedTrace, List[CombinedTrace]]: - return self._gpu_combine1D(func, inputs, output_count) + return self._gpu_combine1D(func, inputs, output_count) # type: ignore @cuda.jit(device=True, cache=True) -def _gpu_average( +def _gpu_average( # pragma: no cover col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number] ): """ @@ -418,7 +430,7 @@ def _gpu_average( @cuda.jit(cache=True) -def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): +def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover """ Sample average of stacked traces, sample-wise. @@ -434,7 +446,7 @@ def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]) @cuda.jit(device=True, cache=True) -def _gpu_var_from_avg( +def _gpu_var_from_avg( # pragma: no cover col: int, samples: npt.NDArray[np.number], averages: npt.NDArray[np.number], @@ -456,7 +468,7 @@ def _gpu_var_from_avg( @cuda.jit(device=True, cache=True) -def _gpu_variance( +def _gpu_variance( # pragma: no cover col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number] ): """ @@ -471,7 +483,7 @@ def _gpu_variance( @cuda.jit(cache=True) -def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): +def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover """ Sample standard deviation of stacked traces, sample-wise. @@ -489,7 +501,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]) @cuda.jit(cache=True) -def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): +def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover """ Sample variance of stacked traces, sample-wise. @@ -505,7 +517,7 @@ def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number] @cuda.jit(cache=True) -def gpu_avg_var( +def gpu_avg_var( # pragma: no cover samples: npt.NDArray[np.number], result_avg: npt.NDArray[np.number], result_var: npt.NDArray[np.number], @@ -527,7 +539,7 @@ def gpu_avg_var( @cuda.jit(cache=True) -def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): +def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover """ Add samples of stacked traces, sample-wise. @@ -546,7 +558,7 @@ def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): @cuda.jit(cache=True) -def gpu_pearson_corr( +def gpu_pearson_corr( # pragma: no cover samples: DeviceNDArray, intermediate_values: DeviceNDArray, intermed_sum: DeviceNDArray, @@ -590,78 +602,37 @@ def gpu_pearson_corr( @public -class CPUTraceManager: +class CPUTraceManager(BaseTraceManager): """Manager for operations on stacked traces on CPU.""" - traces: StackedTraces - - def __init__(self, traces: StackedTraces) -> None: - self.traces = traces - def average(self) -> CombinedTrace: - """ - Compute the average of the :paramref:`~.average.traces`, sample-wise. - - :param traces: - :return: - """ - return CombinedTrace(np.average(self.traces.samples, 0), self.traces.meta) + return CombinedTrace(np.average(self._traces.samples, 0), self._traces.meta) def conditional_average( self, condition: Callable[[npt.NDArray[np.number]], bool] ) -> CombinedTrace: - """ - Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise. - - :param traces: - :return: - """ # TODO: Consider other ways to implement this return CombinedTrace( np.average( - self.traces.samples[ - np.apply_along_axis(condition, 1, self.traces.samples) + self._traces.samples[ + np.apply_along_axis(condition, 1, self._traces.samples) ], 1, ), - self.traces.meta, + self._traces.meta, ) def standard_deviation(self) -> CombinedTrace: - """ - Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise. - - :param traces: - :return: - """ - return CombinedTrace(np.std(self.traces.samples, 0), self.traces.meta) + return CombinedTrace(np.std(self._traces.samples, 0), self._traces.meta) def variance(self) -> CombinedTrace: - """ - Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise. - - :param traces: - :return: - """ - return CombinedTrace(np.var(self.traces.samples, 0), self.traces.meta) + return CombinedTrace(np.var(self._traces.samples, 0), self._traces.meta) def average_and_variance(self) -> List[CombinedTrace]: - """ - Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise. - - :param traces: - :return: - """ return [self.average(), self.variance()] def add(self) -> CombinedTrace: - """ - Add :paramref:`~.add.traces`, sample-wise. - - :param traces: - :return: - """ - return CombinedTrace(np.sum(self.traces.samples, 0), self.traces.meta) + return CombinedTrace(np.sum(self._traces.samples, 0), self._traces.meta) def pearson_corr( self, intermediate_values: npt.NDArray[np.number] @@ -681,7 +652,7 @@ class CPUTraceManager: :param intermediate_values: A 1D array of shape (n,) containing the intermediate values. :type intermediate_values: npt.NDArray[np.number] """ - samples = self.traces.samples + samples = self._traces.samples n = samples.shape[0] if intermediate_values.shape != (n,): raise ValueError( diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py index a609ecb..c824967 100644 --- a/pyecsca/sca/trace/trace.py +++ b/pyecsca/sca/trace/trace.py @@ -86,7 +86,8 @@ class Trace: def with_samples(self, samples: ndarray) -> "Trace": """ - Construct a copy of this trace, with the same metadata, but samples replaced by `samples`. + Construct a copy of this trace, with the same metadata, but samples replaced by + :paramref:`samples`. :param samples: The samples of the new trace. :return: The new trace. @@ -98,7 +99,7 @@ class Trace: Construct a copy of this trace, with the same samples retyped using `dtype`. :param dtype: The numpy dtype. - :return: The new trace + :return: The new trace. """ return self.with_samples(np.array(self.samples.astype(dtype))) @@ -128,6 +129,14 @@ class CombinedTrace(Trace): trace_set: Any = None, parents: Optional[Sequence[Trace]] = None, ): + """ + Construct a new combined trace. + + :param samples: The sample array of the trace. + :param meta: Metadata associated with the trace. + :param trace_set: A trace set the trace is contained in. + :param parents: The traces that were combined to create this trace. + """ super().__init__(samples, meta, trace_set=trace_set) self.parents = None if parents is not None: |
