aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJán Jančár2023-09-30 16:47:24 +0200
committerGitHub2023-09-30 16:47:24 +0200
commit80c72fab754ace04ea653f979e374aa718bda467 (patch)
tree18d635514a872885e499cf7e9472b9c0d10d945d
parent525de7929dfb91625139db99e05090328df4895f (diff)
parentd66c3dc971846c490a9f846e12be299a27856e69 (diff)
downloadpyecsca-80c72fab754ace04ea653f979e374aa718bda467.tar.gz
pyecsca-80c72fab754ace04ea653f979e374aa718bda467.tar.zst
pyecsca-80c72fab754ace04ea653f979e374aa718bda467.zip
Merge pull request #42 from Tomko10/feat/pearson-corr-coef
Pearson correlation coefficient
-rw-r--r--pyecsca/sca/stacked_traces/combine.py90
-rw-r--r--pyecsca/sca/stacked_traces/correlate.py80
-rw-r--r--test/sca/test_stacked_correlate.py64
3 files changed, 208 insertions, 26 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py
index 1e7f7fb..15d5235 100644
--- a/pyecsca/sca/stacked_traces/combine.py
+++ b/pyecsca/sca/stacked_traces/combine.py
@@ -67,7 +67,7 @@ class BaseTraceManager:
"""
raise NotImplementedError
- def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
+ def average_and_variance(self) -> List[CombinedTrace]:
"""
Compute the sample average and variance of the
:paramref:`~.average_and_variance.traces`, sample-wise.
@@ -87,6 +87,9 @@ class BaseTraceManager:
raise NotImplementedError
+InputType = Union[npt.NDArray[np.number], npt.ArrayLike]
+
+
CHUNK_MEMORY_RATIO = 0.4
STREAM_COUNT = 4
@@ -221,9 +224,17 @@ class GPUTraceManager(BaseTraceManager):
return int(
chunk_memory_ratio * mem_size / element_size)
- def _gpu_combine1D(self, func, output_count: int = 1) \
+ @property
+ def traces_shape(self) -> Tuple[int, ...]:
+ return self._traces.samples.shape
+
+ def _gpu_combine1D(self,
+ func,
+ inputs: Optional[List[InputType]] = None,
+ output_count: int = 1) \
-> Union[CombinedTrace, List[CombinedTrace]]:
- results = self._combine_func(func, output_count)
+ inputs = [] if inputs is None else inputs
+ results = self._combine_func(func, inputs, output_count)
if output_count == 1:
return CombinedTrace(
@@ -237,7 +248,10 @@ class GPUTraceManager(BaseTraceManager):
in results
]
- def _gpu_combine1D_all(self, func, output_count: int = 1) \
+ def _gpu_combine1D_all(self,
+ func,
+ inputs: List[InputType],
+ output_count: int = 1) \
-> List[npt.NDArray[np.number]]:
"""
Runs a combination function on the samples column-wise.
@@ -251,18 +265,27 @@ class GPUTraceManager(BaseTraceManager):
raise ValueError("Something went wrong. "
"TPB should be an int")
- device_input = cuda.to_device(self._traces.samples)
+ samples_input = cuda.to_device(self._traces.samples)
+ device_inputs = [
+ cuda.to_device(inp) # type: ignore
+ for inp in inputs
+ ]
device_outputs = [
cuda.device_array(self._traces.samples.shape[1])
for _ in range(output_count)
]
bpg = (self._traces.samples.shape[1] + self._tpb - 1) // self._tpb
- func[bpg, self._tpb](device_input, *device_outputs)
+ func[bpg, self._tpb](samples_input,
+ *device_inputs,
+ *device_outputs)
return [device_output.copy_to_host()
for device_output in device_outputs]
- def _gpu_combine1D_chunked(self, func, output_count: int = 1) \
+ def _gpu_combine1D_chunked(self,
+ func,
+ inputs: List[InputType],
+ output_count: int = 1) \
-> List[npt.NDArray[np.number]]:
if self._chunk_size is None:
raise ValueError("Something went wrong. "
@@ -290,6 +313,11 @@ class GPUTraceManager(BaseTraceManager):
for _ in range(self._stream_count)
]
+ device_inputs = [
+ cuda.const.array_like(inp) # type: ignore
+ for inp in inputs
+ ]
+
chunk_results: List[List[npt.NDArray[np.number]]] = [
[] for _ in range(output_count)]
@@ -302,7 +330,6 @@ class GPUTraceManager(BaseTraceManager):
event = events[chunk % self._stream_count]
if event is not None:
event.wait(stream=stream)
- # stream.synchronize()
pinned_input = pinned_input_buffers[chunk % self._stream_count]
np.copyto(pinned_input, self._traces.samples[:, start:end])
@@ -318,7 +345,9 @@ class GPUTraceManager(BaseTraceManager):
]
bpg = (end - start + self._tpb - 1) // self._tpb
- func[bpg, self._tpb, stream](device_input, *device_outputs)
+ func[bpg, self._tpb, stream](device_input,
+ *device_inputs,
+ *device_outputs)
event = cuda.event()
event.record(stream=stream)
events[chunk % self._stream_count] = event
@@ -335,7 +364,7 @@ class GPUTraceManager(BaseTraceManager):
return [np.concatenate(chunk_result) for chunk_result in chunk_results]
def average(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_average, 1))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_average))
def conditional_average(self,
cond: Callable[[npt.NDArray[np.number]], bool]) \
@@ -343,17 +372,24 @@ class GPUTraceManager(BaseTraceManager):
raise NotImplementedError()
def standard_deviation(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev, 1))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev))
def variance(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_variance, 1))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_variance))
- def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
- averages, variances = self._gpu_combine1D(gpu_avg_var, 2)
- return averages, variances
+ def average_and_variance(self) -> List[CombinedTrace]:
+ averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2)
+ return [averages, variances]
def add(self) -> CombinedTrace:
- return cast(CombinedTrace, self._gpu_combine1D(gpu_add, 1))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_add))
+
+ def run(self,
+ func: Callable,
+ inputs: Optional[List[InputType]] = None,
+ output_count: int = 1) \
+ -> Union[CombinedTrace, List[CombinedTrace]]:
+ return self._gpu_combine1D(func, inputs, output_count)
@cuda.jit(device=True, cache=True)
@@ -381,7 +417,7 @@ def gpu_average(samples: npt.NDArray[np.number],
:param samples: Stacked traces' samples.
:param result: Result output array.
"""
- col = cuda.grid(1)
+ col = cuda.grid(1) # type: ignore
if col >= samples.shape[1]:
return
@@ -390,7 +426,8 @@ def gpu_average(samples: npt.NDArray[np.number],
@cuda.jit(device=True, cache=True)
-def _gpu_var_from_avg(col: int, samples: npt.NDArray[np.number],
+def _gpu_var_from_avg(col: int,
+ samples: npt.NDArray[np.number],
averages: npt.NDArray[np.number],
result: npt.NDArray[np.number]):
"""
@@ -431,7 +468,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number],
:param samples: Stacked traces' samples.
:param result: Result output array.
"""
- col = cuda.grid(1)
+ col = cuda.grid(1) # type: ignore
if col >= samples.shape[1]:
return
@@ -450,7 +487,7 @@ def gpu_variance(samples: npt.NDArray[np.number],
:param samples: Stacked traces' samples.
:param result: Result output array.
"""
- col = cuda.grid(1)
+ col = cuda.grid(1) # type: ignore
if col >= samples.shape[1]:
return
@@ -469,7 +506,7 @@ def gpu_avg_var(samples: npt.NDArray[np.number],
:param result_avg: Result average output array.
:param result_var: Result variance output array.
"""
- col = cuda.grid(1)
+ col = cuda.grid(1) # type: ignore
if col >= samples.shape[1]:
return
@@ -487,7 +524,7 @@ def gpu_add(samples: npt.NDArray[np.number],
:param samples: Stacked traces' samples.
:param result: Result output array.
"""
- col = cuda.grid(1)
+ col = cuda.grid(1) # type: ignore
if col >= samples.shape[1]:
return
@@ -531,7 +568,8 @@ class CPUTraceManager:
"""
# TODO: Consider other ways to implement this
return CombinedTrace(
- np.average(self.traces.samples[np.apply_along_axis(condition, 1, self.traces.samples)], 1),
+ np.average(self.traces.samples[np.apply_along_axis(
+ condition, 1, self.traces.samples)], 1),
self.traces.meta
)
@@ -559,17 +597,17 @@ class CPUTraceManager:
self.traces.meta
)
- def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]:
+ def average_and_variance(self) -> List[CombinedTrace]:
"""
Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise.
:param traces:
:return:
"""
- return (
+ return [
self.average(),
self.variance()
- )
+ ]
def add(self) -> CombinedTrace:
"""
diff --git a/pyecsca/sca/stacked_traces/correlate.py b/pyecsca/sca/stacked_traces/correlate.py
new file mode 100644
index 0000000..c5277d9
--- /dev/null
+++ b/pyecsca/sca/stacked_traces/correlate.py
@@ -0,0 +1,80 @@
+import numpy as np
+import numpy.typing as npt
+from numba import cuda
+from numba.cuda.cudadrv.devicearray import DeviceNDArray
+from math import sqrt
+from typing import List, Optional, Union
+from .combine import InputType, GPUTraceManager
+from .stacked_traces import StackedTraces
+from ..trace.trace import CombinedTrace
+
+
+def gpu_pearson_corr(intermediate_values: npt.NDArray[np.number],
+ stacked_traces: Optional[StackedTraces] = None,
+ trace_manager: Optional[GPUTraceManager] = None,
+ **tm_kwargs) -> Union[CombinedTrace, List[CombinedTrace]]:
+ if (stacked_traces is None) == (trace_manager is None):
+ raise ValueError("Either samples or trace manager must be given.")
+
+ if trace_manager is None:
+ assert stacked_traces is not None
+ trace_manager = GPUTraceManager(stacked_traces, **tm_kwargs)
+
+ if (len(intermediate_values.shape) != 1
+ or (intermediate_values.shape[0]
+ != trace_manager.traces_shape[0])):
+ raise ValueError("Intermediate values have to be a vector "
+ "as long as trace_count")
+
+ intermed_sum: np.number = np.sum(intermediate_values)
+ intermed_sq_sum: np.number = np.sum(np.square(intermediate_values))
+ inputs: List[InputType] = [intermediate_values,
+ np.array([intermed_sum]),
+ np.array([intermed_sq_sum])]
+
+ return trace_manager.run(
+ _gpu_pearson_corr,
+ inputs
+ )
+
+
+@cuda.jit(cache=True)
+def _gpu_pearson_corr(samples: DeviceNDArray,
+ intermediate_values: DeviceNDArray,
+ intermed_sum: DeviceNDArray,
+ intermed_sq_sum: DeviceNDArray,
+ result: DeviceNDArray):
+ """
+ Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration.
+
+ :param samples: A 2D array of shape (n, m) containing the samples.
+ :type samples: npt.NDArray[np.number]
+ :param intermediate_values: A 1D array of shape (n,) containing the intermediate values.
+ :type intermediate_values: npt.NDArray[np.number]
+ :param intermed_sum: A 1D array of shape (1,) containing the precomputed sum of the intermediate values.
+ :type intermed_sum: npt.NDArray[np.number]
+ :param intermed_sq_sum: A 1D array of shape (1,) containing the precomputed sum of the squares of the intermediate values.
+ :param result: A 1D array of shape (m,) to store the resulting correlation coefficients.
+ :type result: cuda.devicearray.DeviceNDArray
+ """
+ col: int = cuda.grid(1) # type: ignore
+
+ if col >= samples.shape[1]: # type: ignore
+ return
+
+ n = samples.shape[0]
+ samples_sum = 0.
+ samples_sq_sum = 0.
+ product_sum = 0.
+
+ for row in range(n):
+ samples_sum += samples[row, col]
+ samples_sq_sum += samples[row, col] ** 2
+ product_sum += samples[row, col] * intermediate_values[row]
+
+ numerator = float(n) * product_sum - samples_sum * intermed_sum[0]
+ denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2)
+ denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2)
+ denominator = denom_samp * denom_int
+
+ result[col] = numerator / denominator
diff --git a/test/sca/test_stacked_correlate.py b/test/sca/test_stacked_correlate.py
new file mode 100644
index 0000000..954494f
--- /dev/null
+++ b/test/sca/test_stacked_correlate.py
@@ -0,0 +1,64 @@
+import pytest
+from numba import cuda
+import numpy as np
+from pyecsca.sca import (
+ StackedTraces,
+ GPUTraceManager,
+ CombinedTrace
+)
+from pyecsca.sca.stacked_traces.correlate import gpu_pearson_corr
+
+TPB = 128
+TRACE_COUNT = 2 ** 10
+TRACE_LEN = 2 ** 15
+RTOL = 1e-5
+ATOL = 1e-5
+
+
+@pytest.fixture()
+def samples():
+ np.random.seed(0x1234)
+ return np.random.rand(TRACE_COUNT, TRACE_LEN).astype(np.float32, order="F")
+
+
+@pytest.fixture()
+def gpu_manager(samples):
+ if not cuda.is_available():
+ pytest.skip("CUDA not available")
+ return GPUTraceManager(StackedTraces(samples), TPB)
+
+
+@pytest.fixture()
+def intermediate_values():
+ np.random.seed(0x1234)
+ return np.random.rand(TRACE_COUNT)
+
+
+def pearson_corr(samples, intermediate_values):
+ return np.corrcoef(samples, intermediate_values, rowvar=False)[-1, :-1]
+
+
+def test_pearson_coef_no_chunking(samples, gpu_manager, intermediate_values):
+ corr_gpu = gpu_pearson_corr(intermediate_values,
+ trace_manager=gpu_manager)
+ corr_cmp = pearson_corr(samples, intermediate_values)
+
+ assert isinstance(corr_gpu, CombinedTrace)
+ assert corr_gpu.samples.shape == \
+ corr_cmp.shape
+
+ assert all(np.isclose(corr_gpu.samples, corr_cmp, rtol=RTOL, atol=ATOL))
+
+
+def test_pearson_coef_chunking(samples, gpu_manager, intermediate_values):
+ corr_gpu = gpu_pearson_corr(intermediate_values,
+ trace_manager=gpu_manager,
+ chunk_size=2 ** 5,
+ stream_count=4)
+ corr_cmp = pearson_corr(samples, intermediate_values)
+
+ assert isinstance(corr_gpu, CombinedTrace)
+ assert corr_gpu.samples.shape == \
+ corr_cmp.shape
+
+ assert all(np.isclose(corr_gpu.samples, corr_cmp, rtol=RTOL, atol=ATOL))