diff options
| -rw-r--r-- | .gitignore | 8 | ||||
| -rw-r--r-- | Makefile | 10 | ||||
| -rw-r--r-- | Pipfile | 20 | ||||
| -rw-r--r-- | README.md | 16 | ||||
| -rw-r--r-- | pyecsca/__init__.py | 10 | ||||
| -rw-r--r-- | pyecsca/align.py | 135 | ||||
| -rw-r--r-- | pyecsca/combine.py | 30 | ||||
| -rw-r--r-- | pyecsca/edit.py | 31 | ||||
| -rw-r--r-- | pyecsca/filter.py | 37 | ||||
| -rw-r--r-- | pyecsca/process.py | 59 | ||||
| -rw-r--r-- | pyecsca/sampling.py | 25 | ||||
| -rw-r--r-- | pyecsca/trace.py | 45 | ||||
| -rw-r--r-- | pyecsca/trace_set/__init__.py | 0 | ||||
| -rw-r--r-- | pyecsca/trace_set/base.py | 29 | ||||
| -rw-r--r-- | pyecsca/trace_set/inspector.py | 254 | ||||
| -rw-r--r-- | pyecsca/tvla.py | 24 | ||||
| -rw-r--r-- | test/__init__.py | 0 | ||||
| -rw-r--r-- | test/example.trs | bin | 0 -> 26250885 bytes | |||
| -rw-r--r-- | test/plots/.gitignore | 1 | ||||
| -rw-r--r-- | test/test_align.py | 75 | ||||
| -rw-r--r-- | test/test_combine.py | 35 | ||||
| -rw-r--r-- | test/test_edit.py | 28 | ||||
| -rw-r--r-- | test/test_filter.py | 35 | ||||
| -rw-r--r-- | test/test_process.py | 47 | ||||
| -rw-r--r-- | test/test_sampling.py | 37 | ||||
| -rw-r--r-- | test/test_trace.py | 12 | ||||
| -rw-r--r-- | test/test_traceset.py | 46 | ||||
| -rw-r--r-- | test/test_tvla.py | 25 | ||||
| -rw-r--r-- | test/utils.py | 24 | ||||
| -rw-r--r-- | unittest.cfg | 2 |
30 files changed, 1100 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2896e11 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/*.trs + +.coverage +.mypy_cache/ +/.pytest_cache/ +/htmlcov/ +/Pipfile.lock +__pycache__/
\ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..059abcf --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +test: + nose2 -A !slow -v + +test-plots: + env PYECSCA_TEST_PLOTS=1 nose2 -A !slow -v + +test-all: + nose2 -v + +.PHONY: test test-plots test-all
\ No newline at end of file @@ -0,0 +1,20 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] +nose2 = "*" +green = "*" +mypy = "*" + +[packages] +numpy = "*" +scipy = "*" +atpublic = "*" +matplotlib = "*" +cython = "*" +fastdtw = {path = "./../fastdtw"} + +[requires] +python_version = "3.7" diff --git a/README.md b/README.md new file mode 100644 index 0000000..169ce09 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +# pyecsca [pɪɛtska] + +**Py**thon **EC** **S**ide-**C**hannel **A**nalysis toolkit. + +## Requirements + + - Numpy + - Scipy + - matplotlib + - atpublic + - fastdtw + +### Testing + + - nose2 + - green
\ No newline at end of file diff --git a/pyecsca/__init__.py b/pyecsca/__init__.py new file mode 100644 index 0000000..89149e4 --- /dev/null +++ b/pyecsca/__init__.py @@ -0,0 +1,10 @@ +from .align import * +from .combine import * +from .edit import * +from .filter import * +from .process import * +from .sampling import * +from .tvla import * +from .trace import * +from .trace_set.base import * +from .trace_set.inspector import * diff --git a/pyecsca/align.py b/pyecsca/align.py new file mode 100644 index 0000000..3b04ad3 --- /dev/null +++ b/pyecsca/align.py @@ -0,0 +1,135 @@ +from typing import List, Callable, Tuple +from copy import copy, deepcopy +from public import public +import numpy as np +from fastdtw import fastdtw, dtw + +from .process import normalize +from .trace import Trace + + +def align_reference(reference: Trace, *traces: Trace, + align_func: Callable[[Trace], Tuple[bool, int]]) -> List[Trace]: + result = [deepcopy(reference)] + for trace in traces: + length = len(trace.samples) + include, offset = align_func(trace) + if not include: + continue + if offset == 0: + result_samples = trace.samples.copy() + else: + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + if offset > 0: + result_samples[:length - offset] = trace.samples[offset:] + else: + result_samples[-offset:] = trace.samples[:length + offset] + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result + + +@public +def align_correlation(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, + max_offset: int, min_correlation: float = 0.5) -> List[Trace]: + reference_centered = normalize(reference) + reference_part = reference_centered.samples[ + reference_offset:reference_offset + reference_length] + + def align_func(trace): + length = len(trace.samples) + correlation_start = max(reference_offset - max_offset, 0) + correlation_end = min(reference_offset + reference_length + max_offset, length - 1) + trace_part = trace.samples[correlation_start:correlation_end] + trace_part = (trace_part - np.mean(trace_part)) / (np.std(trace_part) * len(trace_part)) + correlation = np.correlate(trace_part, reference_part, "same") + max_correlation_offset = correlation.argmax(axis=0) + max_correlation = correlation[max_correlation_offset] + if max_correlation < min_correlation: + return False, 0 + left_space = min(max_offset, reference_offset) + shift = left_space + reference_length // 2 + return True, max_correlation_offset - shift + + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_peaks(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, max_offset: int) -> List[Trace]: + reference_part = reference.samples[reference_offset: reference_offset + reference_length] + reference_peak = np.argmax(reference_part) + + def align_func(trace): + length = len(trace.samples) + window_start = max(reference_offset - max_offset, 0) + window_end = min(reference_offset + reference_length + max_offset, length - 1) + window = trace.samples[window_start: window_end] + window_peak = np.argmax(window) + left_space = min(max_offset, reference_offset) + return True, int(window_peak - reference_peak - left_space) + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_sad(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, max_offset: int) -> List[Trace]: + reference_part = reference.samples[reference_offset: reference_offset + reference_length] + + def align_func(trace): + length = len(trace.samples) + best_sad = 0 + best_offset = 0 + for offset in range(-max_offset, max_offset): + start = reference_offset + offset + stop = start + reference_length + if start < 0 or stop >= length: + continue + trace_part = trace.samples[start:stop] + # todo: add other distance functions here + sad = np.sum(np.abs(reference_part - trace_part)) + if sad > best_sad: + best_sad = sad + best_offset = offset + return True, best_offset + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True) -> List[Trace]: + result = [deepcopy(reference)] + reference_samples = reference.samples + for trace in traces: + if fast: + dist, path = fastdtw(reference_samples, trace.samples, radius=radius) + else: + dist, path = dtw(reference_samples, trace.samples) + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + scale = np.ones(len(trace.samples), dtype=trace.samples.dtype) + for x, y in path: + result_samples[x] = trace.samples[y] + scale[x] += 1 + result_samples //= scale + del scale + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result + + +@public +def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True) -> List[Trace]: + result = [deepcopy(reference)] + reference_samples = reference.samples + for trace in traces: + if fast: + dist, path = fastdtw(reference_samples, trace.samples, radius=radius) + else: + dist, path = dtw(reference_samples, trace.samples) + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + pairs = np.array(np.array(path, dtype=np.dtype("int,int")), dtype=np.dtype([("x", "int"), ("y", "int")])) + result_samples[pairs["x"]] = trace.samples[pairs["y"]] + del pairs + # or manually: + #for x, y in path: + # result_samples[x] = trace.samples[y] + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result diff --git a/pyecsca/combine.py b/pyecsca/combine.py new file mode 100644 index 0000000..4b29f8c --- /dev/null +++ b/pyecsca/combine.py @@ -0,0 +1,30 @@ +import numpy as np +from public import public +from typing import Callable, Optional + +from .trace import Trace, CombinedTrace + + +@public +def average(*traces: Trace) -> Optional[CombinedTrace]: + if not traces: + return None + if len(traces) == 1: + return CombinedTrace(None, None, traces[0].samples.copy(), parents=traces) + dtype = traces[0].samples.dtype + result_samples = np.mean(np.array([trace.samples for trace in traces]), axis=0).astype(dtype) + return CombinedTrace(None, None, result_samples, parents=traces) + + +@public +def conditional_average(*traces: Trace, condition: Callable[[Trace], bool]) -> Optional[CombinedTrace]: + return average(*filter(condition, traces)) + + +@public +def standard_deviation(*traces: Trace) -> Optional[CombinedTrace]: + if not traces: + return None + dtype = traces[0].samples.dtype + result_samples = np.std(np.array([trace.samples for trace in traces]), axis=0).astype(dtype) + return CombinedTrace(None, None, result_samples, parents=traces) diff --git a/pyecsca/edit.py b/pyecsca/edit.py new file mode 100644 index 0000000..7fe364d --- /dev/null +++ b/pyecsca/edit.py @@ -0,0 +1,31 @@ +from copy import copy +from typing import Union, Tuple, Any +from public import public +import numpy as np + +from .trace import Trace + + +@public +def trim(trace: Trace, start: int = None, end: int = None) -> Trace: + if start is None: + start = 0 + if end is None: + end = len(trace.samples) + if start > end: + raise ValueError("Invalid trim arguments.") + return Trace(copy(trace.title), copy(trace.data), trace.samples[start:end].copy()) + + +@public +def reverse(trace: Trace) -> Trace: + return Trace(copy(trace.title), copy(trace.data), np.flipud(trace.samples)) + + +@public +def pad(trace: Trace, lengths: Union[Tuple[int, int], int], values: Union[Tuple[Any, Any], Any] = (0, 0)) -> Trace: + if not isinstance(lengths, tuple): + lengths = (lengths, lengths) + if not isinstance(values, tuple): + values = (values, values) + return Trace(copy(trace.title), copy(trace.data), np.pad(trace.samples, lengths, "constant", constant_values=values)) diff --git a/pyecsca/filter.py b/pyecsca/filter.py new file mode 100644 index 0000000..8a471aa --- /dev/null +++ b/pyecsca/filter.py @@ -0,0 +1,37 @@ +from public import public +from typing import Union, Tuple +from scipy.signal import butter, lfilter +from copy import copy + +from .trace import Trace + + +def filter_any(trace: Trace, sampling_frequency: int, cutoff: Union[int, Tuple[int, int]], type: str) -> Trace: + nyq = 0.5 * sampling_frequency + if not isinstance(cutoff, int): + normal_cutoff = tuple(map(lambda x: x / nyq, cutoff)) + else: + normal_cutoff = cutoff / nyq + b, a = butter(6, normal_cutoff, btype=type, analog=False) + result_samples = lfilter(b, a, trace.samples) + return Trace(copy(trace.title), copy(trace.data), result_samples) + + +@public +def filter_lowpass(trace: Trace, sampling_frequency: int, cutoff: int) -> Trace: + return filter_any(trace, sampling_frequency, cutoff, "lowpass") + + +@public +def filter_highpass(trace: Trace, sampling_frequency: int, cutoff: int) -> Trace: + return filter_any(trace, sampling_frequency, cutoff, "highpass") + + +@public +def filter_bandpass(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace: + return filter_any(trace, sampling_frequency, (low, high), "bandpass") + + +@public +def filter_bandstop(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace: + return filter_any(trace, sampling_frequency, (low, high), "bandstop") diff --git a/pyecsca/process.py b/pyecsca/process.py new file mode 100644 index 0000000..3ca43f2 --- /dev/null +++ b/pyecsca/process.py @@ -0,0 +1,59 @@ +from copy import copy +import numpy as np +from public import public + +from .trace import Trace + + +@public +def absolute(trace: Trace) -> Trace: + return Trace(copy(trace.title), copy(trace.data), np.absolute(trace.samples)) + + +@public +def invert(trace: Trace) -> Trace: + return Trace(copy(trace.title), copy(trace.data), np.negative(trace.samples)) + + +@public +def threshold(trace: Trace, value) -> Trace: + result_samples = trace.samples.copy() + result_samples[result_samples <= value] = 0 + result_samples[np.nonzero(result_samples)] = 1 + return Trace(copy(trace.title), copy(trace.data), result_samples) + + +def rolling_window(samples: np.ndarray, window: int) -> np.ndarray: + shape = samples.shape[:-1] + (samples.shape[-1] - window + 1, window) + strides = samples.strides + (samples.strides[-1],) + return np.lib.stride_tricks.as_strided(samples, shape=shape, strides=strides) + + +@public +def rolling_mean(trace: Trace, window: int) -> Trace: + return Trace(copy(trace.title), copy(trace.data), np.mean(rolling_window(trace.samples, window), -1).astype(dtype=trace.samples.dtype)) + + +@public +def offset(trace: Trace, offset) -> Trace: + return Trace(copy(trace.title), copy(trace.data), trace.samples + offset) + + +def root_mean_square(trace: Trace): + return np.sqrt(np.mean(np.square(trace.samples))) + + +@public +def recenter(trace: Trace) -> Trace: + around = root_mean_square(trace) + return offset(trace, -around) + + +@public +def normalize(trace: Trace) -> Trace: + return Trace(copy(trace.title), copy(trace.data), (trace.samples - np.mean(trace.samples)) / np.std(trace.samples)) + + +@public +def normalize_wl(trace: Trace) -> Trace: + return Trace(copy(trace.title), copy(trace.data), (trace.samples - np.mean(trace.samples)) / (np.std(trace.samples) * len(trace.samples))) diff --git a/pyecsca/sampling.py b/pyecsca/sampling.py new file mode 100644 index 0000000..73e34b5 --- /dev/null +++ b/pyecsca/sampling.py @@ -0,0 +1,25 @@ +from copy import copy +import numpy as np +from scipy.signal import decimate +from public import public + +from .trace import Trace + + +@public +def downsample_average(trace: Trace, factor: int = 2) -> Trace: + resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor)) + result_samples = resized.reshape(-1, factor).mean(axis=1).astype(trace.samples.dtype) + return Trace(copy(trace.title), copy(trace.data), result_samples) + + +@public +def downsample_pick(trace: Trace, factor: int = 2, offset: int = 0) -> Trace: + result_samples = trace.samples[offset::factor].copy() + return Trace(copy(trace.title), copy(trace.data), result_samples) + + +@public +def downsample_decimate(trace: Trace, factor: int = 2) -> Trace: + result_samples = decimate(trace.samples, factor) + return Trace(copy(trace.title), copy(trace.data), result_samples) diff --git a/pyecsca/trace.py b/pyecsca/trace.py new file mode 100644 index 0000000..56e39d3 --- /dev/null +++ b/pyecsca/trace.py @@ -0,0 +1,45 @@ +import weakref +from numpy import ndarray +from typing import Optional + + +class Trace(object): + + def __init__(self, title: Optional[str], data: Optional[bytes], + samples: ndarray, trace_set=None): + self.title = title + self.data = data + self.samples = samples + self.trace_set = trace_set + + @property + def trace_set(self): + if self._trace_set is None: + return None + return self._trace_set() + + @trace_set.setter + def trace_set(self, trace_set): + if trace_set is None: + self._trace_set = None + else: + self._trace_set = weakref.ref(trace_set) + + def __repr__(self): + return "Trace(title={!r}, data={!r}, samples={!r}, trace_set={!r})".format( + self.title, self.data, self.samples, self.trace_set) + + +class CombinedTrace(Trace): + + def __init__(self, title: Optional[str], data: Optional[bytes], + samples: ndarray, trace_set=None, parents=None): + super().__init__(title, data, samples, trace_set=trace_set) + self.parents = None + if parents is not None: + self.parents = weakref.WeakSet(parents) + + def __repr__(self): + return "CombinedTrace(title={!r}, data={!r}, samples={!r}, trace_set={!r}, parents={})".format( + self.title, self.data, self.samples, self.trace_set, + self.parents) diff --git a/pyecsca/trace_set/__init__.py b/pyecsca/trace_set/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pyecsca/trace_set/__init__.py diff --git a/pyecsca/trace_set/base.py b/pyecsca/trace_set/base.py new file mode 100644 index 0000000..1984b88 --- /dev/null +++ b/pyecsca/trace_set/base.py @@ -0,0 +1,29 @@ +from public import public +from typing import List + +from ..trace import Trace + + +@public +class TraceSet(object): + _traces: List = [] + _keys: List = [] + + def __init__(self, *traces: Trace, **kwargs): + self._traces = list(traces) + self.__dict__.update(kwargs) + self._keys = list(kwargs.keys()) + + def __len__(self): + return len(self._traces) + + def __getitem__(self, index) -> Trace: + return self._traces[index] + + def __iter__(self): + yield from self._traces + + def __repr__(self): + args = ", ".join(["{}={!r}".format(key, getattr(self, key)) for key in + self._keys]) + return "TraceSet({})".format(args) diff --git a/pyecsca/trace_set/inspector.py b/pyecsca/trace_set/inspector.py new file mode 100644 index 0000000..fdf9552 --- /dev/null +++ b/pyecsca/trace_set/inspector.py @@ -0,0 +1,254 @@ +import numpy as np +import struct +from enum import IntEnum +from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation +from pathlib import Path +from public import public + +from .base import TraceSet +from ..trace import Trace + + +@public +class SampleCoding(IntEnum): + Int8 = 0x01 + Int16 = 0x02 + Int32 = 0x04 + Float8 = 0x11 + Float16 = 0x12 + Float32 = 0x14 + + def dtype(self): + char = "f" if self.value & 0x10 else "i" + return np.dtype("<{}{}".format(char, self.value & 0x0f)) + + +@public +class Parsers(object): + @staticmethod + def read_int(bytes): + return int.from_bytes(bytes, byteorder="little") + + @staticmethod + def read_bool(bytes): + return Parsers.read_int(bytes) == 1 + + @staticmethod + def read_float(bytes): + return struct.unpack("<f", bytes)[0] + + @staticmethod + def read_str(bytes): + return bytes.decode("ascii") + + @staticmethod + def write_int(i, length=1): + return int.to_bytes(i, length=length, byteorder="little") + + @staticmethod + def write_bool(b, length=1): + return Parsers.write_int(b, length=length) + + @staticmethod + def write_float(f, length=None): + return struct.pack("<{}".format("e" if length == 2 else "f"), f) + + @staticmethod + def write_str(s, length=None): + return s.encode("ascii") + + +@public +class InspectorTraceSet(TraceSet): + num_traces: int + num_samples: int + sample_coding: SampleCoding + data_space: int = 0 + title_space: int = 0 + + global_title: str = "title" + description: str = None + + x_offset: int = 0 + x_label: str = None + y_label: str = None + x_scale: float = 1 + y_scale: float = 1 + + trace_offset: int = 0 + log_scale: int = 0 + + scope_range: float = 0 + scope_coupling: int = 0 + scope_offset: float = 0 + scope_impedance: float = 0 + scope_id: str = None + + filter_type: int = 0 + filter_frequency: float = 0 + filter_range: float = 0 + + external_clock: bool = False + external_clock_threshold: float = 0 + external_clock_multiplier: int = 0 + external_clock_phase_shift: int = 0 + external_clock_resampler_mask: int = 0 + external_clock_resampler_enabled: bool = False + external_clock_frequencty: float = 0 + external_clock_time_base: int = 0 + + _raw_traces = None + _tag_parsers: dict = { + 0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int), + 0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int), + 0x43: ("sample_coding", 1, + lambda bytes: SampleCoding(Parsers.read_int(bytes)), + lambda coding, length: Parsers.write_int(coding.value, + length=length)), + 0x44: ("data_space", 2, Parsers.read_int, Parsers.write_int), + 0x45: ("title_space", 1, Parsers.read_int, Parsers.write_int), + 0x46: ("global_title", None, Parsers.read_str, Parsers.write_str), + 0x47: ("description", None, Parsers.read_str, Parsers.write_str), + 0x48: ("x_offset", None, Parsers.read_int, Parsers.write_int), + 0x49: ("x_label", None, Parsers.read_str, Parsers.write_str), + 0x4a: ("y_label", None, Parsers.read_str, Parsers.write_str), + 0x4b: ("x_scale", 4, Parsers.read_float, Parsers.write_float), + 0x4c: ("y_scale", 4, Parsers.read_float, Parsers.write_float), + 0x4d: ("trace_offset", 4, Parsers.read_int, Parsers.write_int), + 0x4e: ("log_scale", 1, Parsers.read_int, Parsers.write_int), + 0x55: ("scope_range", 4, Parsers.read_float, Parsers.write_float), + 0x56: ("scope_coupling", 4, Parsers.read_int, Parsers.write_int), + 0x57: ("scope_offset", 4, Parsers.read_float, Parsers.write_float), + 0x58: ("scope_impedance", 4, Parsers.read_float, Parsers.write_float), + 0x59: ("scope_id", None, Parsers.read_str, Parsers.write_str), + 0x5a: ("filter_type", 4, Parsers.read_int, Parsers.write_int), + 0x5b: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float), + 0x5c: ("filter_range", 4, Parsers.read_float, Parsers.read_float), + 0x60: ("external_clock", 1, Parsers.read_bool, Parsers.write_bool), + 0x61: ("external_clock_threshold", 4, Parsers.read_float, Parsers.write_float), + 0x62: ("external_clock_multiplier", 4, Parsers.read_int, Parsers.write_int), + 0x63: ("external_clock_phase_shift", 4, Parsers.read_int, Parsers.write_int), + 0x64: ("external_clock_resampler_mask", 4, Parsers.read_int, Parsers.write_int), + 0x65: ("external_clock_resampler_enabled", 1, Parsers.read_bool, Parsers.write_bool), + 0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float), + 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int) + } + _set_tags: set = set() + + def __init__(self, input=None, keep_raw_traces=True): + traces = None + if isinstance(input, bytes): + with BytesIO(input) as f: + traces = self._read(f) + elif isinstance(input, (Path, str)): + with open(input, "rb") as f: + traces = self._read(f) + elif isinstance(input, (RawIOBase, BufferedIOBase)): + traces = self._read(input) + elif input is not None: + raise ValueError( + "Cannot parse data, unknown input: {}".format(input)) + if traces is not None: + super().__init__(*self._scale(traces)) + else: + super().__init__() + if keep_raw_traces: + self._raw_traces = traces + else: + del traces + + def _read(self, file): + self._set_tags = set() + while True: + tag = ord(file.read(1)) + length = ord(file.read(1)) + if length & 0x80: + length = Parsers.read_int(file.read(length & 0x7f)) + value = file.read(length) + if tag in InspectorTraceSet._tag_parsers: + tag_name, tag_len, tag_reader, _ = \ + InspectorTraceSet._tag_parsers[tag] + if tag_len is None or length == tag_len: + setattr(self, tag_name, tag_reader(value)) + self._set_tags.add(tag) + elif tag == 0x5f and length == 0: + break + else: + continue + result = [] + for _ in range(self.num_traces): + title = None if self.title_space == 0 else Parsers.read_str( + file.read(self.title_space)) + data = None if self.data_space == 0 else file.read(self.data_space) + dtype = self.sample_coding.dtype() + try: + samples = np.fromfile(file, dtype, self.num_samples) + except UnsupportedOperation: + samples = np.frombuffer( + file.read(dtype.itemsize * self.num_samples), dtype, + self.num_samples) + result.append(Trace(title, data, samples, trace_set=self)) + return result + + def _write(self, file): + for set_tag in self._set_tags: + tag_name, tag_len, _, tag_writer = InspectorTraceSet._tag_parsers[ + set_tag] + tag_byte = Parsers.write_int(set_tag, length=1) + value_bytes = tag_writer(getattr(self, tag_name), tag_len) + length = len(value_bytes) + if length <= 0x7f: + length_bytes = Parsers.write_int(length, length=1) + else: + length_data = Parsers.write_int(length, length=(length.bit_length() + 7) // 8) + length_bytes = Parsers.write_int( + 0x80 | len(length_data)) + length_data + file.write(tag_byte) + file.write(length_bytes) + file.write(value_bytes) + file.write(b"\x5f\x00") + + for trace in self._raw_traces: + if self.title_space != 0 and trace.title is not None: + file.write(Parsers.write_str(trace.title)) + if self.data_space != 0 and trace.data is not None: + file.write(trace.data) + try: + trace.samples.tofile(file) + except UnsupportedOperation: + file.write(trace.samples.tobytes()) + + def _scale(self, traces): + return list(map(lambda trace: Trace(trace.title, trace.data, + trace.samples.astype("f4") * self.y_scale, trace_set=self), + traces)) + + def save(self, output): + if isinstance(output, (Path, str)): + with open(output, "wb") as f: + self._write(f) + elif isinstance(output, (RawIOBase, BufferedIOBase)): + self._write(output) + else: + raise ValueError("Cannot save data, unknown output: {}".format(output)) + + def __bytes__(self): + with BytesIO() as b: + self.save(b) + return b.getvalue() + + @property + def raw(self): + if self._raw_traces is None: + return None + return list(self._raw_traces) + + @property + def sampling_frequency(self): + return int(1/self.x_scale) + + def __repr__(self): + args = ", ".join(["{}={!r}".format(self._tag_parsers[set_tag][0], + getattr(self, self._tag_parsers[set_tag][0])) + for set_tag in self._set_tags]) + return "InspectorTraceSet({})".format(args) diff --git a/pyecsca/tvla.py b/pyecsca/tvla.py new file mode 100644 index 0000000..0a80ff9 --- /dev/null +++ b/pyecsca/tvla.py @@ -0,0 +1,24 @@ +from public import public +from scipy.stats import ttest_ind +import numpy as np +from typing import Sequence, Optional + +from .trace import Trace, CombinedTrace + + +def ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], + equal_var: bool) -> Optional[CombinedTrace]: + if not first_set or not second_set or len(first_set) == 0 or len(second_set) == 0: + return None + first_stack = np.stack([first.samples for first in first_set]) + second_stack = np.stack([second.samples for second in second_set]) + result = ttest_ind(first_stack, second_stack, axis=0, equal_var=equal_var) + return CombinedTrace(None, None, result[0], parents=[*first_set, *second_set]) + +@public +def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> CombinedTrace: + return ttest(first_set, second_set, False) + +@public +def student_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> CombinedTrace: + return ttest(first_set, second_set, True) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/__init__.py diff --git a/test/example.trs b/test/example.trs Binary files differnew file mode 100644 index 0000000..a432e69 --- /dev/null +++ b/test/example.trs diff --git a/test/plots/.gitignore b/test/plots/.gitignore new file mode 100644 index 0000000..aab52d9 --- /dev/null +++ b/test/plots/.gitignore @@ -0,0 +1 @@ +*.png
\ No newline at end of file diff --git a/test/test_align.py b/test/test_align.py new file mode 100644 index 0000000..ae7baba --- /dev/null +++ b/test/test_align.py @@ -0,0 +1,75 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import align_correlation, align_peaks, align_sad, align_dtw_scale,\ + align_dtw, Trace, InspectorTraceSet +from .utils import slow, plot + + +class AlignTests(TestCase): + + def test_align(self): + first_arr = np.array([10, 64, 120, 64, 10, 10, 10, 10, 10], dtype=np.dtype("i1")) + second_arr = np.array([10, 10, 10, 10, 50, 80, 50, 20, 10], dtype=np.dtype("i1")) + third_arr = np.array([70, 30, 42, 35, 28, 21, 15, 10, 5], dtype=np.dtype("i1")) + a = Trace(None, None, first_arr) + b = Trace(None, None, second_arr) + c = Trace(None, None, third_arr) + result = align_correlation(a, b, c, reference_offset=1, reference_length=3, max_offset=4, min_correlation=0.65) + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + np.testing.assert_equal(result[0].samples, first_arr) + np.testing.assert_equal(result[1].samples, np.array([10, 50, 80, 50, 20, 10, 0, 0, 0], dtype=np.dtype("i1"))) + + @slow + def test_large_align(self): + example = InspectorTraceSet("test/example.trs") + result = align_correlation(*example, reference_offset=100000, reference_length=20000, max_offset=15000) + self.assertIsNotNone(result) + + @slow + def test_large_dtw_align(self): + example = InspectorTraceSet("test/example.trs") + result = align_dtw(*example[:5]) + self.assertIsNotNone(result) + + def test_peak_align(self): + first_arr = np.array([10, 64, 14, 120, 15, 30, 10, 15, 20, 15, 15, 10, 10], dtype=np.dtype("i1")) + second_arr = np.array([10, 10, 10, 10, 90, 40, 50, 20, 10, 17, 16, 10, 10], dtype=np.dtype("i1")) + a = Trace(None, None, first_arr) + b = Trace(None, None, second_arr) + result = align_peaks(a, b, reference_offset=2, reference_length=5, max_offset=3) + self.assertEqual(np.argmax(result[0].samples), np.argmax(result[1].samples)) + + def test_sad_align(self): + first_arr = np.array([10, 64, 14, 120, 15, 30, 10, 15, 20, 15, 15, 10, 10], dtype=np.dtype("i1")) + second_arr = np.array([10, 10, 10, 10, 90, 40, 50, 20, 10, 17, 16, 10, 10], dtype=np.dtype("i1")) + a = Trace(None, None, first_arr) + b = Trace(None, None, second_arr) + align_sad(a, b, reference_offset=2, reference_length=5, max_offset=3) + + def test_dtw_align_scale(self): + first_arr = np.array([10, 64, 14, 120, 15, 30, 10, 15, 20, 15, 15, 10, 10, 8, 10, 12, 10, 13, 9], dtype=np.dtype("i1")) + second_arr = np.array([10, 10, 10, 10, 60, 40, 90, 20, 10, 17, 16, 10, 10, 10, 10, 10, 17, 12, 10], dtype=np.dtype("i1")) + third_arr = np.array([10, 30, 20, 21, 15, 8, 10, 37, 21, 77, 20, 28, 25, 10, 9, 10, 15, 9, 10], dtype=np.dtype("i1")) + a = Trace(None, None, first_arr) + b = Trace(None, None, second_arr) + c = Trace(None, None, third_arr) + result = align_dtw_scale(a, b, c) + + self.assertEqual(np.argmax(result[0].samples), np.argmax(result[1].samples)) + self.assertEqual(np.argmax(result[1].samples), np.argmax(result[2].samples)) + plot(self, *result) + + def test_dtw_align(self): + first_arr = np.array([10, 64, 14, 120, 15, 30, 10, 15, 20, 15, 15, 10, 10, 8, 10, 12, 10, 13, 9], dtype=np.dtype("i1")) + second_arr = np.array([10, 10, 10, 10, 60, 40, 90, 20, 10, 17, 16, 10, 10, 10, 10, 10, 17, 12, 10], dtype=np.dtype("i1")) + third_arr = np.array([10, 30, 20, 21, 15, 8, 10, 47, 21, 77, 20, 28, 25, 10, 9, 10, 15, 9, 10], dtype=np.dtype("i1")) + a = Trace(None, None, first_arr) + b = Trace(None, None, second_arr) + c = Trace(None, None, third_arr) + result = align_dtw(a, b, c) + + self.assertEqual(np.argmax(result[0].samples), np.argmax(result[1].samples)) + self.assertEqual(np.argmax(result[1].samples), np.argmax(result[2].samples)) + plot(self, *result) diff --git a/test/test_combine.py b/test/test_combine.py new file mode 100644 index 0000000..ca23197 --- /dev/null +++ b/test/test_combine.py @@ -0,0 +1,35 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import Trace, CombinedTrace, average, conditional_average, standard_deviation + + +class CombineTests(TestCase): + + def setUp(self): + self.a = Trace(None, b"\xff", np.array([20, 80], dtype=np.dtype("i1"))) + self.b = Trace(None, b"\xff", np.array([30, 42], dtype=np.dtype("i1"))) + self.c = Trace(None, b"\x00", np.array([78, 56], dtype=np.dtype("i1"))) + + def test_average(self): + self.assertIsNone(average()) + result = average(self.a, self.b) + self.assertIsNotNone(result) + self.assertIsInstance(result, CombinedTrace) + self.assertEqual(len(result.samples), 2) + self.assertEqual(result.samples[0], 25) + self.assertEqual(result.samples[1], 61) + + def test_conditional_average(self): + result = conditional_average(self.a, self.b, self.c, + condition=lambda trace: trace.data[0] == 0xff) + self.assertIsInstance(result, CombinedTrace) + self.assertEqual(len(result.samples), 2) + self.assertEqual(result.samples[0], 25) + self.assertEqual(result.samples[1], 61) + + def test_standard_deviation(self): + self.assertIsNone(standard_deviation()) + result = standard_deviation(self.a, self.b) + self.assertIsInstance(result, CombinedTrace) + self.assertEqual(len(result.samples), 2) diff --git a/test/test_edit.py b/test/test_edit.py new file mode 100644 index 0000000..b1c2e39 --- /dev/null +++ b/test/test_edit.py @@ -0,0 +1,28 @@ +from unittest import TestCase + +import numpy as np + +from pyecsca import Trace, trim, reverse, pad + + +class EditTests(TestCase): + + def setUp(self): + self._trace = Trace(None, None, np.array([10, 20, 30, 40, 50], dtype=np.dtype("i1"))) + + def test_trim(self): + result = trim(self._trace, 2) + self.assertIsNotNone(result) + np.testing.assert_equal(result.samples, np.array([30, 40, 50], dtype=np.dtype("i1"))) + + def test_reverse(self): + result = reverse(self._trace) + self.assertIsNotNone(result) + np.testing.assert_equal(result.samples, + np.array([50, 40, 30, 20, 10], dtype=np.dtype("i1"))) + + def test_pad(self): + result = pad(self._trace, 2) + self.assertIsNotNone(result) + np.testing.assert_equal(result.samples, + np.array([0, 0, 10, 20, 30, 40, 50, 0, 0], dtype=np.dtype("i1"))) diff --git a/test/test_filter.py b/test/test_filter.py new file mode 100644 index 0000000..a454d98 --- /dev/null +++ b/test/test_filter.py @@ -0,0 +1,35 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import Trace, filter_lowpass, filter_highpass, filter_bandpass, filter_bandstop +from .utils import plot + + +class FilterTests(TestCase): + + def setUp(self): + self._trace = Trace(None, None, np.array([5, 12, 15, 13, 15, 11, 7, 2, -4, -8, -10, -8, -13, -9, -11, -8, -5], dtype=np.dtype("i1"))) + + def test_lowpass(self): + result = filter_lowpass(self._trace, 100, 20) + self.assertIsNotNone(result) + self.assertEqual(len(self._trace.samples), len(result.samples)) + plot(self, self._trace, result) + + def test_highpass(self): + result = filter_highpass(self._trace, 128, 20) + self.assertIsNotNone(result) + self.assertEqual(len(self._trace.samples), len(result.samples)) + plot(self, self._trace, result) + + def test_bandpass(self): + result = filter_bandpass(self._trace, 128, 20, 60) + self.assertIsNotNone(result) + self.assertEqual(len(self._trace.samples), len(result.samples)) + plot(self, self._trace, result) + + def test_bandstop(self): + result = filter_bandstop(self._trace, 128, 20, 60) + self.assertIsNotNone(result) + self.assertEqual(len(self._trace.samples), len(result.samples)) + plot(self, self._trace, result) diff --git a/test/test_process.py b/test/test_process.py new file mode 100644 index 0000000..130d51f --- /dev/null +++ b/test/test_process.py @@ -0,0 +1,47 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import Trace, absolute, invert, threshold, rolling_mean, offset, recenter + + +class ProcessTests(TestCase): + + def setUp(self): + self._trace = Trace(None, None, np.array([30, -60, 145, 247], dtype=np.dtype("i2"))) + + def test_absolute(self): + result = absolute(self._trace) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(result.samples[1], 60) + + def test_invert(self): + result = invert(self._trace) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + np.testing.assert_equal(result.samples, [-30, 60, -145, -247]) + + def test_threshold(self): + result = threshold(self._trace, 128) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(result.samples[0], 0) + self.assertEqual(result.samples[2], 1) + + def test_rolling_mean(self): + result = rolling_mean(self._trace, 2) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(len(result.samples), 3) + self.assertEqual(result.samples[0], -15) + self.assertEqual(result.samples[1], 42) + self.assertEqual(result.samples[2], 196) + + def test_offset(self): + result = offset(self._trace, 5) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + np.testing.assert_equal(result.samples, np.array([35, -55, 150, 252], dtype=np.dtype("i2"))) + + def test_recenter(self): + self.assertIsNotNone(recenter(self._trace)) diff --git a/test/test_sampling.py b/test/test_sampling.py new file mode 100644 index 0000000..ed8eaf2 --- /dev/null +++ b/test/test_sampling.py @@ -0,0 +1,37 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import Trace, downsample_average, downsample_pick, downsample_decimate +from .utils import plot + + +class SamplingTests(TestCase): + + def setUp(self): + self._trace = Trace(None, None, np.array([20, 40, 50, 50, 10], dtype=np.dtype("i1"))) + + def test_downsample_average(self): + result = downsample_average(self._trace, 2) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(len(result.samples), 2) + self.assertEqual(result.samples[0], 30) + self.assertEqual(result.samples[1], 50) + + def test_downsample_pick(self): + result = downsample_pick(self._trace, 2) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(len(result.samples), 3) + self.assertEqual(result.samples[0], 20) + self.assertEqual(result.samples[1], 50) + + def test_downsample_decimate(self): + trace = Trace(None, None, np.array([20, 30, 55, 18, 15, 10, 35, 24, 21, 15, 10, 8, -10, -5, + -8, -12, -15, -18, -34, -21, -17, -10, -5, -12, -6, -2, + 4, 8, 21, 28], dtype=np.dtype("i1"))) + result = downsample_decimate(trace, 2) + self.assertIsNotNone(result) + self.assertIsInstance(result, Trace) + self.assertEqual(len(result.samples), 15) + plot(self, trace, result) diff --git a/test/test_trace.py b/test/test_trace.py new file mode 100644 index 0000000..9c144f6 --- /dev/null +++ b/test/test_trace.py @@ -0,0 +1,12 @@ +from unittest import TestCase +import numpy as np +from pyecsca import Trace + + +class TraceTests(TestCase): + + def test_basic(self): + trace = Trace("Name", b"\xff\xaa", np.array([10, 15, 24], dtype=np.dtype("i1"))) + self.assertIsNotNone(trace) + self.assertIn("Trace", str(trace)) + self.assertIsNone(trace.trace_set) diff --git a/test/test_traceset.py b/test/test_traceset.py new file mode 100644 index 0000000..547c4c2 --- /dev/null +++ b/test/test_traceset.py @@ -0,0 +1,46 @@ +from unittest import TestCase +import os.path +import tempfile + +from pyecsca import TraceSet, InspectorTraceSet + + +class TraceSetTests(TestCase): + + def test_create(self): + self.assertIsNotNone(TraceSet()) + self.assertIsNotNone(InspectorTraceSet()) + + def test_load_fname(self): + result = InspectorTraceSet("test/example.trs") + self.assertIsNotNone(result) + self.assertEqual(result.global_title, "Example trace set") + self.assertEqual(len(result), 10) + self.assertIn("InspectorTraceSet", str(result)) + self.assertIs(result[0].trace_set, result) + self.assertEqual(result.sampling_frequency, 12500000) + + def test_load_file(self): + with open("test/example.trs", "rb") as f: + self.assertIsNotNone(InspectorTraceSet(f)) + + def test_load_bytes(self): + with open("test/example.trs", "rb") as f: + self.assertIsNotNone(InspectorTraceSet(f.read())) + + def test_get_bytes(self): + self.assertIsNotNone(bytes(InspectorTraceSet("test/example.trs"))) + + def test_keep_traces(self): + trace_set = InspectorTraceSet("test/example.trs") + self.assertIsNotNone(trace_set.raw) + trace_set = InspectorTraceSet("test/example.trs", keep_raw_traces=False) + self.assertIsNone(trace_set.raw) + + def test_save(self): + trace_set = InspectorTraceSet("test/example.trs") + with tempfile.TemporaryDirectory() as dirname: + path = os.path.join(dirname, "out.trs") + trace_set.save(path) + self.assertTrue(os.path.exists(path)) + self.assertIsNotNone(InspectorTraceSet(path)) diff --git a/test/test_tvla.py b/test/test_tvla.py new file mode 100644 index 0000000..4ff1359 --- /dev/null +++ b/test/test_tvla.py @@ -0,0 +1,25 @@ +from unittest import TestCase + +import numpy as np +from pyecsca import Trace, welch_ttest, student_ttest + + +class TvlaTests(TestCase): + + def setUp(self): + self.a = Trace(None, b"\xff", np.array([20, 80], dtype=np.dtype("i1"))) + self.b = Trace(None, b"\xff", np.array([30, 42], dtype=np.dtype("i1"))) + self.c = Trace(None, b"\x00", np.array([78, 56], dtype=np.dtype("i1"))) + self.d = Trace(None, b"\x00", np.array([98, 36], dtype=np.dtype("i1"))) + + def test_welch_ttest(self): + self.assertIsNotNone(welch_ttest([self.a, self.b], [self.c, self.d])) + a = Trace(None, None, np.array([19.8, 20.4, 19.6, 17.8, 18.5, 18.9, 18.3, 18.9, 19.5, 22.0])) + b = Trace(None, None, np.array([28.2, 26.6, 20.1, 23.3, 25.2, 22.1, 17.7, 27.6, 20.6, 13.7])) + c = Trace(None, None, np.array([20.2, 21.6, 27.1, 13.3, 24.2, 20.1, 11.7, 25.6, 26.6, 21.4])) + + result = welch_ttest([a, b], [b, c]) + self.assertIsNotNone(result) + + def test_students_ttest(self): + self.assertIsNotNone(student_ttest([self.a, self.b], [self.c, self.d])) diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..e7d5819 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,24 @@ +import matplotlib.pyplot as plt +from unittest import TestCase +from pyecsca import Trace +from os.path import join, exists +from os import mkdir, getenv + + +def slow(func): + func.slow = 1 + return func + + +def plot(case: TestCase, *traces: Trace): + if getenv("PYECSCA_TEST_PLOTS") is None: + return + fig = plt.figure() + ax = fig.add_subplot(111) + for i, trace in enumerate(traces): + ax.plot(trace.samples, label=str(i)) + ax.legend(loc="best") + directory = join("test", "plots") + if not exists(directory): + mkdir(directory) + plt.savefig(join(directory, case.id() + ".png")) diff --git a/unittest.cfg b/unittest.cfg new file mode 100644 index 0000000..391556e --- /dev/null +++ b/unittest.cfg @@ -0,0 +1,2 @@ +[unittest] +plugins = nose2.plugins.attrib
\ No newline at end of file |
