diff options
| author | J08nY | 2024-06-02 09:54:10 +0200 |
|---|---|---|
| committer | J08nY | 2024-06-02 09:54:10 +0200 |
| commit | 6302e569e92baabc9ca7b18f152c8ce6331a1724 (patch) | |
| tree | dedb06a81e495e1e75ba3acacf3097c7c804c289 /pyecsca/sca | |
| parent | 64ed0caf1272eaa73433a74ac2f12cfe2aa26271 (diff) | |
| download | pyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.tar.gz pyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.tar.zst pyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.zip | |
Tests for CPUTraceManager and tests for constant input.
Diffstat (limited to 'pyecsca/sca')
| -rw-r--r-- | pyecsca/sca/stacked_traces/combine.py | 392 |
1 files changed, 191 insertions, 201 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py index 8016862..d43cd10 100644 --- a/pyecsca/sca/stacked_traces/combine.py +++ b/pyecsca/sca/stacked_traces/combine.py @@ -35,9 +35,9 @@ class BaseTraceManager: """ raise NotImplementedError - def conditional_average(self, - cond: Callable[[npt.NDArray[np.number]], bool]) \ - -> CombinedTrace: + def conditional_average( + self, cond: Callable[[npt.NDArray[np.number]], bool] + ) -> CombinedTrace: """ Average :paramref:`~.conditional_average.traces` for which the :paramref:`~.conditional_average.condition` is ``True``, sample-wise. @@ -103,13 +103,15 @@ class GPUTraceManager(BaseTraceManager): _chunk_size: Optional[int] _stream_count: Optional[int] - def __init__(self, - traces: StackedTraces, - tpb: TPB = 128, - chunk: bool = False, - chunk_size: Optional[int] = None, - chunk_memory_ratio: Optional[float] = None, - stream_count: Optional[int] = None) -> None: + def __init__( + self, + traces: StackedTraces, + tpb: TPB = 128, + chunk: bool = False, + chunk_size: Optional[int] = None, + chunk_memory_ratio: Optional[float] = None, + stream_count: Optional[int] = None, + ) -> None: """ :param traces: Stacked traces on which to operate. :param tpb: Threads per block to use for GPU operations. @@ -119,14 +121,14 @@ class GPUTraceManager(BaseTraceManager): :param chunk_memory_ratio: Part of available memory to use for chunking. :param stream_count: Number of streams to use for chunking. """ - self._check_init_args(chunk_size, - chunk_memory_ratio, - tpb) + self._check_init_args(chunk_size, chunk_memory_ratio, tpb) - chunk = (chunk - or stream_count is not None - or chunk_size is not None - or chunk_memory_ratio is not None) + chunk = ( + chunk + or stream_count is not None + or chunk_size is not None + or chunk_memory_ratio is not None + ) super().__init__(traces) # If chunking is used, the samples are stored in Fortran order @@ -141,9 +143,9 @@ class GPUTraceManager(BaseTraceManager): self._stream_count = None else: self._combine_func = self._gpu_combine1D_chunked - self._stream_count = (stream_count - if stream_count is not None - else STREAM_COUNT) + self._stream_count = ( + stream_count if stream_count is not None else STREAM_COUNT + ) if chunk_size is not None: self._chunk_size = chunk_size else: @@ -152,7 +154,8 @@ class GPUTraceManager(BaseTraceManager): if chunk_memory_ratio is not None else CHUNK_MEMORY_RATIO, item_size=self._traces.samples.itemsize, - chunk_item_count=self._traces.samples.shape[0]) + chunk_item_count=self._traces.samples.shape[0], + ) @staticmethod def _check_tpb(tpb: TPB) -> None: @@ -161,57 +164,59 @@ class GPUTraceManager(BaseTraceManager): max_tpb = dev.MAX_THREADS_PER_BLOCK if isinstance(tpb, int): if tpb % warp_size != 0: - raise ValueError( - f'TPB should be a multiple of WARP_SIZE ({warp_size})' - ) + raise ValueError(f"TPB should be a multiple of WARP_SIZE ({warp_size})") if tpb > max_tpb: raise ValueError( - 'TPB should be smaller than ' - 'MAX_THREADS_PER_BLOCK ({max_tpb})' + "TPB should be smaller than " "MAX_THREADS_PER_BLOCK ({max_tpb})" ) if isinstance(tpb, tuple) and any( t > max_tpb or t % warp_size != 0 for t in tpb ): raise ValueError( - f'TPB should be a multiple of WARP_SIZE ({warp_size}) ' - f'and smaller than MAX_THREADS_PER_BLOCK ({max_tpb})' - 'in each dimension' + f"TPB should be a multiple of WARP_SIZE ({warp_size}) " + f"and smaller than MAX_THREADS_PER_BLOCK ({max_tpb})" + "in each dimension" ) @staticmethod - def _check_chunk_sizing(chunk_size: Optional[int], - chunk_memory_ratio: Optional[float]) -> None: + def _check_chunk_sizing( + chunk_size: Optional[int], chunk_memory_ratio: Optional[float] + ) -> None: if chunk_size and chunk_memory_ratio: - raise ValueError("Only one of chunk_size and chunk_memory_ratio " - "can be specified") + raise ValueError( + "Only one of chunk_size and chunk_memory_ratio " "can be specified" + ) - if chunk_memory_ratio is not None \ - and (chunk_memory_ratio <= 0 or chunk_memory_ratio > 0.5): - raise ValueError("Chunk memory ratio should be in (0, 0.5], " - "because two chunks are stored in memory " - "at once") + if chunk_memory_ratio is not None and ( + chunk_memory_ratio <= 0 or chunk_memory_ratio > 0.5 + ): + raise ValueError( + "Chunk memory ratio should be in (0, 0.5], " + "because two chunks are stored in memory " + "at once" + ) if chunk_size is not None and chunk_size <= 0: raise ValueError("Chunk size should be positive") @staticmethod - def _check_init_args(chunk_size: Optional[int], - chunk_memory_ratio: Optional[float], - tpb: TPB) -> None: + def _check_init_args( + chunk_size: Optional[int], chunk_memory_ratio: Optional[float], tpb: TPB + ) -> None: if not cuda.is_available(): - raise RuntimeError("CUDA is not available, " - "use CPUTraceManager instead") + raise RuntimeError("CUDA is not available, " "use CPUTraceManager instead") GPUTraceManager._check_chunk_sizing(chunk_size, chunk_memory_ratio) GPUTraceManager._check_tpb(tpb) @staticmethod - def chunk_size_from_ratio(chunk_memory_ratio: float, - element_size: int | None = None, - item_size: int | None = None, - chunk_item_count: int | None = None) -> int: - if ((element_size is None) - == (item_size is None and chunk_item_count is None)): + def chunk_size_from_ratio( + chunk_memory_ratio: float, + element_size: int | None = None, + item_size: int | None = None, + chunk_item_count: int | None = None, + ) -> int: + if (element_size is None) == (item_size is None and chunk_item_count is None): raise ValueError( "Either element_size or item_size and chunk_item_count " "should be specified" @@ -222,38 +227,26 @@ class GPUTraceManager(BaseTraceManager): element_size = item_size * chunk_item_count mem_size = cuda.current_context().get_memory_info().free - return int( - chunk_memory_ratio * mem_size / element_size) + return int(chunk_memory_ratio * mem_size / element_size) @property def traces_shape(self) -> Tuple[int, ...]: return self._traces.samples.shape - def _gpu_combine1D(self, - func, - inputs: Optional[List[InputType]] = None, - output_count: int = 1) \ - -> Union[CombinedTrace, List[CombinedTrace]]: + def _gpu_combine1D( + self, func, inputs: Optional[List[InputType]] = None, output_count: int = 1 + ) -> Union[CombinedTrace, List[CombinedTrace]]: inputs = [] if inputs is None else inputs results = self._combine_func(func, inputs, output_count) if output_count == 1: - return CombinedTrace( - results[0], - self._traces.meta - ) + return CombinedTrace(results[0], self._traces.meta) - return [ - CombinedTrace(result, self._traces.meta) - for result - in results - ] + return [CombinedTrace(result, self._traces.meta) for result in results] - def _gpu_combine1D_all(self, - func, - inputs: List[InputType], - output_count: int = 1) \ - -> List[npt.NDArray[np.number]]: + def _gpu_combine1D_all( + self, func, inputs: List[InputType], output_count: int = 1 + ) -> List[npt.NDArray[np.number]]: """ Runs a combination function on the samples column-wise. @@ -263,70 +256,59 @@ class GPUTraceManager(BaseTraceManager): :return: Combined trace output from the GPU function """ if not isinstance(self._tpb, int): - raise ValueError("Something went wrong. " - "TPB should be an int") + raise ValueError("Something went wrong. " "TPB should be an int") samples_input = cuda.to_device(self._traces.samples) - device_inputs = [ - cuda.to_device(inp) # type: ignore - for inp in inputs - ] + device_inputs = [cuda.to_device(inp) for inp in inputs] # type: ignore device_outputs = [ cuda.device_array(self._traces.samples.shape[1]) for _ in range(output_count) ] bpg = (self._traces.samples.shape[1] + self._tpb - 1) // self._tpb - func[bpg, self._tpb](samples_input, - *device_inputs, - *device_outputs) - return [device_output.copy_to_host() - for device_output in device_outputs] + func[bpg, self._tpb](samples_input, *device_inputs, *device_outputs) + return [device_output.copy_to_host() for device_output in device_outputs] - def _gpu_combine1D_chunked(self, - func, - inputs: List[InputType], - output_count: int = 1) \ - -> List[npt.NDArray[np.number]]: + def _gpu_combine1D_chunked( + self, func, inputs: List[InputType], output_count: int = 1 + ) -> List[npt.NDArray[np.number]]: if self._chunk_size is None: - raise ValueError("Something went wrong. " - "Chunk size should be specified") + raise ValueError("Something went wrong. " "Chunk size should be specified") if self._stream_count is None: - raise ValueError("Something went wrong. " - "Stream count should be specified") + raise ValueError( + "Something went wrong. " "Stream count should be specified" + ) if not isinstance(self._tpb, int): - raise ValueError("Something went wrong. " - "TPB should be an int") + raise ValueError("Something went wrong. " "TPB should be an int") chunk_count = ( self._traces.samples.shape[1] + self._chunk_size - 1 ) // self._chunk_size streams = [cuda.stream() for _ in range(self._stream_count)] events: List[Union[None, cuda.Event]] = [ - None for _ in range(self._stream_count)] + None for _ in range(self._stream_count) + ] # Pre-allocate pinned memory for each stream pinned_input_buffers = [ - cuda.pinned_array((self._traces.samples.shape[0], - self._chunk_size), - dtype=self._traces.samples.dtype, - order="F") + cuda.pinned_array( + (self._traces.samples.shape[0], self._chunk_size), + dtype=self._traces.samples.dtype, + order="F", + ) for _ in range(self._stream_count) ] - device_inputs = [ - cuda.to_device(inp) # type: ignore - for inp in inputs - ] + device_inputs = [cuda.to_device(inp) for inp in inputs] # type: ignore chunk_results: List[List[npt.NDArray[np.number]]] = [ - [] for _ in range(output_count)] + [] for _ in range(output_count) + ] with cuda.defer_cleanup(): for chunk in range(chunk_count): start = chunk * self._chunk_size - end = min((chunk + 1) * self._chunk_size, - self._traces.samples.shape[1]) + end = min((chunk + 1) * self._chunk_size, self._traces.samples.shape[1]) stream = streams[chunk % self._stream_count] event = events[chunk % self._stream_count] if event is not None: @@ -336,19 +318,19 @@ class GPUTraceManager(BaseTraceManager): np.copyto(pinned_input, self._traces.samples[:, start:end]) device_input = cuda.to_device( - pinned_input[:, :end-start], stream=stream) + pinned_input[:, : end - start], stream=stream + ) device_outputs = [ cuda.device_array( - (end - start,), - dtype=pinned_input.dtype, - stream=stream) + (end - start,), dtype=pinned_input.dtype, stream=stream + ) for _ in range(output_count) ] bpg = (end - start + self._tpb - 1) // self._tpb - func[bpg, self._tpb, stream](device_input, - *device_inputs, - *device_outputs) + func[bpg, self._tpb, stream]( + device_input, *device_inputs, *device_outputs + ) event = cuda.event() event.record(stream=stream) events[chunk % self._stream_count] = event @@ -356,7 +338,8 @@ class GPUTraceManager(BaseTraceManager): for output_i, device_output in enumerate(device_outputs): # Allocating pinned memory for results host_output = cuda.pinned_array( - (end - start,), dtype=pinned_input.dtype) + (end - start,), dtype=pinned_input.dtype + ) device_output.copy_to_host(host_output, stream=stream) chunk_results[output_i].append(host_output) @@ -367,9 +350,9 @@ class GPUTraceManager(BaseTraceManager): def average(self) -> CombinedTrace: return cast(CombinedTrace, self._gpu_combine1D(gpu_average)) - def conditional_average(self, - cond: Callable[[npt.NDArray[np.number]], bool]) \ - -> CombinedTrace: + def conditional_average( + self, cond: Callable[[npt.NDArray[np.number]], bool] + ) -> CombinedTrace: raise NotImplementedError() def standard_deviation(self) -> CombinedTrace: @@ -385,34 +368,42 @@ class GPUTraceManager(BaseTraceManager): def add(self) -> CombinedTrace: return cast(CombinedTrace, self._gpu_combine1D(gpu_add)) - def pearson_corr(self, - intermediate_values: npt.NDArray[np.number]) \ - -> CombinedTrace: - if (len(intermediate_values.shape) != 1 - or (intermediate_values.shape[0] != self.traces_shape[0])): - raise ValueError("Intermediate values have to be a vector " - "as long as trace_count") - + def pearson_corr( + self, intermediate_values: npt.NDArray[np.number] + ) -> CombinedTrace: + if len(intermediate_values.shape) != 1 or ( + intermediate_values.shape[0] != self.traces_shape[0] + ): + raise ValueError( + "Intermediate values have to be a vector " "as long as trace_count" + ) + if np.all(intermediate_values == intermediate_values[0]): + raise ValueError( + "Constant intermediate value array, correlation undefined." + ) intermed_sum: np.number = np.sum(intermediate_values) intermed_sq_sum: np.number = np.sum(np.square(intermediate_values)) - inputs: List[InputType] = [intermediate_values, - np.array([intermed_sum]), - np.array([intermed_sq_sum])] + inputs: List[InputType] = [ + intermediate_values, + np.array([intermed_sum]), + np.array([intermed_sq_sum]), + ] - return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, - inputs)) + return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs)) - def run(self, - func: Callable, - inputs: Optional[List[InputType]] = None, - output_count: int = 1) \ - -> Union[CombinedTrace, List[CombinedTrace]]: + def run( + self, + func: Callable, + inputs: Optional[List[InputType]] = None, + output_count: int = 1, + ) -> Union[CombinedTrace, List[CombinedTrace]]: return self._gpu_combine1D(func, inputs, output_count) @cuda.jit(device=True, cache=True) -def _gpu_average(col: int, samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def _gpu_average( + col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): """ Cuda device thread function computing the average of a sample of stacked traces. @@ -420,15 +411,14 @@ def _gpu_average(col: int, samples: npt.NDArray[np.number], :param samples: Shared array of the samples of stacked traces. :param result: Result output array. """ - acc = 0. + acc = 0.0 for row in range(samples.shape[0]): acc += samples[row, col] result[col] = acc / samples.shape[0] @cuda.jit(cache=True) -def gpu_average(samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): """ Sample average of stacked traces, sample-wise. @@ -444,10 +434,12 @@ def gpu_average(samples: npt.NDArray[np.number], @cuda.jit(device=True, cache=True) -def _gpu_var_from_avg(col: int, - samples: npt.NDArray[np.number], - averages: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def _gpu_var_from_avg( + col: int, + samples: npt.NDArray[np.number], + averages: npt.NDArray[np.number], + result: npt.NDArray[np.number], +): """ Cuda device thread function computing the variance from the average of a sample of stacked traces. @@ -456,7 +448,7 @@ def _gpu_var_from_avg(col: int, :param averages: Array of averages of samples. :param result: Result output array. """ - var = 0. + var = 0.0 for row in range(samples.shape[0]): current = samples[row, col] - averages[col] var += current * current @@ -464,8 +456,9 @@ def _gpu_var_from_avg(col: int, @cuda.jit(device=True, cache=True) -def _gpu_variance(col: int, samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def _gpu_variance( + col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): """ Cuda device thread function computing the variance of a sample of stacked traces. @@ -478,8 +471,7 @@ def _gpu_variance(col: int, samples: npt.NDArray[np.number], @cuda.jit(cache=True) -def gpu_std_dev(samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): """ Sample standard deviation of stacked traces, sample-wise. @@ -497,8 +489,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number], @cuda.jit(cache=True) -def gpu_variance(samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): """ Sample variance of stacked traces, sample-wise. @@ -514,9 +505,11 @@ def gpu_variance(samples: npt.NDArray[np.number], @cuda.jit(cache=True) -def gpu_avg_var(samples: npt.NDArray[np.number], - result_avg: npt.NDArray[np.number], - result_var: npt.NDArray[np.number]): +def gpu_avg_var( + samples: npt.NDArray[np.number], + result_avg: npt.NDArray[np.number], + result_var: npt.NDArray[np.number], +): """ Sample average and variance of stacked traces, sample-wise. @@ -534,8 +527,7 @@ def gpu_avg_var(samples: npt.NDArray[np.number], @cuda.jit(cache=True) -def gpu_add(samples: npt.NDArray[np.number], - result: npt.NDArray[np.number]): +def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): """ Add samples of stacked traces, sample-wise. @@ -547,18 +539,20 @@ def gpu_add(samples: npt.NDArray[np.number], if col >= samples.shape[1]: return - res = 0. + res = 0.0 for row in range(samples.shape[0]): res += samples[row, col] result[col] = res @cuda.jit(cache=True) -def gpu_pearson_corr(samples: DeviceNDArray, - intermediate_values: DeviceNDArray, - intermed_sum: DeviceNDArray, - intermed_sq_sum: DeviceNDArray, - result: DeviceNDArray): +def gpu_pearson_corr( + samples: DeviceNDArray, + intermediate_values: DeviceNDArray, + intermed_sum: DeviceNDArray, + intermed_sq_sum: DeviceNDArray, + result: DeviceNDArray, +): """ Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration. @@ -578,9 +572,9 @@ def gpu_pearson_corr(samples: DeviceNDArray, return n = samples.shape[0] - samples_sum = 0. - samples_sq_sum = 0. - product_sum = 0. + samples_sum = 0.0 + samples_sq_sum = 0.0 + product_sum = 0.0 for row in range(n): samples_sum += samples[row, col] @@ -588,7 +582,7 @@ def gpu_pearson_corr(samples: DeviceNDArray, product_sum += samples[row, col] * intermediate_values[row] numerator = float(n) * product_sum - samples_sum * intermed_sum[0] - denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2) + denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum**2) denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2) denominator = denom_samp * denom_int @@ -611,15 +605,11 @@ class CPUTraceManager: :param traces: :return: """ - return CombinedTrace( - np.average(self.traces.samples, 0), - self.traces.meta - ) + return CombinedTrace(np.average(self.traces.samples, 0), self.traces.meta) - def conditional_average(self, - condition: Callable[[npt.NDArray[np.number]], - bool]) \ - -> CombinedTrace: + def conditional_average( + self, condition: Callable[[npt.NDArray[np.number]], bool] + ) -> CombinedTrace: """ Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise. @@ -628,9 +618,13 @@ class CPUTraceManager: """ # TODO: Consider other ways to implement this return CombinedTrace( - np.average(self.traces.samples[np.apply_along_axis( - condition, 1, self.traces.samples)], 1), - self.traces.meta + np.average( + self.traces.samples[ + np.apply_along_axis(condition, 1, self.traces.samples) + ], + 1, + ), + self.traces.meta, ) def standard_deviation(self) -> CombinedTrace: @@ -640,10 +634,7 @@ class CPUTraceManager: :param traces: :return: """ - return CombinedTrace( - np.std(self.traces.samples, 0), - self.traces.meta - ) + return CombinedTrace(np.std(self.traces.samples, 0), self.traces.meta) def variance(self) -> CombinedTrace: """ @@ -652,10 +643,7 @@ class CPUTraceManager: :param traces: :return: """ - return CombinedTrace( - np.var(self.traces.samples, 0), - self.traces.meta - ) + return CombinedTrace(np.var(self.traces.samples, 0), self.traces.meta) def average_and_variance(self) -> List[CombinedTrace]: """ @@ -664,10 +652,7 @@ class CPUTraceManager: :param traces: :return: """ - return [ - self.average(), - self.variance() - ] + return [self.average(), self.variance()] def add(self) -> CombinedTrace: """ @@ -676,14 +661,11 @@ class CPUTraceManager: :param traces: :return: """ - return CombinedTrace( - np.sum(self.traces.samples, 0), - self.traces.meta - ) + return CombinedTrace(np.sum(self.traces.samples, 0), self.traces.meta) - def pearson_corr(self, - intermediate_values: npt.NDArray[np.number]) \ - -> CombinedTrace: + def pearson_corr( + self, intermediate_values: npt.NDArray[np.number] + ) -> CombinedTrace: """ Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise. @@ -702,10 +684,18 @@ class CPUTraceManager: samples = self.traces.samples n = samples.shape[0] if intermediate_values.shape != (n,): - raise ValueError("Invalid shape of intermediate_values, " - f"expected ({n},), " - f"got {intermediate_values.shape}") - new_size = str(samples.dtype.itemsize * 2) if samples.dtype.itemsize != 8 else "8" + raise ValueError( + "Invalid shape of intermediate_values, " + f"expected ({n},), " + f"got {intermediate_values.shape}" + ) + if np.all(intermediate_values == intermediate_values[0]): + raise ValueError( + "Constant intermediate value array, correlation undefined." + ) + new_size = ( + str(samples.dtype.itemsize * 2) if samples.dtype.itemsize != 8 else "8" + ) dtype = np.dtype(samples.dtype.kind + new_size) sam_sum = np.sum(samples, axis=0) sam_sq_sum = np.sum(np.square(samples, dtype=dtype), axis=0) @@ -716,7 +706,7 @@ class CPUTraceManager: prod_sum = intermediate_values @ samples numerator = n * prod_sum - sam_sum * iv_sum - denom_samp = np.sqrt(n * sam_sq_sum - sam_sum ** 2) - denom_int = np.sqrt(n * iv_sq_sum - iv_sum ** 2) + denom_samp = np.sqrt(n * sam_sq_sum - sam_sum**2) + denom_int = np.sqrt(n * iv_sq_sum - iv_sum**2) denominator = denom_samp * denom_int - return numerator / denominator + return CombinedTrace(numerator / denominator, self.traces.meta) |
