aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyecsca/sca
diff options
context:
space:
mode:
authorJ08nY2024-06-02 09:54:10 +0200
committerJ08nY2024-06-02 09:54:10 +0200
commit6302e569e92baabc9ca7b18f152c8ce6331a1724 (patch)
treededb06a81e495e1e75ba3acacf3097c7c804c289 /pyecsca/sca
parent64ed0caf1272eaa73433a74ac2f12cfe2aa26271 (diff)
downloadpyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.tar.gz
pyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.tar.zst
pyecsca-6302e569e92baabc9ca7b18f152c8ce6331a1724.zip
Tests for CPUTraceManager and tests for constant input.
Diffstat (limited to 'pyecsca/sca')
-rw-r--r--pyecsca/sca/stacked_traces/combine.py392
1 files changed, 191 insertions, 201 deletions
diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py
index 8016862..d43cd10 100644
--- a/pyecsca/sca/stacked_traces/combine.py
+++ b/pyecsca/sca/stacked_traces/combine.py
@@ -35,9 +35,9 @@ class BaseTraceManager:
"""
raise NotImplementedError
- def conditional_average(self,
- cond: Callable[[npt.NDArray[np.number]], bool]) \
- -> CombinedTrace:
+ def conditional_average(
+ self, cond: Callable[[npt.NDArray[np.number]], bool]
+ ) -> CombinedTrace:
"""
Average :paramref:`~.conditional_average.traces` for which the
:paramref:`~.conditional_average.condition` is ``True``, sample-wise.
@@ -103,13 +103,15 @@ class GPUTraceManager(BaseTraceManager):
_chunk_size: Optional[int]
_stream_count: Optional[int]
- def __init__(self,
- traces: StackedTraces,
- tpb: TPB = 128,
- chunk: bool = False,
- chunk_size: Optional[int] = None,
- chunk_memory_ratio: Optional[float] = None,
- stream_count: Optional[int] = None) -> None:
+ def __init__(
+ self,
+ traces: StackedTraces,
+ tpb: TPB = 128,
+ chunk: bool = False,
+ chunk_size: Optional[int] = None,
+ chunk_memory_ratio: Optional[float] = None,
+ stream_count: Optional[int] = None,
+ ) -> None:
"""
:param traces: Stacked traces on which to operate.
:param tpb: Threads per block to use for GPU operations.
@@ -119,14 +121,14 @@ class GPUTraceManager(BaseTraceManager):
:param chunk_memory_ratio: Part of available memory to use for chunking.
:param stream_count: Number of streams to use for chunking.
"""
- self._check_init_args(chunk_size,
- chunk_memory_ratio,
- tpb)
+ self._check_init_args(chunk_size, chunk_memory_ratio, tpb)
- chunk = (chunk
- or stream_count is not None
- or chunk_size is not None
- or chunk_memory_ratio is not None)
+ chunk = (
+ chunk
+ or stream_count is not None
+ or chunk_size is not None
+ or chunk_memory_ratio is not None
+ )
super().__init__(traces)
# If chunking is used, the samples are stored in Fortran order
@@ -141,9 +143,9 @@ class GPUTraceManager(BaseTraceManager):
self._stream_count = None
else:
self._combine_func = self._gpu_combine1D_chunked
- self._stream_count = (stream_count
- if stream_count is not None
- else STREAM_COUNT)
+ self._stream_count = (
+ stream_count if stream_count is not None else STREAM_COUNT
+ )
if chunk_size is not None:
self._chunk_size = chunk_size
else:
@@ -152,7 +154,8 @@ class GPUTraceManager(BaseTraceManager):
if chunk_memory_ratio is not None
else CHUNK_MEMORY_RATIO,
item_size=self._traces.samples.itemsize,
- chunk_item_count=self._traces.samples.shape[0])
+ chunk_item_count=self._traces.samples.shape[0],
+ )
@staticmethod
def _check_tpb(tpb: TPB) -> None:
@@ -161,57 +164,59 @@ class GPUTraceManager(BaseTraceManager):
max_tpb = dev.MAX_THREADS_PER_BLOCK
if isinstance(tpb, int):
if tpb % warp_size != 0:
- raise ValueError(
- f'TPB should be a multiple of WARP_SIZE ({warp_size})'
- )
+ raise ValueError(f"TPB should be a multiple of WARP_SIZE ({warp_size})")
if tpb > max_tpb:
raise ValueError(
- 'TPB should be smaller than '
- 'MAX_THREADS_PER_BLOCK ({max_tpb})'
+ "TPB should be smaller than " "MAX_THREADS_PER_BLOCK ({max_tpb})"
)
if isinstance(tpb, tuple) and any(
t > max_tpb or t % warp_size != 0 for t in tpb
):
raise ValueError(
- f'TPB should be a multiple of WARP_SIZE ({warp_size}) '
- f'and smaller than MAX_THREADS_PER_BLOCK ({max_tpb})'
- 'in each dimension'
+ f"TPB should be a multiple of WARP_SIZE ({warp_size}) "
+ f"and smaller than MAX_THREADS_PER_BLOCK ({max_tpb})"
+ "in each dimension"
)
@staticmethod
- def _check_chunk_sizing(chunk_size: Optional[int],
- chunk_memory_ratio: Optional[float]) -> None:
+ def _check_chunk_sizing(
+ chunk_size: Optional[int], chunk_memory_ratio: Optional[float]
+ ) -> None:
if chunk_size and chunk_memory_ratio:
- raise ValueError("Only one of chunk_size and chunk_memory_ratio "
- "can be specified")
+ raise ValueError(
+ "Only one of chunk_size and chunk_memory_ratio " "can be specified"
+ )
- if chunk_memory_ratio is not None \
- and (chunk_memory_ratio <= 0 or chunk_memory_ratio > 0.5):
- raise ValueError("Chunk memory ratio should be in (0, 0.5], "
- "because two chunks are stored in memory "
- "at once")
+ if chunk_memory_ratio is not None and (
+ chunk_memory_ratio <= 0 or chunk_memory_ratio > 0.5
+ ):
+ raise ValueError(
+ "Chunk memory ratio should be in (0, 0.5], "
+ "because two chunks are stored in memory "
+ "at once"
+ )
if chunk_size is not None and chunk_size <= 0:
raise ValueError("Chunk size should be positive")
@staticmethod
- def _check_init_args(chunk_size: Optional[int],
- chunk_memory_ratio: Optional[float],
- tpb: TPB) -> None:
+ def _check_init_args(
+ chunk_size: Optional[int], chunk_memory_ratio: Optional[float], tpb: TPB
+ ) -> None:
if not cuda.is_available():
- raise RuntimeError("CUDA is not available, "
- "use CPUTraceManager instead")
+ raise RuntimeError("CUDA is not available, " "use CPUTraceManager instead")
GPUTraceManager._check_chunk_sizing(chunk_size, chunk_memory_ratio)
GPUTraceManager._check_tpb(tpb)
@staticmethod
- def chunk_size_from_ratio(chunk_memory_ratio: float,
- element_size: int | None = None,
- item_size: int | None = None,
- chunk_item_count: int | None = None) -> int:
- if ((element_size is None)
- == (item_size is None and chunk_item_count is None)):
+ def chunk_size_from_ratio(
+ chunk_memory_ratio: float,
+ element_size: int | None = None,
+ item_size: int | None = None,
+ chunk_item_count: int | None = None,
+ ) -> int:
+ if (element_size is None) == (item_size is None and chunk_item_count is None):
raise ValueError(
"Either element_size or item_size and chunk_item_count "
"should be specified"
@@ -222,38 +227,26 @@ class GPUTraceManager(BaseTraceManager):
element_size = item_size * chunk_item_count
mem_size = cuda.current_context().get_memory_info().free
- return int(
- chunk_memory_ratio * mem_size / element_size)
+ return int(chunk_memory_ratio * mem_size / element_size)
@property
def traces_shape(self) -> Tuple[int, ...]:
return self._traces.samples.shape
- def _gpu_combine1D(self,
- func,
- inputs: Optional[List[InputType]] = None,
- output_count: int = 1) \
- -> Union[CombinedTrace, List[CombinedTrace]]:
+ def _gpu_combine1D(
+ self, func, inputs: Optional[List[InputType]] = None, output_count: int = 1
+ ) -> Union[CombinedTrace, List[CombinedTrace]]:
inputs = [] if inputs is None else inputs
results = self._combine_func(func, inputs, output_count)
if output_count == 1:
- return CombinedTrace(
- results[0],
- self._traces.meta
- )
+ return CombinedTrace(results[0], self._traces.meta)
- return [
- CombinedTrace(result, self._traces.meta)
- for result
- in results
- ]
+ return [CombinedTrace(result, self._traces.meta) for result in results]
- def _gpu_combine1D_all(self,
- func,
- inputs: List[InputType],
- output_count: int = 1) \
- -> List[npt.NDArray[np.number]]:
+ def _gpu_combine1D_all(
+ self, func, inputs: List[InputType], output_count: int = 1
+ ) -> List[npt.NDArray[np.number]]:
"""
Runs a combination function on the samples column-wise.
@@ -263,70 +256,59 @@ class GPUTraceManager(BaseTraceManager):
:return: Combined trace output from the GPU function
"""
if not isinstance(self._tpb, int):
- raise ValueError("Something went wrong. "
- "TPB should be an int")
+ raise ValueError("Something went wrong. " "TPB should be an int")
samples_input = cuda.to_device(self._traces.samples)
- device_inputs = [
- cuda.to_device(inp) # type: ignore
- for inp in inputs
- ]
+ device_inputs = [cuda.to_device(inp) for inp in inputs] # type: ignore
device_outputs = [
cuda.device_array(self._traces.samples.shape[1])
for _ in range(output_count)
]
bpg = (self._traces.samples.shape[1] + self._tpb - 1) // self._tpb
- func[bpg, self._tpb](samples_input,
- *device_inputs,
- *device_outputs)
- return [device_output.copy_to_host()
- for device_output in device_outputs]
+ func[bpg, self._tpb](samples_input, *device_inputs, *device_outputs)
+ return [device_output.copy_to_host() for device_output in device_outputs]
- def _gpu_combine1D_chunked(self,
- func,
- inputs: List[InputType],
- output_count: int = 1) \
- -> List[npt.NDArray[np.number]]:
+ def _gpu_combine1D_chunked(
+ self, func, inputs: List[InputType], output_count: int = 1
+ ) -> List[npt.NDArray[np.number]]:
if self._chunk_size is None:
- raise ValueError("Something went wrong. "
- "Chunk size should be specified")
+ raise ValueError("Something went wrong. " "Chunk size should be specified")
if self._stream_count is None:
- raise ValueError("Something went wrong. "
- "Stream count should be specified")
+ raise ValueError(
+ "Something went wrong. " "Stream count should be specified"
+ )
if not isinstance(self._tpb, int):
- raise ValueError("Something went wrong. "
- "TPB should be an int")
+ raise ValueError("Something went wrong. " "TPB should be an int")
chunk_count = (
self._traces.samples.shape[1] + self._chunk_size - 1
) // self._chunk_size
streams = [cuda.stream() for _ in range(self._stream_count)]
events: List[Union[None, cuda.Event]] = [
- None for _ in range(self._stream_count)]
+ None for _ in range(self._stream_count)
+ ]
# Pre-allocate pinned memory for each stream
pinned_input_buffers = [
- cuda.pinned_array((self._traces.samples.shape[0],
- self._chunk_size),
- dtype=self._traces.samples.dtype,
- order="F")
+ cuda.pinned_array(
+ (self._traces.samples.shape[0], self._chunk_size),
+ dtype=self._traces.samples.dtype,
+ order="F",
+ )
for _ in range(self._stream_count)
]
- device_inputs = [
- cuda.to_device(inp) # type: ignore
- for inp in inputs
- ]
+ device_inputs = [cuda.to_device(inp) for inp in inputs] # type: ignore
chunk_results: List[List[npt.NDArray[np.number]]] = [
- [] for _ in range(output_count)]
+ [] for _ in range(output_count)
+ ]
with cuda.defer_cleanup():
for chunk in range(chunk_count):
start = chunk * self._chunk_size
- end = min((chunk + 1) * self._chunk_size,
- self._traces.samples.shape[1])
+ end = min((chunk + 1) * self._chunk_size, self._traces.samples.shape[1])
stream = streams[chunk % self._stream_count]
event = events[chunk % self._stream_count]
if event is not None:
@@ -336,19 +318,19 @@ class GPUTraceManager(BaseTraceManager):
np.copyto(pinned_input, self._traces.samples[:, start:end])
device_input = cuda.to_device(
- pinned_input[:, :end-start], stream=stream)
+ pinned_input[:, : end - start], stream=stream
+ )
device_outputs = [
cuda.device_array(
- (end - start,),
- dtype=pinned_input.dtype,
- stream=stream)
+ (end - start,), dtype=pinned_input.dtype, stream=stream
+ )
for _ in range(output_count)
]
bpg = (end - start + self._tpb - 1) // self._tpb
- func[bpg, self._tpb, stream](device_input,
- *device_inputs,
- *device_outputs)
+ func[bpg, self._tpb, stream](
+ device_input, *device_inputs, *device_outputs
+ )
event = cuda.event()
event.record(stream=stream)
events[chunk % self._stream_count] = event
@@ -356,7 +338,8 @@ class GPUTraceManager(BaseTraceManager):
for output_i, device_output in enumerate(device_outputs):
# Allocating pinned memory for results
host_output = cuda.pinned_array(
- (end - start,), dtype=pinned_input.dtype)
+ (end - start,), dtype=pinned_input.dtype
+ )
device_output.copy_to_host(host_output, stream=stream)
chunk_results[output_i].append(host_output)
@@ -367,9 +350,9 @@ class GPUTraceManager(BaseTraceManager):
def average(self) -> CombinedTrace:
return cast(CombinedTrace, self._gpu_combine1D(gpu_average))
- def conditional_average(self,
- cond: Callable[[npt.NDArray[np.number]], bool]) \
- -> CombinedTrace:
+ def conditional_average(
+ self, cond: Callable[[npt.NDArray[np.number]], bool]
+ ) -> CombinedTrace:
raise NotImplementedError()
def standard_deviation(self) -> CombinedTrace:
@@ -385,34 +368,42 @@ class GPUTraceManager(BaseTraceManager):
def add(self) -> CombinedTrace:
return cast(CombinedTrace, self._gpu_combine1D(gpu_add))
- def pearson_corr(self,
- intermediate_values: npt.NDArray[np.number]) \
- -> CombinedTrace:
- if (len(intermediate_values.shape) != 1
- or (intermediate_values.shape[0] != self.traces_shape[0])):
- raise ValueError("Intermediate values have to be a vector "
- "as long as trace_count")
-
+ def pearson_corr(
+ self, intermediate_values: npt.NDArray[np.number]
+ ) -> CombinedTrace:
+ if len(intermediate_values.shape) != 1 or (
+ intermediate_values.shape[0] != self.traces_shape[0]
+ ):
+ raise ValueError(
+ "Intermediate values have to be a vector " "as long as trace_count"
+ )
+ if np.all(intermediate_values == intermediate_values[0]):
+ raise ValueError(
+ "Constant intermediate value array, correlation undefined."
+ )
intermed_sum: np.number = np.sum(intermediate_values)
intermed_sq_sum: np.number = np.sum(np.square(intermediate_values))
- inputs: List[InputType] = [intermediate_values,
- np.array([intermed_sum]),
- np.array([intermed_sq_sum])]
+ inputs: List[InputType] = [
+ intermediate_values,
+ np.array([intermed_sum]),
+ np.array([intermed_sq_sum]),
+ ]
- return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr,
- inputs))
+ return cast(CombinedTrace, self._gpu_combine1D(gpu_pearson_corr, inputs))
- def run(self,
- func: Callable,
- inputs: Optional[List[InputType]] = None,
- output_count: int = 1) \
- -> Union[CombinedTrace, List[CombinedTrace]]:
+ def run(
+ self,
+ func: Callable,
+ inputs: Optional[List[InputType]] = None,
+ output_count: int = 1,
+ ) -> Union[CombinedTrace, List[CombinedTrace]]:
return self._gpu_combine1D(func, inputs, output_count)
@cuda.jit(device=True, cache=True)
-def _gpu_average(col: int, samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def _gpu_average(
+ col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number]
+):
"""
Cuda device thread function computing the average of a sample of stacked traces.
@@ -420,15 +411,14 @@ def _gpu_average(col: int, samples: npt.NDArray[np.number],
:param samples: Shared array of the samples of stacked traces.
:param result: Result output array.
"""
- acc = 0.
+ acc = 0.0
for row in range(samples.shape[0]):
acc += samples[row, col]
result[col] = acc / samples.shape[0]
@cuda.jit(cache=True)
-def gpu_average(samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
"""
Sample average of stacked traces, sample-wise.
@@ -444,10 +434,12 @@ def gpu_average(samples: npt.NDArray[np.number],
@cuda.jit(device=True, cache=True)
-def _gpu_var_from_avg(col: int,
- samples: npt.NDArray[np.number],
- averages: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def _gpu_var_from_avg(
+ col: int,
+ samples: npt.NDArray[np.number],
+ averages: npt.NDArray[np.number],
+ result: npt.NDArray[np.number],
+):
"""
Cuda device thread function computing the variance from the average of a sample of stacked traces.
@@ -456,7 +448,7 @@ def _gpu_var_from_avg(col: int,
:param averages: Array of averages of samples.
:param result: Result output array.
"""
- var = 0.
+ var = 0.0
for row in range(samples.shape[0]):
current = samples[row, col] - averages[col]
var += current * current
@@ -464,8 +456,9 @@ def _gpu_var_from_avg(col: int,
@cuda.jit(device=True, cache=True)
-def _gpu_variance(col: int, samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def _gpu_variance(
+ col: int, samples: npt.NDArray[np.number], result: npt.NDArray[np.number]
+):
"""
Cuda device thread function computing the variance of a sample of stacked traces.
@@ -478,8 +471,7 @@ def _gpu_variance(col: int, samples: npt.NDArray[np.number],
@cuda.jit(cache=True)
-def gpu_std_dev(samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
"""
Sample standard deviation of stacked traces, sample-wise.
@@ -497,8 +489,7 @@ def gpu_std_dev(samples: npt.NDArray[np.number],
@cuda.jit(cache=True)
-def gpu_variance(samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
"""
Sample variance of stacked traces, sample-wise.
@@ -514,9 +505,11 @@ def gpu_variance(samples: npt.NDArray[np.number],
@cuda.jit(cache=True)
-def gpu_avg_var(samples: npt.NDArray[np.number],
- result_avg: npt.NDArray[np.number],
- result_var: npt.NDArray[np.number]):
+def gpu_avg_var(
+ samples: npt.NDArray[np.number],
+ result_avg: npt.NDArray[np.number],
+ result_var: npt.NDArray[np.number],
+):
"""
Sample average and variance of stacked traces, sample-wise.
@@ -534,8 +527,7 @@ def gpu_avg_var(samples: npt.NDArray[np.number],
@cuda.jit(cache=True)
-def gpu_add(samples: npt.NDArray[np.number],
- result: npt.NDArray[np.number]):
+def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]):
"""
Add samples of stacked traces, sample-wise.
@@ -547,18 +539,20 @@ def gpu_add(samples: npt.NDArray[np.number],
if col >= samples.shape[1]:
return
- res = 0.
+ res = 0.0
for row in range(samples.shape[0]):
res += samples[row, col]
result[col] = res
@cuda.jit(cache=True)
-def gpu_pearson_corr(samples: DeviceNDArray,
- intermediate_values: DeviceNDArray,
- intermed_sum: DeviceNDArray,
- intermed_sq_sum: DeviceNDArray,
- result: DeviceNDArray):
+def gpu_pearson_corr(
+ samples: DeviceNDArray,
+ intermediate_values: DeviceNDArray,
+ intermed_sum: DeviceNDArray,
+ intermed_sq_sum: DeviceNDArray,
+ result: DeviceNDArray,
+):
"""
Calculates the Pearson correlation coefficient between the given samples and intermediate values using GPU acceleration.
@@ -578,9 +572,9 @@ def gpu_pearson_corr(samples: DeviceNDArray,
return
n = samples.shape[0]
- samples_sum = 0.
- samples_sq_sum = 0.
- product_sum = 0.
+ samples_sum = 0.0
+ samples_sq_sum = 0.0
+ product_sum = 0.0
for row in range(n):
samples_sum += samples[row, col]
@@ -588,7 +582,7 @@ def gpu_pearson_corr(samples: DeviceNDArray,
product_sum += samples[row, col] * intermediate_values[row]
numerator = float(n) * product_sum - samples_sum * intermed_sum[0]
- denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum ** 2)
+ denom_samp = sqrt(float(n) * samples_sq_sum - samples_sum**2)
denom_int = sqrt(float(n) * intermed_sq_sum[0] - intermed_sum[0] ** 2)
denominator = denom_samp * denom_int
@@ -611,15 +605,11 @@ class CPUTraceManager:
:param traces:
:return:
"""
- return CombinedTrace(
- np.average(self.traces.samples, 0),
- self.traces.meta
- )
+ return CombinedTrace(np.average(self.traces.samples, 0), self.traces.meta)
- def conditional_average(self,
- condition: Callable[[npt.NDArray[np.number]],
- bool]) \
- -> CombinedTrace:
+ def conditional_average(
+ self, condition: Callable[[npt.NDArray[np.number]], bool]
+ ) -> CombinedTrace:
"""
Compute the conditional average of the :paramref:`~.conditional_average.traces`, sample-wise.
@@ -628,9 +618,13 @@ class CPUTraceManager:
"""
# TODO: Consider other ways to implement this
return CombinedTrace(
- np.average(self.traces.samples[np.apply_along_axis(
- condition, 1, self.traces.samples)], 1),
- self.traces.meta
+ np.average(
+ self.traces.samples[
+ np.apply_along_axis(condition, 1, self.traces.samples)
+ ],
+ 1,
+ ),
+ self.traces.meta,
)
def standard_deviation(self) -> CombinedTrace:
@@ -640,10 +634,7 @@ class CPUTraceManager:
:param traces:
:return:
"""
- return CombinedTrace(
- np.std(self.traces.samples, 0),
- self.traces.meta
- )
+ return CombinedTrace(np.std(self.traces.samples, 0), self.traces.meta)
def variance(self) -> CombinedTrace:
"""
@@ -652,10 +643,7 @@ class CPUTraceManager:
:param traces:
:return:
"""
- return CombinedTrace(
- np.var(self.traces.samples, 0),
- self.traces.meta
- )
+ return CombinedTrace(np.var(self.traces.samples, 0), self.traces.meta)
def average_and_variance(self) -> List[CombinedTrace]:
"""
@@ -664,10 +652,7 @@ class CPUTraceManager:
:param traces:
:return:
"""
- return [
- self.average(),
- self.variance()
- ]
+ return [self.average(), self.variance()]
def add(self) -> CombinedTrace:
"""
@@ -676,14 +661,11 @@ class CPUTraceManager:
:param traces:
:return:
"""
- return CombinedTrace(
- np.sum(self.traces.samples, 0),
- self.traces.meta
- )
+ return CombinedTrace(np.sum(self.traces.samples, 0), self.traces.meta)
- def pearson_corr(self,
- intermediate_values: npt.NDArray[np.number]) \
- -> CombinedTrace:
+ def pearson_corr(
+ self, intermediate_values: npt.NDArray[np.number]
+ ) -> CombinedTrace:
"""
Calculates the Pearson correlation coefficient between the given samples and intermediate values sample-wise.
@@ -702,10 +684,18 @@ class CPUTraceManager:
samples = self.traces.samples
n = samples.shape[0]
if intermediate_values.shape != (n,):
- raise ValueError("Invalid shape of intermediate_values, "
- f"expected ({n},), "
- f"got {intermediate_values.shape}")
- new_size = str(samples.dtype.itemsize * 2) if samples.dtype.itemsize != 8 else "8"
+ raise ValueError(
+ "Invalid shape of intermediate_values, "
+ f"expected ({n},), "
+ f"got {intermediate_values.shape}"
+ )
+ if np.all(intermediate_values == intermediate_values[0]):
+ raise ValueError(
+ "Constant intermediate value array, correlation undefined."
+ )
+ new_size = (
+ str(samples.dtype.itemsize * 2) if samples.dtype.itemsize != 8 else "8"
+ )
dtype = np.dtype(samples.dtype.kind + new_size)
sam_sum = np.sum(samples, axis=0)
sam_sq_sum = np.sum(np.square(samples, dtype=dtype), axis=0)
@@ -716,7 +706,7 @@ class CPUTraceManager:
prod_sum = intermediate_values @ samples
numerator = n * prod_sum - sam_sum * iv_sum
- denom_samp = np.sqrt(n * sam_sq_sum - sam_sum ** 2)
- denom_int = np.sqrt(n * iv_sq_sum - iv_sum ** 2)
+ denom_samp = np.sqrt(n * sam_sq_sum - sam_sum**2)
+ denom_int = np.sqrt(n * iv_sq_sum - iv_sum**2)
denominator = denom_samp * denom_int
- return numerator / denominator
+ return CombinedTrace(numerator / denominator, self.traces.meta)