diff options
| -rw-r--r-- | .travis.yml | 6 | ||||
| -rw-r--r-- | pyecsca/sca/scope/__init__.py | 32 | ||||
| -rw-r--r-- | pyecsca/sca/scope/base.py | 86 | ||||
| -rw-r--r-- | pyecsca/sca/scope/chipwhisperer.py | 44 | ||||
| -rw-r--r-- | pyecsca/sca/scope/picoscope_alt.py | 57 | ||||
| -rw-r--r-- | pyecsca/sca/scope/picoscope_sdk.py (renamed from pyecsca/sca/scope/picoscope.py) | 116 | ||||
| -rw-r--r-- | pyecsca/sca/trace/test.py | 1 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/inspector.py | 6 | ||||
| -rw-r--r-- | setup.py | 3 |
9 files changed, 302 insertions, 49 deletions
diff --git a/.travis.yml b/.travis.yml index b171891..0bac320 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,10 @@ addons: - libps6000 before_install: + - git clone https://github.com/colinoflynn/pico-python + - cd pico-python + - python setup.py install + - cd .. - git clone https://github.com/picotech/picosdk-python-wrappers - cd picosdk-python-wrappers - python setup.py install @@ -22,7 +26,7 @@ before_install: install: - pip install codecov - - pip install -e ".[picoscope, chipwhisperer, test, typecheck]" + - pip install -e ".[picoscope_sdk, picoscope_alt, chipwhisperer, test, typecheck]" script: - make -i typecheck diff --git a/pyecsca/sca/scope/__init__.py b/pyecsca/sca/scope/__init__.py index 25cd0ca..d11ed82 100644 --- a/pyecsca/sca/scope/__init__.py +++ b/pyecsca/sca/scope/__init__.py @@ -1,11 +1,39 @@ +from typing import Type + +from .base import * + +has_picoscope = False +has_picosdk = False +has_chipwhisperer = False + +try: + import picoscope + + has_picoscope = True +except ImportError: + pass + try: import picosdk - from .picoscope import * + + has_picosdk = True except ImportError: pass try: import chipwhisperer - from .chipwhisperer import * + + has_chipwhisperer = True except ImportError: pass + +PicoScope: Type[Scope] +if has_picoscope: + from .picoscope_alt import * + PicoScope = PicoScopeAlt +elif has_picosdk: + from .picoscope_sdk import * + PicoScope = PicoScopeSdk + +if has_chipwhisperer: + from .chipwhisperer import * diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py index 3bade40..2979a7d 100644 --- a/pyecsca/sca/scope/base.py +++ b/pyecsca/sca/scope/base.py @@ -1,5 +1,89 @@ +from typing import Tuple, Sequence, Optional +import numpy as np +from public import public + +@public class Scope(object): """An oscilloscope.""" - pass + + def open(self) -> None: + """Open the connection to the scope.""" + raise NotImplementedError + + @property + def channels(self) -> Sequence[str]: + """A list of channels available on this scope.""" + raise NotImplementedError + + def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + """ + Setup the frequency and sample count for the measurement. The scope might not support + the requested values and will adjust them to get the next best frequency and the largest + supported number of samples (or the number of samples requested). + + :param frequency: The requested frequency in Hz. + :param samples: The requested number of samples to measure. + :return: A tuple of the actual set frequency and the actual number of samples. + """ + raise NotImplementedError + + def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None: + """ + Setup a channel to use the coupling method and measure the given voltage range. + + :param channel: The channel to measure. + :param coupling: The coupling method ("AC" or "DC). + :param range: The voltage range to measure. + :param enable: Whether to enable or disable the channel. + """ + raise NotImplementedError + + def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, + timeout: int, enable: bool) -> None: + """ + Setup a trigger on a particular `channel`, the channel has to be set up and enabled. + The trigger will fire based on the `threshold` and `direction`, if enabled, the trigger + will capture after `delay` ticks pass. If trigger conditions do not hold it will fire + automatically after `timeout` milliseconds. + + :param channel: The channel to trigger on. + :param threshold: The value to trigger on. + :param direction: The direction to trigger on ("above", "below", "rising", "falling"). + :param delay: The delay for capture after trigger (clock ticks). + :param timeout: The timeout in milliseconds. + :param enable: Whether to enable or disable this trigger. + """ + raise NotImplementedError + + def setup_capture(self, channel: str, enable: bool) -> None: + """ + Setup the capture for a channel. + + :param channel: The channel to capture. + :param enable: Whether to enable or disable capture. + """ + raise NotImplementedError + + def arm(self) -> None: + """Arm the scope, it will listen for the trigger after this point.""" + raise NotImplementedError + + def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]: + """ + Wait for the trace to capture, this will block until the scope has a trace. + + :param channel: The channel to retrieve the trace from. + :param timeout: A time in milliseconds to wait for the trace, returns `None` if it runs out. + :return: The trace, or if timed out, None. + """ + raise NotImplementedError + + def stop(self) -> None: + """Stop the capture, if any.""" + raise NotImplementedError + + def close(self) -> None: + """Close the connection to the scope.""" + raise NotImplementedError diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py index 2eb2747..60fe7cf 100644 --- a/pyecsca/sca/scope/chipwhisperer.py +++ b/pyecsca/sca/scope/chipwhisperer.py @@ -1,5 +1,47 @@ +from typing import Optional, Tuple, Sequence + +import numpy as np +from chipwhisperer.capture.scopes.base import ScopeTemplate +from public import public + from .base import Scope +@public class ChipWhispererScope(Scope): """A ChipWhisperer based scope.""" - pass
\ No newline at end of file + + def __init__(self, scope: ScopeTemplate): + self.scope = scope + + def open(self) -> None: + self.scope.con() + + @property + def channels(self) -> Sequence[str]: + return ["tio1", "tio2", "tio3", "tio4"] + + def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + pass + + def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None: + pass + + def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, + timeout: int, enable: bool) -> None: + pass + + def setup_capture(self, channel: str, enable: bool) -> None: + pass + + def arm(self) -> None: + pass + + def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]: + pass + + def stop(self) -> None: + pass + + def close(self) -> None: + pass + diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py new file mode 100644 index 0000000..69b705a --- /dev/null +++ b/pyecsca/sca/scope/picoscope_alt.py @@ -0,0 +1,57 @@ +from time import time_ns, sleep +from typing import Optional, Tuple, Sequence, Union + +import numpy as np +from picoscope.ps4000 import PS4000 +from picoscope.ps6000 import PS6000 +from public import public + +from .base import Scope + + +@public +class PicoScopeAlt(Scope): + + def __init__(self, ps: Union[PS4000, PS6000]): + self.ps = ps + + def open(self) -> None: + self.ps.open() + + @property + def channels(self) -> Sequence[str]: + return list(self.ps.CHANNELS.keys()) + + def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + actual_frequency, max_samples = self.ps.setSamplingFrequency(frequency, samples) + if max_samples < samples: + samples = max_samples + return actual_frequency, samples + + def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None: + self.ps.setChannel(channel, coupling, range, 0.0, enable) + + def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, + timeout: int, enable: bool) -> None: + self.ps.setSimpleTrigger(channel, threshold, direction, delay, timeout, enable) + + def setup_capture(self, channel: str, enable: bool) -> None: + pass + + def arm(self) -> None: + self.ps.runBlock() + + def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]: + start = time_ns() + while not self.ps.isReady(): + sleep(0.001) + if timeout is not None and (time_ns() - start) / 1e6 >= timeout: + return None + + return self.ps.getDataV(channel) + + def stop(self) -> None: + self.ps.stop() + + def close(self) -> None: + self.ps.close() diff --git a/pyecsca/sca/scope/picoscope.py b/pyecsca/sca/scope/picoscope_sdk.py index 8497c64..d44d8c8 100644 --- a/pyecsca/sca/scope/picoscope.py +++ b/pyecsca/sca/scope/picoscope_sdk.py @@ -1,7 +1,7 @@ import ctypes -from enum import IntEnum +from time import time_ns, sleep from math import log2, floor -from typing import Mapping, Optional, MutableMapping, Union +from typing import Mapping, Optional, MutableMapping, Union, Tuple import numpy as np from picosdk.functions import assert_pico_ok @@ -13,13 +13,6 @@ from public import public from .base import Scope -class TriggerType(IntEnum): # pragma: no cover - ABOVE = 1 - BELOW = 2 - RISING = 3 - FALLING = 4 - - def adc2volt(adc: Union[np.ndarray, ctypes.c_int16], volt_range: float, adc_minmax: int) -> Union[np.ndarray, float]: # pragma: no cover if isinstance(adc, ctypes.c_int16): @@ -35,7 +28,8 @@ def volt2adc(volt: Union[np.ndarray, float], return (volt / volt_range) * adc_minmax -class PicoScope(Scope): # pragma: no cover +@public +class PicoScopeSdk(Scope): # pragma: no cover """A PicoScope based scope.""" MODULE: Library PREFIX: str @@ -45,18 +39,28 @@ class PicoScope(Scope): # pragma: no cover MIN_ADC_VALUE: int COUPLING: Mapping TIME_UNITS: Mapping + TRIGGERS: Mapping = { + "above": 0, + "below": 1, + "rising": 2, + "falling": 3 + } def __init__(self): self.handle: ctypes.c_int16 = ctypes.c_int16() - self.frequency: Optional[float] = None + self.frequency: Optional[int] = None self.samples: Optional[int] = None self.timebase: Optional[int] = None self.buffers: MutableMapping = {} self.ranges: MutableMapping = {} - def open(self): + def open(self) -> None: assert_pico_ok(self.__dispatch_call("OpenUnit", ctypes.byref(self.handle))) + @property + def channels(self): + return list(self.CHANNELS.keys()) + def get_variant(self): info = (ctypes.c_int8 * 6)() size = ctypes.c_int16() @@ -64,6 +68,9 @@ class PicoScope(Scope): # pragma: no cover ctypes.byref(size), 3)) return "".join(chr(i) for i in info[:size]) + def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + return self.set_frequency(frequency, samples) + # channel setup (ranges, coupling, which channel is scope vs trigger) def set_channel(self, channel: str, enabled: bool, coupling: str, range: float): assert_pico_ok( @@ -71,17 +78,20 @@ class PicoScope(Scope): # pragma: no cover self.COUPLING[coupling], self.RANGES[range])) self.ranges[channel] = range + def setup_channel(self, channel: str, coupling: str, range: float, enable: bool): + self.set_channel(channel, enable, coupling, range) + def _set_freq(self, frequency: int, samples: int, period_bound: float, timebase_bound: int, - low_freq: int, high_freq: int, high_subtract: int): + low_freq: int, high_freq: int, high_subtract: int) -> Tuple[int, int]: period = 1 / frequency if low_freq == 0 or period > period_bound: tb = floor(high_freq / frequency + high_subtract) - actual_frequency = high_freq / (tb - high_subtract) + actual_frequency = high_freq // (tb - high_subtract) else: tb = floor(log2(low_freq) - log2(frequency)) if tb > timebase_bound: tb = timebase_bound - actual_frequency = low_freq / 2 ** tb + actual_frequency = low_freq // 2 ** tb max_samples = ctypes.c_int32() assert_pico_ok(self.__dispatch_call("GetTimebase", self.handle, tb, samples, None, 0, ctypes.byref(max_samples), 0)) @@ -93,44 +103,61 @@ class PicoScope(Scope): # pragma: no cover return actual_frequency, samples # frequency setup - def set_frequency(self, frequency: int, samples: int): + def set_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: raise NotImplementedError + def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, + timeout: int, enable: bool): + self.set_trigger(direction, enable, threshold, channel, delay, timeout) + # triggering setup - def set_trigger(self, type: TriggerType, enabled: bool, value: float, channel: str, - range: float, delay: int, timeout: int): + def set_trigger(self, type: str, enabled: bool, value: float, channel: str, + delay: int, timeout: int): assert_pico_ok( self.__dispatch_call("SetSimpleTrigger", self.handle, enabled, self.CHANNELS[channel], - volt2adc(value, range, self.MAX_ADC_VALUE), - type.value, delay, timeout)) + volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE), + self.TRIGGERS[type], delay, timeout)) + + def setup_capture(self, channel: str, enable: bool): + self.set_buffer(channel, enable) # buffer setup - def set_buffer(self, channel: str): + def set_buffer(self, channel: str, enable: bool): if self.samples is None: raise ValueError - buffer = (ctypes.c_int16 * self.samples)() - self.buffers[channel] = buffer - assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), self.samples)) + if enable: + if channel in self.buffers: + del self.buffers[channel] + buffer = (ctypes.c_int16 * self.samples)() + assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], + ctypes.byref(buffer), self.samples)) + self.buffers[channel] = buffer + else: + assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], + None, self.samples)) + del self.buffers[channel] - # collection - def collect(self): + def arm(self): if self.samples is None or self.timebase is None: raise ValueError assert_pico_ok( self.__dispatch_call("RunBlock", self.handle, 0, self.samples, self.timebase, 0, None, 0, None, None)) + + def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]: + start = time_ns() + if self.samples is None: + raise ValueError ready = ctypes.c_int16(0) check = ctypes.c_int16(0) while ready.value == check.value: + sleep(0.001) assert_pico_ok(self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready))) + if timeout is not None and (time_ns() - start) / 1e6 >= timeout: + return None - # get the data - def retrieve(self, channel: str) -> np.ndarray: - if self.samples is None: - raise ValueError actual_samples = ctypes.c_int32(self.samples) overflow = ctypes.c_int16() assert_pico_ok( @@ -155,7 +182,7 @@ class PicoScope(Scope): # pragma: no cover @public -class PS4000Scope(PicoScope): # pragma: no cover +class PS4000Scope(PicoScopeSdk): # pragma: no cover MODULE = ps4000 PREFIX = "ps4000" CHANNELS = { @@ -200,7 +227,7 @@ class PS4000Scope(PicoScope): # pragma: no cover @public -class PS6000Scope(PicoScope): # pragma: no cover +class PS6000Scope(PicoScopeSdk): # pragma: no cover MODULE = ps6000 PREFIX = "ps6000" CHANNELS = { @@ -241,15 +268,24 @@ class PS6000Scope(PicoScope): # pragma: no cover self.COUPLING[coupling], self.RANGES[range], 0, ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"])) - def set_buffer(self, channel: str): + def set_buffer(self, channel: str, enable: bool): if self.samples is None: raise ValueError - buffer = (ctypes.c_int16 * self.samples)() - self.buffers[channel] = buffer - assert_pico_ok( - ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), - self.samples, 0)) + if enable: + if channel in self.buffers: + del self.buffers[channel] + buffer = (ctypes.c_int16 * self.samples)() + assert_pico_ok( + ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], + ctypes.byref(buffer), + self.samples, 0)) + self.buffers[channel] = buffer + else: + assert_pico_ok( + ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], + None, + self.samples, 0)) + del self.buffers[channel] def set_frequency(self, frequency: int, samples: int): return self._set_freq(frequency, samples, 3.2e-9, 4, 5_000_000_000, 156_250_000, 4) diff --git a/pyecsca/sca/trace/test.py b/pyecsca/sca/trace/test.py index e192048..92cda68 100644 --- a/pyecsca/sca/trace/test.py +++ b/pyecsca/sca/trace/test.py @@ -49,6 +49,7 @@ def ks_test(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional """ Perform the Kolmogorov-Smirnov two sample test on equality of distributions sample wise on two sets of traces `first_set` and `second_set`. + :param first_set: :param second_set: :return: Kolmogorov-Smirnov test statistic values (samplewise) diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py index 4964f85..f9475c0 100644 --- a/pyecsca/sca/trace_set/inspector.py +++ b/pyecsca/sca/trace_set/inspector.py @@ -138,7 +138,7 @@ class InspectorTraceSet(TraceSet): } _set_tags: set = set() - def __init__(self, input: Optional[Union[str, Path, bytes, BinaryIO]] = None, + def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None, keep_raw_traces: bool = True): """ Read Inspector trace set from file path, bytes or file-like object. @@ -151,8 +151,8 @@ class InspectorTraceSet(TraceSet): with BytesIO(input) as f: traces = self.__read(f) elif isinstance(input, (Path, str)): - with open(input, "rb") as f: - traces = self.__read(f) + with open(input, "rb") as r: + traces = self.__read(r) elif isinstance(input, (RawIOBase, BufferedIOBase)): traces = self.__read(input) elif input is not None: @@ -34,7 +34,8 @@ setup( "asn1crypto" ], extras_require={ - "picoscope": ["picosdk"], + "picoscope_sdk": ["picosdk"], + "picoscope_alt": ["picoscope"], "chipwhisperer": ["chipwhisperer"], "typecheck": ["mypy"], "test": ["nose2", "parameterized", "green", "coverage"] |
