diff options
Diffstat (limited to 'pyecsca/sca/trace_set/inspector.py')
| -rw-r--r-- | pyecsca/sca/trace_set/inspector.py | 153 |
1 files changed, 74 insertions, 79 deletions
diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py index b07313d..4f45aa0 100644 --- a/pyecsca/sca/trace_set/inspector.py +++ b/pyecsca/sca/trace_set/inspector.py @@ -1,10 +1,11 @@ -import numpy as np import struct from enum import IntEnum from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation from pathlib import Path +from typing import Union, Optional + +import numpy as np from public import public -from typing import Union, Optional, BinaryIO, List from .base import TraceSet from ..trace import Trace @@ -100,7 +101,6 @@ class InspectorTraceSet(TraceSet): external_clock_frequencty: float = 0 external_clock_time_base: int = 0 - _raw_traces: Optional[List[Trace]] = None _tag_parsers: dict = { 0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int), 0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int), @@ -136,39 +136,34 @@ class InspectorTraceSet(TraceSet): 0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float), 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int) } - _set_tags: set = set() - def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None, - keep_raw_traces: bool = True): + @classmethod + def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet": """ Read Inspector trace set from file path, bytes or file-like object. :param input: Input file path, bytes or file-like object. - :param keep_raw_traces: Whether to store the raw (unscaled) traces as well. + :return: """ - traces = None if isinstance(input, bytes): - with BytesIO(input) as f: - traces = self.__read(f) - elif isinstance(input, (Path, str)): - with open(input, "rb") as r: - traces = self.__read(r) + with BytesIO(input) as r: + traces, tags = InspectorTraceSet.__read(r) + elif isinstance(input, (str, Path)): + with open(input, "rb") as f: + traces, tags = InspectorTraceSet.__read(f) elif isinstance(input, (RawIOBase, BufferedIOBase)): - traces = self.__read(input) - elif input is not None: - raise ValueError( - "Cannot parse data, unknown input: {}".format(input)) - if traces is not None: - super().__init__(*self.__scale(traces)) - else: - super().__init__() - if keep_raw_traces: - self._raw_traces = traces + traces, tags = InspectorTraceSet.__read(input) else: - del traces + raise ValueError + for trace in traces: + new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"]) + del trace.samples + trace.samples = new + return InspectorTraceSet(*traces, **tags) - def __read(self, file): - self._set_tags = set() + @classmethod + def __read(cls, file): + tags = {} while True: tag = ord(file.read(1)) length = ord(file.read(1)) @@ -176,35 +171,52 @@ class InspectorTraceSet(TraceSet): length = Parsers.read_int(file.read(length & 0x7f)) value = file.read(length) if tag in InspectorTraceSet._tag_parsers: - tag_name, tag_len, tag_reader, _ = \ - InspectorTraceSet._tag_parsers[tag] + tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag] if tag_len is None or length == tag_len: - setattr(self, tag_name, tag_reader(value)) - self._set_tags.add(tag) + tags[tag_name] = tag_reader(value) elif tag == 0x5f and length == 0: break else: continue result = [] - for _ in range(self.num_traces): - title = None if self.title_space == 0 else Parsers.read_str( - file.read(self.title_space)) - data = None if self.data_space == 0 else file.read(self.data_space) - dtype = self.sample_coding.dtype() + for _ in range(tags["num_traces"]): + title = None if "title_space" not in tags else Parsers.read_str( + file.read(tags["title_space"])) + data = None if "data_space" not in tags else file.read(tags["data_space"]) + dtype = tags["sample_coding"].dtype() try: - samples = np.fromfile(file, dtype, self.num_samples) + samples = np.fromfile(file, dtype, tags["num_samples"]) except UnsupportedOperation: samples = np.frombuffer( - file.read(dtype.itemsize * self.num_samples), dtype, - self.num_samples) - result.append(Trace(samples, title, data, trace_set=self)) - return result + file.read(dtype.itemsize * tags["num_samples"]), dtype, + tags["num_samples"]) + result.append(Trace(samples, title, data)) + return result, tags + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet": + raise NotImplementedError + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + """ + Save this trace set into a file. + + :param output: An output path or file-like object. + """ + if isinstance(output, (str, Path)): + with open(output, "wb") as f: + self.__write(f) + elif isinstance(output, (RawIOBase, BufferedIOBase)): + self.__write(output) + else: + raise ValueError def __write(self, file): - for set_tag in self._set_tags: - tag_name, tag_len, _, tag_writer = InspectorTraceSet._tag_parsers[ - set_tag] - tag_byte = Parsers.write_int(set_tag, length=1) + for tag, tag_tuple in self._tag_parsers.items(): + tag_name, tag_len, _, tag_writer = tag_tuple + if tag_name not in self._keys: + continue + tag_byte = Parsers.write_int(tag, length=1) value_bytes = tag_writer(getattr(self, tag_name), tag_len) length = len(value_bytes) if length <= 0x7f: @@ -218,47 +230,32 @@ class InspectorTraceSet(TraceSet): file.write(value_bytes) file.write(b"\x5f\x00") - for trace in self._raw_traces: + for trace in self._traces: if self.title_space != 0 and trace.title is not None: file.write(Parsers.write_str(trace.title)) if self.data_space != 0 and trace.data is not None: file.write(trace.data) + unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding) try: - trace.samples.tofile(file) + unscaled.tofile(file) except UnsupportedOperation: - file.write(trace.samples.tobytes()) - - def __scale(self, traces): - return list(map(lambda trace: Trace(trace.samples.astype("f4") * self.y_scale, trace.title, - trace.data, trace_set=self), - traces)) + file.write(unscaled.tobytes()) + del unscaled - def save(self, output: Union[Path, str, BinaryIO]): - """ - Save this trace set into a file. + def __setitem__(self, key, value): + if not isinstance(value, Trace): + raise TypeError + if len(value) != self.num_samples or value.samples.dtype != self.sample_coding.dtype(): + raise ValueError + super().__setitem__(key, value) - :param output: An output path or file-like object. - """ - if isinstance(output, (Path, str)): - with open(output, "wb") as f: - self.__write(f) - elif isinstance(output, (RawIOBase, BufferedIOBase)): - self.__write(output) - else: - raise ValueError("Cannot save data, unknown output: {}".format(output)) - - def __bytes__(self): - """Return the byte-representation of this trace set file.""" - with BytesIO() as b: - self.save(b) - return b.getvalue() + @staticmethod + def __scale(samples: np.ndarray, factor: float): + return samples.astype("f4") * factor - @property - def raw(self) -> Optional[List[Trace]]: - """The raw (unscaled) traces, as read from the trace set file.""" - if self._raw_traces is None: - return None - return list(self._raw_traces) + @staticmethod + def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding): + return (samples * (1 / factor)).astype(coding.dtype()) @property def sampling_frequency(self) -> int: @@ -266,7 +263,5 @@ class InspectorTraceSet(TraceSet): return int(1 / self.x_scale) def __repr__(self): - args = ", ".join( - [f"{self._tag_parsers[set_tag][0]}={getattr(self, self._tag_parsers[set_tag][0])!r}" - for set_tag in self._set_tags]) + args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys]) return f"InspectorTraceSet({args})" |
