aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyecsca/sca/trace_set/inspector.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyecsca/sca/trace_set/inspector.py')
-rw-r--r--pyecsca/sca/trace_set/inspector.py153
1 files changed, 74 insertions, 79 deletions
diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py
index b07313d..4f45aa0 100644
--- a/pyecsca/sca/trace_set/inspector.py
+++ b/pyecsca/sca/trace_set/inspector.py
@@ -1,10 +1,11 @@
-import numpy as np
import struct
from enum import IntEnum
from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation
from pathlib import Path
+from typing import Union, Optional
+
+import numpy as np
from public import public
-from typing import Union, Optional, BinaryIO, List
from .base import TraceSet
from ..trace import Trace
@@ -100,7 +101,6 @@ class InspectorTraceSet(TraceSet):
external_clock_frequencty: float = 0
external_clock_time_base: int = 0
- _raw_traces: Optional[List[Trace]] = None
_tag_parsers: dict = {
0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int),
0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int),
@@ -136,39 +136,34 @@ class InspectorTraceSet(TraceSet):
0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float),
0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int)
}
- _set_tags: set = set()
- def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None,
- keep_raw_traces: bool = True):
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet":
"""
Read Inspector trace set from file path, bytes or file-like object.
:param input: Input file path, bytes or file-like object.
- :param keep_raw_traces: Whether to store the raw (unscaled) traces as well.
+ :return:
"""
- traces = None
if isinstance(input, bytes):
- with BytesIO(input) as f:
- traces = self.__read(f)
- elif isinstance(input, (Path, str)):
- with open(input, "rb") as r:
- traces = self.__read(r)
+ with BytesIO(input) as r:
+ traces, tags = InspectorTraceSet.__read(r)
+ elif isinstance(input, (str, Path)):
+ with open(input, "rb") as f:
+ traces, tags = InspectorTraceSet.__read(f)
elif isinstance(input, (RawIOBase, BufferedIOBase)):
- traces = self.__read(input)
- elif input is not None:
- raise ValueError(
- "Cannot parse data, unknown input: {}".format(input))
- if traces is not None:
- super().__init__(*self.__scale(traces))
- else:
- super().__init__()
- if keep_raw_traces:
- self._raw_traces = traces
+ traces, tags = InspectorTraceSet.__read(input)
else:
- del traces
+ raise ValueError
+ for trace in traces:
+ new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"])
+ del trace.samples
+ trace.samples = new
+ return InspectorTraceSet(*traces, **tags)
- def __read(self, file):
- self._set_tags = set()
+ @classmethod
+ def __read(cls, file):
+ tags = {}
while True:
tag = ord(file.read(1))
length = ord(file.read(1))
@@ -176,35 +171,52 @@ class InspectorTraceSet(TraceSet):
length = Parsers.read_int(file.read(length & 0x7f))
value = file.read(length)
if tag in InspectorTraceSet._tag_parsers:
- tag_name, tag_len, tag_reader, _ = \
- InspectorTraceSet._tag_parsers[tag]
+ tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag]
if tag_len is None or length == tag_len:
- setattr(self, tag_name, tag_reader(value))
- self._set_tags.add(tag)
+ tags[tag_name] = tag_reader(value)
elif tag == 0x5f and length == 0:
break
else:
continue
result = []
- for _ in range(self.num_traces):
- title = None if self.title_space == 0 else Parsers.read_str(
- file.read(self.title_space))
- data = None if self.data_space == 0 else file.read(self.data_space)
- dtype = self.sample_coding.dtype()
+ for _ in range(tags["num_traces"]):
+ title = None if "title_space" not in tags else Parsers.read_str(
+ file.read(tags["title_space"]))
+ data = None if "data_space" not in tags else file.read(tags["data_space"])
+ dtype = tags["sample_coding"].dtype()
try:
- samples = np.fromfile(file, dtype, self.num_samples)
+ samples = np.fromfile(file, dtype, tags["num_samples"])
except UnsupportedOperation:
samples = np.frombuffer(
- file.read(dtype.itemsize * self.num_samples), dtype,
- self.num_samples)
- result.append(Trace(samples, title, data, trace_set=self))
- return result
+ file.read(dtype.itemsize * tags["num_samples"]), dtype,
+ tags["num_samples"])
+ result.append(Trace(samples, title, data))
+ return result, tags
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet":
+ raise NotImplementedError
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ """
+ Save this trace set into a file.
+
+ :param output: An output path or file-like object.
+ """
+ if isinstance(output, (str, Path)):
+ with open(output, "wb") as f:
+ self.__write(f)
+ elif isinstance(output, (RawIOBase, BufferedIOBase)):
+ self.__write(output)
+ else:
+ raise ValueError
def __write(self, file):
- for set_tag in self._set_tags:
- tag_name, tag_len, _, tag_writer = InspectorTraceSet._tag_parsers[
- set_tag]
- tag_byte = Parsers.write_int(set_tag, length=1)
+ for tag, tag_tuple in self._tag_parsers.items():
+ tag_name, tag_len, _, tag_writer = tag_tuple
+ if tag_name not in self._keys:
+ continue
+ tag_byte = Parsers.write_int(tag, length=1)
value_bytes = tag_writer(getattr(self, tag_name), tag_len)
length = len(value_bytes)
if length <= 0x7f:
@@ -218,47 +230,32 @@ class InspectorTraceSet(TraceSet):
file.write(value_bytes)
file.write(b"\x5f\x00")
- for trace in self._raw_traces:
+ for trace in self._traces:
if self.title_space != 0 and trace.title is not None:
file.write(Parsers.write_str(trace.title))
if self.data_space != 0 and trace.data is not None:
file.write(trace.data)
+ unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding)
try:
- trace.samples.tofile(file)
+ unscaled.tofile(file)
except UnsupportedOperation:
- file.write(trace.samples.tobytes())
-
- def __scale(self, traces):
- return list(map(lambda trace: Trace(trace.samples.astype("f4") * self.y_scale, trace.title,
- trace.data, trace_set=self),
- traces))
+ file.write(unscaled.tobytes())
+ del unscaled
- def save(self, output: Union[Path, str, BinaryIO]):
- """
- Save this trace set into a file.
+ def __setitem__(self, key, value):
+ if not isinstance(value, Trace):
+ raise TypeError
+ if len(value) != self.num_samples or value.samples.dtype != self.sample_coding.dtype():
+ raise ValueError
+ super().__setitem__(key, value)
- :param output: An output path or file-like object.
- """
- if isinstance(output, (Path, str)):
- with open(output, "wb") as f:
- self.__write(f)
- elif isinstance(output, (RawIOBase, BufferedIOBase)):
- self.__write(output)
- else:
- raise ValueError("Cannot save data, unknown output: {}".format(output))
-
- def __bytes__(self):
- """Return the byte-representation of this trace set file."""
- with BytesIO() as b:
- self.save(b)
- return b.getvalue()
+ @staticmethod
+ def __scale(samples: np.ndarray, factor: float):
+ return samples.astype("f4") * factor
- @property
- def raw(self) -> Optional[List[Trace]]:
- """The raw (unscaled) traces, as read from the trace set file."""
- if self._raw_traces is None:
- return None
- return list(self._raw_traces)
+ @staticmethod
+ def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding):
+ return (samples * (1 / factor)).astype(coding.dtype())
@property
def sampling_frequency(self) -> int:
@@ -266,7 +263,5 @@ class InspectorTraceSet(TraceSet):
return int(1 / self.x_scale)
def __repr__(self):
- args = ", ".join(
- [f"{self._tag_parsers[set_tag][0]}={getattr(self, self._tag_parsers[set_tag][0])!r}"
- for set_tag in self._set_tags])
+ args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys])
return f"InspectorTraceSet({args})"