aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyecsca
diff options
context:
space:
mode:
authorTomáš Jusko2023-12-09 14:41:32 +0100
committerTomáš Jusko2023-12-09 14:41:32 +0100
commit0677d92c31d9071a9e7e3633d14e9c09ba76f4f1 (patch)
tree8ba47ebae685057f3f42dca7d8a4b984969233c8 /pyecsca
parentbd7f23b3d762ef48b60c871f7d3ffe5fe2da8c00 (diff)
downloadpyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.tar.gz
pyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.tar.zst
pyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.zip
feat: Added pearson correlation coefficient to stacked combine
Diffstat (limited to 'pyecsca')
-rw-r--r--pyecsca/sca/stacked_traces/combine.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py
index 15d5235..5c9b8f7 100644
--- a/pyecsca/sca/stacked_traces/combine.py
+++ b/pyecsca/sca/stacked_traces/combine.py
@@ -2,6 +2,7 @@ from __future__ import annotations
from numba import cuda
from numba.cuda import devicearray
+from numba.cuda.cudadrv.devicearray import DeviceNDArray
import numpy as np
import numpy.typing as npt
from math import sqrt
@@ -384,6 +385,23 @@ class GPUTraceManager(BaseTraceManager):
def add(self) -> CombinedTrace:
return cast(CombinedTrace, self._gpu_combine1D(gpu_add))
+ def pearson_corr(self,
+ intermediate_values: npt.NDArray[np.number]) \
+ -> CombinedTrace:
+ if (len(intermediate_values.shape) != 1
+ or (intermediate_values.shape[0] != self.traces_shape[0])):
+ raise ValueError("Intermediate values have to be a vector "
+ "as long as trace_count")
+
+ intermed_sum: np.number = np.sum(intermediate_values)
+ intermed_sq_sum: np.number = np.sum(np.square(intermediate_values))
+ inputs: List[InputType] = [intermediate_values,
+ np.array([intermed_sum]),
+ np.array([intermed_sq_sum])]
+
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr,
+ inputs))
+
def run(self,
func: Callable,
inputs: Optional[List[InputType]] = None,
@@ -535,6 +553,48 @@ def gpu_add(samples: npt.NDArray[np.number],
result[col] = res
+@cuda.jit(cache=True)
+def gpu_pearson_corr(samples: DeviceNDArray,
+ intermediate_values: DeviceNDArray,
+ intermed_sum: DeviceNDArray,
+ intermed_sq_sum: DeviceNDArray,
+ result: DeviceNDArray):
+ """
+ Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration.
+
+ :param samples: A 2D array of shape (n, m) containing the samples.
+ :type samples: npt.NDArray[np.number]
+ :param intermediate_values: A 1D array of shape (n,) containing the intermediate values.
+ :type intermediate_values: npt.NDArray[np.number]
+ :param intermed_sum: A 1D array of shape (1,) containing the precomputed sum of the intermediate values.
+ :type intermed_sum: npt.NDArray[np.number]
+ :param intermed_sq_sum: A 1D array of shape (1,) containing the precomputed sum of the squares of the intermediate values.
+ :param result: A 1D array of shape (m,) to store the resulting correlation coefficients.
+ :type result: cuda.devicearray.DeviceNDArray
+ """
+ col: int = cuda.grid(1) # type: ignore
+
+ if col >= samples.shape[1]: # type: ignore
+ return
+
+ n = samples.shape[0]
+ samples_sum = 0.
+ samples_sq_sum = 0.
+ product_sum = 0.
+
+ for row in range(n):
+ samples_sum += samples[row, col]
+ samples_sq_sum += samples[row, col] ** 2
+ product_sum += samples[row, col] * intermediate_values[row]
+
+ numerator = float(n) * product_sum - samples_sum * intermed_sum[0]
+ denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2)
+ denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2)
+ denominator = denom_samp * denom_int
+
+ result[col] = numerator / denominator
+
+
@public
class CPUTraceManager:
"""Manager for operations on stacked traces on CPU."""
@@ -620,3 +680,42 @@ class CPUTraceManager:
np.sum(self.traces.samples, 0),
self.traces.meta
)
+
+ def pearson_corr(self,
+ intermediate_values: npt.NDArray[np.number]) \
+ -> CombinedTrace:
+ """
+ Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise.
+
+ The result is equivalent to:
+
+ >>> np.corrcoef(self.traces.samples,
+ intermediate_values,
+ rowvar=False)[-1, :-1]
+
+ but a different implementation is used for better time-efficiency,
+ which doesn't compute the whole correlation matrix.
+
+ :param intermediate_values: A 1D array of shape (n,) containing the intermediate values.
+ :type intermediate_values: npt.NDArray[np.number]
+ """
+ samples = self.traces.samples
+ n = samples.shape[0]
+ if intermediate_values.shape != (n,):
+ raise ValueError("Invalid shape of intermediate_values, "
+ f"expected ({n},), "
+ f"got {intermediate_values.shape}")
+
+ sam_sum = np.sum(samples, axis=0)
+ sam_sq_sum = np.sum(np.square(samples), axis=0)
+
+ iv_sum = np.sum(intermediate_values)
+ iv_sq_sum = np.sum(np.square(intermediate_values))
+
+ prod_sum = intermediate_values @ samples
+
+ numerator = n * prod_sum - sam_sum * iv_sum
+ denom_samp = np.sqrt(n * sam_sq_sum - sam_sum ** 2)
+ denom_int = np.sqrt(n * iv_sq_sum - iv_sum ** 2)
+ denominator = denom_samp * denom_int
+ return numerator / denominator