diff options
| author | J08nY | 2023-02-16 18:29:48 +0100 |
|---|---|---|
| committer | J08nY | 2023-02-16 18:29:48 +0100 |
| commit | ccb2aff2c36785b3cbc84138dc030453bb35657e (patch) | |
| tree | 2593112bb55a8b5132e13313f7ee06b5dfdd89d4 | |
| parent | 6db49a89270b4bf2b062bf1384a425d14ee63f7c (diff) | |
| download | pyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.tar.gz pyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.tar.zst pyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.zip | |
Add support for ps3000A API.
| -rw-r--r-- | pyecsca/sca/scope/picoscope_sdk.py | 245 |
1 files changed, 186 insertions, 59 deletions
diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py index 8728946..e881e49 100644 --- a/pyecsca/sca/scope/picoscope_sdk.py +++ b/pyecsca/sca/scope/picoscope_sdk.py @@ -14,6 +14,10 @@ try: except CannotFindPicoSDKError as exc: ps3000 = exc try: + from picosdk.ps3000a import ps3000a +except CannotFindPicoSDKError as exc: + ps3000a = exc +try: from picosdk.ps4000 import ps4000 except CannotFindPicoSDKError as exc: ps4000 = exc @@ -32,10 +36,10 @@ from ..trace import Trace def adc2volt( - adc: Union[np.ndarray, ctypes.c_int16], - volt_range: float, - adc_minmax: int, - dtype=np.float32, + adc: Union[np.ndarray, ctypes.c_int16], + volt_range: float, + adc_minmax: int, + dtype=np.float32, ) -> Union[np.ndarray, float]: # pragma: no cover """ Convert raw adc values to volts. @@ -54,7 +58,7 @@ def adc2volt( def volt2adc( - volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32 + volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32 ) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover """ Convert volt values to raw adc values. @@ -100,7 +104,7 @@ class PicoScopeSdk(Scope): # pragma: no cover self._variant = variant def open(self) -> None: - assert_pico_ok(self.__dispatch_call("OpenUnit", ctypes.byref(self.handle))) + assert_pico_ok(self._dispatch_call("OpenUnit", ctypes.byref(self.handle))) @property def channels(self): @@ -112,7 +116,7 @@ class PicoScopeSdk(Scope): # pragma: no cover info = ctypes.create_string_buffer(6) size = ctypes.c_int16() assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "GetUnitInfo", self.handle, info, ctypes.c_int16(6), ctypes.byref(size), ctypes.c_uint(3) ) ) @@ -120,12 +124,12 @@ class PicoScopeSdk(Scope): # pragma: no cover return self._variant def setup_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ) -> Tuple[int, int]: return self.set_frequency(frequency, pretrig, posttrig) def set_channel( - self, channel: str, enabled: bool, coupling: str, range: float, offset: float + self, channel: str, enabled: bool, coupling: str, range: float, offset: float ): if offset != 0.0: raise ValueError("Nonzero offset not supported.") @@ -136,7 +140,7 @@ class PicoScopeSdk(Scope): # pragma: no cover if range not in self.RANGES: raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}") assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "SetChannel", self.handle, self.CHANNELS[channel], @@ -148,20 +152,20 @@ class PicoScopeSdk(Scope): # pragma: no cover self.ranges[channel] = range def setup_channel( - self, channel: str, coupling: str, range: float, offset: float, enable: bool + self, channel: str, coupling: str, range: float, offset: float, enable: bool ): self.set_channel(channel, enable, coupling, range, offset) def _set_freq( - self, - frequency: int, - pretrig: int, - posttrig: int, - period_bound: float, - timebase_bound: int, - low_freq: int, - high_freq: int, - high_subtract: int, + self, + frequency: int, + pretrig: int, + posttrig: int, + period_bound: float, + timebase_bound: int, + low_freq: int, + high_freq: int, + high_subtract: int, ) -> Tuple[int, int]: samples = pretrig + posttrig period = 1 / frequency @@ -172,16 +176,17 @@ class PicoScopeSdk(Scope): # pragma: no cover tb = min(floor(log2(low_freq) - log2(frequency)), timebase_bound) actual_frequency = low_freq // 2 ** tb max_samples = ctypes.c_int32() + interval_nanoseconds = ctypes.c_int32() assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "GetTimebase", self.handle, tb, samples, - None, + ctypes.byref(interval_nanoseconds), 0, ctypes.byref(max_samples), - 0, + 0 ) ) if max_samples.value < samples: @@ -196,32 +201,32 @@ class PicoScopeSdk(Scope): # pragma: no cover return actual_frequency, samples def set_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ) -> Tuple[int, int]: raise NotImplementedError def setup_trigger( - self, - channel: str, - threshold: float, - direction: str, - delay: int, - timeout: int, - enable: bool, + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, ): self.set_trigger(direction, enable, threshold, channel, delay, timeout) def set_trigger( - self, - type: str, - enabled: bool, - value: float, - channel: str, - delay: int, - timeout: int, + self, + type: str, + enabled: bool, + value: float, + channel: str, + delay: int, + timeout: int, ): assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "SetSimpleTrigger", self.handle, enabled, @@ -244,7 +249,7 @@ class PicoScopeSdk(Scope): # pragma: no cover del self.buffers[channel] buffer = (ctypes.c_int16 * self.samples)() assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "SetDataBuffer", self.handle, self.CHANNELS[channel], @@ -255,7 +260,7 @@ class PicoScopeSdk(Scope): # pragma: no cover self.buffers[channel] = buffer else: assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "SetDataBuffer", self.handle, self.CHANNELS[channel], @@ -269,7 +274,7 @@ class PicoScopeSdk(Scope): # pragma: no cover if self.samples is None or self.timebase is None: raise ValueError assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "RunBlock", self.handle, self.pretrig, @@ -292,21 +297,21 @@ class PicoScopeSdk(Scope): # pragma: no cover while ready.value == check.value: sleep(0.001) assert_pico_ok( - self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready)) + self._dispatch_call("IsReady", self.handle, ctypes.byref(ready)) ) if timeout is not None and (time_ns() - start) / 1e6 >= timeout: return False return True def retrieve( - self, channel: str, type: SampleType, dtype=np.float32 + self, channel: str, type: SampleType, dtype=np.float32 ) -> Optional[Trace]: if self.samples is None: raise ValueError actual_samples = ctypes.c_int32(self.samples) overflow = ctypes.c_int16() assert_pico_ok( - self.__dispatch_call( + self._dispatch_call( "GetValues", self.handle, 0, @@ -335,12 +340,12 @@ class PicoScopeSdk(Scope): # pragma: no cover ) def stop(self): - assert_pico_ok(self.__dispatch_call("Stop")) + assert_pico_ok(self._dispatch_call("Stop")) def close(self): - assert_pico_ok(self.__dispatch_call("CloseUnit", self.handle)) + assert_pico_ok(self._dispatch_call("CloseUnit", self.handle)) - def __dispatch_call(self, name, *args, **kwargs): + def _dispatch_call(self, name, *args, **kwargs): """ A unit-generic call of a picoscope SDK method. """ @@ -398,24 +403,147 @@ else: # pragma: no cover COUPLING = {"AC": ps3000.PICO_COUPLING["AC"], "DC": ps3000.PICO_COUPLING["DC"]} + def open(self) -> None: + assert_pico_ok(self._dispatch_call("_open_unit")) # , ctypes.byref(self.handle) + + def stop(self): + assert_pico_ok(self._dispatch_call("_stop")) + + def close(self): + assert_pico_ok(self._dispatch_call("_close_unit", self.handle)) + def get_variant(self): if self._variant is not None: return self._variant info = ctypes.create_string_buffer(6) size = ctypes.c_int16(6) + info_variant = ctypes.c_int16(3) assert_pico_ok( - self.__dispatch_call( - "GetUnitInfo", self.handle, ctypes.byref(info), size, ctypes.c_int16(3) + self._dispatch_call( + "_get_unit_info", self.handle, info, size, info_variant ) ) self._variant = "".join(chr(i) for i in info[: size.value - 1]) # type: ignore return self._variant def set_frequency( - self, frequency: int, pretrig: int, posttrig: int + self, frequency: int, pretrig: int, posttrig: int ): # TODO: fix raise NotImplementedError +if isinstance(ps3000a, CannotFindPicoSDKError): + + @public + class PS3000aScope(PicoScopeSdk): # noqa, pragma: no cover + """PicoScope 3000 series (A API) oscilloscope is not available (Install `libps3000a`).""" + + def __init__(self, variant: Optional[str] = None): + super().__init__(variant) + raise ps3000a + + +else: # pragma: no cover + + @public + class PS3000aScope(PicoScopeSdk): # type: ignore + """PicoScope 3000 series oscilloscope (A API).""" + + MODULE = ps3000a + PREFIX = "ps3000a" + CHANNELS = { + "A": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_A"], + "B": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_B"], + "C": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_C"], + "D": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_D"], + } + + RANGES = { + 0.01: ps3000a.PS3000A_RANGE["PS3000A_10MV"], + 0.02: ps3000a.PS3000A_RANGE["PS3000A_20MV"], + 0.05: ps3000a.PS3000A_RANGE["PS3000A_50MV"], + 0.10: ps3000a.PS3000A_RANGE["PS3000A_100MV"], + 0.20: ps3000a.PS3000A_RANGE["PS3000A_200MV"], + 0.50: ps3000a.PS3000A_RANGE["PS3000A_500MV"], + 1.00: ps3000a.PS3000A_RANGE["PS3000A_1V"], + 2.00: ps3000a.PS3000A_RANGE["PS3000A_2V"], + 5.00: ps3000a.PS3000A_RANGE["PS3000A_5V"], + 10.0: ps3000a.PS3000A_RANGE["PS3000A_10V"], + 20.0: ps3000a.PS3000A_RANGE["PS3000A_20V"], + 50.0: ps3000a.PS3000A_RANGE["PS3000A_50V"] + } + + MAX_ADC_VALUE = 32767 + MIN_ADC_VALUE = -32767 + + COUPLING = {"AC": ps3000a.PICO_COUPLING["AC"], "DC": ps3000a.PICO_COUPLING["DC"]} + + def open(self) -> None: + assert_pico_ok(ps3000a.ps3000aOpenUnit(ctypes.byref(self.handle), None)) + + def set_channel( + self, + channel: str, + enabled: bool, + coupling: str, + range: float, + offset: float, + ): + if channel not in self.CHANNELS: + raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}") + if coupling not in self.COUPLING: + raise ValueError(f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}") + if range not in self.RANGES: + raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}") + assert_pico_ok( + ps3000a.ps3000aSetChannel( + self.handle, + self.CHANNELS[channel], + enabled, + self.COUPLING[coupling], + self.RANGES[range], + offset + ) + ) + self.ranges[channel] = range + + def set_buffer(self, channel: str, enable: bool): + if self.samples is None: + raise ValueError + if enable: + if channel in self.buffers: + del self.buffers[channel] + buffer = (ctypes.c_int16 * self.samples)() + assert_pico_ok( + ps3000a.ps3000aSetDataBuffer( + self.handle, + self.CHANNELS[channel], + ctypes.byref(buffer), + self.samples, + 0, + ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"] + ) + ) + self.buffers[channel] = buffer + else: + assert_pico_ok( + ps3000a.ps3000aSetDataBuffer( + self.handle, self.CHANNELS[channel], None, self.samples, 0, + ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"] + ) + ) + del self.buffers[channel] + + def set_frequency(self, frequency: int, pretrig: int, posttrig: int): + variant = self.get_variant() + if variant in ("3000A", "3000B"): + # This only holds for the 2-channel versions + # 4-channel versions have the settings from branch "D". + return self._set_freq(frequency, pretrig, posttrig, 8e-9, 2, 500_000_000, 62_500_000, 2) + elif variant == "3000": + return self._set_freq(frequency, pretrig, posttrig, 4e-9, 1, 500_000_000, 125_000_000, 1) + elif variant.endswith("D"): + return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2) + # TODO: Needs more per-device settings to be generic. if isinstance(ps4000, CannotFindPicoSDKError): @@ -481,7 +609,6 @@ else: # pragma: no cover else: raise ValueError(f"Unknown variant: {variant}") - if isinstance(ps5000, CannotFindPicoSDKError): @public @@ -533,7 +660,6 @@ else: # pragma: no cover frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2 ) - if isinstance(ps6000, CannotFindPicoSDKError): @public @@ -588,12 +714,12 @@ else: # pragma: no cover assert_pico_ok(ps6000.ps6000OpenUnit(ctypes.byref(self.handle), None)) def set_channel( - self, - channel: str, - enabled: bool, - coupling: str, - range: float, - offset: float, + self, + channel: str, + enabled: bool, + coupling: str, + range: float, + offset: float, ): if channel not in self.CHANNELS: raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}") @@ -612,6 +738,7 @@ else: # pragma: no cover ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"], ) ) + self.ranges[channel] = range def set_buffer(self, channel: str, enable: bool): if self.samples is None: |
