diff options
| author | Ján Jančár | 2023-09-30 16:47:24 +0200 |
|---|---|---|
| committer | GitHub | 2023-09-30 16:47:24 +0200 |
| commit | 80c72fab754ace04ea653f979e374aa718bda467 (patch) | |
| tree | 18d635514a872885e499cf7e9472b9c0d10d945d | |
| parent | 525de7929dfb91625139db99e05090328df4895f (diff) | |
| parent | d66c3dc971846c490a9f846e12be299a27856e69 (diff) | |
| download | pyecsca-80c72fab754ace04ea653f979e374aa718bda467.tar.gz pyecsca-80c72fab754ace04ea653f979e374aa718bda467.tar.zst pyecsca-80c72fab754ace04ea653f979e374aa718bda467.zip | |
Merge pull request #42 from Tomko10/feat/pearson-corr-coef
Pearson correlation coefficient
| -rw-r--r-- | pyecsca/sca/stacked_traces/combine.py | 90 | ||||
| -rw-r--r-- | pyecsca/sca/stacked_traces/correlate.py | 80 | ||||
| -rw-r--r-- | test/sca/test_stacked_correlate.py | 64 |
3 files changed, 208 insertions, 26 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py index 1e7f7fb..15d5235 100644 --- a/pyecsca/sca/stacked_traces/combine.py +++ b/pyecsca/sca/stacked_traces/combine.py @@ -67,7 +67,7 @@ class BaseTraceManager: """ raise NotImplementedError - def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: + def average_and_variance(self) -> List[CombinedTrace]: """ Compute the sample average and variance of the :paramref:`~.average_and_variance.traces`, sample-wise. @@ -87,6 +87,9 @@ class BaseTraceManager: raise NotImplementedError +InputType = Union[npt.NDArray[np.number], npt.ArrayLike] + + CHUNK_MEMORY_RATIO = 0.4 STREAM_COUNT = 4 @@ -221,9 +224,17 @@ class GPUTraceManager(BaseTraceManager): return int( chunk_memory_ratio * mem_size / element_size) - def _gpu_combine1D(self, func, output_count: int = 1) \ + @property + def traces_shape(self) -> Tuple[int, ...]: + return self._traces.samples.shape + + def _gpu_combine1D(self, + func, + inputs: Optional[List[InputType]] = None, + output_count: int = 1) \ -> Union[CombinedTrace, List[CombinedTrace]]: - results = self._combine_func(func, output_count) + inputs = [] if inputs is None else inputs + results = self._combine_func(func, inputs, output_count) if output_count == 1: return CombinedTrace( @@ -237,7 +248,10 @@ class GPUTraceManager(BaseTraceManager): in results ] - def _gpu_combine1D_all(self, func, output_count: int = 1) \ + def _gpu_combine1D_all(self, + func, + inputs: List[InputType], + output_count: int = 1) \ -> List[npt.NDArray[np.number]]: """ Runs a combination function on the samples column-wise. @@ -251,18 +265,27 @@ class GPUTraceManager(BaseTraceManager): raise ValueError("Something went wrong. " "TPB should be an int") - device_input = cuda.to_device(self._traces.samples) + samples_input = cuda.to_device(self._traces.samples) + device_inputs = [ + cuda.to_device(inp) # type: ignore + for inp in inputs + ] device_outputs = [ cuda.device_array(self._traces.samples.shape[1]) for _ in range(output_count) ] bpg = (self._traces.samples.shape[1] + self._tpb - 1) // self._tpb - func[bpg, self._tpb](device_input, *device_outputs) + func[bpg, self._tpb](samples_input, + *device_inputs, + *device_outputs) return [device_output.copy_to_host() for device_output in device_outputs] - def _gpu_combine1D_chunked(self, func, output_count: int = 1) \ + def _gpu_combine1D_chunked(self, + func, + inputs: List[InputType], + output_count: int = 1) \ -> List[npt.NDArray[np.number]]: if self._chunk_size is None: raise ValueError("Something went wrong. " @@ -290,6 +313,11 @@ class GPUTraceManager(BaseTraceManager): for _ in range(self._stream_count) ] + device_inputs = [ + cuda.const.array_like(inp) # type: ignore + for inp in inputs + ] + chunk_results: List[List[npt.NDArray[np.number]]] = [ [] for _ in range(output_count)] @@ -302,7 +330,6 @@ class GPUTraceManager(BaseTraceManager): event = events[chunk % self._stream_count] if event is not None: event.wait(stream=stream) - # stream.synchronize() pinned_input = pinned_input_buffers[chunk % self._stream_count] np.copyto(pinned_input, self._traces.samples[:, start:end]) @@ -318,7 +345,9 @@ class GPUTraceManager(BaseTraceManager): ] bpg = (end - start + self._tpb - 1) // self._tpb - func[bpg, self._tpb, stream](device_input, *device_outputs) + func[bpg, self._tpb, stream](device_input, + *device_inputs, + *device_outputs) event = cuda.event() event.record(stream=stream) events[chunk % self._stream_count] = event @@ -335,7 +364,7 @@ class GPUTraceManager(BaseTraceManager): return [np.concatenate(chunk_result) for chunk_result in chunk_results] def average(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_average, 1)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_average)) def conditional_average(self, cond: Callable[[npt.NDArray[np.number]], bool]) \ @@ -343,17 +372,24 @@ class GPUTraceManager(BaseTraceManager): raise NotImplementedError() def standard_deviation(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev, 1)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_std_dev)) def variance(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_variance, 1)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_variance)) - def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: - averages, variances = self._gpu_combine1D(gpu_avg_var, 2) - return averages, variances + def average_and_variance(self) -> List[CombinedTrace]: + averages, variances = self._gpu_combine1D(gpu_avg_var, output_count=2) + return [averages, variances] def add(self) -> CombinedTrace: - return cast(CombinedTrace, self._gpu_combine1D(gpu_add, 1)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) + + def run(self, + func: Callable, + inputs: Optional[List[InputType]] = None, + output_count: int = 1) \ + -> Union[CombinedTrace, List[CombinedTrace]]: + return self._gpu_combine1D(func, inputs, output_count) @cuda.jit(device=True, cache=True) @@ -381,7 +417,7 @@ def gpu_average(samples: npt.NDArray[np.number], :param samples: Stacked traces' samples. :param result: Result output array. """ - col = cuda.grid(1) + col = cuda.grid(1) # type: ignore if col >= samples.shape[1]: return @@ -390,7 +426,8 @@ def gpu_average(samples: npt.NDArray[np.number], @cuda.jit(device=True, cache=True) -def _gpu_var_from_avg(col: int, samples: npt.NDArray[np.number], +def _gpu_var_from_avg(col: int, + samples: npt.NDArray[np.number], averages: npt.NDArray[np.number], result: npt.NDArray[np.number]): """ @@ -431,7 +468,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number], :param samples: Stacked traces' samples. :param result: Result output array. """ - col = cuda.grid(1) + col = cuda.grid(1) # type: ignore if col >= samples.shape[1]: return @@ -450,7 +487,7 @@ def gpu_variance(samples: npt.NDArray[np.number], :param samples: Stacked traces' samples. :param result: Result output array. """ - col = cuda.grid(1) + col = cuda.grid(1) # type: ignore if col >= samples.shape[1]: return @@ -469,7 +506,7 @@ def gpu_avg_var(samples: npt.NDArray[np.number], :param result_avg: Result average output array. :param result_var: Result variance output array. """ - col = cuda.grid(1) + col = cuda.grid(1) # type: ignore if col >= samples.shape[1]: return @@ -487,7 +524,7 @@ def gpu_add(samples: npt.NDArray[np.number], :param samples: Stacked traces' samples. :param result: Result output array. """ - col = cuda.grid(1) + col = cuda.grid(1) # type: ignore if col >= samples.shape[1]: return @@ -531,7 +568,8 @@ class CPUTraceManager: """ # TODO: Consider other ways to implement this return CombinedTrace( - np.average(self.traces.samples[np.apply_along_axis(condition, 1, self.traces.samples)], 1), + np.average(self.traces.samples[np.apply_along_axis( + condition, 1, self.traces.samples)], 1), self.traces.meta ) @@ -559,17 +597,17 @@ class CPUTraceManager: self.traces.meta ) - def average_and_variance(self) -> Tuple[CombinedTrace, CombinedTrace]: + def average_and_variance(self) -> List[CombinedTrace]: """ Compute the average and sample variance of the :paramref:`~.average_and_variance.traces`, sample-wise. :param traces: :return: """ - return ( + return [ self.average(), self.variance() - ) + ] def add(self) -> CombinedTrace: """ diff --git a/pyecsca/sca/stacked_traces/correlate.py b/pyecsca/sca/stacked_traces/correlate.py new file mode 100644 index 0000000..c5277d9 --- /dev/null +++ b/pyecsca/sca/stacked_traces/correlate.py @@ -0,0 +1,80 @@ +import numpy as np +import numpy.typing as npt +from numba import cuda +from numba.cuda.cudadrv.devicearray import DeviceNDArray +from math import sqrt +from typing import List, Optional, Union +from .combine import InputType, GPUTraceManager +from .stacked_traces import StackedTraces +from ..trace.trace import CombinedTrace + + +def gpu_pearson_corr(intermediate_values: npt.NDArray[np.number], + stacked_traces: Optional[StackedTraces] = None, + trace_manager: Optional[GPUTraceManager] = None, + **tm_kwargs) -> Union[CombinedTrace, List[CombinedTrace]]: + if (stacked_traces is None) == (trace_manager is None): + raise ValueError("Either samples or trace manager must be given.") + + if trace_manager is None: + assert stacked_traces is not None + trace_manager = GPUTraceManager(stacked_traces, **tm_kwargs) + + if (len(intermediate_values.shape) != 1 + or (intermediate_values.shape[0] + != trace_manager.traces_shape[0])): + raise ValueError("Intermediate values have to be a vector " + "as long as trace_count") + + intermed_sum: np.number = np.sum(intermediate_values) + intermed_sq_sum: np.number = np.sum(np.square(intermediate_values)) + inputs: List[InputType] = [intermediate_values, + np.array([intermed_sum]), + np.array([intermed_sq_sum])] + + return trace_manager.run( + _gpu_pearson_corr, + inputs + ) + + +@cuda.jit(cache=True) +def _gpu_pearson_corr(samples: DeviceNDArray, + intermediate_values: DeviceNDArray, + intermed_sum: DeviceNDArray, + intermed_sq_sum: DeviceNDArray, + result: DeviceNDArray): + """ + Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration. + + :param samples: A 2D array of shape (n, m) containing the samples. + :type samples: npt.NDArray[np.number] + :param intermediate_values: A 1D array of shape (n,) containing the intermediate values. + :type intermediate_values: npt.NDArray[np.number] + :param intermed_sum: A 1D array of shape (1,) containing the precomputed sum of the intermediate values. + :type intermed_sum: npt.NDArray[np.number] + :param intermed_sq_sum: A 1D array of shape (1,) containing the precomputed sum of the squares of the intermediate values. + :param result: A 1D array of shape (m,) to store the resulting correlation coefficients. + :type result: cuda.devicearray.DeviceNDArray + """ + col: int = cuda.grid(1) # type: ignore + + if col >= samples.shape[1]: # type: ignore + return + + n = samples.shape[0] + samples_sum = 0. + samples_sq_sum = 0. + product_sum = 0. + + for row in range(n): + samples_sum += samples[row, col] + samples_sq_sum += samples[row, col] ** 2 + product_sum += samples[row, col] * intermediate_values[row] + + numerator = float(n) * product_sum - samples_sum * intermed_sum[0] + denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2) + denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2) + denominator = denom_samp * denom_int + + result[col] = numerator / denominator diff --git a/test/sca/test_stacked_correlate.py b/test/sca/test_stacked_correlate.py new file mode 100644 index 0000000..954494f --- /dev/null +++ b/test/sca/test_stacked_correlate.py @@ -0,0 +1,64 @@ +import pytest +from numba import cuda +import numpy as np +from pyecsca.sca import ( + StackedTraces, + GPUTraceManager, + CombinedTrace +) +from pyecsca.sca.stacked_traces.correlate import gpu_pearson_corr + +TPB = 128 +TRACE_COUNT = 2 ** 10 +TRACE_LEN = 2 ** 15 +RTOL = 1e-5 +ATOL = 1e-5 + + +@pytest.fixture() +def samples(): + np.random.seed(0x1234) + return np.random.rand(TRACE_COUNT, TRACE_LEN).astype(np.float32, order="F") + + +@pytest.fixture() +def gpu_manager(samples): + if not cuda.is_available(): + pytest.skip("CUDA not available") + return GPUTraceManager(StackedTraces(samples), TPB) + + +@pytest.fixture() +def intermediate_values(): + np.random.seed(0x1234) + return np.random.rand(TRACE_COUNT) + + +def pearson_corr(samples, intermediate_values): + return np.corrcoef(samples, intermediate_values, rowvar=False)[-1, :-1] + + +def test_pearson_coef_no_chunking(samples, gpu_manager, intermediate_values): + corr_gpu = gpu_pearson_corr(intermediate_values, + trace_manager=gpu_manager) + corr_cmp = pearson_corr(samples, intermediate_values) + + assert isinstance(corr_gpu, CombinedTrace) + assert corr_gpu.samples.shape == \ + corr_cmp.shape + + assert all(np.isclose(corr_gpu.samples, corr_cmp, rtol=RTOL, atol=ATOL)) + + +def test_pearson_coef_chunking(samples, gpu_manager, intermediate_values): + corr_gpu = gpu_pearson_corr(intermediate_values, + trace_manager=gpu_manager, + chunk_size=2 ** 5, + stream_count=4) + corr_cmp = pearson_corr(samples, intermediate_values) + + assert isinstance(corr_gpu, CombinedTrace) + assert corr_gpu.samples.shape == \ + corr_cmp.shape + + assert all(np.isclose(corr_gpu.samples, corr_cmp, rtol=RTOL, atol=ATOL)) |
