aboutsummaryrefslogtreecommitdiff
path: root/analysis/countermeasures/simulate.py
blob: 7a0d4b0f3f978a3a6a81973ef12ba7e99eaaac23 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
# coding: utf-8

# # Simulating EPA-RE using points of low-order


import pickle
import itertools
import glob
import random
import sys
import time
import os

import matplotlib
import matplotlib.pyplot as plt
import numpy as np

from collections import Counter

from pathlib import Path
from random import randint, randbytes, shuffle
from typing import Type, Any, Tuple

from bs4 import BeautifulSoup
from tqdm.auto import tqdm, trange

from pyecsca.ec.params import DomainParameters, get_params
from pyecsca.ec.mult import *
from pyecsca.sca.re.rpa import multiples_computed
from pyecsca.misc.utils import TaskExecutor

from common import *


def get_general_multiples(bits: int, samples: int = 1000) -> MultResults:
    from random import randint
    results = []
    for _ in range(samples):
        big_scalar = randint(1, 2**bits)
        results.append({big_scalar})
    return MultResults(results, samples)


def get_general_n_multiples(bits: int, n: int, samples: int = 1000) -> MultResults:
    from random import randint
    results = []
    for _ in range(samples):
        smult = set()
        for i in range(n):
            b = randint(1,256)
            smult.add(randint(2**b,2**(b+1)))
        results.append(smult)
    return MultResults(results, samples)


def get_small_scalar_multiples(mult: MultIdent,
                               params: DomainParameters,
                               bits: int,
                               samples: int = 1000,
                               use_init: bool = True,
                               use_multiply: bool = True,
                               seed: bytes | None = None,
                               kind: str = "precomp+necessary") -> Tuple[MultResults, float]:

    duration = -time.perf_counter()
    results = []
    if seed is not None:
        random.seed(seed)

    # If no countermeasure is used, we have fully random scalars.
    # Otherwise, fix one per chunk.
    if mult.countermeasure is None:
        scalars = [random.randint(1, 2**bits) for _ in range(samples)]
    else:
        one = random.randint(1, 2**bits)
        scalars = [one for _ in range(samples)]

    for scalar in scalars:
        # Use a list for less memory usage.
        results.append(list(multiples_computed(scalar, params, mult.klass, mult.partial, use_init, use_multiply, kind=kind)))
    duration += time.perf_counter()
    return MultResults(results, samples, duration=duration, kind=kind)


if __name__ == "__main__":
    category = "secg"
    curve = "secp256r1"
    params = get_params(category, curve, "projective")
    num_workers = int(sys.argv[1]) if len(sys.argv) > 1 else 32
    bits = params.order.bit_length()
    samples = int(sys.argv[2]) if len(sys.argv) > 2 else 100
    kind = sys.argv[3] if len(sys.argv) > 3 else "precomp+necessary"
    use_init = (sys.argv[4].lower() == "true") if len(sys.argv) > 4 else True
    use_multiply = (sys.argv[5].lower() == "true") if len(sys.argv) > 5 else True
    selected_mults = all_mults
    shuffle(selected_mults)

    if (scratch := os.getenv("SCRATCHDIR")) is not None:
        outdir = Path(scratch)
    else:
        outdir = Path.cwd()

    print(f"Running on {num_workers} cores, doing {samples} samples.")

    chunk_id = randbytes(6).hex()
    with TaskExecutor(max_workers=num_workers) as pool:
        for mult in selected_mults:
            for countermeasure in (None, "gsr", "additive", "multiplicative", "euclidean", "bt"):
                mwc = mult.with_countermeasure(countermeasure)
                pool.submit_task(mwc,
                                 get_small_scalar_multiples,
                                 mwc, params, bits, samples, use_init=use_init, use_multiply=use_multiply, seed=chunk_id, kind=kind)
        for mult, future in tqdm(pool.as_completed(), desc="Computing small scalar distributions.", total=len(pool.tasks), smoothing=0):
            if error := future.exception():
                print("Error", mult, error)
                raise error
            res = future.result()
            print(f"Got {mult} in {res.duration}.")
            with (outdir / f"multiples_{bits}_{kind}_{'init' if use_init else 'noinit'}_{'mult' if use_multiply else 'nomult'}_chunk{chunk_id}.pickle").open("ab") as f:
                pickle.dump((mult, res), f)