aboutsummaryrefslogtreecommitdiff
path: root/pyecsca
diff options
context:
space:
mode:
authorJ08nY2025-03-13 13:11:18 +0100
committerJ08nY2025-03-13 13:11:18 +0100
commit3a082ee638008f7ff698b957b5be2d330cc4f4e0 (patch)
tree94e74336c1e390da2836de06920d1695abd44aee /pyecsca
parentaad8cf84a0170f3ddf41d841c0551ca8872b62dc (diff)
downloadpyecsca-3a082ee638008f7ff698b957b5be2d330cc4f4e0.tar.gz
pyecsca-3a082ee638008f7ff698b957b5be2d330cc4f4e0.tar.zst
pyecsca-3a082ee638008f7ff698b957b5be2d330cc4f4e0.zip
Diffstat (limited to 'pyecsca')
-rw-r--r--pyecsca/sca/__init__.py4
-rw-r--r--pyecsca/sca/stacked_traces/__init__.py2
-rw-r--r--pyecsca/sca/stacked_traces/combine.py159
-rw-r--r--pyecsca/sca/trace/trace.py13
4 files changed, 80 insertions, 98 deletions
diff --git a/pyecsca/sca/__init__.py b/pyecsca/sca/__init__.py
index 89e8527..3394de0 100644
--- a/pyecsca/sca/__init__.py
+++ b/pyecsca/sca/__init__.py
@@ -1,9 +1,9 @@
"""Package for Side-Channel Analysis."""
+from .attack import *
from .re import *
from .scope import *
+from .stacked_traces import *
from .target import *
from .trace import *
from .trace_set import *
-from .stacked_traces import *
-from .attack import *
diff --git a/pyecsca/sca/stacked_traces/__init__.py b/pyecsca/sca/stacked_traces/__init__.py
index a38075a..cc015e7 100644
--- a/pyecsca/sca/stacked_traces/__init__.py
+++ b/pyecsca/sca/stacked_traces/__init__.py
@@ -1,2 +1,4 @@
+"""Package for efficient handling of stacked traces and GPU acceleration."""
+
from .stacked_traces import *
from .combine import *
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py
index d43cd10..bf1e857 100644
--- a/pyecsca/sca/stacked_traces/combine.py
+++ b/pyecsca/sca/stacked_traces/combine.py
@@ -1,5 +1,7 @@
from __future__ import annotations
+from abc import ABC, abstractmethod
+
from numba import cuda
from numba.cuda import devicearray
from numba.cuda.cudadrv.devicearray import DeviceNDArray
@@ -7,6 +9,7 @@ import numpy as np
import numpy.typing as npt
from math import sqrt
+from numba.cuda.types import CUDADispatcher
from public import public
from typing import Callable, Union, Tuple, Optional, cast, List
@@ -18,7 +21,7 @@ CudaCTX = Tuple[devicearray.DeviceNDArray, ...]
@public
-class BaseTraceManager:
+class BaseTraceManager(ABC):
"""Base class for trace managers"""
_traces: StackedTraces
@@ -26,67 +29,76 @@ class BaseTraceManager:
def __init__(self, traces: StackedTraces) -> None:
self._traces = traces
+ @abstractmethod
def average(self) -> CombinedTrace:
"""
- Average :paramref:`~.average.traces`, sample-wise.
+ Average traces, sample-wise.
- :param traces:
- :return:
+ :return: The average of the traces.
"""
raise NotImplementedError
+ @abstractmethod
def conditional_average(
self, cond: Callable[[npt.NDArray[np.number]], bool]
) -> CombinedTrace:
"""
- Average :paramref:`~.conditional_average.traces` for which the
- :paramref:`~.conditional_average.condition` is ``True``, sample-wise.
+ Average traces for which the
+ :paramref:`~.conditional_average.cond` is ``True``, sample-wise.
- :param traces:
- :param condition:
- :return:
+ :param cond: The condition for selecting the traces.
+ :return: The average of (some of) the traces.
"""
raise NotImplementedError
+ @abstractmethod
def standard_deviation(self) -> CombinedTrace:
"""
- Compute the sample standard-deviation of the
- :paramref:`~.standard_deviation.traces`, sample-wise.
+ Compute the sample standard-deviation of the traces, sample-wise.
- :param traces:
- :return:
+ :return: The standard deviation of the traces.
"""
raise NotImplementedError
+ @abstractmethod
def variance(self) -> CombinedTrace:
"""
- Compute the sample variance of the
- :paramref:`~.variance.traces`, sample-wise.
+ Compute the sample variance of the traces, sample-wise.
- :param traces:
- :return:
+ :return: The variance of the traces.
"""
raise NotImplementedError
+ @abstractmethod
def average_and_variance(self) -> List[CombinedTrace]:
"""
- Compute the sample average and variance of the
- :paramref:`~.average_and_variance.traces`, sample-wise.
+ Compute the sample average and variance of the traces, sample-wise.
- :param traces:
- :return:
+ :return: The average and variance of the traces.
"""
raise NotImplementedError
+ @abstractmethod
def add(self) -> CombinedTrace:
"""
- Add :paramref:`~.add.traces`, sample-wise.
+ Add traces, sample-wise.
- :param traces:
- :return:
+ :return: The sum of the traces.
"""
raise NotImplementedError
+ @abstractmethod
+ def pearson_corr(
+ self, intermediate_values: npt.NDArray[np.number]
+ ) -> CombinedTrace:
+ """
+ Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise.
+
+ :param intermediate_values: A 1D array of shape (n,) containing the intermediate values.
+ :type intermediate_values: npt.NDArray[np.number]
+ :return: The Pearson correlation coefficient between the samples and intermediate values.
+ """
+ raise NotImplementedError
InputType = Union[npt.NDArray[np.number], npt.ArrayLike]
@@ -96,7 +108,7 @@ STREAM_COUNT = 4
@public
-class GPUTraceManager(BaseTraceManager):
+class GPUTraceManager(BaseTraceManager): # pragma: no cover
"""Manager for operations with stacked traces on GPU"""
_tpb: TPB
@@ -234,7 +246,7 @@ class GPUTraceManager(BaseTraceManager):
return self._traces.samples.shape
def _gpu_combine1D(
- self, func, inputs: Optional[List[InputType]] = None, output_count: int = 1
+ self, func: CUDADispatcher, inputs: Optional[List[InputType]] = None, output_count: int = 1
) -> Union[CombinedTrace, List[CombinedTrace]]:
inputs = [] if inputs is None else inputs
results = self._combine_func(func, inputs, output_count)
@@ -245,7 +257,7 @@ class GPUTraceManager(BaseTraceManager):
return [CombinedTrace(result, self._traces.meta) for result in results]
def _gpu_combine1D_all(
- self, func, inputs: List[InputType], output_count: int = 1
+ self, func: CUDADispatcher, inputs: List[InputType], output_count: int = 1
) -> List[npt.NDArray[np.number]]:
"""
Runs a combination function on the samples column-wise.
@@ -270,7 +282,7 @@ class GPUTraceManager(BaseTraceManager):
return [device_output.copy_to_host() for device_output in device_outputs]
def _gpu_combine1D_chunked(
- self, func, inputs: List[InputType], output_count: int = 1
+ self, func: CUDADispatcher, inputs: List[InputType], output_count: int = 1
) -> List[npt.NDArray[np.number]]:
if self._chunk_size is None:
raise ValueError("Something went wrong. " "Chunk size should be specified")
@@ -348,7 +360,7 @@ class GPUTraceManager(BaseTraceManager):
return [np.concatenate(chunk_result) for chunk_result in chunk_results]
def average(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_average))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_average)) # type: ignore
def conditional_average(
self, cond: Callable[[npt.NDArray[np.number]], bool]
@@ -356,17 +368,17 @@ class GPUTraceManager(BaseTraceManager):
raise NotImplementedError()
def standard_deviation(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev)) # type: ignore
def variance(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_variance))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_variance)) # type: ignore
def average_and_variance(self) -> List[CombinedTrace]:
- averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2)
+ averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2) # type: ignore
return [averages, variances]
def add(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_add))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) # type: ignore
def pearson_corr(
self, intermediate_values: npt.NDArray[np.number]
@@ -389,7 +401,7 @@ class GPUTraceManager(BaseTraceManager):
np.array([intermed_sq_sum]),
]
- return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs)) # type: ignore
def run(
self,
@@ -397,11 +409,11 @@ class GPUTraceManager(BaseTraceManager):
inputs: Optional[List[InputType]] = None,
output_count: int = 1,
) -> Union[CombinedTrace, List[CombinedTrace]]:
- return self._gpu_combine1D(func, inputs, output_count)
+ return self._gpu_combine1D(func, inputs, output_count) # type: ignore
@cuda.jit(device=True, cache=True)
-def _gpu_average(
+def _gpu_average( # pragma: no cover
col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number]
):
"""
@@ -418,7 +430,7 @@ def _gpu_average(
@cuda.jit(cache=True)
-def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
+def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover
"""
Sample average of stacked traces, sample-wise.
@@ -434,7 +446,7 @@ def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number])
@cuda.jit(device=True, cache=True)
-def _gpu_var_from_avg(
+def _gpu_var_from_avg( # pragma: no cover
col: int,
samples: npt.NDArray[np.number],
averages: npt.NDArray[np.number],
@@ -456,7 +468,7 @@ def _gpu_var_from_avg(
@cuda.jit(device=True, cache=True)
-def _gpu_variance(
+def _gpu_variance( # pragma: no cover
col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number]
):
"""
@@ -471,7 +483,7 @@ def _gpu_variance(
@cuda.jit(cache=True)
-def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
+def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover
"""
Sample standard deviation of stacked traces, sample-wise.
@@ -489,7 +501,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number])
@cuda.jit(cache=True)
-def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
+def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover
"""
Sample variance of stacked traces, sample-wise.
@@ -505,7 +517,7 @@ def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]
@cuda.jit(cache=True)
-def gpu_avg_var(
+def gpu_avg_var( # pragma: no cover
samples: npt.NDArray[np.number],
result_avg: npt.NDArray[np.number],
result_var: npt.NDArray[np.number],
@@ -527,7 +539,7 @@ def gpu_avg_var(
@cuda.jit(cache=True)
-def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
+def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover
"""
Add samples of stacked traces, sample-wise.
@@ -546,7 +558,7 @@ def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
@cuda.jit(cache=True)
-def gpu_pearson_corr(
+def gpu_pearson_corr( # pragma: no cover
samples: DeviceNDArray,
intermediate_values: DeviceNDArray,
intermed_sum: DeviceNDArray,
@@ -590,78 +602,37 @@ def gpu_pearson_corr(
@public
-class CPUTraceManager:
+class CPUTraceManager(BaseTraceManager):
"""Manager for operations on stacked traces on CPU."""
- traces: StackedTraces
-
- def __init__(self, traces: StackedTraces) -> None:
- self.traces = traces
-
def average(self) -> CombinedTrace:
- """
- Compute the average of the :paramref:`~.average.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return CombinedTrace(np.average(self.traces.samples, 0), self.traces.meta)
+ return CombinedTrace(np.average(self._traces.samples, 0), self._traces.meta)
def conditional_average(
self, condition: Callable[[npt.NDArray[np.number]], bool]
) -> CombinedTrace:
- """
- Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise.
-
- :param traces:
- :return:
- """
# TODO: Consider other ways to implement this
return CombinedTrace(
np.average(
- self.traces.samples[
- np.apply_along_axis(condition, 1, self.traces.samples)
+ self._traces.samples[
+ np.apply_along_axis(condition, 1, self._traces.samples)
],
1,
),
- self.traces.meta,
+ self._traces.meta,
)
def standard_deviation(self) -> CombinedTrace:
- """
- Compute the sample standard-deviation of the :paramref:`~.standard_deviation.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return CombinedTrace(np.std(self.traces.samples, 0), self.traces.meta)
+ return CombinedTrace(np.std(self._traces.samples, 0), self._traces.meta)
def variance(self) -> CombinedTrace:
- """
- Compute the sample variance of the :paramref:`~.variance.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return CombinedTrace(np.var(self.traces.samples, 0), self.traces.meta)
+ return CombinedTrace(np.var(self._traces.samples, 0), self._traces.meta)
def average_and_variance(self) -> List[CombinedTrace]:
- """
- Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise.
-
- :param traces:
- :return:
- """
return [self.average(), self.variance()]
def add(self) -> CombinedTrace:
- """
- Add :paramref:`~.add.traces`, sample-wise.
-
- :param traces:
- :return:
- """
- return CombinedTrace(np.sum(self.traces.samples, 0), self.traces.meta)
+ return CombinedTrace(np.sum(self._traces.samples, 0), self._traces.meta)
def pearson_corr(
self, intermediate_values: npt.NDArray[np.number]
@@ -681,7 +652,7 @@ class CPUTraceManager:
:param intermediate_values: A 1D array of shape (n,) containing the intermediate values.
:type intermediate_values: npt.NDArray[np.number]
"""
- samples = self.traces.samples
+ samples = self._traces.samples
n = samples.shape[0]
if intermediate_values.shape != (n,):
raise ValueError(
diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py
index a609ecb..c824967 100644
--- a/pyecsca/sca/trace/trace.py
+++ b/pyecsca/sca/trace/trace.py
@@ -86,7 +86,8 @@ class Trace:
def with_samples(self, samples: ndarray) -> "Trace":
"""
- Construct a copy of this trace, with the same metadata, but samples replaced by `samples`.
+ Construct a copy of this trace, with the same metadata, but samples replaced by
+ :paramref:`samples`.
:param samples: The samples of the new trace.
:return: The new trace.
@@ -98,7 +99,7 @@ class Trace:
Construct a copy of this trace, with the same samples retyped using `dtype`.
:param dtype: The numpy dtype.
- :return: The new trace
+ :return: The new trace.
"""
return self.with_samples(np.array(self.samples.astype(dtype)))
@@ -128,6 +129,14 @@ class CombinedTrace(Trace):
trace_set: Any = None,
parents: Optional[Sequence[Trace]] = None,
):
+ """
+ Construct a new combined trace.
+
+ :param samples: The sample array of the trace.
+ :param meta: Metadata associated with the trace.
+ :param trace_set: A trace set the trace is contained in.
+ :param parents: The traces that were combined to create this trace.
+ """
super().__init__(samples, meta, trace_set=trace_set)
self.parents = None
if parents is not None: