diff options
| author | Tomáš Jusko | 2023-12-09 14:41:32 +0100 |
|---|---|---|
| committer | Tomáš Jusko | 2023-12-09 14:41:32 +0100 |
| commit | 0677d92c31d9071a9e7e3633d14e9c09ba76f4f1 (patch) | |
| tree | 8ba47ebae685057f3f42dca7d8a4b984969233c8 /pyecsca | |
| parent | bd7f23b3d762ef48b60c871f7d3ffe5fe2da8c00 (diff) | |
| download | pyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.tar.gz pyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.tar.zst pyecsca-0677d92c31d9071a9e7e3633d14e9c09ba76f4f1.zip | |
feat: Added pearson correlation coefficient to stacked combine
Diffstat (limited to 'pyecsca')
| -rw-r--r-- | pyecsca/sca/stacked_traces/combine.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py index 15d5235..5c9b8f7 100644 --- a/pyecsca/sca/stacked_traces/combine.py +++ b/pyecsca/sca/stacked_traces/combine.py @@ -2,6 +2,7 @@ from __future__ import annotations from numba import cuda from numba.cuda import devicearray +from numba.cuda.cudadrv.devicearray import DeviceNDArray import numpy as np import numpy.typing as npt from math import sqrt @@ -384,6 +385,23 @@ class GPUTraceManager(BaseTraceManager): def add(self) -> CombinedTrace: return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) + def pearson_corr(self, + intermediate_values: npt.NDArray[np.number]) \ + -> CombinedTrace: + if (len(intermediate_values.shape) != 1 + or (intermediate_values.shape[0] != self.traces_shape[0])): + raise ValueError("Intermediate values have to be a vector " + "as long as trace_count") + + intermed_sum: np.number = np.sum(intermediate_values) + intermed_sq_sum: np.number = np.sum(np.square(intermediate_values)) + inputs: List[InputType] = [intermediate_values, + np.array([intermed_sum]), + np.array([intermed_sq_sum])] + + return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, + inputs)) + def run(self, func: Callable, inputs: Optional[List[InputType]] = None, @@ -535,6 +553,48 @@ def gpu_add(samples: npt.NDArray[np.number], result[col] = res +@cuda.jit(cache=True) +def gpu_pearson_corr(samples: DeviceNDArray, + intermediate_values: DeviceNDArray, + intermed_sum: DeviceNDArray, + intermed_sq_sum: DeviceNDArray, + result: DeviceNDArray): + """ + Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration. + + :param samples: A 2D array of shape (n, m) containing the samples. + :type samples: npt.NDArray[np.number] + :param intermediate_values: A 1D array of shape (n,) containing the intermediate values. + :type intermediate_values: npt.NDArray[np.number] + :param intermed_sum: A 1D array of shape (1,) containing the precomputed sum of the intermediate values. + :type intermed_sum: npt.NDArray[np.number] + :param intermed_sq_sum: A 1D array of shape (1,) containing the precomputed sum of the squares of the intermediate values. + :param result: A 1D array of shape (m,) to store the resulting correlation coefficients. + :type result: cuda.devicearray.DeviceNDArray + """ + col: int = cuda.grid(1) # type: ignore + + if col >= samples.shape[1]: # type: ignore + return + + n = samples.shape[0] + samples_sum = 0. + samples_sq_sum = 0. + product_sum = 0. + + for row in range(n): + samples_sum += samples[row, col] + samples_sq_sum += samples[row, col] ** 2 + product_sum += samples[row, col] * intermediate_values[row] + + numerator = float(n) * product_sum - samples_sum * intermed_sum[0] + denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2) + denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2) + denominator = denom_samp * denom_int + + result[col] = numerator / denominator + + @public class CPUTraceManager: """Manager for operations on stacked traces on CPU.""" @@ -620,3 +680,42 @@ class CPUTraceManager: np.sum(self.traces.samples, 0), self.traces.meta ) + + def pearson_corr(self, + intermediate_values: npt.NDArray[np.number]) \ + -> CombinedTrace: + """ + Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise. + + The result is equivalent to: + + >>> np.corrcoef(self.traces.samples, + intermediate_values, + rowvar=False)[-1, :-1] + + but a different implementation is used for better time-efficiency, + which doesn't compute the whole correlation matrix. + + :param intermediate_values: A 1D array of shape (n,) containing the intermediate values. + :type intermediate_values: npt.NDArray[np.number] + """ + samples = self.traces.samples + n = samples.shape[0] + if intermediate_values.shape != (n,): + raise ValueError("Invalid shape of intermediate_values, " + f"expected ({n},), " + f"got {intermediate_values.shape}") + + sam_sum = np.sum(samples, axis=0) + sam_sq_sum = np.sum(np.square(samples), axis=0) + + iv_sum = np.sum(intermediate_values) + iv_sq_sum = np.sum(np.square(intermediate_values)) + + prod_sum = intermediate_values @ samples + + numerator = n * prod_sum - sam_sum * iv_sum + denom_samp = np.sqrt(n * sam_sq_sum - sam_sum ** 2) + denom_int = np.sqrt(n * iv_sq_sum - iv_sum ** 2) + denominator = denom_samp * denom_int + return numerator / denominator |
