diff options
| -rw-r--r-- | pyecsca/ec/efd/shortw/w12-0/scaling/z | 5 | ||||
| -rw-r--r-- | pyecsca/ec/efd/shortw/w12-0/scaling/z.op3 | 5 | ||||
| -rw-r--r-- | pyecsca/sca/trace/__init__.py | 1 | ||||
| -rw-r--r-- | pyecsca/sca/trace/plot.py | 0 | ||||
| -rw-r--r-- | pyecsca/sca/trace/trace.py | 53 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/__init__.py | 2 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/base.py | 28 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/chipwhisperer.py | 64 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/hdf5.py | 84 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/inspector.py | 153 | ||||
| -rw-r--r-- | pyecsca/sca/trace_set/pickle.py | 35 | ||||
| -rw-r--r-- | test/data/test.h5 | bin | 0 -> 6712 bytes | |||
| -rw-r--r-- | test/data/test.pickle | bin | 0 -> 463 bytes | |||
| -rw-r--r-- | test/sca/test_align.py | 4 | ||||
| -rw-r--r-- | test/sca/test_combine.py | 6 | ||||
| -rw-r--r-- | test/sca/test_edit.py | 2 | ||||
| -rw-r--r-- | test/sca/test_filter.py | 2 | ||||
| -rw-r--r-- | test/sca/test_match.py | 8 | ||||
| -rw-r--r-- | test/sca/test_process.py | 2 | ||||
| -rw-r--r-- | test/sca/test_sampling.py | 2 | ||||
| -rw-r--r-- | test/sca/test_test.py | 22 | ||||
| -rw-r--r-- | test/sca/test_trace.py | 2 | ||||
| -rw-r--r-- | test/sca/test_traceset.py | 93 |
23 files changed, 423 insertions, 150 deletions
diff --git a/pyecsca/ec/efd/shortw/w12-0/scaling/z b/pyecsca/ec/efd/shortw/w12-0/scaling/z new file mode 100644 index 0000000..02907a4 --- /dev/null +++ b/pyecsca/ec/efd/shortw/w12-0/scaling/z @@ -0,0 +1,5 @@ +compute A = 1/Z1 +compute AA = A^2 +compute X3 = X1*A +compute Y3 = Y1*AA +compute Z3 = 1 diff --git a/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3 b/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3 new file mode 100644 index 0000000..67652e5 --- /dev/null +++ b/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3 @@ -0,0 +1,5 @@ +A = 1/Z1 +AA = A^2 +X3 = X1*A +Y3 = Y1*AA +Z3 = 1 diff --git a/pyecsca/sca/trace/__init__.py b/pyecsca/sca/trace/__init__.py index 696e387..4190807 100644 --- a/pyecsca/sca/trace/__init__.py +++ b/pyecsca/sca/trace/__init__.py @@ -5,6 +5,7 @@ from .combine import * from .edit import * from .filter import * from .match import * +from .plot import * from .process import * from .sampling import * from .test import * diff --git a/pyecsca/sca/trace/plot.py b/pyecsca/sca/trace/plot.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pyecsca/sca/trace/plot.py diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py index 18df978..99a6814 100644 --- a/pyecsca/sca/trace/trace.py +++ b/pyecsca/sca/trace/trace.py @@ -1,23 +1,44 @@ import weakref +from typing import Optional, Any, Mapping, Sequence +from copy import copy, deepcopy + from numpy import ndarray +import numpy as np from public import public -from typing import Optional, Sequence, Any @public class Trace(object): - """A power trace, which has an optional title, optional data bytes and mandatory samples.""" + """A trace, which has an optional title, optional data bytes and mandatory samples.""" title: Optional[str] data: Optional[bytes] + meta: Mapping[str, Any] samples: ndarray def __init__(self, samples: ndarray, title: Optional[str], data: Optional[bytes], - trace_set: Any = None): + meta: Mapping[str, Any] = None, trace_set: Any = None): self.title = title self.data = data + self.meta = meta self.samples = samples self.trace_set = trace_set + def __len__(self): + """Length of the trace, in samples.""" + return len(self.samples) + + def __getitem__(self, index): + """Get the sample at `index`.""" + return self.samples[index] + + def __setitem__(self, key, value): + """Set the sample at `key`.""" + self.samples[key] = value + + def __iter__(self): + """Iterate over the samples.""" + yield from self.samples + @property def trace_set(self) -> Any: if self._trace_set is None: @@ -31,17 +52,37 @@ class Trace(object): else: self._trace_set = weakref.ref(trace_set) + def __getstate__(self): + state = self.__dict__.copy() + del state["_trace_set"] + return state + + def __eq__(self, other): + if not isinstance(other, Trace): + return False + return np.array_equal(self.samples, other.samples) and self.title == other.title and self.data == other.data and self.meta == other.meta + + def with_samples(self, samples: ndarray) -> "Trace": + return Trace(samples, deepcopy(self.title), deepcopy(self.data), deepcopy(self.meta), deepcopy(self.trace_set)) + + def __copy__(self): + return Trace(copy(self.samples), copy(self.title), copy(self.data), copy(self.meta), copy(self.trace_set)) + + def __deepcopy__(self, memodict={}): + return Trace(deepcopy(self.samples, memo=memodict), deepcopy(self.title, memo=memodict), deepcopy(self.data, memo=memodict), deepcopy(self.meta, memo=memodict), deepcopy(self.trace_set, memo=memodict)) + def __repr__(self): return f"Trace(title={self.title!r}, data={self.data!r}, samples={self.samples!r}, trace_set={self.trace_set!r})" @public class CombinedTrace(Trace): - """A power trace that was combined from other traces, `parents`.""" + """A trace that was combined from other traces, `parents`.""" def __init__(self, samples: ndarray, title: Optional[str], data: Optional[bytes], - trace_set=None, parents: Sequence[Trace] = None): - super().__init__(samples, title, data, trace_set=trace_set) + meta: Mapping[str, Any] = None, trace_set: Any = None, + parents: Sequence[Trace] = None): + super().__init__(samples, title, data, meta, trace_set=trace_set) self.parents = None if parents is not None: self.parents = weakref.WeakSet(parents) diff --git a/pyecsca/sca/trace_set/__init__.py b/pyecsca/sca/trace_set/__init__.py index cdede67..ee5e08a 100644 --- a/pyecsca/sca/trace_set/__init__.py +++ b/pyecsca/sca/trace_set/__init__.py @@ -3,3 +3,5 @@ from .base import * from .inspector import * from .chipwhisperer import * +from .pickle import * +from .hdf5 import * diff --git a/pyecsca/sca/trace_set/base.py b/pyecsca/sca/trace_set/base.py index 2221933..0bab76d 100644 --- a/pyecsca/sca/trace_set/base.py +++ b/pyecsca/sca/trace_set/base.py @@ -1,16 +1,21 @@ +from io import RawIOBase, BufferedIOBase +from pathlib import Path +from typing import List, Union + from public import public -from typing import List from ..trace import Trace @public class TraceSet(object): - _traces: List = [] - _keys: List = [] + _traces: List[Trace] + _keys: List def __init__(self, *traces: Trace, **kwargs): self._traces = list(traces) + for trace in self._traces: + trace.trace_set = self self.__dict__.update(kwargs) self._keys = list(kwargs.keys()) @@ -22,10 +27,27 @@ class TraceSet(object): """Get the trace at `index`.""" return self._traces[index] + def __setitem__(self, key, value): + if not isinstance(value, Trace): + raise TypeError + self._traces[key] = value + value.trace_set = self + def __iter__(self): """Iterate over the traces.""" yield from self._traces + @classmethod + def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet": + raise NotImplementedError + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet": + raise NotImplementedError + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + raise NotImplementedError + def __repr__(self): args = ", ".join(["{}={!r}".format(key, getattr(self, key)) for key in self._keys]) diff --git a/pyecsca/sca/trace_set/chipwhisperer.py b/pyecsca/sca/trace_set/chipwhisperer.py index 2b37c6f..e70c457 100644 --- a/pyecsca/sca/trace_set/chipwhisperer.py +++ b/pyecsca/sca/trace_set/chipwhisperer.py @@ -1,9 +1,14 @@ -import numpy as np -from .base import TraceSet -from os.path import exists, isfile, join from configparser import ConfigParser +from io import RawIOBase, BufferedIOBase +from itertools import zip_longest +from os.path import exists, isfile, join, basename, dirname +from pathlib import Path +from typing import Union + +import numpy as np from public import public +from .base import TraceSet from ..trace import Trace @@ -11,27 +16,54 @@ from ..trace import Trace class ChipWhispererTraceSet(TraceSet): """ChipWhisperer trace set (native) format.""" - def __init__(self, path: str = None, name: str = None): - if path is None and name is None: - super().__init__() + @classmethod + def read(cls, + input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "ChipWhispererTraceSet": + if isinstance(input, (str, Path)): + traces, kwargs = ChipWhispererTraceSet.__read(input) + return ChipWhispererTraceSet(*traces, **kwargs) else: - data = self.__read_data(path, name) - trace_data = data["traces"] - traces = [Trace(trace_samples, None, None, trace_set=self) for trace_samples in trace_data] - del data["traces"] - config = self.__read_config(path, name) - super().__init__(*traces, **data, **config) + raise ValueError + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "ChipWhispererTraceSet": + raise NotImplementedError + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + raise NotImplementedError + + @classmethod + def __read(cls, full_path): + file_name = basename(full_path) + if not file_name.startswith("config_") or not file_name.endswith(".cfg"): + raise ValueError + path = dirname(full_path) + name = file_name[7:-4] + data = ChipWhispererTraceSet.__read_data(path, name) + traces = [] + for samples, key, textin, textout in zip_longest(data["traces"], data["keylist"], + data["textin"], data["textout"]): + traces.append( + Trace(samples, None, None, {"key": key, "textin": textin, "textout": textout})) + del data["traces"] + del data["keylist"] + del data["textin"] + del data["textout"] + config = ChipWhispererTraceSet.__read_config(path, name) + return traces, {**data, **config} - def __read_data(self, path, name): + @classmethod + def __read_data(cls, path, name): types = {"keylist": None, "knownkey": None, "textin": None, "textout": None, "traces": None} for type in types.keys(): - type_path = join(path, name + "_" + type + ".npy") + type_path = join(path, name + type + ".npy") if exists(type_path) and isfile(type_path): types[type] = np.load(type_path, allow_pickle=True) return types - def __read_config(self, path, name): - config_path = join(path, "config_" + name + "_.cfg") + @classmethod + def __read_config(cls, path, name): + config_path = join(path, "config_" + name + ".cfg") if exists(config_path) and isfile(config_path): config = ConfigParser() config.read(config_path) diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py new file mode 100644 index 0000000..0a24393 --- /dev/null +++ b/pyecsca/sca/trace_set/hdf5.py @@ -0,0 +1,84 @@ +from copy import copy +from io import RawIOBase, BufferedIOBase, IOBase +from pathlib import Path +from typing import Union, Optional +import numpy as np +import h5py +from public import public + +from .base import TraceSet +from .. import Trace + + +@public +class HDF5TraceSet(TraceSet): + _file: Optional[h5py.File] + + @classmethod + def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet": + if isinstance(input, (str, Path)): + hdf5 = h5py.File(str(input), mode="r") + elif isinstance(input, IOBase): + hdf5 = h5py.File(input, mode="r") + else: + raise ValueError + kwargs = dict(hdf5.attrs) + traces = [] + for k, v in hdf5.items(): + meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None + samples = hdf5[k] + traces.append(Trace(np.array(samples, dtype=samples.dtype), None, None, meta)) + hdf5.close() + return HDF5TraceSet(*traces, **kwargs) + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet": + if isinstance(input, (str, Path)): + hdf5 = h5py.File(str(input), mode="a") + elif isinstance(input, IOBase): + hdf5 = h5py.File(input, mode="a") + else: + raise ValueError + kwargs = dict(hdf5.attrs) + traces = [] + for k, v in hdf5.items(): + meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None + samples = hdf5[k] + traces.append(Trace(samples, k, None, meta)) + return HDF5TraceSet(*traces, **kwargs, _file=hdf5) + + def __setitem__(self, key, value): + if not isinstance(value, Trace): + raise TypeError + if self._file is not None: + if str(key) in self._file: + del self._file[str(key)] + self._file[str(key)] = value.samples + if value.meta: + for k, v in value.meta.items(): + self._file[str(key)].attrs[k] = v + super().__setitem__(key, value) + + def save(self): + if self._file is not None: + self._file.flush() + + def close(self): + if self._file is not None: + self._file.close() + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + if isinstance(output, (str, Path)): + hdf5 = h5py.File(str(output), "w") + elif isinstance(output, IOBase): + hdf5 = h5py.File(output, "w") + else: + raise ValueError + for k in self._keys: + hdf5[k] = getattr(self, k) + for i, trace in enumerate(self._traces): + dset = hdf5.create_dataset(str(i), trace.samples) + if trace.meta: + for k, v in trace.meta.items(): + dset.attrs[k] = v + hdf5.close() diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py index b07313d..4f45aa0 100644 --- a/pyecsca/sca/trace_set/inspector.py +++ b/pyecsca/sca/trace_set/inspector.py @@ -1,10 +1,11 @@ -import numpy as np import struct from enum import IntEnum from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation from pathlib import Path +from typing import Union, Optional + +import numpy as np from public import public -from typing import Union, Optional, BinaryIO, List from .base import TraceSet from ..trace import Trace @@ -100,7 +101,6 @@ class InspectorTraceSet(TraceSet): external_clock_frequencty: float = 0 external_clock_time_base: int = 0 - _raw_traces: Optional[List[Trace]] = None _tag_parsers: dict = { 0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int), 0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int), @@ -136,39 +136,34 @@ class InspectorTraceSet(TraceSet): 0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float), 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int) } - _set_tags: set = set() - def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None, - keep_raw_traces: bool = True): + @classmethod + def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet": """ Read Inspector trace set from file path, bytes or file-like object. :param input: Input file path, bytes or file-like object. - :param keep_raw_traces: Whether to store the raw (unscaled) traces as well. + :return: """ - traces = None if isinstance(input, bytes): - with BytesIO(input) as f: - traces = self.__read(f) - elif isinstance(input, (Path, str)): - with open(input, "rb") as r: - traces = self.__read(r) + with BytesIO(input) as r: + traces, tags = InspectorTraceSet.__read(r) + elif isinstance(input, (str, Path)): + with open(input, "rb") as f: + traces, tags = InspectorTraceSet.__read(f) elif isinstance(input, (RawIOBase, BufferedIOBase)): - traces = self.__read(input) - elif input is not None: - raise ValueError( - "Cannot parse data, unknown input: {}".format(input)) - if traces is not None: - super().__init__(*self.__scale(traces)) - else: - super().__init__() - if keep_raw_traces: - self._raw_traces = traces + traces, tags = InspectorTraceSet.__read(input) else: - del traces + raise ValueError + for trace in traces: + new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"]) + del trace.samples + trace.samples = new + return InspectorTraceSet(*traces, **tags) - def __read(self, file): - self._set_tags = set() + @classmethod + def __read(cls, file): + tags = {} while True: tag = ord(file.read(1)) length = ord(file.read(1)) @@ -176,35 +171,52 @@ class InspectorTraceSet(TraceSet): length = Parsers.read_int(file.read(length & 0x7f)) value = file.read(length) if tag in InspectorTraceSet._tag_parsers: - tag_name, tag_len, tag_reader, _ = \ - InspectorTraceSet._tag_parsers[tag] + tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag] if tag_len is None or length == tag_len: - setattr(self, tag_name, tag_reader(value)) - self._set_tags.add(tag) + tags[tag_name] = tag_reader(value) elif tag == 0x5f and length == 0: break else: continue result = [] - for _ in range(self.num_traces): - title = None if self.title_space == 0 else Parsers.read_str( - file.read(self.title_space)) - data = None if self.data_space == 0 else file.read(self.data_space) - dtype = self.sample_coding.dtype() + for _ in range(tags["num_traces"]): + title = None if "title_space" not in tags else Parsers.read_str( + file.read(tags["title_space"])) + data = None if "data_space" not in tags else file.read(tags["data_space"]) + dtype = tags["sample_coding"].dtype() try: - samples = np.fromfile(file, dtype, self.num_samples) + samples = np.fromfile(file, dtype, tags["num_samples"]) except UnsupportedOperation: samples = np.frombuffer( - file.read(dtype.itemsize * self.num_samples), dtype, - self.num_samples) - result.append(Trace(samples, title, data, trace_set=self)) - return result + file.read(dtype.itemsize * tags["num_samples"]), dtype, + tags["num_samples"]) + result.append(Trace(samples, title, data)) + return result, tags + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet": + raise NotImplementedError + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + """ + Save this trace set into a file. + + :param output: An output path or file-like object. + """ + if isinstance(output, (str, Path)): + with open(output, "wb") as f: + self.__write(f) + elif isinstance(output, (RawIOBase, BufferedIOBase)): + self.__write(output) + else: + raise ValueError def __write(self, file): - for set_tag in self._set_tags: - tag_name, tag_len, _, tag_writer = InspectorTraceSet._tag_parsers[ - set_tag] - tag_byte = Parsers.write_int(set_tag, length=1) + for tag, tag_tuple in self._tag_parsers.items(): + tag_name, tag_len, _, tag_writer = tag_tuple + if tag_name not in self._keys: + continue + tag_byte = Parsers.write_int(tag, length=1) value_bytes = tag_writer(getattr(self, tag_name), tag_len) length = len(value_bytes) if length <= 0x7f: @@ -218,47 +230,32 @@ class InspectorTraceSet(TraceSet): file.write(value_bytes) file.write(b"\x5f\x00") - for trace in self._raw_traces: + for trace in self._traces: if self.title_space != 0 and trace.title is not None: file.write(Parsers.write_str(trace.title)) if self.data_space != 0 and trace.data is not None: file.write(trace.data) + unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding) try: - trace.samples.tofile(file) + unscaled.tofile(file) except UnsupportedOperation: - file.write(trace.samples.tobytes()) - - def __scale(self, traces): - return list(map(lambda trace: Trace(trace.samples.astype("f4") * self.y_scale, trace.title, - trace.data, trace_set=self), - traces)) + file.write(unscaled.tobytes()) + del unscaled - def save(self, output: Union[Path, str, BinaryIO]): - """ - Save this trace set into a file. + def __setitem__(self, key, value): + if not isinstance(value, Trace): + raise TypeError + if len(value) != self.num_samples or value.samples.dtype != self.sample_coding.dtype(): + raise ValueError + super().__setitem__(key, value) - :param output: An output path or file-like object. - """ - if isinstance(output, (Path, str)): - with open(output, "wb") as f: - self.__write(f) - elif isinstance(output, (RawIOBase, BufferedIOBase)): - self.__write(output) - else: - raise ValueError("Cannot save data, unknown output: {}".format(output)) - - def __bytes__(self): - """Return the byte-representation of this trace set file.""" - with BytesIO() as b: - self.save(b) - return b.getvalue() + @staticmethod + def __scale(samples: np.ndarray, factor: float): + return samples.astype("f4") * factor - @property - def raw(self) -> Optional[List[Trace]]: - """The raw (unscaled) traces, as read from the trace set file.""" - if self._raw_traces is None: - return None - return list(self._raw_traces) + @staticmethod + def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding): + return (samples * (1 / factor)).astype(coding.dtype()) @property def sampling_frequency(self) -> int: @@ -266,7 +263,5 @@ class InspectorTraceSet(TraceSet): return int(1 / self.x_scale) def __repr__(self): - args = ", ".join( - [f"{self._tag_parsers[set_tag][0]}={getattr(self, self._tag_parsers[set_tag][0])!r}" - for set_tag in self._set_tags]) + args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys]) return f"InspectorTraceSet({args})" diff --git a/pyecsca/sca/trace_set/pickle.py b/pyecsca/sca/trace_set/pickle.py new file mode 100644 index 0000000..42bc3bb --- /dev/null +++ b/pyecsca/sca/trace_set/pickle.py @@ -0,0 +1,35 @@ +import pickle +from io import BufferedIOBase, RawIOBase, IOBase +from pathlib import Path +from typing import Union + +from public import public + +from .base import TraceSet + + +@public +class PickleTraceSet(TraceSet): + @classmethod + def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet": + if isinstance(input, bytes): + return pickle.loads(input) + elif isinstance(input, (str, Path)): + with open(input, "rb") as f: + return pickle.load(f) + elif isinstance(input, IOBase): + return pickle.load(input) + raise ValueError + + @classmethod + def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet": + raise NotImplementedError + + def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]): + if isinstance(output, (str, Path)): + with open(output, "wb") as f: + pickle.dump(self, f) + elif isinstance(output, IOBase): + pickle.dump(self, output) + else: + raise ValueError diff --git a/test/data/test.h5 b/test/data/test.h5 Binary files differnew file mode 100644 index 0000000..78df1fa --- /dev/null +++ b/test/data/test.h5 diff --git a/test/data/test.pickle b/test/data/test.pickle Binary files differnew file mode 100644 index 0000000..43f7f07 --- /dev/null +++ b/test/data/test.pickle diff --git a/test/sca/test_align.py b/test/sca/test_align.py index 96cc458..b595058 100644 --- a/test/sca/test_align.py +++ b/test/sca/test_align.py @@ -23,13 +23,13 @@ class AlignTests(TestCase): @slow def test_large_align(self): - example = InspectorTraceSet("test/data/example.trs") + example = InspectorTraceSet.read("test/data/example.trs") result = align_correlation(*example, reference_offset=100000, reference_length=20000, max_offset=15000) self.assertIsNotNone(result) @slow def test_large_dtw_align(self): - example = InspectorTraceSet("test/data/example.trs") + example = InspectorTraceSet.read("test/data/example.trs") result = align_dtw(*example[:5]) self.assertIsNotNone(result) diff --git a/test/sca/test_combine.py b/test/sca/test_combine.py index 7fda1fa..41df9fb 100644 --- a/test/sca/test_combine.py +++ b/test/sca/test_combine.py @@ -7,9 +7,9 @@ from pyecsca.sca import Trace, CombinedTrace, average, conditional_average, stan class CombineTests(TestCase): def setUp(self): - self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff") - self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff") - self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00") + self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None) + self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None) + self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None) def test_average(self): self.assertIsNone(average()) diff --git a/test/sca/test_edit.py b/test/sca/test_edit.py index 637e9b7..f5b529e 100644 --- a/test/sca/test_edit.py +++ b/test/sca/test_edit.py @@ -8,7 +8,7 @@ from pyecsca.sca import Trace, trim, reverse, pad class EditTests(TestCase): def setUp(self): - self._trace = Trace(np.array([10, 20, 30, 40, 50], dtype=np.dtype("i1")), None, None) + self._trace = Trace(np.array([10, 20, 30, 40, 50], dtype=np.dtype("i1")), None, None, None) def test_trim(self): result = trim(self._trace, 2) diff --git a/test/sca/test_filter.py b/test/sca/test_filter.py index 9194fa4..73516d8 100644 --- a/test/sca/test_filter.py +++ b/test/sca/test_filter.py @@ -10,7 +10,7 @@ class FilterTests(TestCase): def setUp(self): self._trace = Trace( np.array([5, 12, 15, 13, 15, 11, 7, 2, -4, -8, -10, -8, -13, -9, -11, -8, -5], - dtype=np.dtype("i1")), None, None) + dtype=np.dtype("i1")), None, None, None) def test_lowpass(self): result = filter_lowpass(self._trace, 100, 20) diff --git a/test/sca/test_match.py b/test/sca/test_match.py index b8663c7..549f143 100644 --- a/test/sca/test_match.py +++ b/test/sca/test_match.py @@ -10,21 +10,21 @@ class MatchingTests(TestCase): def test_simple_match(self): pattern = Trace(np.array([1, 15, 12, -10, 0, 13, 17, -1, 0], dtype=np.dtype("i1")), None, - None) + None, None) base = Trace(np.array( [0, 1, 3, 1, 2, -2, -3, 1, 15, 12, -10, 0, 13, 17, -1, 0, 3, 1], - dtype=np.dtype("i1")), None, None) + dtype=np.dtype("i1")), None, None, None) filtered = match_part(base, 7, 9) self.assertListEqual(filtered, [7]) plot(self, base=base, pattern=pad(pattern, (filtered[0], 0))) def test_multiple_match(self): pattern = Trace(np.array([1, 15, 12, -10, 0, 13, 17, -1, 0], dtype=np.dtype("i1")), None, - None) + None, None) base = Trace(np.array( [0, 1, 3, 1, 2, -2, -3, 1, 18, 10, -5, 0, 13, 17, -1, 0, 3, 1, 2, 5, 13, 8, -8, 1, 11, 15, 0, 1, 5, 2, 4], - dtype=np.dtype("i1")), None, None) + dtype=np.dtype("i1")), None, None, None) filtered = match_pattern(base, pattern, 0.9) self.assertListEqual(filtered, [7, 19]) plot(self, base=base, pattern1=pad(pattern, (filtered[0], 0)), pattern2=pad(pattern, (filtered[1], 0))) diff --git a/test/sca/test_process.py b/test/sca/test_process.py index f039a95..49d520f 100644 --- a/test/sca/test_process.py +++ b/test/sca/test_process.py @@ -7,7 +7,7 @@ from pyecsca.sca import Trace, absolute, invert, threshold, rolling_mean, offset class ProcessTests(TestCase): def setUp(self): - self._trace = Trace(np.array([30, -60, 145, 247], dtype=np.dtype("i2")), None, None) + self._trace = Trace(np.array([30, -60, 145, 247], dtype=np.dtype("i2")), None, None, None) def test_absolute(self): result = absolute(self._trace) diff --git a/test/sca/test_sampling.py b/test/sca/test_sampling.py index 40eaa7d..c08128c 100644 --- a/test/sca/test_sampling.py +++ b/test/sca/test_sampling.py @@ -29,7 +29,7 @@ class SamplingTests(TestCase): def test_downsample_decimate(self): trace = Trace(np.array([20, 30, 55, 18, 15, 10, 35, 24, 21, 15, 10, 8, -10, -5, -8, -12, -15, -18, -34, -21, -17, -10, -5, -12, -6, -2, - 4, 8, 21, 28], dtype=np.dtype("i1")), None, None) + 4, 8, 21, 28], dtype=np.dtype("i1")), None, None, None) result = downsample_decimate(trace, 2) self.assertIsNotNone(result) self.assertIsInstance(result, Trace) diff --git a/test/sca/test_test.py b/test/sca/test_test.py index 5277c20..f012f7f 100644 --- a/test/sca/test_test.py +++ b/test/sca/test_test.py @@ -8,19 +8,19 @@ from pyecsca.sca import Trace, welch_ttest, student_ttest, ks_test class TTestTests(TestCase): def setUp(self): - self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff") - self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff") - self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00") - self.d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00") + self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None) + self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None) + self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None) + self.d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00", None) def test_welch_ttest(self): self.assertIsNotNone(welch_ttest([self.a, self.b], [self.c, self.d])) a = Trace(np.array([19.8, 20.4, 19.6, 17.8, 18.5, 18.9, 18.3, 18.9, 19.5, 22.0]), None, - None) + None, None) b = Trace(np.array([28.2, 26.6, 20.1, 23.3, 25.2, 22.1, 17.7, 27.6, 20.6, 13.7]), None, - None) + None, None) c = Trace(np.array([20.2, 21.6, 27.1, 13.3, 24.2, 20.1, 11.7, 25.6, 26.6, 21.4]), None, - None) + None, None) result = welch_ttest([a, b], [b, c]) self.assertIsNotNone(result) @@ -35,8 +35,8 @@ class KolmogorovSmirnovTests(TestCase): def test_ks_test(self): self.assertIsNone(ks_test([], [])) - a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff") - b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff") - c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00") - d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00") + a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None) + b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None) + c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None) + d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00", None) self.assertIsNotNone(ks_test([a, b], [c, d])) diff --git a/test/sca/test_trace.py b/test/sca/test_trace.py index 3399c09..d09c4f5 100644 --- a/test/sca/test_trace.py +++ b/test/sca/test_trace.py @@ -6,7 +6,7 @@ from pyecsca.sca import Trace class TraceTests(TestCase): def test_basic(self): - trace = Trace(np.array([10, 15, 24], dtype=np.dtype("i1")), "Name", b"\xff\xaa") + trace = Trace(np.array([10, 15, 24], dtype=np.dtype("i1")), "Name", b"\xff\xaa", None) self.assertIsNotNone(trace) self.assertIn("Trace", str(trace)) self.assertIsNone(trace.trace_set) diff --git a/test/sca/test_traceset.py b/test/sca/test_traceset.py index 6d48707..a3ff526 100644 --- a/test/sca/test_traceset.py +++ b/test/sca/test_traceset.py @@ -1,8 +1,17 @@ import os.path +import shutil import tempfile from unittest import TestCase -from pyecsca.sca import TraceSet, InspectorTraceSet, ChipWhispererTraceSet +import numpy as np + +from pyecsca.sca import (TraceSet, InspectorTraceSet, ChipWhispererTraceSet, PickleTraceSet, + HDF5TraceSet, Trace) + +EXAMPLE_TRACES = [Trace(np.array([20, 40, 50, 50, 10], dtype=np.dtype("i1")), None, None), + Trace(np.array([1, 2, 3, 4, 5], dtype=np.dtype("i1")), None, None), + Trace(np.array([6, 7, 8, 9, 10], dtype=np.dtype("i1")), None, None)] +EXAMPLE_KWARGS = {"num_traces": 3, "thingy": "abc"} class TraceSetTests(TestCase): @@ -11,12 +20,14 @@ class TraceSetTests(TestCase): self.assertIsNotNone(TraceSet()) self.assertIsNotNone(InspectorTraceSet()) self.assertIsNotNone(ChipWhispererTraceSet()) + self.assertIsNotNone(PickleTraceSet()) + self.assertIsNotNone(HDF5TraceSet()) class InspectorTraceSetTests(TestCase): def test_load_fname(self): - result = InspectorTraceSet("test/data/example.trs") + result = InspectorTraceSet.read("test/data/example.trs") self.assertIsNotNone(result) self.assertEqual(result.global_title, "Example trace set") self.assertEqual(len(result), 10) @@ -27,36 +38,76 @@ class InspectorTraceSetTests(TestCase): def test_load_file(self): with open("test/data/example.trs", "rb") as f: - self.assertIsNotNone(InspectorTraceSet(f)) + self.assertIsNotNone(InspectorTraceSet.read(f)) def test_load_bytes(self): with open("test/data/example.trs", "rb") as f: - self.assertIsNotNone(InspectorTraceSet(f.read())) - - def test_get_bytes(self): - self.assertIsNotNone(bytes(InspectorTraceSet("test/data/example.trs"))) - - def test_keep_traces(self): - trace_set = InspectorTraceSet("test/data/example.trs") - self.assertIsNotNone(trace_set.raw) - trace_set = InspectorTraceSet("test/data/example.trs", keep_raw_traces=False) - self.assertIsNone(trace_set.raw) + self.assertIsNotNone(InspectorTraceSet.read(f.read())) def test_save(self): - trace_set = InspectorTraceSet("test/data/example.trs") + trace_set = InspectorTraceSet.read("test/data/example.trs") with tempfile.TemporaryDirectory() as dirname: path = os.path.join(dirname, "out.trs") - trace_set.save(path) + trace_set.write(path) self.assertTrue(os.path.exists(path)) - self.assertIsNotNone(InspectorTraceSet(path)) + self.assertIsNotNone(InspectorTraceSet.read(path)) - with self.assertRaises(ValueError): - trace_set.save(None) - -class ChipWhispererTraceSetTest(TestCase): +class ChipWhispererTraceSetTests(TestCase): def test_load_fname(self): - result = ChipWhispererTraceSet("test/data/", "chipwhisperer") + result = ChipWhispererTraceSet.read("test/data/config_chipwhisperer_.cfg") self.assertIsNotNone(result) self.assertEqual(len(result), 2) + + +class PickleTraceSetTests(TestCase): + + def test_load_fname(self): + result = PickleTraceSet.read("test/data/test.pickle") + self.assertIsNotNone(result) + + def test_load_file(self): + with open("test/data/test.pickle", "rb") as f: + self.assertIsNotNone(PickleTraceSet.read(f)) + + def test_save(self): + trace_set = PickleTraceSet(*EXAMPLE_TRACES, **EXAMPLE_KWARGS) + with tempfile.TemporaryDirectory() as dirname: + path = os.path.join(dirname, "out.pickle") + trace_set.write(path) + self.assertTrue(os.path.exists(path)) + self.assertIsNotNone(PickleTraceSet.read(path)) + + +class HDF5TraceSetTests(TestCase): + + def test_load_fname(self): + result = HDF5TraceSet.read("test/data/test.h5") + self.assertIsNotNone(result) + + def test_load_file(self): + with open("test/data/test.h5", "rb") as f: + self.assertIsNotNone(HDF5TraceSet.read(f)) + + def test_inplace(self): + with tempfile.TemporaryDirectory() as dirname: + path = os.path.join(dirname, "test.h5") + shutil.copy("test/data/test.h5", path) + trace_set = HDF5TraceSet.inplace(path) + self.assertIsNotNone(trace_set) + test_trace = Trace(np.array([6, 7], dtype=np.dtype("i1")), None, None, meta={"thing": "ring"}) + trace_set[0] = test_trace + trace_set.save() + trace_set.close() + + test_set = HDF5TraceSet.read(path) + self.assertEquals(test_set[0], test_trace) + + def test_save(self): + trace_set = HDF5TraceSet(*EXAMPLE_TRACES, **EXAMPLE_KWARGS) + with tempfile.TemporaryDirectory() as dirname: + path = os.path.join(dirname, "out.h5") + trace_set.write(path) + self.assertTrue(os.path.exists(path)) + self.assertIsNotNone(HDF5TraceSet.read(path)) |
