aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2025-10-25 16:07:31 +0200
committerJ08nY2025-10-25 16:07:31 +0200
commit4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf (patch)
tree3c6b81eb26563919eb927c7bf46ba38d59be0e9e
parent1bf4c680fed1eab512e65ec6b7a0b4eb6ce8ca74 (diff)
downloadECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.tar.gz
ECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.tar.zst
ECTester-4bf6a31c00a90cb5aea54c7a37c95f8f3413faaf.zip
-rw-r--r--analysis/scalarmults/common.py448
-rw-r--r--analysis/scalarmults/epare/__init__.py206
-rw-r--r--analysis/scalarmults/epare/config.py206
-rw-r--r--analysis/scalarmults/epare/error_model.py109
-rw-r--r--analysis/scalarmults/epare/mult_results.py40
-rw-r--r--analysis/scalarmults/epare/prob_map.py65
-rw-r--r--analysis/scalarmults/epare/simulate.py102
-rw-r--r--analysis/scalarmults/probmaps.py78
-rw-r--r--analysis/scalarmults/simulate.ipynb268
-rw-r--r--analysis/scalarmults/visualize.ipynb2
10 files changed, 830 insertions, 694 deletions
diff --git a/analysis/scalarmults/common.py b/analysis/scalarmults/common.py
deleted file mode 100644
index b307278..0000000
--- a/analysis/scalarmults/common.py
+++ /dev/null
@@ -1,448 +0,0 @@
-import itertools
-import hashlib
-from datetime import timedelta
-from enum import Enum
-from operator import itemgetter
-
-from dataclasses import dataclass
-from functools import partial, cached_property, total_ordering
-from typing import Any, Optional, Type, Union, Literal
-
-from statsmodels.stats.proportion import proportion_confint
-
-from pyecsca.sca.re.rpa import MultipleContext
-from pyecsca.ec.mult import *
-from pyecsca.ec.point import Point
-from pyecsca.ec.countermeasures import GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri
-
-
-def check_equal_multiples(k, l, q):
- """Checks whether the two multiples input into the formula are equal modulo q (the order of the base)."""
- return (k % q) == (l % q)
-
-
-def check_divides(k, l, q):
- """Checks whether q (the order of the base) divides any of the multiples input into the formula."""
- return (k != 0) and (l != 0) and (k % q == 0) or (l % q == 0)
-
-
-def check_half_add(k, l, q):
- return (q % 2 == 0) and ((k-l) % (q//2)) == 0
-
-
-def check_affine(k, q):
- """Checks whether q (the order of the base) divides the multiple that is to be converted to affine."""
- return k % q == 0
-
-
-def check_any(*checks, q=None):
- """Merge multiple checks together. The returned check function no longer takes the `q` parameter."""
- def check_func(k, l):
- for check in checks:
- if check(k, l, q):
- return True
- return False
- return check_func
-
-
-# These checks can be applied to add formulas. See the formulas notebook for background on them.
-checks_add = {
- "equal_multiples": check_equal_multiples,
- "divides": check_divides,
- "half_add": check_half_add
-}
-
-# This check can be applied to conversion to affine.
-checks_affine = {
- "affine": check_affine
-}
-
-def powerset(iterable):
- """Take an iterable and create a powerset of its elements."""
- s = list(iterable)
- return map(set, itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1)))
-
-
-@dataclass(frozen=True)
-@total_ordering
-class ErrorModel:
- """
- An ErrorModel describes the behavior of an implementation with regards to errors on exceptional
- inputs to its addition formulas, to-affine conversion or general scalar multiplication.
-
- :param checks: A set of names of checks (from checks_add and checks_affine) that the implementation performs.
- Note that these may not be checks that the implementation explicitly performs, only that it behaves w.r.t.
- errors as if it were doing these checks, due to the formulas it chose and any actual checks it has.
- :param check_condition: Either "all" or "necessary". Specifies whether the checks are applied to all points
- that the implementation computes during a scalar multiplication or only those that end up being used -- thus
- affect -- the final result. If an implementation does not perform any dummy operations, these two are the same.
- :param precomp_to_affine: Specifies whether the implementation converts all results of the precomputation step
- to affine form. If it does, it means that additional checks on all outputs of the precomputation are done as
- they have to be "convertible" to affine form.
- """
- checks: set[str]
- check_condition: Union[Literal["all"], Literal["necessary"]]
- precomp_to_affine: bool
-
- def __init__(self, checks: set[str], check_condition: Union[Literal["all"], Literal["necessary"]], precomp_to_affine: bool):
- for check in checks:
- if check not in checks_add:
- raise ValueError(f"Unknown check: {check}")
- checks = set(checks)
- checks.add("affine") # always done in our model
- object.__setattr__(self, "checks", checks)
- if check_condition not in ("all", "necessary"):
- raise ValueError("Wrong check_condition")
- object.__setattr__(self, "check_condition", check_condition)
- object.__setattr__(self, "precomp_to_affine", precomp_to_affine)
-
- def check_add(self, q):
- """Get the add formula check function for the given q."""
- if self.checks == {"affine"}:
- return lambda k, l: False
- return check_any(*map(lambda name: checks_add[name], filter(lambda check: check in checks_add, self.checks)), q=q)
-
- def check_affine(self, q):
- """Get the to-affine check function for the given q."""
- return partial(check_affine, q=q)
-
- def __lt__(self, other):
- if not isinstance(other, ErrorModel):
- return NotImplemented
- return str(self) < str(other)
-
- def __str__(self):
- cs = []
- if "equal_multiples" in self.checks:
- cs.append("em")
- if "divides" in self.checks:
- cs.append("d")
- if "half_add" in self.checks:
- cs.append("ha")
- if "affine" in self.checks:
- cs.append("a")
- precomp = "+pre" if self.precomp_to_affine else ""
- return f"({','.join(cs)}+{self.check_condition}{precomp})"
-
- def __hash__(self):
- return hash((tuple(sorted(self.checks)), self.check_condition, self.precomp_to_affine))
-
-
-# All error models are a simple cartesian product of the individual options.
-all_error_models = []
-for checks in powerset(checks_add):
- for precomp_to_affine in (True, False):
- for check_condition in ("all", "necessary"):
- error_model = ErrorModel(checks, check_condition=check_condition, precomp_to_affine=precomp_to_affine)
- all_error_models.append(error_model)
-
-
-@dataclass(frozen=True)
-@total_ordering
-class MultIdent:
- """
- A MultIdent is a description of a scalar multiplication implementation, consisting of a scalar multiplier,
- (optionally) a countermeasure, and (optionally) an error model.
-
- The scalar multiplier is defined by the `klass` attribute, along with the `args` and `kwargs` attributes.
- One can reconstruct the raw multiplier (without the countermeasure) by doing:
-
- klass(*args, **kwargs)
-
- The countermeasure is simply in the `countermeasure` attribute and may be `None`.
-
- The error model is simply in the `error_model` attribute and may be `None`. If it is `None`, the MultIdent
- is not suitable for error simulation and merely represents the description of a scalar multiplication
- implementation we care about when reverse-engineering: the multiplier and the countermeasure, we do not
- really care about the error model, yet need it when simulating.
- """
- klass: Type[ScalarMultiplier]
- args: list[Any]
- kwargs: dict[str, Any]
- countermeasure: Optional[str] = None
- error_model: Optional[ErrorModel] = None
-
- def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs):
- object.__setattr__(self, "klass", klass)
- object.__setattr__(self, "args", args if args is not None else [])
- if kwargs is not None and "countermeasure" in kwargs:
- object.__setattr__(self, "countermeasure", kwargs["countermeasure"])
- del kwargs["countermeasure"]
- if kwargs is not None and "error_model" in kwargs:
- object.__setattr__(self, "error_model", kwargs["error_model"])
- del kwargs["error_model"]
- object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {})
-
- @cached_property
- def partial(self):
- """Get the callable that constructs the scalar multiplier (with countermeasure if any)."""
- func = partial(self.klass, *self.args, **self.kwargs)
- if self.countermeasure is None:
- return func
- if self.countermeasure == "gsr":
- return lambda *args, **kwargs: GroupScalarRandomization(func(*args, **kwargs))
- elif self.countermeasure == "additive":
- return lambda *args, **kwargs: AdditiveSplitting(func(*args, **kwargs))
- elif self.countermeasure == "multiplicative":
- return lambda *args, **kwargs: MultiplicativeSplitting(func(*args, **kwargs))
- elif self.countermeasure == "euclidean":
- return lambda *args, **kwargs: EuclideanSplitting(func(*args, **kwargs))
- elif self.countermeasure == "bt":
- return lambda *args, **kwargs: BrumleyTuveri(func(*args, **kwargs))
-
- def with_countermeasure(self, countermeasure: str | None):
- """Return a new MultIdent with a given countermeasure."""
- if countermeasure not in (None, "gsr", "additive", "multiplicative", "euclidean", "bt"):
- raise ValueError(f"Unknown countermeasure: {countermeasure}")
- return MultIdent(self.klass, *self.args, **self.kwargs, countermeasure=countermeasure)
-
- def with_error_model(self, error_model: ErrorModel | None):
- """Return a new MultIdent with a given error model."""
- if not (isinstance(error_model, ErrorModel) or error_model is None):
- raise ValueError("Unknown error model.")
- return MultIdent(self.klass, *self.args, **self.kwargs, countermeasure=self.countermeasure, error_model=error_model)
-
- def __str__(self):
- name = self.klass.__name__.replace("Multiplier", "")
- args = ("_" + ",".join(list(map(str, self.args)))) if self.args else ""
- kwmap = {"recoding_direction": "recode",
- "direction": "dir",
- "width": "w"}
- kwargs = ("_" + ",".join(f"{kwmap.get(k, k)}:{v.name if isinstance(v, Enum) else str(v)}" for k,v in self.kwargs.items())) if self.kwargs else ""
- countermeasure = f"+{self.countermeasure}" if self.countermeasure is not None else ""
- error_model = f"+{self.error_model}" if self.error_model is not None else ""
- return f"{name}{args}{kwargs}{countermeasure}{error_model}"
-
- def __getstate__(self):
- state = self.__dict__.copy()
- # Remove cached properties
- state.pop("partial", None)
- return state
-
- def __lt__(self, other):
- if not isinstance(other, MultIdent):
- return NotImplemented
- return str(self) < str(other)
-
- def __repr__(self):
- return str(self)
-
- def __hash__(self):
- return hash((self.klass, self.countermeasure, self.error_model, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values())))
-
-
-@dataclass
-class MultResults:
- """
- A MultResults instance represents many simulated scalar multiplciation computations, which were tracked
- using a `MultipleContext` (i.e. the outputs of the :func:`pyecsca.sca.re.rpa.multiple_graph` function).
- Generally, these would be for one MultIdent only, but that should be handled separately, for example
- in a dict[MultIdent, MultResults]. The `samples` describe how many computations
- are contained and must correspond to the length of the `multiplications` list.
- """
- multiplications: list[tuple[MultipleContext, MultipleContext, Point]]
- samples: int
- duration: Optional[float] = None
-
- def merge(self, other: "MultResults"):
- self.multiplications.extend(other.multiplications)
- self.samples += other.samples
-
- def __len__(self):
- return self.samples
-
- def __iter__(self):
- yield from self.multiplications
-
- def __getitem__(self, i):
- return self.multiplications[i]
-
- def __str__(self):
- duration = timedelta(seconds=int(self.duration)) if self.duration is not None else ""
- return f"MultResults({self.samples},{duration})"
-
- def __repr__(self):
- return str(self)
-
-
-@dataclass
-class ProbMap:
- """
- A ProbMap is a mapping from integers (base point order q) to floats (error probability for some scalar
- multiplication implementation, i.e. MultIdent). The probability map is constructed for a given set of
- `divisors` (the base point orders q). Probability maps can be narrowed or merged. A narrowing restricts
- the probability map to a smaller set of `divisors`. A merging takes another probability map using the
- same divisor set and updates the probabilities to a weighted average of the two probability maps
- (the weight is the number of samples).
- """
- probs: dict[int, float]
- divisors_hash: bytes
- samples: int
-
- def __len__(self):
- return len(self.probs)
-
- def __iter__(self):
- yield from self.probs
-
- def __getitem__(self, i):
- return self.probs[i] if i in self.probs else 0.0
-
- def __contains__(self, item):
- return item in self.probs
-
- def keys(self):
- return self.probs.keys()
-
- def values(self):
- return self.probs.values()
-
- def items(self):
- return self.probs.items()
-
- def narrow(self, divisors: set[int]):
- """Narrow the probability map to the new set of divisors (must be a subset of the current set)."""
- divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()
- if self.divisors_hash == divisors_hash:
- # Already narrow.
- return
- for kdel in set(self.probs.keys()).difference(divisors):
- del self.probs[kdel]
- self.divisors_hash = divisors_hash
-
- def merge(self, other: "ProbMap") -> None:
- """Merge the `other` probability map into this one (must share the divisor set)."""
- if self.divisors_hash != other.divisors_hash:
- raise ValueError("Merging can only work on probmaps created for same divisors.")
- new_keys = set(self.keys()).union(other.keys())
- result = {}
- for key in new_keys:
- sk = self[key]
- ok = other[key]
- result[key] = (sk * self.samples + ok * other.samples) / (self.samples + other.samples)
- self.probs = result
- self.samples += other.samples
-
-
-# All dbl-and-add multipliers from https://github.com/J08nY/pyecsca/blob/master/pyecsca/ec/mult
-window_mults = [
- MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.LTR),
- MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.LTR),
- MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.LTR),
- MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.LTR),
- MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.LTR),
- MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.RTL),
- MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.RTL),
- MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.RTL),
- MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.RTL),
- MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.RTL),
- MultIdent(FixedWindowLTRMultiplier, m=2**1),
- MultIdent(FixedWindowLTRMultiplier, m=2**2),
- MultIdent(FixedWindowLTRMultiplier, m=2**3),
- MultIdent(FixedWindowLTRMultiplier, m=2**4),
- MultIdent(FixedWindowLTRMultiplier, m=2**5),
- MultIdent(FixedWindowLTRMultiplier, m=2**6),
- MultIdent(WindowBoothMultiplier, width=2),
- MultIdent(WindowBoothMultiplier, width=3),
- MultIdent(WindowBoothMultiplier, width=4),
- MultIdent(WindowBoothMultiplier, width=5),
- MultIdent(WindowBoothMultiplier, width=6)
-]
-naf_mults = [
- MultIdent(WindowNAFMultiplier, width=2),
- MultIdent(WindowNAFMultiplier, width=3),
- MultIdent(WindowNAFMultiplier, width=4),
- MultIdent(WindowNAFMultiplier, width=5),
- MultIdent(WindowNAFMultiplier, width=6),
- MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.LTR),
- MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.RTL),
- MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.LTR),
- MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.RTL)
-]
-comb_mults = [
- MultIdent(CombMultiplier, width=2, always=True),
- MultIdent(CombMultiplier, width=3, always=True),
- MultIdent(CombMultiplier, width=4, always=True),
- MultIdent(CombMultiplier, width=5, always=True),
- MultIdent(CombMultiplier, width=6, always=True),
- MultIdent(CombMultiplier, width=2, always=False),
- MultIdent(CombMultiplier, width=3, always=False),
- MultIdent(CombMultiplier, width=4, always=False),
- MultIdent(CombMultiplier, width=5, always=False),
- MultIdent(CombMultiplier, width=6, always=False),
- MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.LTR),
- MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.LTR),
- MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.LTR),
- MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.LTR),
- MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.LTR),
- MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.RTL),
- MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.RTL),
- MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.RTL),
- MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.RTL),
- MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.RTL)
-]
-binary_mults = [
- MultIdent(LTRMultiplier, always=False, complete=True),
- MultIdent(LTRMultiplier, always=True, complete=True),
- MultIdent(LTRMultiplier, always=False, complete=False),
- MultIdent(LTRMultiplier, always=True, complete=False),
- MultIdent(RTLMultiplier, always=False, complete=True),
- MultIdent(RTLMultiplier, always=True, complete=True),
- MultIdent(RTLMultiplier, always=False, complete=False),
- MultIdent(RTLMultiplier, always=True, complete=False),
- MultIdent(CoronMultiplier)
-]
-other_mults = [
- MultIdent(FullPrecompMultiplier, always=False, complete=True),
- MultIdent(FullPrecompMultiplier, always=True, complete=True),
- MultIdent(FullPrecompMultiplier, always=False, complete=False),
- MultIdent(FullPrecompMultiplier, always=True, complete=False),
- MultIdent(SimpleLadderMultiplier, complete=True),
- MultIdent(SimpleLadderMultiplier, complete=False)
-]
-
-# We can enumerate all mults and countermeasures here.
-all_mults = window_mults + naf_mults + binary_mults + other_mults + comb_mults
-all_mults_with_ctr = [mult.with_countermeasure(ctr) for mult in all_mults for ctr in (None, "gsr", "additive", "multiplicative", "euclidean", "bt")]
-
-
-def powers_of(k, max_power=20):
- """Take all powers of `k` up to `max_power`."""
- return [k**i for i in range(1, max_power)]
-
-def prod_combine(one, other):
- """Multiply all pairs of elements from `one` and `other`."""
- return [a * b for a, b in itertools.product(one, other)]
-
-
-# We have several sets of divisors, inspired by various "interesting" multiples the multipliers may compute.
-small_primes = [3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]
-medium_primes = [211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397]
-large_primes = [401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997]
-all_integers = list(range(1, 400))
-all_even = list(range(2, 400, 2))
-all_odd = list(range(1, 400, 2))
-all_primes = small_primes + medium_primes + large_primes
-
-divisor_map = {
- "small_primes": small_primes,
- "medium_primes": medium_primes,
- "large_primes": large_primes,
- "all_primes": all_primes,
- "all_integers": all_integers,
- "all_even": all_even,
- "all_odd": all_odd,
- "powers_of_2": powers_of(2),
- "powers_of_2_large": powers_of(2, 256),
- "powers_of_2_large_3": [i * 3 for i in powers_of(2, 256)],
- "powers_of_2_large_p1": [i + 1 for i in powers_of(2, 256)],
- "powers_of_2_large_m1": [i - 1 for i in powers_of(2, 256)],
- "powers_of_2_large_pmautobus": sorted(set([i + j for i in powers_of(2, 256) for j in range(-5,5) if i+j > 0])),
- "powers_of_3": powers_of(3),
-}
-divisor_map["all"] = list(sorted(set().union(*[v for v in divisor_map.values()])))
-
-
-def conf_interval(p: float, samples: int, alpha: float = 0.05) -> tuple[float, float]:
- """Compute a confidence interval for a Binomial distribution."""
- return proportion_confint(round(p*samples), samples, alpha, method="wilson")
diff --git a/analysis/scalarmults/epare/__init__.py b/analysis/scalarmults/epare/__init__.py
new file mode 100644
index 0000000..631c2fd
--- /dev/null
+++ b/analysis/scalarmults/epare/__init__.py
@@ -0,0 +1,206 @@
+import itertools
+from statsmodels.stats.proportion import proportion_confint
+
+from pyecsca.ec.mult import (
+ DoubleAndAddMultiplier,
+ LTRMultiplier,
+ RTLMultiplier,
+ LadderMultiplier,
+ BinaryNAFMultiplier,
+ WindowNAFMultiplier,
+ SimpleLadderMultiplier,
+ DifferentialLadderMultiplier,
+ CoronMultiplier,
+ FixedWindowLTRMultiplier,
+ FullPrecompMultiplier,
+ ProcessingDirection,
+ AccumulationOrder,
+ ScalarMultiplier,
+ SlidingWindowMultiplier,
+ BGMWMultiplier,
+ CombMultiplier,
+ WindowBoothMultiplier,
+ ScalarMultiplier
+)
+from pyecsca.ec.countermeasures import (
+ GroupScalarRandomization,
+ AdditiveSplitting,
+ MultiplicativeSplitting,
+ EuclideanSplitting,
+ BrumleyTuveri,
+ PointBlinding,
+ ScalarMultiplierCountermeasure
+)
+
+from .config import MultIdent, CountermeasureIdent, Config
+from .error_model import ErrorModel, checks_add
+from .mult_results import MultResults
+from .prob_map import ProbMap
+
+
+
+def powerset(iterable):
+ """Take an iterable and create a powerset of its elements."""
+ s = list(iterable)
+ return map(set, itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1)))
+
+
+def powers_of(k, max_power=20):
+ """Take all powers of `k` up to `max_power`."""
+ return [k**i for i in range(1, max_power)]
+
+
+def prod_combine(one, other):
+ """Multiply all pairs of elements from `one` and `other`."""
+ return [a * b for a, b in itertools.product(one, other)]
+
+
+# All dbl-and-add multipliers from https://github.com/J08nY/pyecsca/blob/master/pyecsca/ec/mult
+window_mults = [
+ MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.LTR),
+ MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.LTR),
+ MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.LTR),
+ MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.LTR),
+ MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.LTR),
+ MultIdent(SlidingWindowMultiplier, width=2, recoding_direction=ProcessingDirection.RTL),
+ MultIdent(SlidingWindowMultiplier, width=3, recoding_direction=ProcessingDirection.RTL),
+ MultIdent(SlidingWindowMultiplier, width=4, recoding_direction=ProcessingDirection.RTL),
+ MultIdent(SlidingWindowMultiplier, width=5, recoding_direction=ProcessingDirection.RTL),
+ MultIdent(SlidingWindowMultiplier, width=6, recoding_direction=ProcessingDirection.RTL),
+ MultIdent(FixedWindowLTRMultiplier, m=2**1),
+ MultIdent(FixedWindowLTRMultiplier, m=2**2),
+ MultIdent(FixedWindowLTRMultiplier, m=2**3),
+ MultIdent(FixedWindowLTRMultiplier, m=2**4),
+ MultIdent(FixedWindowLTRMultiplier, m=2**5),
+ MultIdent(FixedWindowLTRMultiplier, m=2**6),
+ MultIdent(WindowBoothMultiplier, width=2),
+ MultIdent(WindowBoothMultiplier, width=3),
+ MultIdent(WindowBoothMultiplier, width=4),
+ MultIdent(WindowBoothMultiplier, width=5),
+ MultIdent(WindowBoothMultiplier, width=6)
+]
+naf_mults = [
+ MultIdent(WindowNAFMultiplier, width=2),
+ MultIdent(WindowNAFMultiplier, width=3),
+ MultIdent(WindowNAFMultiplier, width=4),
+ MultIdent(WindowNAFMultiplier, width=5),
+ MultIdent(WindowNAFMultiplier, width=6),
+ MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.LTR),
+ MultIdent(BinaryNAFMultiplier, always=False, direction=ProcessingDirection.RTL),
+ MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.LTR),
+ MultIdent(BinaryNAFMultiplier, always=True, direction=ProcessingDirection.RTL)
+]
+comb_mults = [
+ MultIdent(CombMultiplier, width=2, always=True),
+ MultIdent(CombMultiplier, width=3, always=True),
+ MultIdent(CombMultiplier, width=4, always=True),
+ MultIdent(CombMultiplier, width=5, always=True),
+ MultIdent(CombMultiplier, width=6, always=True),
+ MultIdent(CombMultiplier, width=2, always=False),
+ MultIdent(CombMultiplier, width=3, always=False),
+ MultIdent(CombMultiplier, width=4, always=False),
+ MultIdent(CombMultiplier, width=5, always=False),
+ MultIdent(CombMultiplier, width=6, always=False),
+ MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.LTR),
+ MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.LTR),
+ MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.LTR),
+ MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.LTR),
+ MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.LTR),
+ MultIdent(BGMWMultiplier, width=2, direction=ProcessingDirection.RTL),
+ MultIdent(BGMWMultiplier, width=3, direction=ProcessingDirection.RTL),
+ MultIdent(BGMWMultiplier, width=4, direction=ProcessingDirection.RTL),
+ MultIdent(BGMWMultiplier, width=5, direction=ProcessingDirection.RTL),
+ MultIdent(BGMWMultiplier, width=6, direction=ProcessingDirection.RTL)
+]
+binary_mults = [
+ MultIdent(LTRMultiplier, always=False, complete=True),
+ MultIdent(LTRMultiplier, always=True, complete=True),
+ MultIdent(LTRMultiplier, always=False, complete=False),
+ MultIdent(LTRMultiplier, always=True, complete=False),
+ MultIdent(RTLMultiplier, always=False, complete=True),
+ MultIdent(RTLMultiplier, always=True, complete=True),
+ MultIdent(RTLMultiplier, always=False, complete=False),
+ MultIdent(RTLMultiplier, always=True, complete=False),
+ MultIdent(CoronMultiplier)
+]
+other_mults = [
+ MultIdent(FullPrecompMultiplier, always=False, complete=True),
+ MultIdent(FullPrecompMultiplier, always=True, complete=True),
+ MultIdent(FullPrecompMultiplier, always=False, complete=False),
+ MultIdent(FullPrecompMultiplier, always=True, complete=False),
+ MultIdent(SimpleLadderMultiplier, complete=True),
+ MultIdent(SimpleLadderMultiplier, complete=False)
+]
+
+# All error models are a simple cartesian product of the individual options.
+def _all_error_models():
+ result = []
+ for checks in powerset(checks_add):
+ for precomp_to_affine in (True, False):
+ for check_condition in ("all", "necessary"):
+ error_model = ErrorModel(checks, check_condition=check_condition, precomp_to_affine=precomp_to_affine)
+ result.append(error_model)
+ return result
+all_error_models = _all_error_models()
+
+
+# We can enumerate all mults and countermeasures here.
+all_mults = window_mults + naf_mults + binary_mults + other_mults + comb_mults
+def _all_mults_with_ctr():
+ result = []
+ for mult in all_mults:
+ for one_ctr_class, other_ctr_class in itertools.product((GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri, None), repeat=2):
+ if one_ctr_class is None and other_ctr_class is None:
+ result.append(mult)
+ continue
+ if other_ctr_class is None:
+ continue
+ if one_ctr_class is None:
+ mults = [mult] * other_ctr_class.nmults
+ other_ctr = CountermeasureIdent(other_ctr_class, *mults)
+ result.append(other_ctr)
+ continue
+
+ mults = [mult] * other_ctr_class.nmults
+ other_ctr = CountermeasureIdent(other_ctr_class, *mults)
+ for i in range(1, 2**one_ctr_class.nmults):
+ bits = format(i, f"0{one_ctr_class.nmults}b")
+ args = [other_ctr if bit == "1" else mult for bit in bits]
+ ctr = CountermeasureIdent(one_ctr_class, *args)
+ result.append(ctr)
+ return result
+all_mults_with_ctr = _all_mults_with_ctr()
+all_configs = [Config(mult, None) for mult in all_mults_with_ctr]
+
+
+# We have several sets of divisors, inspired by various "interesting" multiples the multipliers may compute.
+small_primes = [3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]
+medium_primes = [211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397]
+large_primes = [401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997]
+all_integers = list(range(1, 400))
+all_even = list(range(2, 400, 2))
+all_odd = list(range(1, 400, 2))
+all_primes = small_primes + medium_primes + large_primes
+
+divisor_map = {
+ "small_primes": small_primes,
+ "medium_primes": medium_primes,
+ "large_primes": large_primes,
+ "all_primes": all_primes,
+ "all_integers": all_integers,
+ "all_even": all_even,
+ "all_odd": all_odd,
+ "powers_of_2": powers_of(2),
+ "powers_of_2_large": powers_of(2, 256),
+ "powers_of_2_large_3": [i * 3 for i in powers_of(2, 256)],
+ "powers_of_2_large_p1": [i + 1 for i in powers_of(2, 256)],
+ "powers_of_2_large_m1": [i - 1 for i in powers_of(2, 256)],
+ "powers_of_2_large_pmautobus": sorted(set([i + j for i in powers_of(2, 256) for j in range(-5,5) if i+j > 0])),
+ "powers_of_3": powers_of(3),
+}
+divisor_map["all"] = list(sorted(set().union(*[v for v in divisor_map.values()])))
+
+
+def conf_interval(p: float, samples: int, alpha: float = 0.05) -> tuple[float, float]:
+ """Compute a confidence interval for a Binomial distribution."""
+ return proportion_confint(round(p*samples), samples, alpha, method="wilson")
diff --git a/analysis/scalarmults/epare/config.py b/analysis/scalarmults/epare/config.py
new file mode 100644
index 0000000..569a1fa
--- /dev/null
+++ b/analysis/scalarmults/epare/config.py
@@ -0,0 +1,206 @@
+from dataclasses import dataclass
+from enum import Enum
+from functools import partial, total_ordering
+from typing import Any, Optional, Type
+
+from pyecsca.ec.countermeasures import GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri, PointBlinding, ScalarMultiplierCountermeasure
+from pyecsca.ec.mult import ScalarMultiplier
+from .error_model import ErrorModel
+
+class Composable:
+ klass: Type
+ args: list[Any]
+ kwargs: dict[str, Any]
+
+ @property
+ def mult(self):
+ """
+ Extract the MultIdent out of the Composable.
+
+ We assume there is only one somewhere in the tree (at the leafs).
+ """
+ if isinstance(self, MultIdent):
+ return self
+ for arg in self.args:
+ if isinstance(arg, Composable):
+ return arg.mult
+ for kwarg in self.kwargs.values():
+ if isinstance(kwarg, Composable):
+ return kwarg.mult
+
+ def construct(self, *mult_args, **mult_kwargs):
+ """Recursively construct this composable."""
+ args, kwargs = self._build_args_kwargs(mult_args, mult_kwargs)
+ return self._instantiate(args, kwargs, mult_args, mult_kwargs)
+
+ def _build_args_kwargs(self, mult_args, mult_kwargs):
+ args = []
+ for arg in self.args:
+ if isinstance(arg, Composable):
+ args.append(arg.construct(*mult_args, **mult_kwargs))
+ else:
+ args.append(arg)
+ kwargs = {}
+ for key, value in self.kwargs.items():
+ if isinstance(value, Composable):
+ kwargs[key] = value.construct(*mult_args, **mult_kwargs)
+ else:
+ kwargs[key] = value
+ return args, kwargs
+
+ def _instantiate(self, c_args, c_kwargs, mult_args, mult_kwargs):
+ return self.klass(*c_args, **c_kwargs)
+
+ def __lt__(self, other):
+ if not isinstance(other, Composable):
+ return NotImplemented
+ return str(self) < str(other)
+
+ def __hash__(self):
+ return hash((self.klass, tuple(self.args)), tuple(self.kwargs.keys()), tuple(self.kwargs.values()))
+
+
+@dataclass(frozen=True)
+@total_ordering
+class CountermeasureIdent(Composable):
+ klass: Type[ScalarMultiplierCountermeasure]
+ args: list[Any]
+ kwargs: dict[str, Any]
+
+ def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs):
+ object.__setattr__(self, "klass", klass)
+ object.__setattr__(self, "args", args if args is not None else [])
+ object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {})
+
+ def construct(self, *mult_args, **mult_kwargs):
+ args, kwargs = self._build_args_kwargs(mult_args, mult_kwargs)
+ # Capture any formula required to instantiate this countermeasure from the mult args
+ for arg in mult_args:
+ if any(map(lambda req: isinstance(arg, req), self.klass.requires)):
+ kwargs[arg.shortname] = arg
+ # Capture the rng as well.
+ if "rng" in mult_kwargs:
+ kwargs["rng"] = mult_kwargs["rng"]
+ return self._instantiate(args, kwargs, mult_args, mult_kwargs)
+
+ def __str__(self):
+ if issubclass(self.klass, GroupScalarRandomization):
+ name = "gsr"
+ elif issubclass(self.klass, AdditiveSplitting):
+ name = "asplit"
+ elif issubclass(self.klass, MultiplicativeSplitting):
+ name = "msplit"
+ elif issubclass(self.klass, EuclideanSplitting):
+ name = "esplit"
+ elif issubclass(self.klass, BrumleyTuveri):
+ name = "bt"
+ elif issubclass(self.klass, PointBlinding):
+ name = "blind"
+ else:
+ name = "?"
+ # Only print other Composables as Countermeasures do not have interesting arguments
+ args = (",".join(list(map(str, filter(lambda arg: isinstance(arg, Composable), self.args))))) if self.args else ""
+ # Same for kwargs
+ kwargs = (",".join(f"{k}={v}" for k, v in self.kwargs if isinstance(v, Composable))) if self.kwargs else ""
+ return f"{name}({args}{',' if args and kwargs else ''}{kwargs})"
+
+ def __repr__(self):
+ return str(self)
+
+ def __hash__(self):
+ return hash((self.klass, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values())))
+
+
+@dataclass(frozen=True)
+@total_ordering
+class MultIdent(Composable):
+ klass: Type[ScalarMultiplier]
+ args: list[Any]
+ kwargs: dict[str, Any]
+
+ def __init__(self, klass: Type[ScalarMultiplier], *args, **kwargs):
+ object.__setattr__(self, "klass", klass)
+ object.__setattr__(self, "args", args if args is not None else [])
+ object.__setattr__(self, "kwargs", kwargs if kwargs is not None else {})
+
+ def _instantiate(self, c_args, c_kwargs, mult_args, mult_kwargs):
+ args = [*c_args, *mult_args]
+ kwargs = {**c_kwargs, **mult_kwargs}
+ if "rng" in kwargs:
+ del kwargs["rng"]
+ return self.klass(*args, **kwargs)
+
+ def __str__(self):
+ name = self.klass.__name__.replace("Multiplier", "")
+ args = (",".join(list(map(str, self.args)))) if self.args else ""
+ kwmap = {"recoding_direction": "recode",
+ "direction": "dir",
+ "width": "w"}
+ kwargs = (",".join(f"{kwmap.get(k, k)}:{v.name if isinstance(v, Enum) else str(v)}" for k,v in self.kwargs.items())) if self.kwargs else ""
+ return f"{name}({args}{',' if args and kwargs else ''}{kwargs})"
+
+ def __repr__(self):
+ return str(self)
+
+ def __hash__(self):
+ return hash((self.klass, tuple(self.args), tuple(self.kwargs.keys()), tuple(self.kwargs.values())))
+
+
+@dataclass(frozen=True)
+@total_ordering
+class Config:
+ """
+ A Config is a description of a scalar multiplication implementation, consisting of a scalar multiplier,
+ (optionally) a countermeasure, and (optionally) an error model.
+
+ The scalar multiplier and countermeasure combination is specified as an instance of `Composable` stored
+ in the `composition` attribute.
+
+ The error model is simply in the `error_model` attribute and may be `None`. If it is `None`, the Config
+ is not suitable for error simulation and merely represents the description of a scalar multiplication
+ implementation we care about when reverse-engineering: the multiplier and the countermeasure, we do not
+ really care about the error model, yet need it when simulating.
+ """
+ composition: Composable
+ error_model: Optional[ErrorModel] = None
+
+ @property
+ def partial(self):
+ """Get the callable that constructs the scalar multiplier (with countermeasure if any)."""
+ return self.composition.construct
+
+ @property
+ def mult(self):
+ """Get the MultIdent"""
+ return self.composition.mult
+
+ @property
+ def has_countermeasure(self):
+ return isinstance(self.composition, CountermeasureIdent)
+
+ @property
+ def has_error_model(self):
+ return self.error_model is not None
+
+ def with_error_model(self, error_model: ErrorModel | None):
+ """Return a new Config with a given error model."""
+ if not (isinstance(error_model, ErrorModel) or error_model is None):
+ raise ValueError("Unknown error model.")
+ return Config(self.composition, error_model=error_model)
+
+ def __str__(self):
+ error_model = str(self.error_model) if self.error_model else ""
+ return f"<{self.composition}{error_model}>"
+
+ def __repr__(self):
+ return str(self)
+
+ def __lt__(self, other):
+ if not isinstance(other, Config):
+ return NotImplemented
+ me = (self.mult, self.composition)
+ them = (other.mult, other.composition)
+ return me < them
+
+ def __hash__(self):
+ return hash((self.composition, self.error_model))
diff --git a/analysis/scalarmults/epare/error_model.py b/analysis/scalarmults/epare/error_model.py
new file mode 100644
index 0000000..a7f5edd
--- /dev/null
+++ b/analysis/scalarmults/epare/error_model.py
@@ -0,0 +1,109 @@
+from dataclasses import dataclass
+from functools import partial, total_ordering
+from typing import Any, Optional, Type, Union, Literal
+
+
+def check_equal_multiples(k, l, q):
+ """Checks whether the two multiples input into the formula are equal modulo q (the order of the base)."""
+ return (k % q) == (l % q)
+
+
+def check_divides(k, l, q):
+ """Checks whether q (the order of the base) divides any of the multiples input into the formula."""
+ return (k != 0) and (l != 0) and (k % q == 0) or (l % q == 0)
+
+
+def check_half_add(k, l, q):
+ return (q % 2 == 0) and ((k-l) % (q//2)) == 0
+
+
+def check_affine(k, q):
+ """Checks whether q (the order of the base) divides the multiple that is to be converted to affine."""
+ return k % q == 0
+
+
+def check_any(*checks, q=None):
+ """Merge multiple checks together. The returned check function no longer takes the `q` parameter."""
+ def check_func(k, l):
+ for check in checks:
+ if check(k, l, q):
+ return True
+ return False
+ return check_func
+
+
+# These checks can be applied to add formulas. See the formulas notebook for background on them.
+checks_add = {
+ "equal_multiples": check_equal_multiples,
+ "divides": check_divides,
+ "half_add": check_half_add
+}
+
+# This check can be applied to conversion to affine.
+checks_affine = {
+ "affine": check_affine
+}
+
+@dataclass(frozen=True)
+@total_ordering
+class ErrorModel:
+ """
+ An ErrorModel describes the behavior of an implementation with regards to errors on exceptional
+ inputs to its addition formulas, to-affine conversion or general scalar multiplication.
+
+ :param checks: A set of names of checks (from checks_add and checks_affine) that the implementation performs.
+ Note that these may not be checks that the implementation explicitly performs, only that it behaves w.r.t.
+ errors as if it were doing these checks, due to the formulas it chose and any actual checks it has.
+ :param check_condition: Either "all" or "necessary". Specifies whether the checks are applied to all points
+ that the implementation computes during a scalar multiplication or only those that end up being used -- thus
+ affect -- the final result. If an implementation does not perform any dummy operations, these two are the same.
+ :param precomp_to_affine: Specifies whether the implementation converts all results of the precomputation step
+ to affine form. If it does, it means that additional checks on all outputs of the precomputation are done as
+ they have to be "convertible" to affine form.
+ """
+ checks: set[str]
+ check_condition: Union[Literal["all"], Literal["necessary"]]
+ precomp_to_affine: bool
+
+ def __init__(self, checks: set[str], check_condition: Union[Literal["all"], Literal["necessary"]], precomp_to_affine: bool):
+ for check in checks:
+ if check not in checks_add:
+ raise ValueError(f"Unknown check: {check}")
+ checks = set(checks)
+ checks.add("affine") # always done in our model
+ object.__setattr__(self, "checks", checks)
+ if check_condition not in ("all", "necessary"):
+ raise ValueError("Wrong check_condition")
+ object.__setattr__(self, "check_condition", check_condition)
+ object.__setattr__(self, "precomp_to_affine", precomp_to_affine)
+
+ def check_add(self, q):
+ """Get the add formula check function for the given q."""
+ if self.checks == {"affine"}:
+ return lambda k, l: False
+ return check_any(*map(lambda name: checks_add[name], filter(lambda check: check in checks_add, self.checks)), q=q)
+
+ def check_affine(self, q):
+ """Get the to-affine check function for the given q."""
+ return partial(check_affine, q=q)
+
+ def __lt__(self, other):
+ if not isinstance(other, ErrorModel):
+ return NotImplemented
+ return str(self) < str(other)
+
+ def __str__(self):
+ cs = []
+ if "equal_multiples" in self.checks:
+ cs.append("em")
+ if "divides" in self.checks:
+ cs.append("d")
+ if "half_add" in self.checks:
+ cs.append("ha")
+ if "affine" in self.checks:
+ cs.append("a")
+ precomp = "+pre" if self.precomp_to_affine else ""
+ return f"({','.join(cs)}+{self.check_condition}{precomp})"
+
+ def __hash__(self):
+ return hash((tuple(sorted(self.checks)), self.check_condition, self.precomp_to_affine)) \ No newline at end of file
diff --git a/analysis/scalarmults/epare/mult_results.py b/analysis/scalarmults/epare/mult_results.py
new file mode 100644
index 0000000..5424b01
--- /dev/null
+++ b/analysis/scalarmults/epare/mult_results.py
@@ -0,0 +1,40 @@
+from dataclasses import dataclass
+from datetime import timedelta
+from typing import Optional
+
+from pyecsca.sca.re.rpa import MultipleContext
+from pyecsca.ec.point import Point
+
+
+@dataclass
+class MultResults:
+ """
+ A MultResults instance represents many simulated scalar multiplciation computations, which were tracked
+ using a `MultipleContext` (i.e. the outputs of the :func:`pyecsca.sca.re.rpa.multiple_graph` function).
+ Generally, these would be for one Config only, but that should be handled separately, for example
+ in a dict[Config, MultResults]. The `samples` describe how many computations
+ are contained and must correspond to the length of the `multiplications` list.
+ """
+ multiplications: list[tuple[MultipleContext, MultipleContext, Point]]
+ samples: int
+ duration: Optional[float] = None
+
+ def merge(self, other: "MultResults"):
+ self.multiplications.extend(other.multiplications)
+ self.samples += other.samples
+
+ def __len__(self):
+ return self.samples
+
+ def __iter__(self):
+ yield from self.multiplications
+
+ def __getitem__(self, i):
+ return self.multiplications[i]
+
+ def __str__(self):
+ duration = timedelta(seconds=int(self.duration)) if self.duration is not None else ""
+ return f"MultResults({self.samples},{duration})"
+
+ def __repr__(self):
+ return str(self) \ No newline at end of file
diff --git a/analysis/scalarmults/epare/prob_map.py b/analysis/scalarmults/epare/prob_map.py
new file mode 100644
index 0000000..eb96dda
--- /dev/null
+++ b/analysis/scalarmults/epare/prob_map.py
@@ -0,0 +1,65 @@
+from dataclasses import dataclass
+import hashlib
+
+
+def hash_divisors(divisors: set[int]) -> bytes:
+ return hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()
+
+
+@dataclass
+class ProbMap:
+ """
+ A ProbMap is a mapping from integers (base point order q) to floats (error probability for some scalar
+ multiplication implementation, i.e. Config). The probability map is constructed for a given set of
+ `divisors` (the base point orders q). Probability maps can be narrowed or merged. A narrowing restricts
+ the probability map to a smaller set of `divisors`. A merging takes another probability map using the
+ same divisor set and updates the probabilities to a weighted average of the two probability maps
+ (the weight is the number of samples).
+ """
+ probs: dict[int, float]
+ divisors_hash: bytes
+ samples: int
+
+ def __len__(self):
+ return len(self.probs)
+
+ def __iter__(self):
+ yield from self.probs
+
+ def __getitem__(self, i):
+ return self.probs[i] if i in self.probs else 0.0
+
+ def __contains__(self, item):
+ return item in self.probs
+
+ def keys(self):
+ return self.probs.keys()
+
+ def values(self):
+ return self.probs.values()
+
+ def items(self):
+ return self.probs.items()
+
+ def narrow(self, divisors: set[int]):
+ """Narrow the probability map to the new set of divisors (must be a subset of the current set)."""
+ divisors_hash = hash_divisors(divisors)
+ if self.divisors_hash == divisors_hash:
+ # Already narrow.
+ return
+ for kdel in set(self.probs.keys()).difference(divisors):
+ del self.probs[kdel]
+ self.divisors_hash = divisors_hash
+
+ def merge(self, other: "ProbMap") -> None:
+ """Merge the `other` probability map into this one (must share the divisor set)."""
+ if self.divisors_hash != other.divisors_hash:
+ raise ValueError("Merging can only work on probmaps created for same divisors.")
+ new_keys = set(self.keys()).union(other.keys())
+ result = {}
+ for key in new_keys:
+ sk = self[key]
+ ok = other[key]
+ result[key] = (sk * self.samples + ok * other.samples) / (self.samples + other.samples)
+ self.probs = result
+ self.samples += other.samples \ No newline at end of file
diff --git a/analysis/scalarmults/epare/simulate.py b/analysis/scalarmults/epare/simulate.py
new file mode 100644
index 0000000..9b78436
--- /dev/null
+++ b/analysis/scalarmults/epare/simulate.py
@@ -0,0 +1,102 @@
+import random
+import pickle
+from functools import partial
+
+from .config import Config
+from .mult_results import MultResults
+
+from pyecsca.ec.params import DomainParameters
+from pyecsca.ec.mod import mod
+from pyecsca.sca.re.rpa import multiple_graph
+
+
+def simulate_multiples(mult: Config,
+ params: DomainParameters,
+ bits: int,
+ samples: int = 100,
+ seed: bytes | None = None) -> MultResults:
+ """
+ Takes a Config, which specifies a scalar multiplier (with optional countermeasures)
+ and simulates `samples` scalar multiplications, while tracking which multiples of the
+ symbolic input point get computed.
+ """
+ results = []
+ if seed is not None:
+ random.seed(seed)
+ rng = lambda n: mod(random.randrange(n), n)
+
+ # If no countermeasure is used, we have fully random scalars.
+ # Otherwise, fix one per chunk.
+ if not mult.has_countermeasure:
+ scalars = [random.randint(1, 2**bits) for _ in range(samples)]
+ else:
+ one = random.randint(1, 2**bits)
+ scalars = [one for _ in range(samples)]
+
+ for scalar in scalars:
+ results.append(multiple_graph(scalar, params, mult.mult.klass, partial(mult.partial, rng=rng)))
+ return MultResults(results, samples)
+
+
+def simulate_multiples_direct(mult: Config,
+ params: DomainParameters,
+ bits: int,
+ fname: str,
+ samples: int = 100,
+ seed: bytes | None = None) -> str:
+ """
+ Like the `simulate_multiples` function above, but stores the pickled output directly
+ into a file named `fname`.
+ """
+ result = simulate_multiples(mult, params, bits, samples, seed)
+ with open(fname, "wb") as f:
+ pickle.dump((mult, result), f)
+ return fname
+
+
+def evaluate_multiples(mult: Config,
+ res: MultResults,
+ divisors: set[int],
+ use_init: bool = True,
+ use_multiply: bool = True):
+ """
+ Takes MultIdent and MultResults and a set of divisors (base point orders `q`) and
+ evaluates them using the error model from the MultIdent. Note that the MultIdent
+ must have an error model in this case. Returns the ProbMap.
+ """
+ errors = {divisor: 0 for divisor in divisors}
+ samples = len(res)
+ divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()
+ for precomp_ctx, full_ctx, out in res:
+ check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out,
+ check_condition=mult.error_model.check_condition,
+ precomp_to_affine=mult.error_model.precomp_to_affine,
+ use_init=use_init,
+ use_multiply=use_multiply)
+ for q in divisors:
+ error = evaluate_checks(check_funcs={"add": mult.error_model.check_add(q), "affine": mult.error_model.check_affine(q)},
+ check_inputs=check_inputs)
+ errors[q] += error
+ # Make probmaps smaller. Do not store zero probabilities.
+ probs = {}
+ for q, error in errors.items():
+ if error != 0:
+ probs[q] = error / samples
+ return ProbMap(probs, divisors_hash, samples)
+
+
+def evaluate_multiples_direct(mult: Config,
+ fname: str,
+ offset: int,
+ divisors: set[int],
+ use_init: bool = True,
+ use_multiply: bool = True):
+ """
+ Like `evaluate_multiples`, but instead reads the MultResults from a file named `fname`
+ at an `offset`. Still returns the ProbMap, which is significantly smaller and easier
+ to pickle than the MultResults.
+ """
+ with open(fname, "rb") as f:
+ f.seek(offset)
+ _, res = pickle.load(f)
+ return evaluate_multiples(mult, res, divisors, use_init, use_multiply) \ No newline at end of file
diff --git a/analysis/scalarmults/probmaps.py b/analysis/scalarmults/probmaps.py
deleted file mode 100644
index 0ad13a3..0000000
--- a/analysis/scalarmults/probmaps.py
+++ /dev/null
@@ -1,78 +0,0 @@
-import sys
-import pickle
-import itertools
-import glob
-
-from pyecsca.misc.utils import TaskExecutor
-from tqdm.auto import tqdm
-
-from common import *
-
-
-def divides_any(l: int, small_scalars: set[int]) -> bool:
- for s in small_scalars:
- if s%l==0:
- return True
- return False
-
-
-def process_small_scalars(scalar_results: MultResults, divisors: set[int]) -> ProbMap:
- result = {}
- for divisor in divisors:
- count = 0
- for smult in scalar_results.multiplications:
- if divides_any(divisor, smult):
- count += 1
- result[divisor] = count / scalar_results.samples
- return ProbMap(result, scalar_results.samples, scalar_results.kind)
-
-
-def load_chunk(fname: str, divisors: set[int], kind: str) -> dict[MultIdent, ProbMap]:
- with open(fname, "rb") as f:
- multiples = {}
- while True:
- try:
- mult, distr = pickle.load(f)
- multiples[mult] = distr
- except EOFError:
- break
- res = {}
- for mult, results in multiples.items():
- results.kind = kind
- res[mult] = process_small_scalars(results, divisors)
- return res
-
-
-if __name__ == "__main__":
- distributions_mults = {}
- bits = 256
- num_workers = int(sys.argv[1]) if len(sys.argv) > 1 else 32
- divisor_name = sys.argv[2] if len(sys.argv) > 2 else "all"
- kind = sys.argv[3] if len(sys.argv) > 3 else "precomp+necessary"
- use_init = (sys.argv[4].lower() == "true") if len(sys.argv) > 4 else True
- use_multiply = (sys.argv[5].lower() == "true") if len(sys.argv) > 5 else True
- files = sorted(glob.glob(f"multiples_{bits}_{kind}_{'init' if use_init else 'noinit'}_{'mult' if use_multiply else 'nomult'}_chunk*.pickle"))
-
- selected_divisors = divisor_map[divisor_name]
-
- with TaskExecutor(max_workers=num_workers) as pool:
- for fname in files:
- pool.submit_task(fname,
- load_chunk,
- fname, selected_divisors, kind)
- for fname, future in tqdm(pool.as_completed(), total=len(pool.tasks), smoothing=0):
- if error := future.exception():
- print(f"Error {fname}, {error}")
- continue
- new_distrs = future.result()
- for mult, prob_map in new_distrs.items():
- if mult in distributions_mults:
- distributions_mults[mult].merge(prob_map)
- else:
- distributions_mults[mult] = prob_map
- for mult, prob_map in distributions_mults.items():
- print(f"Got {prob_map.samples} for {mult}.")
-
- # Save
- with open(f"{divisor_name}_{kind}_distrs.pickle", "wb") as f:
- pickle.dump(distributions_mults, f)
diff --git a/analysis/scalarmults/simulate.ipynb b/analysis/scalarmults/simulate.ipynb
index c1f123a..07f0e8f 100644
--- a/analysis/scalarmults/simulate.ipynb
+++ b/analysis/scalarmults/simulate.ipynb
@@ -25,12 +25,11 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 15,
"id": "b4386513-cc14-434b-a748-2863f8657452",
"metadata": {},
"outputs": [],
"source": [
- "import itertools\n",
"import gc\n",
"import glob\n",
"import hashlib\n",
@@ -52,7 +51,8 @@
"\n",
"from collections import Counter\n",
"from pathlib import Path\n",
- "from random import randint, randbytes\n",
+ "from random import randint, randbytes, randrange\n",
+ "from functools import partial\n",
"from typing import Type, Any\n",
"\n",
"from tqdm.auto import tqdm, trange\n",
@@ -64,7 +64,7 @@
"from pyecsca.sca.re.epa import graph_to_check_inputs, evaluate_checks\n",
"from pyecsca.misc.utils import TaskExecutor\n",
"\n",
- "from common import *"
+ "from epare import *"
]
},
{
@@ -80,7 +80,7 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 16,
"id": "3463a7bd-34d8-458b-8ceb-dddf99de21dc",
"metadata": {},
"outputs": [],
@@ -97,10 +97,21 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 17,
"id": "170c11fc-86cf-4eb1-bf4e-b2e44b2d7ac5",
"metadata": {},
- "outputs": [],
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Scalar multipliers considered: 65\n",
+ "Scalar multipliers (with a combination of up-to two countermeasures) considered: 5265\n",
+ "Error models considered: 32\n",
+ "Total configurations considered: 168480\n"
+ ]
+ }
+ ],
"source": [
"nmults = len(all_mults)\n",
"nmults_ctr = len(all_mults_with_ctr)\n",
@@ -108,7 +119,7 @@
"ncfgs = nmults_ctr * nerror_models\n",
"\n",
"print(f\"Scalar multipliers considered: {nmults}\")\n",
- "print(f\"Scalar multipliers (with a single countermeasure) considered: {nmults_ctr}\")\n",
+ "print(f\"Scalar multipliers (with a combination of up-to two countermeasures) considered: {nmults_ctr}\")\n",
"print(f\"Error models considered: {nerror_models}\")\n",
"print(f\"Total configurations considered: {ncfgs}\")"
]
@@ -126,7 +137,7 @@
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 18,
"id": "4d5c7f10-618f-4612-b594-81d1607b0d1d",
"metadata": {},
"outputs": [],
@@ -139,170 +150,19 @@
"use_init = True\n",
"use_multiply = True\n",
"\n",
- "num_workers = 30\n",
+ "num_workers = 20\n",
"samples = 1000\n",
"\n",
"selected_mults = all_mults"
]
},
{
- "cell_type": "code",
- "execution_count": null,
- "id": "07bc266d-35eb-4f6d-bdba-e9f6f66827f1",
- "metadata": {},
- "outputs": [],
- "source": [
- "def simulate_multiples(mult: MultIdent,\n",
- " params: DomainParameters,\n",
- " bits: int,\n",
- " samples: int = 100,\n",
- " seed: bytes | None = None) -> MultResults:\n",
- " \"\"\"\n",
- " Takes a MultIdent, which specifies a scalar multiplier (with an optional countermeasure)\n",
- " and simulates `samples` scalar multiplications, while tracking which multiples of the\n",
- " symbolic input point get computed.\n",
- " \"\"\"\n",
- " results = []\n",
- " if seed is not None:\n",
- " random.seed(seed)\n",
- "\n",
- " # If no countermeasure is used, we have fully random scalars.\n",
- " # Otherwise, fix one per chunk.\n",
- " if mult.countermeasure is None:\n",
- " scalars = [random.randint(1, 2**bits) for _ in range(samples)]\n",
- " else:\n",
- " one = random.randint(1, 2**bits)\n",
- " scalars = [one for _ in range(samples)]\n",
- "\n",
- " for scalar in scalars:\n",
- " results.append(multiple_graph(scalar, params, mult.klass, mult.partial))\n",
- " return MultResults(results, samples)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "6f3e9ffa-19e8-46b2-a6ad-1d318d5c8e17",
- "metadata": {},
- "outputs": [],
- "source": [
- "def simulate_multiples_direct(mult: MultIdent,\n",
- " params: DomainParameters,\n",
- " bits: int,\n",
- " fname: str,\n",
- " samples: int = 100,\n",
- " seed: bytes | None = None) -> str:\n",
- " \"\"\"\n",
- " Like the `simulate_multiples` function above, but stores the pickled output directly\n",
- " into a file named `fname`.\n",
- " \"\"\"\n",
- " results = []\n",
- " if seed is not None:\n",
- " random.seed(seed)\n",
- "\n",
- " # If no countermeasure is used, we have fully random scalars.\n",
- " # Otherwise, fix one per chunk.\n",
- " if mult.countermeasure is None:\n",
- " scalars = [random.randint(1, 2**bits) for _ in range(samples)]\n",
- " else:\n",
- " one = random.randint(1, 2**bits)\n",
- " scalars = [one for _ in range(samples)]\n",
- "\n",
- " for scalar in scalars:\n",
- " results.append(multiple_graph(scalar, params, mult.klass, mult.partial))\n",
- " result = MultResults(results, samples)\n",
- " with open(fname, \"wb\") as f:\n",
- " pickle.dump((mult, result), f)\n",
- " return fname"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "64799c16-8113-4eff-81de-6a3e547eb5c5",
- "metadata": {},
- "outputs": [],
- "source": [
- "def evaluate_multiples(mult: MultIdent,\n",
- " res: MultResults,\n",
- " divisors: set[int],\n",
- " use_init: bool = True,\n",
- " use_multiply: bool = True):\n",
- " \"\"\"\n",
- " Takes MultIdent and MultResults and a set of divisors (base point orders `q`) and\n",
- " evaluates them using the error model from the MultIdent. Note that the MultIdent\n",
- " must have an error model in this case. Returns the ProbMap.\n",
- " \"\"\"\n",
- " errors = {divisor: 0 for divisor in divisors}\n",
- " samples = len(res)\n",
- " divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()\n",
- " for precomp_ctx, full_ctx, out in res:\n",
- " check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out,\n",
- " check_condition=mult.error_model.check_condition,\n",
- " precomp_to_affine=mult.error_model.precomp_to_affine,\n",
- " use_init=use_init,\n",
- " use_multiply=use_multiply)\n",
- " for q in divisors:\n",
- " error = evaluate_checks(check_funcs={\"add\": mult.error_model.check_add(q), \"affine\": mult.error_model.check_affine(q)},\n",
- " check_inputs=check_inputs)\n",
- " errors[q] += error\n",
- " # Make probmaps smaller. Do not store zero probabilities.\n",
- " probs = {}\n",
- " for q, error in errors.items():\n",
- " if error != 0:\n",
- " probs[q] = error / samples\n",
- " return ProbMap(probs, divisors_hash, samples)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "ac630a08-4120-41cf-b3bb-1827ef469542",
- "metadata": {},
- "outputs": [],
- "source": [
- "def evaluate_multiples_direct(mult: MultIdent,\n",
- " fname: str,\n",
- " offset: int,\n",
- " divisors: set[int],\n",
- " use_init: bool = True,\n",
- " use_multiply: bool = True):\n",
- " \"\"\"\n",
- " Like `evaluate_multiples`, but instead reads the MultResults from a file named `fname`\n",
- " at an `offset`. Still returns the ProbMap, which is significantly smaller and easier\n",
- " to pickle than the MultResults.\n",
- " \"\"\"\n",
- " with open(fname, \"rb\") as f:\n",
- " f.seek(offset)\n",
- " _, res = pickle.load(f)\n",
- " errors = {divisor: 0 for divisor in divisors}\n",
- " samples = len(res)\n",
- " divisors_hash = hashlib.blake2b(str(sorted(divisors)).encode(), digest_size=8).digest()\n",
- " for precomp_ctx, full_ctx, out in res:\n",
- " check_inputs = graph_to_check_inputs(precomp_ctx, full_ctx, out,\n",
- " check_condition=mult.error_model.check_condition,\n",
- " precomp_to_affine=mult.error_model.precomp_to_affine,\n",
- " use_init=use_init,\n",
- " use_multiply=use_multiply)\n",
- " for q in divisors:\n",
- " error = evaluate_checks(check_funcs={\"add\": mult.error_model.check_add(q), \"affine\": mult.error_model.check_affine(q)},\n",
- " check_inputs=check_inputs)\n",
- " errors[q] += error\n",
- " # Make probmaps smaller. Do not store zero probabilities.\n",
- " probs = {}\n",
- " for q, error in errors.items():\n",
- " if error != 0:\n",
- " probs[q] = error / samples\n",
- " return ProbMap(probs, divisors_hash, samples)"
- ]
- },
- {
"cell_type": "markdown",
"id": "3aaf712e-5b97-4390-8dd4-e1db1dfe36a2",
"metadata": {},
"source": [
"## Run\n",
- "Run this cell as many times as you want. It will simulate `samples` scalar multiplications for each `MultIdent` (a scalar multiplier implementation with an optional countermeasure) and store them into the chunk."
+ "Run this cell as many times as you want. It will simulate `samples` scalar multiplications for each `Config` (a scalar multiplier implementation with an optional countermeasure) and store them into the chunk."
]
},
{
@@ -310,12 +170,35 @@
"execution_count": null,
"id": "84359084-4116-436c-92cd-d43fdfeca842",
"metadata": {},
- "outputs": [],
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "f7b8ac781be9408db32f219b4b2290a0",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Computing multiple graphs.: 0%| | 0/5265 [00:00<?, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/home/xjancar/pyecsca/virt/lib/python3.12/site-packages/loky/process_executor.py:782: UserWarning: A worker stopped while some jobs were given to the executor. This can be caused by a too short worker timeout or by a memory leak.\n",
+ " warnings.warn(\n"
+ ]
+ }
+ ],
"source": [
"chunk_id = randbytes(4).hex()\n",
"with TaskExecutor(max_workers=num_workers, initializer=silence) as pool, tempfile.TemporaryDirectory() as tmp_dirname:\n",
" tmp_path = Path(tmp_dirname)\n",
- " for i, mult in enumerate(all_mults_with_ctr):\n",
+ " for i, mult in enumerate(all_configs):\n",
" pool.submit_task(mult,\n",
" simulate_multiples_direct,\n",
" mult, params, bits, tmp_path / f\"{i}.pickle\", samples, seed=chunk_id)\n",
@@ -324,7 +207,8 @@
" #print(f\"Got {mult}.\")\n",
" if error := future.exception():\n",
" print(\"Error!\", error)\n",
- " continue\n",
+ " break\n",
+ " #continue\n",
" fpath = future.result()\n",
" with fpath.open(\"rb\") as f:\n",
" h.write(f.read())\n",
@@ -351,7 +235,57 @@
"execution_count": null,
"id": "fbab8333-b8f1-4890-b38a-7bb34f5ffb02",
"metadata": {},
- "outputs": [],
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "f4bc80bd47b345e0a74ad1a94121bfb1",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Processing chunks: 0%| | 0/1 [00:00<?, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Processing chunk 316daffc, no probmaps found.\n"
+ ]
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "38046ebec4d34694a973ab1959f14905",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Loading chunk 316daffc.: 0%| | 0/5265 [00:00<?, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "76e3fb85f95e41069c2b7fbe86b27650",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "Processing 316daffc.: 0%| | 0/168480 [00:00<?, ?it/s]"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
"source": [
"with TaskExecutor(max_workers=num_workers, initializer=silence) as pool:\n",
" for in_fname in tqdm(glob.glob(f\"multiples_{bits}_chunk*.pickle\"), desc=\"Processing chunks\", smoothing=0):\n",
@@ -364,7 +298,7 @@
" out_file = Path(out_fname)\n",
"\n",
" cfgs_todo = set()\n",
- " for mult in all_mults_with_ctr:\n",
+ " for mult in all_configs:\n",
" for error_model in all_error_models:\n",
" cfgs_todo.add(mult.with_error_model(error_model))\n",
"\n",
diff --git a/analysis/scalarmults/visualize.ipynb b/analysis/scalarmults/visualize.ipynb
index d2d6b0f..a77ba9d 100644
--- a/analysis/scalarmults/visualize.ipynb
+++ b/analysis/scalarmults/visualize.ipynb
@@ -433,7 +433,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.13.5"
+ "version": "3.12.3"
}
},
"nbformat": 4,