diff options
| author | J08nY | 2020-03-05 17:01:55 +0100 |
|---|---|---|
| committer | J08nY | 2020-03-05 17:01:55 +0100 |
| commit | cd3a7e66257dc0940b9c385d38029c3152234007 (patch) | |
| tree | 711b602f3fc0e6f71cf176a2b1bcee0a59753327 | |
| parent | 79de2bbc6f099b0bd1d79427aabd6113e87c31d3 (diff) | |
| download | pyecsca-cd3a7e66257dc0940b9c385d38029c3152234007.tar.gz pyecsca-cd3a7e66257dc0940b9c385d38029c3152234007.tar.zst pyecsca-cd3a7e66257dc0940b9c385d38029c3152234007.zip | |
| -rw-r--r-- | pyecsca/sca/scope/base.py | 5 | ||||
| -rw-r--r-- | pyecsca/sca/scope/chipwhisperer.py | 6 | ||||
| -rw-r--r-- | pyecsca/sca/scope/picoscope_alt.py | 5 | ||||
| -rw-r--r-- | pyecsca/sca/scope/picoscope_sdk.py | 125 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/hdf5.py | 12 | ||||
| -rw-r--r-- | test/sca/test_traceset.py | 3 |
6 files changed, 92 insertions, 64 deletions
diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py index f88bc35..78a6431 100644 --- a/pyecsca/sca/scope/base.py +++ b/pyecsca/sca/scope/base.py @@ -17,14 +17,15 @@ class Scope(object): """A list of channels available on this scope.""" raise NotImplementedError - def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: """ Setup the frequency and sample count for the measurement. The scope might not support the requested values and will adjust them to get the next best frequency and the largest supported number of samples (or the number of samples requested). :param frequency: The requested frequency in Hz. - :param samples: The requested number of samples to measure. + :param pretrig: The requested number of samples to measure before the trigger. + :param posttrig: The requested number of samples to measure after the trigger. :return: A tuple of the actual set frequency and the actual number of samples. """ raise NotImplementedError diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py index c1a5f48..f8a8b7b 100644 --- a/pyecsca/sca/scope/chipwhisperer.py +++ b/pyecsca/sca/scope/chipwhisperer.py @@ -23,9 +23,11 @@ class ChipWhispererScope(Scope): # pragma: no cover def channels(self) -> Sequence[str]: return [] - def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + if pretrig != 0: + raise ValueError("ChipWhisperer does not support pretrig samples.") self.scope.clock.clkgen_freq = frequency - self.scope.samples = samples + self.scope.samples = posttrig return self.scope.clock.freq_ctr, self.scope.samples def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None: diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py index 52cbf2e..dfa69a9 100644 --- a/pyecsca/sca/scope/picoscope_alt.py +++ b/pyecsca/sca/scope/picoscope_alt.py @@ -16,6 +16,7 @@ class PicoScopeAlt(Scope): # pragma: no cover def __init__(self, ps: Union[PS4000, PS5000, PS6000]): super().__init__() self.ps = ps + self.trig_ratio: float = 0.0 def open(self) -> None: self.ps.open() @@ -24,9 +25,11 @@ class PicoScopeAlt(Scope): # pragma: no cover def channels(self) -> Sequence[str]: return list(self.ps.CHANNELS.keys()) - def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + samples = pretrig + posttrig actual_frequency, max_samples = self.ps.setSamplingFrequency(frequency, samples) if max_samples < samples: + self.trig_ratio = (pretrig / samples) samples = max_samples return actual_frequency, samples diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py index a198367..e1d4a52 100644 --- a/pyecsca/sca/scope/picoscope_sdk.py +++ b/pyecsca/sca/scope/picoscope_sdk.py @@ -23,7 +23,7 @@ def adc2volt(adc: Union[np.ndarray, ctypes.c_int16], def volt2adc(volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int) -> Union[ - np.ndarray, ctypes.c_int16]: # pragma: no cover + np.ndarray, ctypes.c_int16]: # pragma: no cover if isinstance(volt, float): return ctypes.c_int16(int((volt / volt_range) * adc_minmax)) return (volt / volt_range) * adc_minmax @@ -51,6 +51,8 @@ class PicoScopeSdk(Scope): # pragma: no cover super().__init__() self.handle: ctypes.c_int16 = ctypes.c_int16() self.frequency: Optional[int] = None + self.pretrig: Optional[int] = None + self.posttrig: Optional[int] = None self.samples: Optional[int] = None self.timebase: Optional[int] = None self.buffers: MutableMapping = {} @@ -70,8 +72,8 @@ class PicoScopeSdk(Scope): # pragma: no cover ctypes.byref(size), 3)) return "".join(chr(i) for i in info[:size.value]) - def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: - return self.set_frequency(frequency, samples) + def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + return self.set_frequency(frequency, pretrig, posttrig) def set_channel(self, channel: str, enabled: bool, coupling: str, range: float): assert_pico_ok( @@ -82,8 +84,10 @@ class PicoScopeSdk(Scope): # pragma: no cover def setup_channel(self, channel: str, coupling: str, range: float, enable: bool): self.set_channel(channel, enable, coupling, range) - def _set_freq(self, frequency: int, samples: int, period_bound: float, timebase_bound: int, + def _set_freq(self, frequency: int, pretrig: int, posttrig: int, period_bound: float, + timebase_bound: int, low_freq: int, high_freq: int, high_subtract: int) -> Tuple[int, int]: + samples = pretrig + posttrig period = 1 / frequency if low_freq == 0 or period > period_bound: tb = floor(high_freq / frequency + high_subtract) @@ -97,13 +101,17 @@ class PicoScopeSdk(Scope): # pragma: no cover assert_pico_ok(self.__dispatch_call("GetTimebase", self.handle, tb, samples, None, 0, ctypes.byref(max_samples), 0)) if max_samples.value < samples: + pretrig = max_samples.value * (pretrig // samples) + posttrig = max_samples.value - pretrig samples = max_samples.value self.frequency = actual_frequency self.samples = samples + self.pretrig = pretrig + self.posttrig = posttrig self.timebase = tb return actual_frequency, samples - def set_frequency(self, frequency: int, samples: int) -> Tuple[int, int]: + def set_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: raise NotImplementedError def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, @@ -129,20 +137,21 @@ class PicoScopeSdk(Scope): # pragma: no cover del self.buffers[channel] buffer = (ctypes.c_int16 * self.samples)() assert_pico_ok( - self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), self.samples)) + self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], + ctypes.byref(buffer), self.samples)) self.buffers[channel] = buffer else: assert_pico_ok( - self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], - None, self.samples)) + self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], + None, self.samples)) del self.buffers[channel] def arm(self): if self.samples is None or self.timebase is None: raise ValueError assert_pico_ok( - self.__dispatch_call("RunBlock", self.handle, 0, self.samples, self.timebase, 0, + self.__dispatch_call("RunBlock", self.handle, self.pretrig, self.posttrig, + self.timebase, 0, None, 0, None, None)) def capture(self, timeout: Optional[int] = None) -> bool: @@ -183,44 +192,6 @@ class PicoScopeSdk(Scope): # pragma: no cover @public -class PS5000Scope(PicoScopeSdk): # pragma: no cover - MODULE = ps5000 - PREFIX = "ps5000" - CHANNELS = { - "A": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_A"], - "B": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_B"], - "C": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_C"], - "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"] - } - - RANGES = { - 0.01: 0, - 0.02: 1, - 0.05: 2, - 0.10: 3, - 0.20: 4, - 0.50: 5, - 1.00: 6, - 2.00: 7, - 5.00: 8, - 10.0: 9, - 20.0: 10, - 50.0: 11 - } - - MAX_ADC_VALUE = 32512 - MIN_ADC_VALUE = -32512 - - COUPLING = { - "AC": 0, - "DC": 1 - } - - def set_frequency(self, frequency: int, samples: int): - return self._set_freq(frequency, samples, 4e-9, 2, 1_000_000_000, 125_000_000, 2) - - -@public class PS4000Scope(PicoScopeSdk): # pragma: no cover MODULE = ps4000 PREFIX = "ps4000" @@ -255,14 +226,53 @@ class PS4000Scope(PicoScopeSdk): # pragma: no cover "DC": ps4000.PICO_COUPLING["DC"] } - def set_frequency(self, frequency: int, samples: int): + def set_frequency(self, frequency: int, pretrig: int, posttrig: int): variant = self.get_variant() if variant in ("4223", "4224", "4423", "4424"): - return self._set_freq(frequency, samples, 50e-9, 2, 80_000_000, 20_000_000, 1) + return self._set_freq(frequency, pretrig, posttrig, 50e-9, 2, 80_000_000, 20_000_000, 1) elif variant in ("4226", "4227"): - return self._set_freq(frequency, samples, 32e-9, 3, 250_000_000, 31_250_000, 2) + return self._set_freq(frequency, pretrig, posttrig, 32e-9, 3, 250_000_000, 31_250_000, + 2) elif variant == "4262": - return self._set_freq(frequency, samples, 0, 0, 0, 10_000_000, -1) + return self._set_freq(frequency, pretrig, posttrig, 0, 0, 0, 10_000_000, -1) + + +@public +class PS5000Scope(PicoScopeSdk): # pragma: no cover + MODULE = ps5000 + PREFIX = "ps5000" + CHANNELS = { + "A": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_A"], + "B": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_B"], + "C": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_C"], + "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"] + } + + RANGES = { + 0.01: 0, + 0.02: 1, + 0.05: 2, + 0.10: 3, + 0.20: 4, + 0.50: 5, + 1.00: 6, + 2.00: 7, + 5.00: 8, + 10.0: 9, + 20.0: 10, + 50.0: 11 + } + + MAX_ADC_VALUE = 32512 + MIN_ADC_VALUE = -32512 + + COUPLING = { + "AC": 0, + "DC": 1 + } + + def set_frequency(self, frequency: int, pretrig: int, posttrig: int): + return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2) @public @@ -316,15 +326,14 @@ class PS6000Scope(PicoScopeSdk): # pragma: no cover buffer = (ctypes.c_int16 * self.samples)() assert_pico_ok( ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), - self.samples, 0)) + ctypes.byref(buffer), self.samples, 0)) self.buffers[channel] = buffer else: assert_pico_ok( ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], - None, - self.samples, 0)) + None, self.samples, 0)) del self.buffers[channel] - def set_frequency(self, frequency: int, samples: int): - return self._set_freq(frequency, samples, 3.2e-9, 4, 5_000_000_000, 156_250_000, 4) + def set_frequency(self, frequency: int, pretrig: int, posttrig: int): + return self._set_freq(frequency, pretrig, posttrig, 3.2e-9, 4, 5_000_000_000, 156_250_000, + 4) diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py index 0a24393..01f8303 100644 --- a/pyecsca/sca/trace_set/hdf5.py +++ b/pyecsca/sca/trace_set/hdf5.py @@ -54,11 +54,23 @@ class HDF5TraceSet(TraceSet): if str(key) in self._file: del self._file[str(key)] self._file[str(key)] = value.samples + value.samples = self._file[str(key)] if value.meta: for k, v in value.meta.items(): self._file[str(key)].attrs[k] = v super().__setitem__(key, value) + def append(self, value: Trace): + if self._file is not None: + last = sorted(list(map(int, self._file.keys())))[-1] + key = last + 1 + self._file[str(key)] = value.samples + value.samples = self._file[str(key)] + if value.meta: + for k, v in value.meta.items(): + self._file[str(key)].attrs[k] = v + self._traces.append(value) + def save(self): if self._file is not None: self._file.flush() diff --git a/test/sca/test_traceset.py b/test/sca/test_traceset.py index a3ff526..72eac47 100644 --- a/test/sca/test_traceset.py +++ b/test/sca/test_traceset.py @@ -1,6 +1,7 @@ import os.path import shutil import tempfile +from copy import deepcopy from unittest import TestCase import numpy as np @@ -97,7 +98,7 @@ class HDF5TraceSetTests(TestCase): trace_set = HDF5TraceSet.inplace(path) self.assertIsNotNone(trace_set) test_trace = Trace(np.array([6, 7], dtype=np.dtype("i1")), None, None, meta={"thing": "ring"}) - trace_set[0] = test_trace + trace_set[0] = deepcopy(test_trace) trace_set.save() trace_set.close() |
