diff options
| author | J08nY | 2025-10-25 16:07:31 +0200 |
|---|---|---|
| committer | J08nY | 2025-10-25 16:07:31 +0200 |
| commit | 4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf (patch) | |
| tree | 3c6b81eb26563919eb927c7bf46ba38d59be0e9e | |
| parent | 1bf4c680fed1eab512e65ec6b7a0b4eb6ce8ca74 (diff) | |
| download | ECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.tar.gz ECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.tar.zst ECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.zip | |
| -rw-r--r-- | analysis/scalarmults/common.py | 448 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/__init__.py | 206 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/config.py | 206 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/error_model.py | 109 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/mult_results.py | 40 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/prob_map.py | 65 | ||||
| -rw-r--r-- | analysis/scalarmults/epare/simulate.py | 102 | ||||
| -rw-r--r-- | analysis/scalarmults/probmaps.py | 78 | ||||
| -rw-r--r-- | analysis/scalarmults/simulate.ipynb | 268 | ||||
| -rw-r--r-- | analysis/scalarmults/visualize.ipynb | 2 |
10 files changed, 830 insertions, 694 deletions
diff --git a/analysis/scalarmults/common.py b/analysis/scalarmults/common.py deleted file mode 100644 index b307278..0000000 --- a/analysis/scalarmults/common.py +++ /dev/null @@ -1,448 +0,0 @@ -import itertools -import hashlib -from datetime import timedelta -from enum import Enum -from operator import itemgetter - -from dataclasses import dataclass -from functools import partial, cached_property, total_ordering -from typing import Any, Optional, Type, Union, Literal - -from statsmodels.stats.proportion import proportion_confint - -from pyecsca.sca.re.rpa import MultipleContext -from pyecsca.ec.mult import * -from pyecsca.ec.point import Point -from pyecsca.ec.countermeasures import GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri - - -def check_equal_multiples(k, l, q): - """Checks whether the two multiples input into the formula are equal modulo q (the order of the base).""" - return (k % q) == (l % q) - - -def check_divides(k, l, q): - """Checks whether q (the order of the base) divides any of the multiples input into the formula.""" - return (k != 0) and (l != 0) and (k % q == 0) or (l % q == 0) - - -def check_half_add(k, l, q): - return (q % 2 == 0) and ((k-l) % (q//2)) == 0 - - -def check_affine(k, q): - """Checks whether q (the order of the base) divides the multiple that is to be converted to affine.""" - return k % q == 0 - - -def check_any(*checks, q=None): - """Merge multiple checks together. The returned check function no longer takes the `q` parameter.""" - def check_func(k, l): - for check in checks: - if check(k, l, q): - return True - return False - return check_func - - -# These checks can be applied to add formulas. See the formulas notebook for background on them. -checks_add = { - "equal_multiples": check_equal_multiples, - "divides": check_divides, - "half_add": check_half_add -} - -# This check can be applied to conversion to affine. -checks_affine = { - "affine": check_affine -} - -def powerset(iterable): - """Take an iterable and create a powerset of its elements.""" - s = list(iterable) - return map(set, itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1))) - - -@dataclass(frozen=True) -@total_ordering -class ErrorModel: - """ - An ErrorModel describes the behavior of an implementation with regards to errors on exceptional - inputs to its addition formulas, to-affine conversion or general scalar multiplication. - - :param checks: A set of names of checks (from checks_add and checks_affine) that the implementation performs. - Note that these may not be checks that the implementation explicitly performs, only that it behaves w.r.t. - errors as if it were doing these checks, due to the formulas it chose and any actual checks it has. - :param check_condition: Either "all" or "necessary". Specifies whether the checks are applied to all points - that the implementation computes during a scalar multiplication or only those that end up being used -- thus - affect -- the final result. If an implementation does not perform any dummy operations, these two are the same. - :param precomp_to_affine: Specifies whether the implementation converts all results of the precomputation step - to affine form. If it does, it means that additional checks on all outputs of the precomputation are done as - they have to be "convertible" to affine form. - """ - checks: set[str] - check_condition: Union[Literal["all"], Literal["necessary"]] - precomp_to_affine: bool - - def __init__(self, checks: set[str], check_condition: Union[Literal["all"], Literal["necessary"]], precomp_to_affine: bool): - for check in checks: - if check not in checks_add: - raise ValueError(f"Unknown check: {check}") - checks = set(checks) - checks.add("affine") # always done in our model - object.__setattr__(self, "checks", checks) - if check_condition not in ("all", "necessary"): - raise ValueError("Wrong check_condition") - object.__setattr__(self, "check_condition", check_condition) - object.__setattr__(self, "precomp_to_affine", precomp_to_affine) - - def check_add(self, q): - """Get the add formula check function for the given q.""" - if self.checks == {"affine"}: - return lambda k, l: False - return check_any(*map(lambda name: checks_add[name], filter(lambda check: check in checks_add, self.checks)), q=q) - - def check_affine(self, q): - """Get the to-affine check function for the given q.""" - return partial(check_affine, q=q) - - def __lt__(self, other): - if not isinstance(other, ErrorModel): - return NotImplemented - return str(self) < str(other) - - def __str__(self): - cs = [] - if "equal_multiples" in self.checks: - cs.append("em") - if "divides" in self.checks: - cs.append("d") - if "half_add" in self.checks: - cs.append("ha") - if "affine" in self.checks: - cs.append("a") - precomp = "+pre" if self.precomp_to_affine else "" - return f"({','.join(cs)}+{self.check_condition}{precomp})" - - def __hash__(self): - return hash((tuple(sorted(self.checks)), self.check_condition, self.precomp_to_affine)) - - -# All error models are a simple cartesian product of the individual options. -all_error_models = [] -for checks in powerset(checks_add): - for precomp_to_affine in (True, False): - for check_condition in ("all", "necessary"): - error_model = ErrorModel(checks, check_condition=check_condition, precomp_to_affine=precomp_to_affine) - all_error_models.append(error_model) - - -@dataclass(frozen=True) -@total_ordering -class MultIdent: - """ - A MultIdent is a description of a scalar multiplication implementation, consisting of a scalar multiplier, - (optionally) a countermeasure, and (optionally) an error model. - - The scalar multiplier is defined by the `klass` attribute, along with the `args` and `kwargs` attributes. - One can reconstruct the raw multiplier (without the countermeasure) by doing: - - klass(*args, **kwargs) - - The countermeasure is simply in the `countermeasure` attribute and may be `None`. - - The error model is simply in the `error_model` attribute and may be `None`. If it is `None`, the MultIdent - is not suitable for error simulation and merely represents the description of a scalar multiplication - implementation we care about when reverse-engineering: the multiplier and the countermeasure, we do not - really care about the error model, yet need it when simulating. - """ - klass: Type[ScalarMultiplier] - args: list[Any] - kwargs: dict[str, Any] - countermeasure: Optional[str] = None - error_model: Optional[ErrorModel] = None - - def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs): - object.__setattr__(self, "klass", klass) - object.__setattr__(self, "args", args if args is not None else []) - if kwargs is not None and "countermeasure" in kwargs: - object.__setattr__(self, "countermeasure", kwargs["countermeasure"]) - del kwargs["countermeasure"] - if kwargs is not None and "error_model" in kwargs: - object.__setattr__(self, "error_model", kwargs["error_model"]) - del kwargs["error_model"] - object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {}) - - @cached_property - def partial(self): - """Get the callable that constructs the scalar multiplier (with countermeasure if any).""" - func = partial(self.klass, *self.args, **self.kwargs) - if self.countermeasure is None: - return func - if self.countermeasure == "gsr": - return lambda *args, **kwargs: GroupScalarRandomization(func(*args, **kwargs)) - elif self.countermeasure == "additive": - return lambda *args, **kwargs: AdditiveSplitting(func(*args, **kwargs)) - elif self.countermeasure == "multiplicative": - return lambda *args, **kwargs: MultiplicativeSplitting(func(*args, **kwargs)) - elif self.countermeasure == "euclidean": - return lambda *args, **kwargs: EuclideanSplitting(func(*args, **kwargs)) - elif self.countermeasure == "bt": - return lambda *args, **kwargs: BrumleyTuveri(func(*args, **kwargs)) - - def with_countermeasure(self, countermeasure: str | None): - """Return a new MultIdent with a given countermeasure.""" - if countermeasure not in (None, "gsr", "additive", "multiplicative", "euclidean", "bt"): - raise ValueError(f"Unknown countermeasure: {countermeasure}") - return MultIdent(self.klass, *self.args, **self.kwargs, countermeasure=countermeasure) - - def with_error_model(self, error_model: ErrorModel | None): - """Return a new MultIdent with a given error model.""" - if not (isinstance(error_model, ErrorModel) or error_model is None): - raise ValueError("Unknown error model.") - return MultIdent(self.klass, *self.args, **self.kwargs, countermeasure=self.countermeasure, error_model=error_model) - - def __str__(self): - name = self.klass.__name__.replace("Multiplier", "") - args = ("_" + ",".join(list(map(str, self.args)))) if self.args else "" - kwmap = {"recoding_direction": "recode", - "direction": "dir", - "width": "w"} - kwargs = ("_" + ",".join(f"{kwmap.get(k, k)}:{v.name if isinstance(v, Enum) else str(v)}" for k,v in self.kwargs.items())) if self.kwargs else "" - countermeasure = f"+{self.countermeasure}" if self.countermeasure is not None else "" - error_model = f"+{self.error_model}" if self.error_model is not None else "" - return f"{name}{args}{kwargs}{countermeasure}{error_model}" - - def __getstate__(self): - state = self.__dict__.copy() - # Remove cached properties - state.pop("partial", None) - return state - - def __lt__(self, other): - if not isinstance(other, MultIdent): - return NotImplemented - return str(self) < str(other) - - def __repr__(self): - return str(self) - - def __hash__(self): - return hash((self.klass, self.countermeasure, self.error_model, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values()))) - - -@dataclass -class MultResults: - """ - A MultResults instance represents many simulated scalar multiplciation computations, which were tracked - using a `MultipleContext` (i.e. the outputs of the :func:`pyecsca.sca.re.rpa.multiple_graph` function). - Generally, these would be for one MultIdent only, but that should be handled separately, for example - in a dict[MultIdent, MultResults]. The `samples` describe how many computations - are contained and must correspond to the length of the `multiplications` list. - """ - multiplications: list[tuple[MultipleContext, MultipleContext, Point]] - samples: int - duration: Optional[float] = None - - def merge(self, other: "MultResults"): - self.multiplications.extend(other.multiplications) - self.samples += other.samples - - def __len__(self): - return self.samples - - def __iter__(self): - yield from self.multiplications - - def __getitem__(self, i): - return self.multiplications[i] - - def __str__(self): - duration = timedelta(seconds=int(self.duration)) if self.duration is not None else "" - return f"MultResults({self.samples},{duration})" - - def __repr__(self): - return str(self) - - -@dataclass -class ProbMap: - """ - A ProbMap is a mapping from integers (base point order q) to floats (error probability for some scalar - multiplication implementation, i.e. MultIdent). The probability map is constructed for a given set of - `divisors` (the base point orders q). Probability maps can be narrowed or merged. A narrowing restricts - the probability map to a smaller set of `divisors`. A merging takes another probability map using the - same divisor set and updates the probabilities to a weighted average of the two probability maps - (the weight is the number of samples). - """ - probs: dict[int, float] - divisors_hash: bytes - samples: int - - def __len__(self): - return len(self.probs) - - def __iter__(self): - yield from self.probs - - def __getitem__(self, i): - return self.probs[i] if i in self.probs else 0.0 - - def __contains__(self, item): - return item in self.probs - - def keys(self): - return self.probs.keys() - - def values(self): - return self.probs.values() - - def items(self): - return self.probs.items() - - def narrow(self, divisors: set[int]): - """Narrow the probability map to the new set of divisors (must be a subset of the current set).""" - divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest() - if self.divisors_hash == divisors_hash: - # Already narrow. - return - for kdel in set(self.probs.keys()).difference(divisors): - del self.probs[kdel] - self.divisors_hash = divisors_hash - - def merge(self, other: "ProbMap") -> None: - """Merge the `other` probability map into this one (must share the divisor set).""" - if self.divisors_hash != other.divisors_hash: - raise ValueError("Merging can only work on probmaps created for same divisors.") - new_keys = set(self.keys()).union(other.keys()) - result = {} - for key in new_keys: - sk = self[key] - ok = other[key] - result[key] = (sk * self.samples + ok * other.samples) / (self.samples + other.samples) - self.probs = result - self.samples += other.samples - - -# All dbl-and-add multipliers from https://github.com/J08nY/pyecsca/blob/master/pyecsca/ec/mult -window_mults = [ - MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.LTR), - MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.LTR), - MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.LTR), - MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.LTR), - MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.LTR), - MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.RTL), - MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.RTL), - MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.RTL), - MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.RTL), - MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.RTL), - MultIdent(FixedWindowLTRMultiplier, m=2**1), - MultIdent(FixedWindowLTRMultiplier, m=2**2), - MultIdent(FixedWindowLTRMultiplier, m=2**3), - MultIdent(FixedWindowLTRMultiplier, m=2**4), - MultIdent(FixedWindowLTRMultiplier, m=2**5), - MultIdent(FixedWindowLTRMultiplier, m=2**6), - MultIdent(WindowBoothMultiplier, width=2), - MultIdent(WindowBoothMultiplier, width=3), - MultIdent(WindowBoothMultiplier, width=4), - MultIdent(WindowBoothMultiplier, width=5), - MultIdent(WindowBoothMultiplier, width=6) -] -naf_mults = [ - MultIdent(WindowNAFMultiplier, width=2), - MultIdent(WindowNAFMultiplier, width=3), - MultIdent(WindowNAFMultiplier, width=4), - MultIdent(WindowNAFMultiplier, width=5), - MultIdent(WindowNAFMultiplier, width=6), - MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.LTR), - MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.RTL), - MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.LTR), - MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.RTL) -] -comb_mults = [ - MultIdent(CombMultiplier, width=2, always=True), - MultIdent(CombMultiplier, width=3, always=True), - MultIdent(CombMultiplier, width=4, always=True), - MultIdent(CombMultiplier, width=5, always=True), - MultIdent(CombMultiplier, width=6, always=True), - MultIdent(CombMultiplier, width=2, always=False), - MultIdent(CombMultiplier, width=3, always=False), - MultIdent(CombMultiplier, width=4, always=False), - MultIdent(CombMultiplier, width=5, always=False), - MultIdent(CombMultiplier, width=6, always=False), - MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.LTR), - MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.LTR), - MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.LTR), - MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.LTR), - MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.LTR), - MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.RTL), - MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.RTL), - MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.RTL), - MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.RTL), - MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.RTL) -] -binary_mults = [ - MultIdent(LTRMultiplier, always=False, complete=True), - MultIdent(LTRMultiplier, always=True, complete=True), - MultIdent(LTRMultiplier, always=False, complete=False), - MultIdent(LTRMultiplier, always=True, complete=False), - MultIdent(RTLMultiplier, always=False, complete=True), - MultIdent(RTLMultiplier, always=True, complete=True), - MultIdent(RTLMultiplier, always=False, complete=False), - MultIdent(RTLMultiplier, always=True, complete=False), - MultIdent(CoronMultiplier) -] -other_mults = [ - MultIdent(FullPrecompMultiplier, always=False, complete=True), - MultIdent(FullPrecompMultiplier, always=True, complete=True), - MultIdent(FullPrecompMultiplier, always=False, complete=False), - MultIdent(FullPrecompMultiplier, always=True, complete=False), - MultIdent(SimpleLadderMultiplier, complete=True), - MultIdent(SimpleLadderMultiplier, complete=False) -] - -# We can enumerate all mults and countermeasures here. -all_mults = window_mults + naf_mults + binary_mults + other_mults + comb_mults -all_mults_with_ctr = [mult.with_countermeasure(ctr) for mult in all_mults for ctr in (None, "gsr", "additive", "multiplicative", "euclidean", "bt")] - - -def powers_of(k, max_power=20): - """Take all powers of `k` up to `max_power`.""" - return [k**i for i in range(1, max_power)] - -def prod_combine(one, other): - """Multiply all pairs of elements from `one` and `other`.""" - return [a * b for a, b in itertools.product(one, other)] - - -# We have several sets of divisors, inspired by various "interesting" multiples the multipliers may compute. -small_primes = [3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199] -medium_primes = [211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397] -large_primes = [401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997] -all_integers = list(range(1, 400)) -all_even = list(range(2, 400, 2)) -all_odd = list(range(1, 400, 2)) -all_primes = small_primes + medium_primes + large_primes - -divisor_map = { - "small_primes": small_primes, - "medium_primes": medium_primes, - "large_primes": large_primes, - "all_primes": all_primes, - "all_integers": all_integers, - "all_even": all_even, - "all_odd": all_odd, - "powers_of_2": powers_of(2), - "powers_of_2_large": powers_of(2, 256), - "powers_of_2_large_3": [i * 3 for i in powers_of(2, 256)], - "powers_of_2_large_p1": [i + 1 for i in powers_of(2, 256)], - "powers_of_2_large_m1": [i - 1 for i in powers_of(2, 256)], - "powers_of_2_large_pmautobus": sorted(set([i + j for i in powers_of(2, 256) for j in range(-5,5) if i+j > 0])), - "powers_of_3": powers_of(3), -} -divisor_map["all"] = list(sorted(set().union(*[v for v in divisor_map.values()]))) - - -def conf_interval(p: float, samples: int, alpha: float = 0.05) -> tuple[float, float]: - """Compute a confidence interval for a Binomial distribution.""" - return proportion_confint(round(p*samples), samples, alpha, method="wilson") diff --git a/analysis/scalarmults/epare/__init__.py b/analysis/scalarmults/epare/__init__.py new file mode 100644 index 0000000..631c2fd --- /dev/null +++ b/analysis/scalarmults/epare/__init__.py @@ -0,0 +1,206 @@ +import itertools +from statsmodels.stats.proportion import proportion_confint + +from pyecsca.ec.mult import ( + DoubleAndAddMultiplier, + LTRMultiplier, + RTLMultiplier, + LadderMultiplier, + BinaryNAFMultiplier, + WindowNAFMultiplier, + SimpleLadderMultiplier, + DifferentialLadderMultiplier, + CoronMultiplier, + FixedWindowLTRMultiplier, + FullPrecompMultiplier, + ProcessingDirection, + AccumulationOrder, + ScalarMultiplier, + SlidingWindowMultiplier, + BGMWMultiplier, + CombMultiplier, + WindowBoothMultiplier, + ScalarMultiplier +) +from pyecsca.ec.countermeasures import ( + GroupScalarRandomization, + AdditiveSplitting, + MultiplicativeSplitting, + EuclideanSplitting, + BrumleyTuveri, + PointBlinding, + ScalarMultiplierCountermeasure +) + +from .config import MultIdent, CountermeasureIdent, Config +from .error_model import ErrorModel, checks_add +from .mult_results import MultResults +from .prob_map import ProbMap + + + +def powerset(iterable): + """Take an iterable and create a powerset of its elements.""" + s = list(iterable) + return map(set, itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1))) + + +def powers_of(k, max_power=20): + """Take all powers of `k` up to `max_power`.""" + return [k**i for i in range(1, max_power)] + + +def prod_combine(one, other): + """Multiply all pairs of elements from `one` and `other`.""" + return [a * b for a, b in itertools.product(one, other)] + + +# All dbl-and-add multipliers from https://github.com/J08nY/pyecsca/blob/master/pyecsca/ec/mult +window_mults = [ + MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.LTR), + MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.LTR), + MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.LTR), + MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.LTR), + MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.LTR), + MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.RTL), + MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.RTL), + MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.RTL), + MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.RTL), + MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.RTL), + MultIdent(FixedWindowLTRMultiplier, m=2**1), + MultIdent(FixedWindowLTRMultiplier, m=2**2), + MultIdent(FixedWindowLTRMultiplier, m=2**3), + MultIdent(FixedWindowLTRMultiplier, m=2**4), + MultIdent(FixedWindowLTRMultiplier, m=2**5), + MultIdent(FixedWindowLTRMultiplier, m=2**6), + MultIdent(WindowBoothMultiplier, width=2), + MultIdent(WindowBoothMultiplier, width=3), + MultIdent(WindowBoothMultiplier, width=4), + MultIdent(WindowBoothMultiplier, width=5), + MultIdent(WindowBoothMultiplier, width=6) +] +naf_mults = [ + MultIdent(WindowNAFMultiplier, width=2), + MultIdent(WindowNAFMultiplier, width=3), + MultIdent(WindowNAFMultiplier, width=4), + MultIdent(WindowNAFMultiplier, width=5), + MultIdent(WindowNAFMultiplier, width=6), + MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.LTR), + MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.RTL), + MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.LTR), + MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.RTL) +] +comb_mults = [ + MultIdent(CombMultiplier, width=2, always=True), + MultIdent(CombMultiplier, width=3, always=True), + MultIdent(CombMultiplier, width=4, always=True), + MultIdent(CombMultiplier, width=5, always=True), + MultIdent(CombMultiplier, width=6, always=True), + MultIdent(CombMultiplier, width=2, always=False), + MultIdent(CombMultiplier, width=3, always=False), + MultIdent(CombMultiplier, width=4, always=False), + MultIdent(CombMultiplier, width=5, always=False), + MultIdent(CombMultiplier, width=6, always=False), + MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.LTR), + MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.LTR), + MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.LTR), + MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.LTR), + MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.LTR), + MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.RTL), + MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.RTL), + MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.RTL), + MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.RTL), + MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.RTL) +] +binary_mults = [ + MultIdent(LTRMultiplier, always=False, complete=True), + MultIdent(LTRMultiplier, always=True, complete=True), + MultIdent(LTRMultiplier, always=False, complete=False), + MultIdent(LTRMultiplier, always=True, complete=False), + MultIdent(RTLMultiplier, always=False, complete=True), + MultIdent(RTLMultiplier, always=True, complete=True), + MultIdent(RTLMultiplier, always=False, complete=False), + MultIdent(RTLMultiplier, always=True, complete=False), + MultIdent(CoronMultiplier) +] +other_mults = [ + MultIdent(FullPrecompMultiplier, always=False, complete=True), + MultIdent(FullPrecompMultiplier, always=True, complete=True), + MultIdent(FullPrecompMultiplier, always=False, complete=False), + MultIdent(FullPrecompMultiplier, always=True, complete=False), + MultIdent(SimpleLadderMultiplier, complete=True), + MultIdent(SimpleLadderMultiplier, complete=False) +] + +# All error models are a simple cartesian product of the individual options. +def _all_error_models(): + result = [] + for checks in powerset(checks_add): + for precomp_to_affine in (True, False): + for check_condition in ("all", "necessary"): + error_model = ErrorModel(checks, check_condition=check_condition, precomp_to_affine=precomp_to_affine) + result.append(error_model) + return result +all_error_models = _all_error_models() + + +# We can enumerate all mults and countermeasures here. +all_mults = window_mults + naf_mults + binary_mults + other_mults + comb_mults +def _all_mults_with_ctr(): + result = [] + for mult in all_mults: + for one_ctr_class, other_ctr_class in itertools.product((GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri, None), repeat=2): + if one_ctr_class is None and other_ctr_class is None: + result.append(mult) + continue + if other_ctr_class is None: + continue + if one_ctr_class is None: + mults = [mult] * other_ctr_class.nmults + other_ctr = CountermeasureIdent(other_ctr_class, *mults) + result.append(other_ctr) + continue + + mults = [mult] * other_ctr_class.nmults + other_ctr = CountermeasureIdent(other_ctr_class, *mults) + for i in range(1, 2**one_ctr_class.nmults): + bits = format(i, f"0{one_ctr_class.nmults}b") + args = [other_ctr if bit == "1" else mult for bit in bits] + ctr = CountermeasureIdent(one_ctr_class, *args) + result.append(ctr) + return result +all_mults_with_ctr = _all_mults_with_ctr() +all_configs = [Config(mult, None) for mult in all_mults_with_ctr] + + +# We have several sets of divisors, inspired by various "interesting" multiples the multipliers may compute. +small_primes = [3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199] +medium_primes = [211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397] +large_primes = [401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997] +all_integers = list(range(1, 400)) +all_even = list(range(2, 400, 2)) +all_odd = list(range(1, 400, 2)) +all_primes = small_primes + medium_primes + large_primes + +divisor_map = { + "small_primes": small_primes, + "medium_primes": medium_primes, + "large_primes": large_primes, + "all_primes": all_primes, + "all_integers": all_integers, + "all_even": all_even, + "all_odd": all_odd, + "powers_of_2": powers_of(2), + "powers_of_2_large": powers_of(2, 256), + "powers_of_2_large_3": [i * 3 for i in powers_of(2, 256)], + "powers_of_2_large_p1": [i + 1 for i in powers_of(2, 256)], + "powers_of_2_large_m1": [i - 1 for i in powers_of(2, 256)], + "powers_of_2_large_pmautobus": sorted(set([i + j for i in powers_of(2, 256) for j in range(-5,5) if i+j > 0])), + "powers_of_3": powers_of(3), +} +divisor_map["all"] = list(sorted(set().union(*[v for v in divisor_map.values()]))) + + +def conf_interval(p: float, samples: int, alpha: float = 0.05) -> tuple[float, float]: + """Compute a confidence interval for a Binomial distribution.""" + return proportion_confint(round(p*samples), samples, alpha, method="wilson") diff --git a/analysis/scalarmults/epare/config.py b/analysis/scalarmults/epare/config.py new file mode 100644 index 0000000..569a1fa --- /dev/null +++ b/analysis/scalarmults/epare/config.py @@ -0,0 +1,206 @@ +from dataclasses import dataclass +from enum import Enum +from functools import partial, total_ordering +from typing import Any, Optional, Type + +from pyecsca.ec.countermeasures import GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri, PointBlinding, ScalarMultiplierCountermeasure +from pyecsca.ec.mult import ScalarMultiplier +from .error_model import ErrorModel + +class Composable: + klass: Type + args: list[Any] + kwargs: dict[str, Any] + + @property + def mult(self): + """ + Extract the MultIdent out of the Composable. + + We assume there is only one somewhere in the tree (at the leafs). + """ + if isinstance(self, MultIdent): + return self + for arg in self.args: + if isinstance(arg, Composable): + return arg.mult + for kwarg in self.kwargs.values(): + if isinstance(kwarg, Composable): + return kwarg.mult + + def construct(self, *mult_args, **mult_kwargs): + """Recursively construct this composable.""" + args, kwargs = self._build_args_kwargs(mult_args, mult_kwargs) + return self._instantiate(args, kwargs, mult_args, mult_kwargs) + + def _build_args_kwargs(self, mult_args, mult_kwargs): + args = [] + for arg in self.args: + if isinstance(arg, Composable): + args.append(arg.construct(*mult_args, **mult_kwargs)) + else: + args.append(arg) + kwargs = {} + for key, value in self.kwargs.items(): + if isinstance(value, Composable): + kwargs[key] = value.construct(*mult_args, **mult_kwargs) + else: + kwargs[key] = value + return args, kwargs + + def _instantiate(self, c_args, c_kwargs, mult_args, mult_kwargs): + return self.klass(*c_args, **c_kwargs) + + def __lt__(self, other): + if not isinstance(other, Composable): + return NotImplemented + return str(self) < str(other) + + def __hash__(self): + return hash((self.klass, tuple(self.args)), tuple(self.kwargs.keys()), tuple(self.kwargs.values())) + + +@dataclass(frozen=True) +@total_ordering +class CountermeasureIdent(Composable): + klass: Type[ScalarMultiplierCountermeasure] + args: list[Any] + kwargs: dict[str, Any] + + def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs): + object.__setattr__(self, "klass", klass) + object.__setattr__(self, "args", args if args is not None else []) + object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {}) + + def construct(self, *mult_args, **mult_kwargs): + args, kwargs = self._build_args_kwargs(mult_args, mult_kwargs) + # Capture any formula required to instantiate this countermeasure from the mult args + for arg in mult_args: + if any(map(lambda req: isinstance(arg, req), self.klass.requires)): + kwargs[arg.shortname] = arg + # Capture the rng as well. + if "rng" in mult_kwargs: + kwargs["rng"] = mult_kwargs["rng"] + return self._instantiate(args, kwargs, mult_args, mult_kwargs) + + def __str__(self): + if issubclass(self.klass, GroupScalarRandomization): + name = "gsr" + elif issubclass(self.klass, AdditiveSplitting): + name = "asplit" + elif issubclass(self.klass, MultiplicativeSplitting): + name = "msplit" + elif issubclass(self.klass, EuclideanSplitting): + name = "esplit" + elif issubclass(self.klass, BrumleyTuveri): + name = "bt" + elif issubclass(self.klass, PointBlinding): + name = "blind" + else: + name = "?" + # Only print other Composables as Countermeasures do not have interesting arguments + args = (",".join(list(map(str, filter(lambda arg: isinstance(arg, Composable), self.args))))) if self.args else "" + # Same for kwargs + kwargs = (",".join(f"{k}={v}" for k, v in self.kwargs if isinstance(v, Composable))) if self.kwargs else "" + return f"{name}({args}{',' if args and kwargs else ''}{kwargs})" + + def __repr__(self): + return str(self) + + def __hash__(self): + return hash((self.klass, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values()))) + + +@dataclass(frozen=True) +@total_ordering +class MultIdent(Composable): + klass: Type[ScalarMultiplier] + args: list[Any] + kwargs: dict[str, Any] + + def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs): + object.__setattr__(self, "klass", klass) + object.__setattr__(self, "args", args if args is not None else []) + object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {}) + + def _instantiate(self, c_args, c_kwargs, mult_args, mult_kwargs): + args = [*c_args, *mult_args] + kwargs = {**c_kwargs, **mult_kwargs} + if "rng" in kwargs: + del kwargs["rng"] + return self.klass(*args, **kwargs) + + def __str__(self): + name = self.klass.__name__.replace("Multiplier", "") + args = (",".join(list(map(str, self.args)))) if self.args else "" + kwmap = {"recoding_direction": "recode", + "direction": "dir", + "width": "w"} + kwargs = (",".join(f"{kwmap.get(k, k)}:{v.name if isinstance(v, Enum) else str(v)}" for k,v in self.kwargs.items())) if self.kwargs else "" + return f"{name}({args}{',' if args and kwargs else ''}{kwargs})" + + def __repr__(self): + return str(self) + + def __hash__(self): + return hash((self.klass, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values()))) + + +@dataclass(frozen=True) +@total_ordering +class Config: + """ + A Config is a description of a scalar multiplication implementation, consisting of a scalar multiplier, + (optionally) a countermeasure, and (optionally) an error model. + + The scalar multiplier and countermeasure combination is specified as an instance of `Composable` stored + in the `composition` attribute. + + The error model is simply in the `error_model` attribute and may be `None`. If it is `None`, the Config + is not suitable for error simulation and merely represents the description of a scalar multiplication + implementation we care about when reverse-engineering: the multiplier and the countermeasure, we do not + really care about the error model, yet need it when simulating. + """ + composition: Composable + error_model: Optional[ErrorModel] = None + + @property + def partial(self): + """Get the callable that constructs the scalar multiplier (with countermeasure if any).""" + return self.composition.construct + + @property + def mult(self): + """Get the MultIdent""" + return self.composition.mult + + @property + def has_countermeasure(self): + return isinstance(self.composition, CountermeasureIdent) + + @property + def has_error_model(self): + return self.error_model is not None + + def with_error_model(self, error_model: ErrorModel | None): + """Return a new Config with a given error model.""" + if not (isinstance(error_model, ErrorModel) or error_model is None): + raise ValueError("Unknown error model.") + return Config(self.composition, error_model=error_model) + + def __str__(self): + error_model = str(self.error_model) if self.error_model else "" + return f"<{self.composition}{error_model}>" + + def __repr__(self): + return str(self) + + def __lt__(self, other): + if not isinstance(other, Config): + return NotImplemented + me = (self.mult, self.composition) + them = (other.mult, other.composition) + return me < them + + def __hash__(self): + return hash((self.composition, self.error_model)) diff --git a/analysis/scalarmults/epare/error_model.py b/analysis/scalarmults/epare/error_model.py new file mode 100644 index 0000000..a7f5edd --- /dev/null +++ b/analysis/scalarmults/epare/error_model.py @@ -0,0 +1,109 @@ +from dataclasses import dataclass +from functools import partial, total_ordering +from typing import Any, Optional, Type, Union, Literal + + +def check_equal_multiples(k, l, q): + """Checks whether the two multiples input into the formula are equal modulo q (the order of the base).""" + return (k % q) == (l % q) + + +def check_divides(k, l, q): + """Checks whether q (the order of the base) divides any of the multiples input into the formula.""" + return (k != 0) and (l != 0) and (k % q == 0) or (l % q == 0) + + +def check_half_add(k, l, q): + return (q % 2 == 0) and ((k-l) % (q//2)) == 0 + + +def check_affine(k, q): + """Checks whether q (the order of the base) divides the multiple that is to be converted to affine.""" + return k % q == 0 + + +def check_any(*checks, q=None): + """Merge multiple checks together. The returned check function no longer takes the `q` parameter.""" + def check_func(k, l): + for check in checks: + if check(k, l, q): + return True + return False + return check_func + + +# These checks can be applied to add formulas. See the formulas notebook for background on them. +checks_add = { + "equal_multiples": check_equal_multiples, + "divides": check_divides, + "half_add": check_half_add +} + +# This check can be applied to conversion to affine. +checks_affine = { + "affine": check_affine +} + +@dataclass(frozen=True) +@total_ordering +class ErrorModel: + """ + An ErrorModel describes the behavior of an implementation with regards to errors on exceptional + inputs to its addition formulas, to-affine conversion or general scalar multiplication. + + :param checks: A set of names of checks (from checks_add and checks_affine) that the implementation performs. + Note that these may not be checks that the implementation explicitly performs, only that it behaves w.r.t. + errors as if it were doing these checks, due to the formulas it chose and any actual checks it has. + :param check_condition: Either "all" or "necessary". Specifies whether the checks are applied to all points + that the implementation computes during a scalar multiplication or only those that end up being used -- thus + affect -- the final result. If an implementation does not perform any dummy operations, these two are the same. + :param precomp_to_affine: Specifies whether the implementation converts all results of the precomputation step + to affine form. If it does, it means that additional checks on all outputs of the precomputation are done as + they have to be "convertible" to affine form. + """ + checks: set[str] + check_condition: Union[Literal["all"], Literal["necessary"]] + precomp_to_affine: bool + + def __init__(self, checks: set[str], check_condition: Union[Literal["all"], Literal["necessary"]], precomp_to_affine: bool): + for check in checks: + if check not in checks_add: + raise ValueError(f"Unknown check: {check}") + checks = set(checks) + checks.add("affine") # always done in our model + object.__setattr__(self, "checks", checks) + if check_condition not in ("all", "necessary"): + raise ValueError("Wrong check_condition") + object.__setattr__(self, "check_condition", check_condition) + object.__setattr__(self, "precomp_to_affine", precomp_to_affine) + + def check_add(self, q): + """Get the add formula check function for the given q.""" + if self.checks == {"affine"}: + return lambda k, l: False + return check_any(*map(lambda name: checks_add[name], filter(lambda check: check in checks_add, self.checks)), q=q) + + def check_affine(self, q): + """Get the to-affine check function for the given q.""" + return partial(check_affine, q=q) + + def __lt__(self, other): + if not isinstance(other, ErrorModel): + return NotImplemented + return str(self) < str(other) + + def __str__(self): + cs = [] + if "equal_multiples" in self.checks: + cs.append("em") + if "divides" in self.checks: + cs.append("d") + if "half_add" in self.checks: + cs.append("ha") + if "affine" in self.checks: + cs.append("a") + precomp = "+pre" if self.precomp_to_affine else "" + return f"({','.join(cs)}+{self.check_condition}{precomp})" + + def __hash__(self): + return hash((tuple(sorted(self.checks)), self.check_condition, self.precomp_to_affine))
\ No newline at end of file diff --git a/analysis/scalarmults/epare/mult_results.py b/analysis/scalarmults/epare/mult_results.py new file mode 100644 index 0000000..5424b01 --- /dev/null +++ b/analysis/scalarmults/epare/mult_results.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass +from datetime import timedelta +from typing import Optional + +from pyecsca.sca.re.rpa import MultipleContext +from pyecsca.ec.point import Point + + +@dataclass +class MultResults: + """ + A MultResults instance represents many simulated scalar multiplciation computations, which were tracked + using a `MultipleContext` (i.e. the outputs of the :func:`pyecsca.sca.re.rpa.multiple_graph` function). + Generally, these would be for one Config only, but that should be handled separately, for example + in a dict[Config, MultResults]. The `samples` describe how many computations + are contained and must correspond to the length of the `multiplications` list. + """ + multiplications: list[tuple[MultipleContext, MultipleContext, Point]] + samples: int + duration: Optional[float] = None + + def merge(self, other: "MultResults"): + self.multiplications.extend(other.multiplications) + self.samples += other.samples + + def __len__(self): + return self.samples + + def __iter__(self): + yield from self.multiplications + + def __getitem__(self, i): + return self.multiplications[i] + + def __str__(self): + duration = timedelta(seconds=int(self.duration)) if self.duration is not None else "" + return f"MultResults({self.samples},{duration})" + + def __repr__(self): + return str(self)
\ No newline at end of file diff --git a/analysis/scalarmults/epare/prob_map.py b/analysis/scalarmults/epare/prob_map.py new file mode 100644 index 0000000..eb96dda --- /dev/null +++ b/analysis/scalarmults/epare/prob_map.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +import hashlib + + +def hash_divisors(divisors: set[int]) -> bytes: + return hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest() + + +@dataclass +class ProbMap: + """ + A ProbMap is a mapping from integers (base point order q) to floats (error probability for some scalar + multiplication implementation, i.e. Config). The probability map is constructed for a given set of + `divisors` (the base point orders q). Probability maps can be narrowed or merged. A narrowing restricts + the probability map to a smaller set of `divisors`. A merging takes another probability map using the + same divisor set and updates the probabilities to a weighted average of the two probability maps + (the weight is the number of samples). + """ + probs: dict[int, float] + divisors_hash: bytes + samples: int + + def __len__(self): + return len(self.probs) + + def __iter__(self): + yield from self.probs + + def __getitem__(self, i): + return self.probs[i] if i in self.probs else 0.0 + + def __contains__(self, item): + return item in self.probs + + def keys(self): + return self.probs.keys() + + def values(self): + return self.probs.values() + + def items(self): + return self.probs.items() + + def narrow(self, divisors: set[int]): + """Narrow the probability map to the new set of divisors (must be a subset of the current set).""" + divisors_hash = hash_divisors(divisors) + if self.divisors_hash == divisors_hash: + # Already narrow. + return + for kdel in set(self.probs.keys()).difference(divisors): + del self.probs[kdel] + self.divisors_hash = divisors_hash + + def merge(self, other: "ProbMap") -> None: + """Merge the `other` probability map into this one (must share the divisor set).""" + if self.divisors_hash != other.divisors_hash: + raise ValueError("Merging can only work on probmaps created for same divisors.") + new_keys = set(self.keys()).union(other.keys()) + result = {} + for key in new_keys: + sk = self[key] + ok = other[key] + result[key] = (sk * self.samples + ok * other.samples) / (self.samples + other.samples) + self.probs = result + self.samples += other.samples
\ No newline at end of file diff --git a/analysis/scalarmults/epare/simulate.py b/analysis/scalarmults/epare/simulate.py new file mode 100644 index 0000000..9b78436 --- /dev/null +++ b/analysis/scalarmults/epare/simulate.py @@ -0,0 +1,102 @@ +import random +import pickle +from functools import partial + +from .config import Config +from .mult_results import MultResults + +from pyecsca.ec.params import DomainParameters +from pyecsca.ec.mod import mod +from pyecsca.sca.re.rpa import multiple_graph + + +def simulate_multiples(mult: Config, + params: DomainParameters, + bits: int, + samples: int = 100, + seed: bytes | None = None) -> MultResults: + """ + Takes a Config, which specifies a scalar multiplier (with optional countermeasures) + and simulates `samples` scalar multiplications, while tracking which multiples of the + symbolic input point get computed. + """ + results = [] + if seed is not None: + random.seed(seed) + rng = lambda n: mod(random.randrange(n), n) + + # If no countermeasure is used, we have fully random scalars. + # Otherwise, fix one per chunk. + if not mult.has_countermeasure: + scalars = [random.randint(1, 2**bits) for _ in range(samples)] + else: + one = random.randint(1, 2**bits) + scalars = [one for _ in range(samples)] + + for scalar in scalars: + results.append(multiple_graph(scalar, params, mult.mult.klass, partial(mult.partial, rng=rng))) + return MultResults(results, samples) + + +def simulate_multiples_direct(mult: Config, + params: DomainParameters, + bits: int, + fname: str, + samples: int = 100, + seed: bytes | None = None) -> str: + """ + Like the `simulate_multiples` function above, but stores the pickled output directly + into a file named `fname`. + """ + result = simulate_multiples(mult, params, bits, samples, seed) + with open(fname, "wb") as f: + pickle.dump((mult, result), f) + return fname + + +def evaluate_multiples(mult: Config, + res: MultResults, + divisors: set[int], + use_init: bool = True, + use_multiply: bool = True): + """ + Takes MultIdent and MultResults and a set of divisors (base point orders `q`) and + evaluates them using the error model from the MultIdent. Note that the MultIdent + must have an error model in this case. Returns the ProbMap. + """ + errors = {divisor: 0 for divisor in divisors} + samples = len(res) + divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest() + for precomp_ctx, full_ctx, out in res: + check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out, + check_condition=mult.error_model.check_condition, + precomp_to_affine=mult.error_model.precomp_to_affine, + use_init=use_init, + use_multiply=use_multiply) + for q in divisors: + error = evaluate_checks(check_funcs={"add": mult.error_model.check_add(q), "affine": mult.error_model.check_affine(q)}, + check_inputs=check_inputs) + errors[q] += error + # Make probmaps smaller. Do not store zero probabilities. + probs = {} + for q, error in errors.items(): + if error != 0: + probs[q] = error / samples + return ProbMap(probs, divisors_hash, samples) + + +def evaluate_multiples_direct(mult: Config, + fname: str, + offset: int, + divisors: set[int], + use_init: bool = True, + use_multiply: bool = True): + """ + Like `evaluate_multiples`, but instead reads the MultResults from a file named `fname` + at an `offset`. Still returns the ProbMap, which is significantly smaller and easier + to pickle than the MultResults. + """ + with open(fname, "rb") as f: + f.seek(offset) + _, res = pickle.load(f) + return evaluate_multiples(mult, res, divisors, use_init, use_multiply)
\ No newline at end of file diff --git a/analysis/scalarmults/probmaps.py b/analysis/scalarmults/probmaps.py deleted file mode 100644 index 0ad13a3..0000000 --- a/analysis/scalarmults/probmaps.py +++ /dev/null @@ -1,78 +0,0 @@ -import sys -import pickle -import itertools -import glob - -from pyecsca.misc.utils import TaskExecutor -from tqdm.auto import tqdm - -from common import * - - -def divides_any(l: int, small_scalars: set[int]) -> bool: - for s in small_scalars: - if s%l==0: - return True - return False - - -def process_small_scalars(scalar_results: MultResults, divisors: set[int]) -> ProbMap: - result = {} - for divisor in divisors: - count = 0 - for smult in scalar_results.multiplications: - if divides_any(divisor, smult): - count += 1 - result[divisor] = count / scalar_results.samples - return ProbMap(result, scalar_results.samples, scalar_results.kind) - - -def load_chunk(fname: str, divisors: set[int], kind: str) -> dict[MultIdent, ProbMap]: - with open(fname, "rb") as f: - multiples = {} - while True: - try: - mult, distr = pickle.load(f) - multiples[mult] = distr - except EOFError: - break - res = {} - for mult, results in multiples.items(): - results.kind = kind - res[mult] = process_small_scalars(results, divisors) - return res - - -if __name__ == "__main__": - distributions_mults = {} - bits = 256 - num_workers = int(sys.argv[1]) if len(sys.argv) > 1 else 32 - divisor_name = sys.argv[2] if len(sys.argv) > 2 else "all" - kind = sys.argv[3] if len(sys.argv) > 3 else "precomp+necessary" - use_init = (sys.argv[4].lower() == "true") if len(sys.argv) > 4 else True - use_multiply = (sys.argv[5].lower() == "true") if len(sys.argv) > 5 else True - files = sorted(glob.glob(f"multiples_{bits}_{kind}_{'init' if use_init else 'noinit'}_{'mult' if use_multiply else 'nomult'}_chunk*.pickle")) - - selected_divisors = divisor_map[divisor_name] - - with TaskExecutor(max_workers=num_workers) as pool: - for fname in files: - pool.submit_task(fname, - load_chunk, - fname, selected_divisors, kind) - for fname, future in tqdm(pool.as_completed(), total=len(pool.tasks), smoothing=0): - if error := future.exception(): - print(f"Error {fname}, {error}") - continue - new_distrs = future.result() - for mult, prob_map in new_distrs.items(): - if mult in distributions_mults: - distributions_mults[mult].merge(prob_map) - else: - distributions_mults[mult] = prob_map - for mult, prob_map in distributions_mults.items(): - print(f"Got {prob_map.samples} for {mult}.") - - # Save - with open(f"{divisor_name}_{kind}_distrs.pickle", "wb") as f: - pickle.dump(distributions_mults, f) diff --git a/analysis/scalarmults/simulate.ipynb b/analysis/scalarmults/simulate.ipynb index c1f123a..07f0e8f 100644 --- a/analysis/scalarmults/simulate.ipynb +++ b/analysis/scalarmults/simulate.ipynb @@ -25,12 +25,11 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 15, "id": "b4386513-cc14-434b-a748-2863f8657452", "metadata": {}, "outputs": [], "source": [ - "import itertools\n", "import gc\n", "import glob\n", "import hashlib\n", @@ -52,7 +51,8 @@ "\n", "from collections import Counter\n", "from pathlib import Path\n", - "from random import randint, randbytes\n", + "from random import randint, randbytes, randrange\n", + "from functools import partial\n", "from typing import Type, Any\n", "\n", "from tqdm.auto import tqdm, trange\n", @@ -64,7 +64,7 @@ "from pyecsca.sca.re.epa import graph_to_check_inputs, evaluate_checks\n", "from pyecsca.misc.utils import TaskExecutor\n", "\n", - "from common import *" + "from epare import *" ] }, { @@ -80,7 +80,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 16, "id": "3463a7bd-34d8-458b-8ceb-dddf99de21dc", "metadata": {}, "outputs": [], @@ -97,10 +97,21 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 17, "id": "170c11fc-86cf-4eb1-bf4e-b2e44b2d7ac5", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Scalar multipliers considered: 65\n", + "Scalar multipliers (with a combination of up-to two countermeasures) considered: 5265\n", + "Error models considered: 32\n", + "Total configurations considered: 168480\n" + ] + } + ], "source": [ "nmults = len(all_mults)\n", "nmults_ctr = len(all_mults_with_ctr)\n", @@ -108,7 +119,7 @@ "ncfgs = nmults_ctr * nerror_models\n", "\n", "print(f\"Scalar multipliers considered: {nmults}\")\n", - "print(f\"Scalar multipliers (with a single countermeasure) considered: {nmults_ctr}\")\n", + "print(f\"Scalar multipliers (with a combination of up-to two countermeasures) considered: {nmults_ctr}\")\n", "print(f\"Error models considered: {nerror_models}\")\n", "print(f\"Total configurations considered: {ncfgs}\")" ] @@ -126,7 +137,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 18, "id": "4d5c7f10-618f-4612-b594-81d1607b0d1d", "metadata": {}, "outputs": [], @@ -139,170 +150,19 @@ "use_init = True\n", "use_multiply = True\n", "\n", - "num_workers = 30\n", + "num_workers = 20\n", "samples = 1000\n", "\n", "selected_mults = all_mults" ] }, { - "cell_type": "code", - "execution_count": null, - "id": "07bc266d-35eb-4f6d-bdba-e9f6f66827f1", - "metadata": {}, - "outputs": [], - "source": [ - "def simulate_multiples(mult: MultIdent,\n", - " params: DomainParameters,\n", - " bits: int,\n", - " samples: int = 100,\n", - " seed: bytes | None = None) -> MultResults:\n", - " \"\"\"\n", - " Takes a MultIdent, which specifies a scalar multiplier (with an optional countermeasure)\n", - " and simulates `samples` scalar multiplications, while tracking which multiples of the\n", - " symbolic input point get computed.\n", - " \"\"\"\n", - " results = []\n", - " if seed is not None:\n", - " random.seed(seed)\n", - "\n", - " # If no countermeasure is used, we have fully random scalars.\n", - " # Otherwise, fix one per chunk.\n", - " if mult.countermeasure is None:\n", - " scalars = [random.randint(1, 2**bits) for _ in range(samples)]\n", - " else:\n", - " one = random.randint(1, 2**bits)\n", - " scalars = [one for _ in range(samples)]\n", - "\n", - " for scalar in scalars:\n", - " results.append(multiple_graph(scalar, params, mult.klass, mult.partial))\n", - " return MultResults(results, samples)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6f3e9ffa-19e8-46b2-a6ad-1d318d5c8e17", - "metadata": {}, - "outputs": [], - "source": [ - "def simulate_multiples_direct(mult: MultIdent,\n", - " params: DomainParameters,\n", - " bits: int,\n", - " fname: str,\n", - " samples: int = 100,\n", - " seed: bytes | None = None) -> str:\n", - " \"\"\"\n", - " Like the `simulate_multiples` function above, but stores the pickled output directly\n", - " into a file named `fname`.\n", - " \"\"\"\n", - " results = []\n", - " if seed is not None:\n", - " random.seed(seed)\n", - "\n", - " # If no countermeasure is used, we have fully random scalars.\n", - " # Otherwise, fix one per chunk.\n", - " if mult.countermeasure is None:\n", - " scalars = [random.randint(1, 2**bits) for _ in range(samples)]\n", - " else:\n", - " one = random.randint(1, 2**bits)\n", - " scalars = [one for _ in range(samples)]\n", - "\n", - " for scalar in scalars:\n", - " results.append(multiple_graph(scalar, params, mult.klass, mult.partial))\n", - " result = MultResults(results, samples)\n", - " with open(fname, \"wb\") as f:\n", - " pickle.dump((mult, result), f)\n", - " return fname" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "64799c16-8113-4eff-81de-6a3e547eb5c5", - "metadata": {}, - "outputs": [], - "source": [ - "def evaluate_multiples(mult: MultIdent,\n", - " res: MultResults,\n", - " divisors: set[int],\n", - " use_init: bool = True,\n", - " use_multiply: bool = True):\n", - " \"\"\"\n", - " Takes MultIdent and MultResults and a set of divisors (base point orders `q`) and\n", - " evaluates them using the error model from the MultIdent. Note that the MultIdent\n", - " must have an error model in this case. Returns the ProbMap.\n", - " \"\"\"\n", - " errors = {divisor: 0 for divisor in divisors}\n", - " samples = len(res)\n", - " divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()\n", - " for precomp_ctx, full_ctx, out in res:\n", - " check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out,\n", - " check_condition=mult.error_model.check_condition,\n", - " precomp_to_affine=mult.error_model.precomp_to_affine,\n", - " use_init=use_init,\n", - " use_multiply=use_multiply)\n", - " for q in divisors:\n", - " error = evaluate_checks(check_funcs={\"add\": mult.error_model.check_add(q), \"affine\": mult.error_model.check_affine(q)},\n", - " check_inputs=check_inputs)\n", - " errors[q] += error\n", - " # Make probmaps smaller. Do not store zero probabilities.\n", - " probs = {}\n", - " for q, error in errors.items():\n", - " if error != 0:\n", - " probs[q] = error / samples\n", - " return ProbMap(probs, divisors_hash, samples)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ac630a08-4120-41cf-b3bb-1827ef469542", - "metadata": {}, - "outputs": [], - "source": [ - "def evaluate_multiples_direct(mult: MultIdent,\n", - " fname: str,\n", - " offset: int,\n", - " divisors: set[int],\n", - " use_init: bool = True,\n", - " use_multiply: bool = True):\n", - " \"\"\"\n", - " Like `evaluate_multiples`, but instead reads the MultResults from a file named `fname`\n", - " at an `offset`. Still returns the ProbMap, which is significantly smaller and easier\n", - " to pickle than the MultResults.\n", - " \"\"\"\n", - " with open(fname, \"rb\") as f:\n", - " f.seek(offset)\n", - " _, res = pickle.load(f)\n", - " errors = {divisor: 0 for divisor in divisors}\n", - " samples = len(res)\n", - " divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()\n", - " for precomp_ctx, full_ctx, out in res:\n", - " check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out,\n", - " check_condition=mult.error_model.check_condition,\n", - " precomp_to_affine=mult.error_model.precomp_to_affine,\n", - " use_init=use_init,\n", - " use_multiply=use_multiply)\n", - " for q in divisors:\n", - " error = evaluate_checks(check_funcs={\"add\": mult.error_model.check_add(q), \"affine\": mult.error_model.check_affine(q)},\n", - " check_inputs=check_inputs)\n", - " errors[q] += error\n", - " # Make probmaps smaller. Do not store zero probabilities.\n", - " probs = {}\n", - " for q, error in errors.items():\n", - " if error != 0:\n", - " probs[q] = error / samples\n", - " return ProbMap(probs, divisors_hash, samples)" - ] - }, - { "cell_type": "markdown", "id": "3aaf712e-5b97-4390-8dd4-e1db1dfe36a2", "metadata": {}, "source": [ "## Run\n", - "Run this cell as many times as you want. It will simulate `samples` scalar multiplications for each `MultIdent` (a scalar multiplier implementation with an optional countermeasure) and store them into the chunk." + "Run this cell as many times as you want. It will simulate `samples` scalar multiplications for each `Config` (a scalar multiplier implementation with an optional countermeasure) and store them into the chunk." ] }, { @@ -310,12 +170,35 @@ "execution_count": null, "id": "84359084-4116-436c-92cd-d43fdfeca842", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "f7b8ac781be9408db32f219b4b2290a0", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Computing multiple graphs.: 0%| | 0/5265 [00:00<?, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/xjancar/pyecsca/virt/lib/python3.12/site-packages/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.\n", + " warnings.warn(\n" + ] + } + ], "source": [ "chunk_id = randbytes(4).hex()\n", "with TaskExecutor(max_workers=num_workers, initializer=silence) as pool, tempfile.TemporaryDirectory() as tmp_dirname:\n", " tmp_path = Path(tmp_dirname)\n", - " for i, mult in enumerate(all_mults_with_ctr):\n", + " for i, mult in enumerate(all_configs):\n", " pool.submit_task(mult,\n", " simulate_multiples_direct,\n", " mult, params, bits, tmp_path / f\"{i}.pickle\", samples, seed=chunk_id)\n", @@ -324,7 +207,8 @@ " #print(f\"Got {mult}.\")\n", " if error := future.exception():\n", " print(\"Error!\", error)\n", - " continue\n", + " break\n", + " #continue\n", " fpath = future.result()\n", " with fpath.open(\"rb\") as f:\n", " h.write(f.read())\n", @@ -351,7 +235,57 @@ "execution_count": null, "id": "fbab8333-b8f1-4890-b38a-7bb34f5ffb02", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "f4bc80bd47b345e0a74ad1a94121bfb1", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Processing chunks: 0%| | 0/1 [00:00<?, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Processing chunk 316daffc, no probmaps found.\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "38046ebec4d34694a973ab1959f14905", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Loading chunk 316daffc.: 0%| | 0/5265 [00:00<?, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "76e3fb85f95e41069c2b7fbe86b27650", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Processing 316daffc.: 0%| | 0/168480 [00:00<?, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], "source": [ "with TaskExecutor(max_workers=num_workers, initializer=silence) as pool:\n", " for in_fname in tqdm(glob.glob(f\"multiples_{bits}_chunk*.pickle\"), desc=\"Processing chunks\", smoothing=0):\n", @@ -364,7 +298,7 @@ " out_file = Path(out_fname)\n", "\n", " cfgs_todo = set()\n", - " for mult in all_mults_with_ctr:\n", + " for mult in all_configs:\n", " for error_model in all_error_models:\n", " cfgs_todo.add(mult.with_error_model(error_model))\n", "\n", diff --git a/analysis/scalarmults/visualize.ipynb b/analysis/scalarmults/visualize.ipynb index d2d6b0f..a77ba9d 100644 --- a/analysis/scalarmults/visualize.ipynb +++ b/analysis/scalarmults/visualize.ipynb @@ -433,7 +433,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.5" + "version": "3.12.3" } }, "nbformat": 4, |
