diff options
| author | J08nY | 2025-03-13 19:38:48 +0100 |
|---|---|---|
| committer | J08nY | 2025-03-13 19:38:48 +0100 |
| commit | 7c2f12a0111de33330870b2179b71281b59ada29 (patch) | |
| tree | 80beb7f4e3090a4805d7aa20ffba5cbcc0078902 /pyecsca/sca | |
| parent | eccc58127b4c0c10f50e4d05e699d3585391e8a1 (diff) | |
| download | pyecsca-7c2f12a0111de33330870b2179b71281b59ada29.tar.gz pyecsca-7c2f12a0111de33330870b2179b71281b59ada29.tar.zst pyecsca-7c2f12a0111de33330870b2179b71281b59ada29.zip | |
Diffstat (limited to 'pyecsca/sca')
36 files changed, 279 insertions, 143 deletions
diff --git a/pyecsca/sca/attack/leakage_model.py b/pyecsca/sca/attack/leakage_model.py index c1691d8..fa9a2e3 100644 --- a/pyecsca/sca/attack/leakage_model.py +++ b/pyecsca/sca/attack/leakage_model.py @@ -1,6 +1,7 @@ """ Provides leakage models to simulate leakage. """ + import abc import sys from typing import Literal, ClassVar @@ -11,9 +12,12 @@ from public import public from pyecsca.sca.trace import Trace if sys.version_info[0] < 3 or sys.version_info[0] == 3 and sys.version_info[1] < 10: + def hw(i): return bin(i).count("1") + else: + def hw(i): return i.bit_count() @@ -43,13 +47,16 @@ class NormalNoice(Noise): def __call__(self, *args, **kwargs): arg = args[0] if isinstance(arg, Trace): - return Trace(arg.samples + self.rng.normal(self.mean, self.sdev, len(arg.samples))) + return Trace( + arg.samples + self.rng.normal(self.mean, self.sdev, len(arg.samples)) + ) return arg + self.rng.normal(self.mean, self.sdev) @public class LeakageModel(abc.ABC): """An abstract leakage model.""" + num_args: ClassVar[int] @abc.abstractmethod @@ -61,6 +68,7 @@ class LeakageModel(abc.ABC): @public class Identity(LeakageModel): """Identity leakage model, leaks the thing itself.""" + num_args = 1 def __call__(self, *args, **kwargs) -> int: @@ -70,6 +78,7 @@ class Identity(LeakageModel): @public class Bit(LeakageModel): """Bit leakage model, leaks a selected bit.""" + num_args = 1 def __init__(self, which: int): @@ -85,6 +94,7 @@ class Bit(LeakageModel): @public class Slice(LeakageModel): """Slice leakage model, leaks a slice of bits.""" + num_args = 1 def __init__(self, begin: int, end: int): @@ -103,6 +113,7 @@ class Slice(LeakageModel): @public class HammingWeight(LeakageModel): """Hamming-weight leakage model, leaks the Hamming-weight of the thing.""" + num_args = 1 def __call__(self, *args, **kwargs) -> int: @@ -112,6 +123,7 @@ class HammingWeight(LeakageModel): @public class HammingDistance(LeakageModel): """Hamming-distance leakage model, leaks the Hamming-distance between the two things.""" + num_args = 2 def __call__(self, *args, **kwargs) -> int: @@ -121,6 +133,7 @@ class HammingDistance(LeakageModel): @public class BitLength(LeakageModel): """Bit-length leakage model, leaks the bit-length of the thing.""" + num_args = 1 def __call__(self, *args, **kwargs) -> int: diff --git a/pyecsca/sca/re/rpa.py b/pyecsca/sca/re/rpa.py index 4f30d07..10071b0 100644 --- a/pyecsca/sca/re/rpa.py +++ b/pyecsca/sca/re/rpa.py @@ -6,7 +6,17 @@ from copy import copy, deepcopy from functools import lru_cache from public import public -from typing import MutableMapping, Optional, Callable, List, Set, cast, Type, Literal, Union +from typing import ( + MutableMapping, + Optional, + Callable, + List, + Set, + cast, + Type, + Literal, + Union, +) from sympy import FF, sympify, Poly, symbols @@ -404,7 +414,9 @@ class RPA(RE): @lru_cache(maxsize=256, typed=True) -def _cached_fake_mult(mult_class: Type[ScalarMultiplier], mult_factory: Callable, params: DomainParameters) -> ScalarMultiplier: +def _cached_fake_mult( + mult_class: Type[ScalarMultiplier], mult_factory: Callable, params: DomainParameters +) -> ScalarMultiplier: return fake_mult(mult_class, mult_factory, params) @@ -416,7 +428,12 @@ def multiples_computed( mult_factory: Callable, use_init: bool = False, use_multiply: bool = True, - kind: Union[Literal["all"], Literal["input"], Literal["necessary"], Literal["precomp+necessary"]] = "all", + kind: Union[ + Literal["all"], + Literal["input"], + Literal["necessary"], + Literal["precomp+necessary"], + ] = "all", ) -> set[int]: """ Compute the multiples computed for a given scalar and multiplier (quickly). diff --git a/pyecsca/sca/re/tree.py b/pyecsca/sca/re/tree.py index bab749e..152bb3d 100644 --- a/pyecsca/sca/re/tree.py +++ b/pyecsca/sca/re/tree.py @@ -42,6 +42,7 @@ Here we grow the trees. ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⠾⠛⣿⠙⠛⠶⢦⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀ """ + from math import ceil, log2 from copy import deepcopy from typing import Mapping, Any, Set, List, Tuple, Optional, Dict diff --git a/pyecsca/sca/re/zvp.py b/pyecsca/sca/re/zvp.py index 9c12e56..956d0c2 100644 --- a/pyecsca/sca/re/zvp.py +++ b/pyecsca/sca/re/zvp.py @@ -3,6 +3,7 @@ Provides functionality inspired by the Zero-value point attack [ZVP]_. Implements ZVP point construction from [FFD]_. """ + from functools import lru_cache from typing import List, Set, Tuple, Dict, Type, Callable from public import public @@ -518,7 +519,9 @@ def solve_hard_dcp(xonly_polynomial: Poly, curve: EllipticCurve, k: int) -> Set[ if has_pari: roots = solve_hard_dcp_cypari(xonly_polynomial, curve, k) else: - warnings.warn("Falling-back to slow hard-DCP computation due to missing [pari] (cypari2 and libpari) dependency.") + warnings.warn( + "Falling-back to slow hard-DCP computation due to missing [pari] (cypari2 and libpari) dependency." + ) # Substitute in the mult-by-k map dlog = subs_dlog(xonly_polynomial, k, curve) # Put in concrete curve parameters @@ -582,7 +585,9 @@ def solve_hard_dcp_cypari( @lru_cache(maxsize=256, typed=True) -def _cached_fake_mult(mult_class: Type[ScalarMultiplier], mult_factory: Callable, params: DomainParameters) -> ScalarMultiplier: +def _cached_fake_mult( + mult_class: Type[ScalarMultiplier], mult_factory: Callable, params: DomainParameters +) -> ScalarMultiplier: return fake_mult(mult_class, mult_factory, params) @@ -593,7 +598,7 @@ def addition_chain( mult_class: Type[ScalarMultiplier], mult_factory: Callable, use_init: bool = False, - use_multiply: bool = True + use_multiply: bool = True, ) -> List[Tuple[str, Tuple[int, ...]]]: """ Compute the addition chain for a given scalar and multiplier. diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py index b9d43af..5c751e9 100644 --- a/pyecsca/sca/scope/base.py +++ b/pyecsca/sca/scope/base.py @@ -1,4 +1,5 @@ """Provides an abstract base class for oscilloscopes.""" + from enum import Enum, auto from typing import Tuple, Sequence, Optional diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py index ba11525..24b4ba1 100644 --- a/pyecsca/sca/scope/chipwhisperer.py +++ b/pyecsca/sca/scope/chipwhisperer.py @@ -1,4 +1,5 @@ """Provides an oscilloscope class using the ChipWhisperer-Lite scope.""" + from typing import Optional, Tuple, Sequence, Set import numpy as np diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py index 86332c0..5a34b0e 100644 --- a/pyecsca/sca/scope/picoscope_alt.py +++ b/pyecsca/sca/scope/picoscope_alt.py @@ -1,4 +1,5 @@ """Provides an oscilloscope class for the PicoScope branded oscilloscopes using the alternative `pico-python <https://github.com/colinoflynn/pico-python>`_ bindings.""" + from time import time_ns, sleep import numpy as np from typing import Optional, Tuple, Sequence, Union diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py index ebb11e5..133b9a2 100644 --- a/pyecsca/sca/scope/picoscope_sdk.py +++ b/pyecsca/sca/scope/picoscope_sdk.py @@ -1,4 +1,5 @@ """Provides an oscilloscope class for PicoScope branded oscilloscopes using the official `picosdk-python-wrappers <https://github.com/picotech/picosdk-python-wrappers>`_.""" + import ctypes from math import log2, floor from time import time_ns, sleep @@ -36,10 +37,10 @@ from pyecsca.sca.trace import Trace def adc2volt( - adc: Union[np.ndarray, ctypes.c_int16], - volt_range: float, - adc_minmax: int, - dtype=np.float32, + adc: Union[np.ndarray, ctypes.c_int16], + volt_range: float, + adc_minmax: int, + dtype=np.float32, ) -> Union[np.ndarray, float]: # pragma: no cover """ Convert raw adc values to volts. @@ -58,7 +59,7 @@ def adc2volt( def volt2adc( - volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32 + volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32 ) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover """ Convert volt values to raw adc values. @@ -117,28 +118,39 @@ class PicoScopeSdk(Scope): # pragma: no cover size = ctypes.c_int16() assert_pico_ok( self._dispatch_call( - "GetUnitInfo", self.handle, info, ctypes.c_int16(6), ctypes.byref(size), ctypes.c_uint(3) + "GetUnitInfo", + self.handle, + info, + ctypes.c_int16(6), + ctypes.byref(size), + ctypes.c_uint(3), ) ) self._variant = "".join(chr(i) for i in info[: size.value - 1]) # type: ignore return self._variant def setup_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ) -> Tuple[int, int]: return self.set_frequency(frequency, pretrig, posttrig) def set_channel( - self, channel: str, enabled: bool, coupling: str, range: float, offset: float + self, channel: str, enabled: bool, coupling: str, range: float, offset: float ): if offset != 0.0: raise ValueError("Nonzero offset not supported.") if channel not in self.CHANNELS: - raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}") + raise ValueError( + f"Channel {channel} not in available channels: {self.CHANNELS.keys()}" + ) if coupling not in self.COUPLING: - raise ValueError(f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}") + raise ValueError( + f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}" + ) if range not in self.RANGES: - raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}") + raise ValueError( + f"Range {range} not in available ranges: {self.RANGES.keys()}" + ) assert_pico_ok( self._dispatch_call( "SetChannel", @@ -152,20 +164,20 @@ class PicoScopeSdk(Scope): # pragma: no cover self.ranges[channel] = range def setup_channel( - self, channel: str, coupling: str, range: float, offset: float, enable: bool + self, channel: str, coupling: str, range: float, offset: float, enable: bool ): self.set_channel(channel, enable, coupling, range, offset) def _set_freq( - self, - frequency: int, - pretrig: int, - posttrig: int, - period_bound: float, - timebase_bound: int, - low_freq: int, - high_freq: int, - high_subtract: int, + self, + frequency: int, + pretrig: int, + posttrig: int, + period_bound: float, + timebase_bound: int, + low_freq: int, + high_freq: int, + high_subtract: int, ) -> Tuple[int, int]: samples = pretrig + posttrig period = 1 / frequency @@ -174,7 +186,7 @@ class PicoScopeSdk(Scope): # pragma: no cover actual_frequency = high_freq // (tb - high_subtract) else: tb = min(floor(log2(low_freq) - log2(frequency)), timebase_bound) - actual_frequency = low_freq // 2 ** tb + actual_frequency = low_freq // 2**tb max_samples = ctypes.c_int32() interval_nanoseconds = ctypes.c_int32() assert_pico_ok( @@ -186,7 +198,7 @@ class PicoScopeSdk(Scope): # pragma: no cover ctypes.byref(interval_nanoseconds), 0, ctypes.byref(max_samples), - 0 + 0, ) ) if max_samples.value < samples: @@ -201,29 +213,29 @@ class PicoScopeSdk(Scope): # pragma: no cover return actual_frequency, samples def set_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ) -> Tuple[int, int]: raise NotImplementedError def setup_trigger( - self, - channel: str, - threshold: float, - direction: str, - delay: int, - timeout: int, - enable: bool, + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, ): self.set_trigger(direction, enable, threshold, channel, delay, timeout) def set_trigger( - self, - type: str, - enabled: bool, - value: float, - channel: str, - delay: int, - timeout: int, + self, + type: str, + enabled: bool, + value: float, + channel: str, + delay: int, + timeout: int, ): assert_pico_ok( self._dispatch_call( @@ -304,7 +316,7 @@ class PicoScopeSdk(Scope): # pragma: no cover return True def retrieve( - self, channel: str, type: SampleType, dtype=np.float32 + self, channel: str, type: SampleType, dtype=np.float32 ) -> Optional[Trace]: if self.samples is None: raise ValueError @@ -365,7 +377,6 @@ if isinstance(ps3000, CannotFindPicoSDKError): super().__init__(variant) raise ps3000 - else: # pragma: no cover @public @@ -404,7 +415,9 @@ else: # pragma: no cover COUPLING = {"AC": ps3000.PICO_COUPLING["AC"], "DC": ps3000.PICO_COUPLING["DC"]} def open(self) -> None: - assert_pico_ok(self._dispatch_call("_open_unit")) # , ctypes.byref(self.handle) + assert_pico_ok( + self._dispatch_call("_open_unit") + ) # , ctypes.byref(self.handle) def stop(self): assert_pico_ok(self._dispatch_call("_stop")) @@ -427,10 +440,11 @@ else: # pragma: no cover return self._variant def set_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ): # TODO: fix raise NotImplementedError + if isinstance(ps3000a, CannotFindPicoSDKError): @public @@ -441,7 +455,6 @@ if isinstance(ps3000a, CannotFindPicoSDKError): super().__init__(variant) raise ps3000a - else: # pragma: no cover @public @@ -455,7 +468,7 @@ else: # pragma: no cover "B": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_B"], "C": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_C"], "D": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_D"], - "AUX": ps3000a.PS3000A_CHANNEL["PS3000A_TRIGGER_AUX"] + "AUX": ps3000a.PS3000A_CHANNEL["PS3000A_TRIGGER_AUX"], } RANGES = { @@ -470,31 +483,40 @@ else: # pragma: no cover 5.00: ps3000a.PS3000A_RANGE["PS3000A_5V"], 10.0: ps3000a.PS3000A_RANGE["PS3000A_10V"], 20.0: ps3000a.PS3000A_RANGE["PS3000A_20V"], - 50.0: ps3000a.PS3000A_RANGE["PS3000A_50V"] + 50.0: ps3000a.PS3000A_RANGE["PS3000A_50V"], } MAX_ADC_VALUE = 32767 MIN_ADC_VALUE = -32767 - COUPLING = {"AC": ps3000a.PICO_COUPLING["AC"], "DC": ps3000a.PICO_COUPLING["DC"]} + COUPLING = { + "AC": ps3000a.PICO_COUPLING["AC"], + "DC": ps3000a.PICO_COUPLING["DC"], + } def open(self) -> None: assert_pico_ok(ps3000a.ps3000aOpenUnit(ctypes.byref(self.handle), None)) def set_channel( - self, - channel: str, - enabled: bool, - coupling: str, - range: float, - offset: float, + self, + channel: str, + enabled: bool, + coupling: str, + range: float, + offset: float, ): if channel not in self.CHANNELS: - raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}") + raise ValueError( + f"Channel {channel} not in available channels: {self.CHANNELS.keys()}" + ) if coupling not in self.COUPLING: - raise ValueError(f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}") + raise ValueError( + f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}" + ) if range not in self.RANGES: - raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}") + raise ValueError( + f"Range {range} not in available ranges: {self.RANGES.keys()}" + ) assert_pico_ok( ps3000a.ps3000aSetChannel( self.handle, @@ -502,7 +524,7 @@ else: # pragma: no cover enabled, self.COUPLING[coupling], self.RANGES[range], - offset + offset, ) ) self.ranges[channel] = range @@ -521,15 +543,19 @@ else: # pragma: no cover ctypes.byref(buffer), self.samples, 0, - ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"] + ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"], ) ) self.buffers[channel] = buffer else: assert_pico_ok( ps3000a.ps3000aSetDataBuffer( - self.handle, self.CHANNELS[channel], None, self.samples, 0, - ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"] + self.handle, + self.CHANNELS[channel], + None, + self.samples, + 0, + ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"], ) ) del self.buffers[channel] @@ -539,13 +565,20 @@ else: # pragma: no cover if variant in ("3000A", "3000B"): # This only holds for the 2-channel versions # 4-channel versions have the settings from branch "D". - return self._set_freq(frequency, pretrig, posttrig, 8e-9, 2, 500_000_000, 62_500_000, 2) + return self._set_freq( + frequency, pretrig, posttrig, 8e-9, 2, 500_000_000, 62_500_000, 2 + ) elif variant == "3000": - return self._set_freq(frequency, pretrig, posttrig, 4e-9, 1, 500_000_000, 125_000_000, 1) + return self._set_freq( + frequency, pretrig, posttrig, 4e-9, 1, 500_000_000, 125_000_000, 1 + ) elif variant.endswith("D"): - return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2) + return self._set_freq( + frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2 + ) # TODO: Needs more per-device settings to be generic. + if isinstance(ps4000, CannotFindPicoSDKError): @public @@ -556,7 +589,6 @@ if isinstance(ps4000, CannotFindPicoSDKError): super().__init__(variant) raise ps4000 - else: # pragma: no cover @public @@ -570,7 +602,7 @@ else: # pragma: no cover "B": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_B"], "C": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_C"], "D": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_D"], - "AUX": ps4000.PS4000_CHANNEL["PS4000_TRIGGER_AUX"] + "AUX": ps4000.PS4000_CHANNEL["PS4000_TRIGGER_AUX"], } RANGES = { @@ -611,6 +643,7 @@ else: # pragma: no cover else: raise ValueError(f"Unknown variant: {variant}") + if isinstance(ps5000, CannotFindPicoSDKError): @public @@ -621,7 +654,6 @@ if isinstance(ps5000, CannotFindPicoSDKError): super().__init__(variant) raise ps5000 - else: # pragma: no cover @public @@ -635,7 +667,7 @@ else: # pragma: no cover "B": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_B"], "C": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_C"], "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"], - "AUX": ps5000.PS5000_CHANNEL["PS5000_TRIGGER_AUX"] + "AUX": ps5000.PS5000_CHANNEL["PS5000_TRIGGER_AUX"], } RANGES = { @@ -663,6 +695,7 @@ else: # pragma: no cover frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2 ) + if isinstance(ps6000, CannotFindPicoSDKError): @public @@ -673,7 +706,6 @@ if isinstance(ps6000, CannotFindPicoSDKError): super().__init__(variant) raise ps6000 - else: # pragma: no cover @public @@ -718,19 +750,25 @@ else: # pragma: no cover assert_pico_ok(ps6000.ps6000OpenUnit(ctypes.byref(self.handle), None)) def set_channel( - self, - channel: str, - enabled: bool, - coupling: str, - range: float, - offset: float, + self, + channel: str, + enabled: bool, + coupling: str, + range: float, + offset: float, ): if channel not in self.CHANNELS: - raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}") + raise ValueError( + f"Channel {channel} not in available channels: {self.CHANNELS.keys()}" + ) if coupling not in self.COUPLING: - raise ValueError(f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}") + raise ValueError( + f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}" + ) if range not in self.RANGES: - raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}") + raise ValueError( + f"Range {range} not in available ranges: {self.RANGES.keys()}" + ) assert_pico_ok( ps6000.ps6000SetChannel( self.handle, diff --git a/pyecsca/sca/stacked_traces/combine.py b/pyecsca/sca/stacked_traces/combine.py index 3659613..33bb1a6 100644 --- a/pyecsca/sca/stacked_traces/combine.py +++ b/pyecsca/sca/stacked_traces/combine.py @@ -163,9 +163,11 @@ class GPUTraceManager(BaseTraceManager): # pragma: no cover self._chunk_size = chunk_size else: self._chunk_size = self.chunk_size_from_ratio( - chunk_memory_ratio - if chunk_memory_ratio is not None - else CHUNK_MEMORY_RATIO, + ( + chunk_memory_ratio + if chunk_memory_ratio is not None + else CHUNK_MEMORY_RATIO + ), item_size=self._traces.samples.itemsize, chunk_item_count=self._traces.samples.shape[0], ) @@ -247,7 +249,10 @@ class GPUTraceManager(BaseTraceManager): # pragma: no cover return self._traces.samples.shape def _gpu_combine1D( - self, func: CUDADispatcher, inputs: Optional[List[InputType]] = None, output_count: int = 1 + self, + func: CUDADispatcher, + inputs: Optional[List[InputType]] = None, + output_count: int = 1, ) -> Union[CombinedTrace, List[CombinedTrace]]: inputs = [] if inputs is None else inputs results = self._combine_func(func, inputs, output_count) @@ -431,7 +436,9 @@ def _gpu_average( # pragma: no cover @cuda.jit(cache=True) -def gpu_average(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover +def gpu_average( + samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): # pragma: no cover """ Sample average of stacked traces, sample-wise. @@ -484,7 +491,9 @@ def _gpu_variance( # pragma: no cover @cuda.jit(cache=True) -def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover +def gpu_std_dev( + samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): # pragma: no cover """ Sample standard deviation of stacked traces, sample-wise. @@ -502,7 +511,9 @@ def gpu_std_dev(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]) @cuda.jit(cache=True) -def gpu_variance(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover +def gpu_variance( + samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): # pragma: no cover """ Sample variance of stacked traces, sample-wise. @@ -540,7 +551,9 @@ def gpu_avg_var( # pragma: no cover @cuda.jit(cache=True) -def gpu_add(samples: npt.NDArray[np.number], result: npt.NDArray[np.number]): # pragma: no cover +def gpu_add( + samples: npt.NDArray[np.number], result: npt.NDArray[np.number] +): # pragma: no cover """ Add samples of stacked traces, sample-wise. diff --git a/pyecsca/sca/stacked_traces/stacked_traces.py b/pyecsca/sca/stacked_traces/stacked_traces.py index 09169bd..476375e 100644 --- a/pyecsca/sca/stacked_traces/stacked_traces.py +++ b/pyecsca/sca/stacked_traces/stacked_traces.py @@ -16,16 +16,17 @@ class StackedTraces: # TODO: Split metadata into common and per-trace def __init__( - self, samples: np.ndarray, - meta: Mapping[str, Any] | None = None) -> None: + self, samples: np.ndarray, meta: Mapping[str, Any] | None = None + ) -> None: if meta is None: meta = {} self.meta = meta self.samples = samples @classmethod - def fromarray(cls, traces: Sequence[np.ndarray], - meta: Mapping[str, Any] | None = None) -> 'StackedTraces': + def fromarray( + cls, traces: Sequence[np.ndarray], meta: Mapping[str, Any] | None = None + ) -> "StackedTraces": if meta is None: meta = {} ts = list(traces) @@ -36,7 +37,7 @@ class StackedTraces: return cls(stacked, meta) @classmethod - def fromtraceset(cls, traceset: TraceSet) -> 'StackedTraces': + def fromtraceset(cls, traceset: TraceSet) -> "StackedTraces": traces = [t.samples for t in traceset] return cls.fromarray(traces) diff --git a/pyecsca/sca/target/ISO7816.py b/pyecsca/sca/target/ISO7816.py index 0af78db..950aa05 100644 --- a/pyecsca/sca/target/ISO7816.py +++ b/pyecsca/sca/target/ISO7816.py @@ -1,4 +1,5 @@ """Provides classes for working with ISO7816-4 APDUs and an abstract base class for an ISO7816-4 based target.""" + from abc import abstractmethod, ABC from dataclasses import dataclass from enum import IntEnum @@ -12,12 +13,14 @@ from pyecsca.sca.target.base import Target @public class CardConnectionException(Exception): """Card could not be connected.""" + pass @public class CardProtocol(IntEnum): """Card protocol to use/negotiate.""" + T0 = 0 T1 = 1 @@ -59,35 +62,35 @@ class CommandAPDU: # pragma: no cover if len(self.data) <= 255: # Case 3s return ( - bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) - + self.data + bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + + self.data ) else: # Case 3e return ( - bytes([self.cls, self.ins, self.p1, self.p2, 0]) - + len(self.data).to_bytes(2, "big") - + self.data + bytes([self.cls, self.ins, self.p1, self.p2, 0]) + + len(self.data).to_bytes(2, "big") + + self.data ) else: if len(self.data) <= 255 and self.ne <= 256: # Case 4s return ( - bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) - + self.data - + bytes([self.ne if self.ne != 256 else 0]) + bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + + self.data + + bytes([self.ne if self.ne != 256 else 0]) ) else: # Case 4e return ( - bytes([self.cls, self.ins, self.p1, self.p2, 0]) - + len(self.data).to_bytes(2, "big") - + self.data - + ( - self.ne.to_bytes(2, "big") - if self.ne != 65536 - else bytes([0, 0]) - ) + bytes([self.cls, self.ins, self.p1, self.p2, 0]) + + len(self.data).to_bytes(2, "big") + + self.data + + ( + self.ne.to_bytes(2, "big") + if self.ne != 65536 + else bytes([0, 0]) + ) ) diff --git a/pyecsca/sca/target/PCSC.py b/pyecsca/sca/target/PCSC.py index cd74149..53b04fa 100644 --- a/pyecsca/sca/target/PCSC.py +++ b/pyecsca/sca/target/PCSC.py @@ -1,4 +1,5 @@ """Provides a smartcard target communicating via PC/SC (Personal Computer/Smart Card).""" + from typing import Union, Optional from public import public @@ -7,8 +8,14 @@ from smartcard.System import readers from smartcard.pcsc.PCSCCardConnection import PCSCCardConnection from smartcard.pcsc.PCSCReader import PCSCReader -from pyecsca.sca.target.ISO7816 import ISO7816Target, CommandAPDU, ResponseAPDU, ISO7816, CardProtocol, \ - CardConnectionException +from pyecsca.sca.target.ISO7816 import ( + ISO7816Target, + CommandAPDU, + ResponseAPDU, + ISO7816, + CardProtocol, + CardConnectionException, +) @public diff --git a/pyecsca/sca/target/base.py b/pyecsca/sca/target/base.py index 53106d5..7cb1a75 100644 --- a/pyecsca/sca/target/base.py +++ b/pyecsca/sca/target/base.py @@ -1,4 +1,5 @@ """Provides an abstract base class for targets.""" + from abc import ABC, abstractmethod from public import public diff --git a/pyecsca/sca/target/binary.py b/pyecsca/sca/target/binary.py index 0dd22d5..dcfe704 100644 --- a/pyecsca/sca/target/binary.py +++ b/pyecsca/sca/target/binary.py @@ -1,4 +1,5 @@ """Provides a binary target class which represents a target that is a runnable binary on the host.""" + import subprocess from subprocess import Popen from typing import Optional, Union, List diff --git a/pyecsca/sca/target/chipwhisperer.py b/pyecsca/sca/target/chipwhisperer.py index 28a2cb8..30b4a86 100644 --- a/pyecsca/sca/target/chipwhisperer.py +++ b/pyecsca/sca/target/chipwhisperer.py @@ -5,6 +5,7 @@ ChipWhisperer is a side-channel analysis tool and framework. A ChipWhisperer tar that uses the ChipWhisperer's SimpleSerial communication protocol and is communicated with using ChipWhisperer-Lite or Pro. """ + from time import sleep import chipwhisperer as cw @@ -20,9 +21,7 @@ from pyecsca.sca.target.simpleserial import SimpleSerialTarget class ChipWhispererTarget(Flashable, SimpleSerialTarget): # pragma: no cover """ChipWhisperer-based target, using the SimpleSerial-ish protocol and communicating using ChipWhisperer-Lite/Pro.""" - def __init__( - self, target: SimpleSerial, scope: ScopeTypes, programmer, **kwargs - ): + def __init__(self, target: SimpleSerial, scope: ScopeTypes, programmer, **kwargs): super().__init__() self.target = target self.scope = scope diff --git a/pyecsca/sca/target/ectester.py b/pyecsca/sca/target/ectester.py index ec00502..5ab5d7e 100644 --- a/pyecsca/sca/target/ectester.py +++ b/pyecsca/sca/target/ectester.py @@ -1,4 +1,5 @@ """Provides an `ECTester <https://github.com/crocs-muni/ECTester/>`_ target class.""" + from abc import ABC from binascii import hexlify from enum import IntEnum, IntFlag diff --git a/pyecsca/sca/target/flash.py b/pyecsca/sca/target/flash.py index 95c078d..8ab1883 100644 --- a/pyecsca/sca/target/flash.py +++ b/pyecsca/sca/target/flash.py @@ -1,4 +1,5 @@ """Provides a mix-in class of a flashable target (e.g. one where the code gets flashed to it before running).""" + from public import public from abc import ABC, abstractmethod diff --git a/pyecsca/sca/target/leakage.py b/pyecsca/sca/target/leakage.py index 6487c56..a44f48f 100644 --- a/pyecsca/sca/target/leakage.py +++ b/pyecsca/sca/target/leakage.py @@ -121,9 +121,7 @@ class LeakageTarget(Target): if self.privkey is None: raise ValueError("Missing privkey") with local(DefaultContext()) as ctx: - ecdh = ECDH( - self.mult, self.params, other_pubkey, self.privkey, hash_algo - ) + ecdh = ECDH(self.mult, self.params, other_pubkey, self.privkey, hash_algo) shared_secret = ecdh.perform() return shared_secret, self.get_trace(ctx) diff --git a/pyecsca/sca/target/leia.py b/pyecsca/sca/target/leia.py index 59336b6..5bd19af 100644 --- a/pyecsca/sca/target/leia.py +++ b/pyecsca/sca/target/leia.py @@ -1,10 +1,17 @@ """Provides a smartcard target communicating via the LEIA board in solo mode.""" + from typing import Optional from smartleia import LEIA, create_APDU_from_bytes, T -from pyecsca.sca.target.ISO7816 import ISO7816Target, CommandAPDU, ResponseAPDU, ISO7816, CardProtocol, \ - CardConnectionException +from pyecsca.sca.target.ISO7816 import ( + ISO7816Target, + CommandAPDU, + ResponseAPDU, + ISO7816, + CardProtocol, + CardConnectionException, +) class LEIATarget(ISO7816Target): # pragma: no cover diff --git a/pyecsca/sca/target/serial.py b/pyecsca/sca/target/serial.py index 62dea45..a9dbdac 100644 --- a/pyecsca/sca/target/serial.py +++ b/pyecsca/sca/target/serial.py @@ -1,4 +1,5 @@ """Provides an abstract serial target, that communicates by writing and reading a stream of bytes.""" + from abc import abstractmethod from public import public diff --git a/pyecsca/sca/target/simpleserial.py b/pyecsca/sca/target/simpleserial.py index f13a0a6..d28f697 100644 --- a/pyecsca/sca/target/simpleserial.py +++ b/pyecsca/sca/target/simpleserial.py @@ -1,4 +1,5 @@ """Provides an abstract target class communicating using the `ChipWhisperer's <https://github.com/newaetech/chipwhisperer/>`_ SimpleSerial protocol.""" + from abc import ABC from time import time_ns, sleep from typing import Mapping, Union diff --git a/pyecsca/sca/trace/align.py b/pyecsca/sca/trace/align.py index e1d3669..71c84cb 100644 --- a/pyecsca/sca/trace/align.py +++ b/pyecsca/sca/trace/align.py @@ -1,4 +1,5 @@ """Provides functions for aligning traces in a trace set to a reference trace within it.""" + import numpy as np from copy import deepcopy from fastdtw import fastdtw, dtw diff --git a/pyecsca/sca/trace/combine.py b/pyecsca/sca/trace/combine.py index 9997a83..7d751e2 100644 --- a/pyecsca/sca/trace/combine.py +++ b/pyecsca/sca/trace/combine.py @@ -1,4 +1,5 @@ """Provides functions for combining traces sample-wise.""" + from typing import Callable, Tuple import numpy as np diff --git a/pyecsca/sca/trace/edit.py b/pyecsca/sca/trace/edit.py index 7d50188..7c8db61 100644 --- a/pyecsca/sca/trace/edit.py +++ b/pyecsca/sca/trace/edit.py @@ -1,4 +1,5 @@ """Provides functions for editing traces as if they were tapes you can trim, reverse, etc.""" + import numpy as np from public import public from typing import Union, Tuple, Any, Optional diff --git a/pyecsca/sca/trace/filter.py b/pyecsca/sca/trace/filter.py index 17361b9..8bfe01e 100644 --- a/pyecsca/sca/trace/filter.py +++ b/pyecsca/sca/trace/filter.py @@ -1,4 +1,5 @@ """Provides functions for filtering traces using digital (low/high/band)-pass and bandstop filters.""" + from public import public from scipy.signal import butter, lfilter from typing import Union, Tuple diff --git a/pyecsca/sca/trace/match.py b/pyecsca/sca/trace/match.py index 8a1b9bb..55a93c1 100644 --- a/pyecsca/sca/trace/match.py +++ b/pyecsca/sca/trace/match.py @@ -1,4 +1,5 @@ """Provides functions for matching a pattern within a trace to it.""" + import numpy as np from scipy.signal import find_peaks from public import public diff --git a/pyecsca/sca/trace/plot.py b/pyecsca/sca/trace/plot.py index c12840b..49b6165 100644 --- a/pyecsca/sca/trace/plot.py +++ b/pyecsca/sca/trace/plot.py @@ -1,4 +1,5 @@ """Provides functions for plotting traces.""" + from functools import reduce import holoviews as hv diff --git a/pyecsca/sca/trace/process.py b/pyecsca/sca/trace/process.py index 1851e10..5540e7a 100644 --- a/pyecsca/sca/trace/process.py +++ b/pyecsca/sca/trace/process.py @@ -1,4 +1,5 @@ """Provides functions for sample-wise processing of single traces.""" + from typing import Any import numpy as np @@ -56,7 +57,10 @@ def rolling_mean(trace: Trace, window: int) -> Trace: :param window: :return: """ - return trace.with_samples(convolve(trace.samples, np.ones(window, dtype=trace.samples.dtype), "valid") / window) + return trace.with_samples( + convolve(trace.samples, np.ones(window, dtype=trace.samples.dtype), "valid") + / window + ) @public @@ -130,4 +134,4 @@ def transform(trace: Trace, min_value: Any = 0, max_value: Any = 1) -> Trace: t_max = np.max(trace.samples) t_range = t_max - t_min d = max_value - min_value - return trace.with_samples(((trace.samples - t_min) * (d/t_range)) + min_value) + return trace.with_samples(((trace.samples - t_min) * (d / t_range)) + min_value) diff --git a/pyecsca/sca/trace/sampling.py b/pyecsca/sca/trace/sampling.py index 2c39e4d..701d22a 100644 --- a/pyecsca/sca/trace/sampling.py +++ b/pyecsca/sca/trace/sampling.py @@ -1,4 +1,5 @@ """Provides downsampling functions for traces.""" + from typing import cast import numpy as np diff --git a/pyecsca/sca/trace/test.py b/pyecsca/sca/trace/test.py index c7bbd95..8970e5b 100644 --- a/pyecsca/sca/trace/test.py +++ b/pyecsca/sca/trace/test.py @@ -1,4 +1,5 @@ """Provides statistical tests usable on groups of traces sample-wise (Welch's and Student's t-test, ...).""" + from typing import Sequence, Tuple import numpy as np @@ -58,7 +59,7 @@ def welch_ttest( result = [CombinedTrace(tval)] if dof or p_value: top = (varn_0 + varn_1) ** 2 - bot = (varn_0 ** 2 / (n0 - 1)) + (varn_1 ** 2 / (n1 - 1)) + bot = (varn_0**2 / (n0 - 1)) + (varn_1**2 / (n1 - 1)) df = top / bot del top del bot @@ -88,9 +89,7 @@ def student_ttest( @public -def ks_test( - first_set: Sequence[Trace], second_set: Sequence[Trace] -) -> CombinedTrace: +def ks_test(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> CombinedTrace: """ Perform the Kolmogorov-Smirnov two sample test on equality of distributions sample wise on two sets of traces :paramref:`~.ks_test.first_set` and :paramref:`~.ks_test.second_set`. diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py index c824967..beb9247 100644 --- a/pyecsca/sca/trace/trace.py +++ b/pyecsca/sca/trace/trace.py @@ -1,4 +1,5 @@ """Provides the Trace class.""" + import weakref from typing import Any, Mapping, Sequence, Optional from copy import copy, deepcopy @@ -108,9 +109,11 @@ class Trace: def __deepcopy__(self, memodict): return Trace( - deepcopy(self.samples, memo=memodict) - if isinstance(self.samples, np.ndarray) - else np.array(self.samples), + ( + deepcopy(self.samples, memo=memodict) + if isinstance(self.samples, np.ndarray) + else np.array(self.samples) + ), deepcopy(self.meta, memo=memodict), ) diff --git a/pyecsca/sca/trace_set/base.py b/pyecsca/sca/trace_set/base.py index 0f605e4..83a3a1a 100644 --- a/pyecsca/sca/trace_set/base.py +++ b/pyecsca/sca/trace_set/base.py @@ -1,4 +1,5 @@ """Provides a base traceset class.""" + from pathlib import Path from typing import List, Union, BinaryIO @@ -45,7 +46,5 @@ class TraceSet: raise NotImplementedError def __repr__(self): - args = ", ".join( - [f"{key}={getattr(self, key)!r}" for key in self._keys] - ) + args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys]) return f"{self.__class__.__name__}({args})" diff --git a/pyecsca/sca/trace_set/chipwhisperer.py b/pyecsca/sca/trace_set/chipwhisperer.py index 564e1cd..1a66db8 100644 --- a/pyecsca/sca/trace_set/chipwhisperer.py +++ b/pyecsca/sca/trace_set/chipwhisperer.py @@ -16,7 +16,9 @@ class ChipWhispererTraceSet(TraceSet): """ChipWhisperer trace set (native) format.""" @classmethod - def read(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "ChipWhispererTraceSet": + def read( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "ChipWhispererTraceSet": if isinstance(input, (str, Path)): traces, kws = ChipWhispererTraceSet.__read(input) return ChipWhispererTraceSet(*traces, **kws) diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py index 62690ae..2c2594c 100644 --- a/pyecsca/sca/trace_set/hdf5.py +++ b/pyecsca/sca/trace_set/hdf5.py @@ -4,6 +4,7 @@ Provides a traceset implemented on top of the Hierarchical Data Format (HDF5). This traceset can be loaded "inplace" which means that it is not fully loaded into memory, and only parts of traces that are operated on are in memory. This is very useful for working with huge sets of traces that do not fit in memory. """ + import pickle import uuid from collections.abc import MutableMapping @@ -98,7 +99,9 @@ class HDF5TraceSet(TraceSet): return HDF5TraceSet(*traces, **kws, _ordering=ordering) @classmethod - def inplace(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "HDF5TraceSet": + def inplace( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "HDF5TraceSet": if isinstance(input, (str, Path)): hdf5 = h5py.File(str(input), mode="a") elif isinstance(input, (RawIOBase, BufferedIOBase, BinaryIO)): diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py index 3ed4e2b..71802d0 100644 --- a/pyecsca/sca/trace_set/inspector.py +++ b/pyecsca/sca/trace_set/inspector.py @@ -1,4 +1,5 @@ """Provides a traceset implementation based on Riscure's Inspector traceset format (``.trs``).""" + import struct from enum import IntEnum from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation @@ -148,7 +149,9 @@ class InspectorTraceSet(TraceSet): _scaled: bool = False @classmethod - def read(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "InspectorTraceSet": + def read( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "InspectorTraceSet": """ Read Inspector trace set from file path, bytes or file-like object. @@ -213,7 +216,9 @@ class InspectorTraceSet(TraceSet): return result, tags @classmethod - def inplace(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "InspectorTraceSet": + def inplace( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "InspectorTraceSet": raise NotImplementedError def write(self, output: Union[str, Path, BinaryIO]): diff --git a/pyecsca/sca/trace_set/pickle.py b/pyecsca/sca/trace_set/pickle.py index 0134c63..05bc5d7 100644 --- a/pyecsca/sca/trace_set/pickle.py +++ b/pyecsca/sca/trace_set/pickle.py @@ -3,6 +3,7 @@ Provides a traceset implementation based on Python's pickle format. This implementation of the traceset is thus very generic. """ + import pickle from io import BufferedIOBase, RawIOBase from pathlib import Path @@ -18,7 +19,9 @@ class PickleTraceSet(TraceSet): """Pickle-based traceset format.""" @classmethod - def read(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "PickleTraceSet": + def read( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "PickleTraceSet": if isinstance(input, bytes): return pickle.loads(input) # pickle is OK here, skipcq: BAN-B301 elif isinstance(input, (str, Path)): @@ -29,7 +32,9 @@ class PickleTraceSet(TraceSet): raise TypeError @classmethod - def inplace(cls, input: Union[str, Path, bytes, BinaryIO], **kwargs) -> "PickleTraceSet": + def inplace( + cls, input: Union[str, Path, bytes, BinaryIO], **kwargs + ) -> "PickleTraceSet": raise NotImplementedError def write(self, output: Union[str, Path, BinaryIO]): @@ -42,7 +47,5 @@ class PickleTraceSet(TraceSet): raise TypeError def __repr__(self): - args = ", ".join( - [f"{key}={getattr(self, key)!r}" for key in self._keys] - ) + args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys]) return f"PickleTraceSet(num_traces={len(self)}, {args})" |
