diff options
Diffstat (limited to 'pyecsca/sca/align.py')
| -rw-r--r-- | pyecsca/sca/align.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/pyecsca/sca/align.py b/pyecsca/sca/align.py new file mode 100644 index 0000000..ff586ab --- /dev/null +++ b/pyecsca/sca/align.py @@ -0,0 +1,236 @@ +""" +This module provides functions for aligning traces in a trace set to a reference trace within it. +""" +import numpy as np +from copy import copy, deepcopy +from fastdtw import fastdtw, dtw +from public import public +from typing import List, Callable, Tuple + +from .process import normalize +from .trace import Trace + + +def align_reference(reference: Trace, *traces: Trace, + align_func: Callable[[Trace], Tuple[bool, int]]) -> List[Trace]: + result = [deepcopy(reference)] + for trace in traces: + length = len(trace.samples) + include, offset = align_func(trace) + if not include: + continue + if offset == 0: + result_samples = trace.samples.copy() + else: + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + if offset > 0: + result_samples[:length - offset] = trace.samples[offset:] + else: + result_samples[-offset:] = trace.samples[:length + offset] + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result + + +@public +def align_correlation(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, + max_offset: int, min_correlation: float = 0.5) -> List[Trace]: + """ + Align `traces` to the reference `trace`. Using the cross-correlation of a part of the reference + trace starting at `reference_offset` with `reference_length` and try to match it to a part of + the trace being matched that is at most `max_offset` mis-aligned from the reference, pick the + alignment offset with the largest cross-correlation. If the maximum cross-correlation of the + trace parts being matched is below `min_correlation`, do not include the trace. + + :param reference: + :param traces: + :param reference_offset: + :param reference_length: + :param max_offset: + :param min_correlation: + :return: + """ + reference_centered = normalize(reference) + reference_part = reference_centered.samples[ + reference_offset:reference_offset + reference_length] + + def align_func(trace): + length = len(trace.samples) + correlation_start = max(reference_offset - max_offset, 0) + correlation_end = min(reference_offset + reference_length + max_offset, length - 1) + trace_part = trace.samples[correlation_start:correlation_end] + trace_part = (trace_part - np.mean(trace_part)) / (np.std(trace_part) * len(trace_part)) + correlation = np.correlate(trace_part, reference_part, "same") + max_correlation_offset = correlation.argmax(axis=0) + max_correlation = correlation[max_correlation_offset] + del trace_part + if max_correlation < min_correlation: + return False, 0 + left_space = min(max_offset, reference_offset) + shift = left_space + reference_length // 2 + return True, max_correlation_offset - shift + + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_peaks(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, max_offset: int) -> List[Trace]: + """ + Align `traces` to the reference `trace` so that the maximum value within the reference trace + window from `reference_offset` of `reference_length` aligns with the maximum + value of the trace being aligned within `max_offset` of the reference window. + + :param reference: + :param traces: + :param reference_offset: + :param reference_length: + :param max_offset: + :return: + """ + reference_part = reference.samples[reference_offset: reference_offset + reference_length] + reference_peak = np.argmax(reference_part) + + def align_func(trace): + length = len(trace.samples) + window_start = max(reference_offset - max_offset, 0) + window_end = min(reference_offset + reference_length + max_offset, length - 1) + window = trace.samples[window_start: window_end] + window_peak = np.argmax(window) + left_space = min(max_offset, reference_offset) + return True, int(window_peak - reference_peak - left_space) + + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_offset(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, max_offset: int, + dist_func: Callable[[np.ndarray, np.ndarray], float], max_dist: float = float("inf")) -> List[Trace]: + """ + Align `traces` to the reference `trace` so that the value of the `dist_func` is minimized + between the reference trace window from `reference_offset` of `reference_length` and the trace + being aligned within `max_offset` of the reference window. + + :param reference: + :param traces: + :param reference_offset: + :param reference_length: + :param max_offset: + :param dist_func: + :return: + """ + reference_part = reference.samples[reference_offset: reference_offset + reference_length] + def align_func(trace): + length = len(trace.samples) + best_distance = 0 + best_offset = 0 + for offset in range(-max_offset, max_offset): + start = reference_offset + offset + stop = start + reference_length + if start < 0 or stop >= length: + continue + trace_part = trace.samples[start:stop] + distance = dist_func(reference_part, trace_part) + if distance < best_distance: + best_distance = distance + best_offset = offset + if best_distance < max_dist: + return True, best_offset + else: + return False, 0 + return align_reference(reference, *traces, align_func=align_func) + + +@public +def align_sad(reference: Trace, *traces: Trace, + reference_offset: int, reference_length: int, max_offset: int) -> List[Trace]: + """ + Align `traces` to the reference `trace` so that the Sum Of Absolute Differences between the + reference trace window from `reference_offset` of `reference_length` and the trace being aligned + within `max_offset` of the reference window is maximized. + + :param reference: + :param traces: + :param reference_offset: + :param reference_length: + :param max_offset: + :return: + """ + def sad(reference_part, trace_part): + return float(np.sum(np.abs(reference_part - trace_part))) + + return align_offset(reference, *traces, + reference_offset=reference_offset, reference_length=reference_length, + max_offset=max_offset, dist_func=sad) + + +@public +def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1, + fast: bool = True) -> List[Trace]: + """ + Align `traces` to the reference `trace`. + Using fastdtw (Dynamic Time Warping) with scaling as per: + + Jasper G. J. van Woudenberg, Marc F. Witteman, Bram Bakker: + Improving Differential Power Analysis by Elastic Alignment + + https://pdfs.semanticscholar.org/aceb/7c307098a414d7c384d6189226e4375cf02d.pdf + + :param reference: + :param traces: + :param radius: + :param fast: + :return: + """ + result = [deepcopy(reference)] + reference_samples = reference.samples + for trace in traces: + if fast: + dist, path = fastdtw(reference_samples, trace.samples, radius=radius) + else: + dist, path = dtw(reference_samples, trace.samples) + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + scale = np.ones(len(trace.samples), dtype=trace.samples.dtype) + for x, y in path: + result_samples[x] = trace.samples[y] + scale[x] += 1 + result_samples //= scale + del scale + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result + + +@public +def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True) -> List[Trace]: + """ + Align `traces` to the reference `trace`. Using fastdtw (Dynamic Time Warping) as per: + + Stan Salvador, Philip Chan: + FastDTW: Toward Accurate Dynamic Time Warping in Linear Time and Space + + https://cs.fit.edu/~pkc/papers/tdm04.pdf + + :param reference: + :param traces: + :param radius: + :param fast: + :return: + """ + result = [deepcopy(reference)] + reference_samples = reference.samples + for trace in traces: + if fast: + dist, path = fastdtw(reference_samples, trace.samples, radius=radius) + else: + dist, path = dtw(reference_samples, trace.samples) + result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) + pairs = np.array(np.array(path, dtype=np.dtype("int,int")), + dtype=np.dtype([("x", "int"), ("y", "int")])) + result_samples[pairs["x"]] = trace.samples[pairs["y"]] + del pairs + # or manually: + # for x, y in path: + # result_samples[x] = trace.samples[y] + result.append(Trace(copy(trace.title), copy(trace.data), result_samples)) + return result |
