"""Provides a traceset implementation based on Riscure's Inspector traceset format (``.trs``).""" import struct from enum import IntEnum from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation from pathlib import Path from typing import Union, Optional, BinaryIO import numpy as np from public import public from pyecsca.sca.trace_set.base import TraceSet from pyecsca.sca.trace import Trace @public class SampleCoding(IntEnum): Int8 = 0x01 Int16 = 0x02 Int32 = 0x04 Float8 = 0x11 Float16 = 0x12 Float32 = 0x14 def dtype(self): char = "f" if self.value & 0x10 else "i" return np.dtype(f"<{char}{self.value & 0x0F}") @public class Parsers: @staticmethod def read_int(bytes): return int.from_bytes(bytes, byteorder="little") @staticmethod def read_bool(bytes): return Parsers.read_int(bytes) == 1 @staticmethod def read_float(bytes): return struct.unpack(" "InspectorTraceSet": """ Read Inspector trace set from file path, bytes or file-like object. :param input: Input file path, bytes or file-like object. :return: """ if isinstance(input, bytes): with BytesIO(input) as r: traces, tags = InspectorTraceSet.__read(r) elif isinstance(input, (str, Path)): with open(input, "rb") as f: traces, tags = InspectorTraceSet.__read(f) elif isinstance(input, (RawIOBase, BufferedIOBase, BinaryIO)): traces, tags = InspectorTraceSet.__read(input) else: raise TypeError if kwargs.get("scale"): tags["_scaled"] = True for trace in traces: new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"]) del trace.samples trace.samples = new else: tags["_scaled"] = False return InspectorTraceSet(*traces, **tags) @classmethod def __read(cls, file): tags = {} while True: tag = ord(file.read(1)) length = ord(file.read(1)) if length & 0x80: length = Parsers.read_int(file.read(length & 0x7F)) value = file.read(length) if tag in InspectorTraceSet._tag_parsers: tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag] if tag_len is None or length == tag_len: tags[tag_name] = tag_reader(value) elif tag == 0x5F and length == 0: break else: continue result = [] for _ in range(tags["num_traces"]): title = ( None if "title_space" not in tags else Parsers.read_str(file.read(tags["title_space"])) ) data = None if "data_space" not in tags else file.read(tags["data_space"]) dtype = tags["sample_coding"].dtype() try: samples = np.fromfile(file, dtype, tags["num_samples"]) except UnsupportedOperation: samples = np.frombuffer( file.read(dtype.itemsize * tags["num_samples"]), dtype, tags["num_samples"], ) result.append(Trace(samples, {"title": title, "data": data})) return result, tags @classmethod def inplace( cls, input: Union[str, Path, bytes, BinaryIO], **kwargs ) -> "InspectorTraceSet": raise NotImplementedError def write(self, output: Union[str, Path, BinaryIO]): """ Save this trace set into a file. :param output: An output path or file-like object. """ if isinstance(output, (str, Path)): with open(output, "wb") as f: self.__write(f) elif isinstance(output, (RawIOBase, BufferedIOBase, BinaryIO)): self.__write(output) else: raise TypeError def __write(self, file): for tag, tag_tuple in self._tag_parsers.items(): tag_name, tag_len, _, tag_writer = tag_tuple if tag_name not in self._keys: continue tag_byte = Parsers.write_int(tag, length=1) value_bytes = tag_writer(getattr(self, tag_name), tag_len) length = len(value_bytes) if length <= 0x7F: length_bytes = Parsers.write_int(length, length=1) else: length_data = Parsers.write_int( length, length=(length.bit_length() + 7) // 8 ) length_bytes = Parsers.write_int(0x80 | len(length_data)) + length_data file.write(tag_byte) file.write(length_bytes) file.write(value_bytes) file.write(b"\x5f\x00") for trace in self._traces: if self.title_space != 0 and trace.meta["title"] is not None: file.write(Parsers.write_str(trace.meta["title"])) if self.data_space != 0 and trace.meta["data"] is not None: file.write(trace.meta["data"]) if self._scaled: unscaled = InspectorTraceSet.__unscale( trace.samples, self.y_scale, self.sample_coding ) else: unscaled = trace.samples try: unscaled.tofile(file) except UnsupportedOperation: file.write(unscaled.tobytes()) del unscaled @staticmethod def __scale(samples: np.ndarray, factor: float): return samples.astype("f4") * factor @staticmethod def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding): return (samples * (1 / factor)).astype(coding.dtype()) @property def sampling_frequency(self) -> int: """Return the sampling frequency of the trace set.""" return int(1 / self.x_scale) def __repr__(self): args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys]) return f"InspectorTraceSet({args})"