aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJ08nY2023-02-16 18:29:48 +0100
committerJ08nY2023-02-16 18:29:48 +0100
commitccb2aff2c36785b3cbc84138dc030453bb35657e (patch)
tree2593112bb55a8b5132e13313f7ee06b5dfdd89d4
parent6db49a89270b4bf2b062bf1384a425d14ee63f7c (diff)
downloadpyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.tar.gz
pyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.tar.zst
pyecsca-ccb2aff2c36785b3cbc84138dc030453bb35657e.zip
Add support for ps3000A API.
-rw-r--r--pyecsca/sca/scope/picoscope_sdk.py245
1 files changed, 186 insertions, 59 deletions
diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py
index 8728946..e881e49 100644
--- a/pyecsca/sca/scope/picoscope_sdk.py
+++ b/pyecsca/sca/scope/picoscope_sdk.py
@@ -14,6 +14,10 @@ try:
except CannotFindPicoSDKError as exc:
ps3000 = exc
try:
+ from picosdk.ps3000a import ps3000a
+except CannotFindPicoSDKError as exc:
+ ps3000a = exc
+try:
from picosdk.ps4000 import ps4000
except CannotFindPicoSDKError as exc:
ps4000 = exc
@@ -32,10 +36,10 @@ from ..trace import Trace
def adc2volt(
- adc: Union[np.ndarray, ctypes.c_int16],
- volt_range: float,
- adc_minmax: int,
- dtype=np.float32,
+ adc: Union[np.ndarray, ctypes.c_int16],
+ volt_range: float,
+ adc_minmax: int,
+ dtype=np.float32,
) -> Union[np.ndarray, float]: # pragma: no cover
"""
Convert raw adc values to volts.
@@ -54,7 +58,7 @@ def adc2volt(
def volt2adc(
- volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32
+ volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32
) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover
"""
Convert volt values to raw adc values.
@@ -100,7 +104,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
self._variant = variant
def open(self) -> None:
- assert_pico_ok(self.__dispatch_call("OpenUnit", ctypes.byref(self.handle)))
+ assert_pico_ok(self._dispatch_call("OpenUnit", ctypes.byref(self.handle)))
@property
def channels(self):
@@ -112,7 +116,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
info = ctypes.create_string_buffer(6)
size = ctypes.c_int16()
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"GetUnitInfo", self.handle, info, ctypes.c_int16(6), ctypes.byref(size), ctypes.c_uint(3)
)
)
@@ -120,12 +124,12 @@ class PicoScopeSdk(Scope): # pragma: no cover
return self._variant
def setup_frequency(
- self, frequency: int, pretrig: int, posttrig: int
+ self, frequency: int, pretrig: int, posttrig: int
) -> Tuple[int, int]:
return self.set_frequency(frequency, pretrig, posttrig)
def set_channel(
- self, channel: str, enabled: bool, coupling: str, range: float, offset: float
+ self, channel: str, enabled: bool, coupling: str, range: float, offset: float
):
if offset != 0.0:
raise ValueError("Nonzero offset not supported.")
@@ -136,7 +140,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
if range not in self.RANGES:
raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}")
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"SetChannel",
self.handle,
self.CHANNELS[channel],
@@ -148,20 +152,20 @@ class PicoScopeSdk(Scope): # pragma: no cover
self.ranges[channel] = range
def setup_channel(
- self, channel: str, coupling: str, range: float, offset: float, enable: bool
+ self, channel: str, coupling: str, range: float, offset: float, enable: bool
):
self.set_channel(channel, enable, coupling, range, offset)
def _set_freq(
- self,
- frequency: int,
- pretrig: int,
- posttrig: int,
- period_bound: float,
- timebase_bound: int,
- low_freq: int,
- high_freq: int,
- high_subtract: int,
+ self,
+ frequency: int,
+ pretrig: int,
+ posttrig: int,
+ period_bound: float,
+ timebase_bound: int,
+ low_freq: int,
+ high_freq: int,
+ high_subtract: int,
) -> Tuple[int, int]:
samples = pretrig + posttrig
period = 1 / frequency
@@ -172,16 +176,17 @@ class PicoScopeSdk(Scope): # pragma: no cover
tb = min(floor(log2(low_freq) - log2(frequency)), timebase_bound)
actual_frequency = low_freq // 2 ** tb
max_samples = ctypes.c_int32()
+ interval_nanoseconds = ctypes.c_int32()
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"GetTimebase",
self.handle,
tb,
samples,
- None,
+ ctypes.byref(interval_nanoseconds),
0,
ctypes.byref(max_samples),
- 0,
+ 0
)
)
if max_samples.value < samples:
@@ -196,32 +201,32 @@ class PicoScopeSdk(Scope): # pragma: no cover
return actual_frequency, samples
def set_frequency(
- self, frequency: int, pretrig: int, posttrig: int
+ self, frequency: int, pretrig: int, posttrig: int
) -> Tuple[int, int]:
raise NotImplementedError
def setup_trigger(
- self,
- channel: str,
- threshold: float,
- direction: str,
- delay: int,
- timeout: int,
- enable: bool,
+ self,
+ channel: str,
+ threshold: float,
+ direction: str,
+ delay: int,
+ timeout: int,
+ enable: bool,
):
self.set_trigger(direction, enable, threshold, channel, delay, timeout)
def set_trigger(
- self,
- type: str,
- enabled: bool,
- value: float,
- channel: str,
- delay: int,
- timeout: int,
+ self,
+ type: str,
+ enabled: bool,
+ value: float,
+ channel: str,
+ delay: int,
+ timeout: int,
):
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"SetSimpleTrigger",
self.handle,
enabled,
@@ -244,7 +249,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
del self.buffers[channel]
buffer = (ctypes.c_int16 * self.samples)()
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"SetDataBuffer",
self.handle,
self.CHANNELS[channel],
@@ -255,7 +260,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
self.buffers[channel] = buffer
else:
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"SetDataBuffer",
self.handle,
self.CHANNELS[channel],
@@ -269,7 +274,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
if self.samples is None or self.timebase is None:
raise ValueError
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"RunBlock",
self.handle,
self.pretrig,
@@ -292,21 +297,21 @@ class PicoScopeSdk(Scope): # pragma: no cover
while ready.value == check.value:
sleep(0.001)
assert_pico_ok(
- self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready))
+ self._dispatch_call("IsReady", self.handle, ctypes.byref(ready))
)
if timeout is not None and (time_ns() - start) / 1e6 >= timeout:
return False
return True
def retrieve(
- self, channel: str, type: SampleType, dtype=np.float32
+ self, channel: str, type: SampleType, dtype=np.float32
) -> Optional[Trace]:
if self.samples is None:
raise ValueError
actual_samples = ctypes.c_int32(self.samples)
overflow = ctypes.c_int16()
assert_pico_ok(
- self.__dispatch_call(
+ self._dispatch_call(
"GetValues",
self.handle,
0,
@@ -335,12 +340,12 @@ class PicoScopeSdk(Scope): # pragma: no cover
)
def stop(self):
- assert_pico_ok(self.__dispatch_call("Stop"))
+ assert_pico_ok(self._dispatch_call("Stop"))
def close(self):
- assert_pico_ok(self.__dispatch_call("CloseUnit", self.handle))
+ assert_pico_ok(self._dispatch_call("CloseUnit", self.handle))
- def __dispatch_call(self, name, *args, **kwargs):
+ def _dispatch_call(self, name, *args, **kwargs):
"""
A unit-generic call of a picoscope SDK method.
"""
@@ -398,24 +403,147 @@ else: # pragma: no cover
COUPLING = {"AC": ps3000.PICO_COUPLING["AC"], "DC": ps3000.PICO_COUPLING["DC"]}
+ def open(self) -> None:
+ assert_pico_ok(self._dispatch_call("_open_unit")) # , ctypes.byref(self.handle)
+
+ def stop(self):
+ assert_pico_ok(self._dispatch_call("_stop"))
+
+ def close(self):
+ assert_pico_ok(self._dispatch_call("_close_unit", self.handle))
+
def get_variant(self):
if self._variant is not None:
return self._variant
info = ctypes.create_string_buffer(6)
size = ctypes.c_int16(6)
+ info_variant = ctypes.c_int16(3)
assert_pico_ok(
- self.__dispatch_call(
- "GetUnitInfo", self.handle, ctypes.byref(info), size, ctypes.c_int16(3)
+ self._dispatch_call(
+ "_get_unit_info", self.handle, info, size, info_variant
)
)
self._variant = "".join(chr(i) for i in info[: size.value - 1]) # type: ignore
return self._variant
def set_frequency(
- self, frequency: int, pretrig: int, posttrig: int
+ self, frequency: int, pretrig: int, posttrig: int
): # TODO: fix
raise NotImplementedError
+if isinstance(ps3000a, CannotFindPicoSDKError):
+
+ @public
+ class PS3000aScope(PicoScopeSdk): # noqa, pragma: no cover
+ """PicoScope 3000 series (A API) oscilloscope is not available (Install `libps3000a`)."""
+
+ def __init__(self, variant: Optional[str] = None):
+ super().__init__(variant)
+ raise ps3000a
+
+
+else: # pragma: no cover
+
+ @public
+ class PS3000aScope(PicoScopeSdk): # type: ignore
+ """PicoScope 3000 series oscilloscope (A API)."""
+
+ MODULE = ps3000a
+ PREFIX = "ps3000a"
+ CHANNELS = {
+ "A": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_A"],
+ "B": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_B"],
+ "C": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_C"],
+ "D": ps3000a.PS3000A_CHANNEL["PS3000A_CHANNEL_D"],
+ }
+
+ RANGES = {
+ 0.01: ps3000a.PS3000A_RANGE["PS3000A_10MV"],
+ 0.02: ps3000a.PS3000A_RANGE["PS3000A_20MV"],
+ 0.05: ps3000a.PS3000A_RANGE["PS3000A_50MV"],
+ 0.10: ps3000a.PS3000A_RANGE["PS3000A_100MV"],
+ 0.20: ps3000a.PS3000A_RANGE["PS3000A_200MV"],
+ 0.50: ps3000a.PS3000A_RANGE["PS3000A_500MV"],
+ 1.00: ps3000a.PS3000A_RANGE["PS3000A_1V"],
+ 2.00: ps3000a.PS3000A_RANGE["PS3000A_2V"],
+ 5.00: ps3000a.PS3000A_RANGE["PS3000A_5V"],
+ 10.0: ps3000a.PS3000A_RANGE["PS3000A_10V"],
+ 20.0: ps3000a.PS3000A_RANGE["PS3000A_20V"],
+ 50.0: ps3000a.PS3000A_RANGE["PS3000A_50V"]
+ }
+
+ MAX_ADC_VALUE = 32767
+ MIN_ADC_VALUE = -32767
+
+ COUPLING = {"AC": ps3000a.PICO_COUPLING["AC"], "DC": ps3000a.PICO_COUPLING["DC"]}
+
+ def open(self) -> None:
+ assert_pico_ok(ps3000a.ps3000aOpenUnit(ctypes.byref(self.handle), None))
+
+ def set_channel(
+ self,
+ channel: str,
+ enabled: bool,
+ coupling: str,
+ range: float,
+ offset: float,
+ ):
+ if channel not in self.CHANNELS:
+ raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}")
+ if coupling not in self.COUPLING:
+ raise ValueError(f"Coupling {coupling} not in available couplings: {self.COUPLING.keys()}")
+ if range not in self.RANGES:
+ raise ValueError(f"Range {range} not in available ranges: {self.RANGES.keys()}")
+ assert_pico_ok(
+ ps3000a.ps3000aSetChannel(
+ self.handle,
+ self.CHANNELS[channel],
+ enabled,
+ self.COUPLING[coupling],
+ self.RANGES[range],
+ offset
+ )
+ )
+ self.ranges[channel] = range
+
+ def set_buffer(self, channel: str, enable: bool):
+ if self.samples is None:
+ raise ValueError
+ if enable:
+ if channel in self.buffers:
+ del self.buffers[channel]
+ buffer = (ctypes.c_int16 * self.samples)()
+ assert_pico_ok(
+ ps3000a.ps3000aSetDataBuffer(
+ self.handle,
+ self.CHANNELS[channel],
+ ctypes.byref(buffer),
+ self.samples,
+ 0,
+ ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"]
+ )
+ )
+ self.buffers[channel] = buffer
+ else:
+ assert_pico_ok(
+ ps3000a.ps3000aSetDataBuffer(
+ self.handle, self.CHANNELS[channel], None, self.samples, 0,
+ ps3000a.PS3000A_RATIO_MODE["PS3000A_RATIO_MODE_NONE"]
+ )
+ )
+ del self.buffers[channel]
+
+ def set_frequency(self, frequency: int, pretrig: int, posttrig: int):
+ variant = self.get_variant()
+ if variant in ("3000A", "3000B"):
+ # This only holds for the 2-channel versions
+ # 4-channel versions have the settings from branch "D".
+ return self._set_freq(frequency, pretrig, posttrig, 8e-9, 2, 500_000_000, 62_500_000, 2)
+ elif variant == "3000":
+ return self._set_freq(frequency, pretrig, posttrig, 4e-9, 1, 500_000_000, 125_000_000, 1)
+ elif variant.endswith("D"):
+ return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2)
+ # TODO: Needs more per-device settings to be generic.
if isinstance(ps4000, CannotFindPicoSDKError):
@@ -481,7 +609,6 @@ else: # pragma: no cover
else:
raise ValueError(f"Unknown variant: {variant}")
-
if isinstance(ps5000, CannotFindPicoSDKError):
@public
@@ -533,7 +660,6 @@ else: # pragma: no cover
frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2
)
-
if isinstance(ps6000, CannotFindPicoSDKError):
@public
@@ -588,12 +714,12 @@ else: # pragma: no cover
assert_pico_ok(ps6000.ps6000OpenUnit(ctypes.byref(self.handle), None))
def set_channel(
- self,
- channel: str,
- enabled: bool,
- coupling: str,
- range: float,
- offset: float,
+ self,
+ channel: str,
+ enabled: bool,
+ coupling: str,
+ range: float,
+ offset: float,
):
if channel not in self.CHANNELS:
raise ValueError(f"Channel {channel} not in available channels: {self.CHANNELS.keys()}")
@@ -612,6 +738,7 @@ else: # pragma: no cover
ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"],
)
)
+ self.ranges[channel] = range
def set_buffer(self, channel: str, enable: bool):
if self.samples is None: