diff options
| -rw-r--r-- | pyecsca/sca/attack/DPA.py | 88 | ||||
| -rw-r--r-- | pyecsca/sca/target/emulator.py | 123 |
2 files changed, 211 insertions, 0 deletions
diff --git a/pyecsca/sca/attack/DPA.py b/pyecsca/sca/attack/DPA.py new file mode 100644 index 0000000..99e6472 --- /dev/null +++ b/pyecsca/sca/attack/DPA.py @@ -0,0 +1,88 @@ +from pyecsca.ec.mult import ScalarMultiplier +from pyecsca.ec.point import Point +from pyecsca.ec.context import DefaultContext, local +from pyecsca.ec.params import DomainParameters +from pyecsca.sca.trace import Trace, average, subtract, absolute +from pyecsca.sca.trace.plot import plot_trace +from typing import Tuple, Dict +from public import public + +@public +class DPA(): + + traces: list[Trace] + points: list[Point] + mult: ScalarMultiplier + params: DomainParameters + doms: Dict[str, list[Trace]] + + def __init__(self, points: list[Point], traces: list[Trace], mult: ScalarMultiplier, params: DomainParameters): + ''' + :param points: Points on which scalar multiplication with secret scalar was performed + :param traces: Power traces corresponding to the scalar multiplication for each of the points + :param mult: Scalar multiplier used + :param params: Domain parameters used + ''' + self.points = points + self.traces = traces + self.mult = mult + self.params = params + self.doms = {'guess_one' : [], 'guess_zero' : []} + + def compute_split_point(self, guessed_scalar: int, target_bit: int, point: Point) -> Point: + with(local(DefaultContext())) as ctx: + self.mult.init(self.params, point) + self.mult.multiply(guessed_scalar) + action_index = -1 + for bit in bin(guessed_scalar)[2:target_bit + 2]: + if bit == '1': + action_index += 2 + elif bit == '0': + action_index += 1 + result = ctx.actions.get_by_index([0, action_index])[0] + return result.output_points[0] + + def split_traces(self, guessed_scalar: int, target_bit: int) -> Tuple[list[Trace], list[Trace]]: + one_traces = [] + zero_traces = [] + for i in range(len(self.points)): + #TODO: works only if the computed split point has "X" coordinate + split_value = self.compute_split_point(guessed_scalar, target_bit, self.points[i]).X + if int(split_value) & 1 == 1: + one_traces.append(self.traces[i]) + elif int(split_value) & 1 == 0: + zero_traces.append(self.traces[i]) + return one_traces, zero_traces + + def calculate_difference_of_means(self, one_traces: list[Trace], zero_traces: list[Trace]) -> Trace: + avg_ones = average(*one_traces) + avg_zeros = average(*zero_traces) + return subtract(avg_ones, avg_zeros) + + def plot_difference_of_means(self, dom): + return plot_trace(dom).opts(width=950, height=600) + + def recover_bit(self, recovered_scalar: int, target_bit: int, scalar_bit_length: int, real_pub_key: Point) -> int: + if target_bit == scalar_bit_length - 1: + self.mult.init(self.params, self.params.generator) + if real_pub_key == self.mult.multiply(recovered_scalar): + return recovered_scalar + return recovered_scalar | 1 + mask = 1 << (scalar_bit_length - target_bit - 1) + guessed_scalar_0 = recovered_scalar + guessed_scalar_1 = recovered_scalar | mask + ones_0, zeros_0 = self.split_traces(guessed_scalar_0, target_bit) + ones_1, zeros_1 = self.split_traces(guessed_scalar_1, target_bit) + dom_0 = self.calculate_difference_of_means(ones_0, zeros_0) + dom_1 = self.calculate_difference_of_means(ones_1, zeros_1) + self.doms['guess_zero'].append(dom_0) + self.doms['guess_one'].append(dom_1) + if max(absolute(dom_0)) > max(absolute(dom_1)): + return guessed_scalar_0 + return guessed_scalar_1 + + def perform(self, scalar_bit_length: int, real_pub_key: Point) -> int: + recovered_scalar = 1 << (scalar_bit_length - 1) + for target_bit in range(1, scalar_bit_length): + recovered_scalar = self.recover_bit(recovered_scalar, target_bit, scalar_bit_length, real_pub_key) + return recovered_scalar
\ No newline at end of file diff --git a/pyecsca/sca/target/emulator.py b/pyecsca/sca/target/emulator.py new file mode 100644 index 0000000..0f097e5 --- /dev/null +++ b/pyecsca/sca/target/emulator.py @@ -0,0 +1,123 @@ +from pyecsca.ec.coordinates import CoordinateModel +from pyecsca.ec.mod import Mod +from pyecsca.ec.model import CurveModel +from pyecsca.ec.params import DomainParameters +from pyecsca.ec.point import Point, InfinityPoint +from pyecsca.ec.mult import ScalarMultiplier +from pyecsca.ec.key_generation import KeyGeneration +from pyecsca.ec.key_agreement import KeyAgreement +from pyecsca.ec.signature import Signature, SignatureResult +from pyecsca.ec.formula import FormulaAction +from pyecsca.ec.context import Context, DefaultContext, local +from pyecsca.sca.attack import LeakageModel +from pyecsca.sca.trace import Trace, average, subtract +from typing import Mapping, Union, Optional, Tuple +from public import public +from .base import Target +import numpy as np + +@public +class EmulatorTarget(Target): + + model: CurveModel + coords: CoordinateModel + mult: ScalarMultiplier + params: Optional[DomainParameters] + leakage_model: LeakageModel + privkey: Optional[Mod] + pubkey: Optional[Point] + + def __init__(self, model: CurveModel, coords: CoordinateModel, mult: ScalarMultiplier): + super().__init__() + self.model = model + self.coords = coords + self.mult = mult + self.params = None + self.leakage_model = None + self.privkey = None + self.pubkey = None + + def get_trace(self, context: DefaultContext) -> Trace: + def callback(action): + if isinstance(action, FormulaAction): + for intermediate in action.op_results: + leak = self.leakage_model(intermediate.value) + temp_trace.append(leak) + temp_trace = [] + context.actions.walk(callback) + return Trace(np.array(temp_trace)) + + def emulate_scalar_mult_traces(self, num_of_traces: int, scalar: int) -> Tuple[list[Point], list[Trace]]: + points = [self.params.curve.affine_random().to_model(self.coords, self.params.curve) for _ in range(num_of_traces)] + traces = [] + for point in points: + _, trace = self.scalar_mult(scalar, point) + traces.append(trace) + return points, traces + + def emulate_ecdh_traces(self, num_of_traces: int) -> Tuple[list[Trace], list[Trace]]: + other_pubs = [self.params.curve.affine_random().to_model(self.coords, self.params.curve) for _ in range(num_of_traces)] + traces = [] + for pub in other_pubs: + _, trace = self.ecdh(pub, None) + traces.append(trace) + return other_pubs, traces + + def set_params(self, params: DomainParameters) -> None: + self.params = params + + def set_leakage_model(self, leakage_model: LeakageModel) -> None: + self.leakage_model = leakage_model + + def scalar_mult(self, scalar: int, point: Point) -> Tuple[Point, Trace]: + with local(DefaultContext()) as ctx: + self.mult.init(self.params, point) + res_point = self.mult.multiply(scalar) + return res_point, self.get_trace(ctx) + + def generate(self) -> Tuple[Tuple[Mod, Point], Trace]: + with local(DefaultContext()) as ctx: + keygen = KeyGeneration(self.mult, self.params, False) + priv, pub = keygen.generate() + return (priv, pub), self.get_trace(ctx) + + def set_privkey(self, privkey: Mod) -> None: + self.privkey = privkey + + def set_pubkey(self, pubkey: Point) -> None: + self.pubkey = pubkey + + def ecdh(self, other_pubkey: Point, hash_algo=None) -> Tuple[bytes, Trace]: + with local(DefaultContext()) as ctx: + ecdh = KeyAgreement(self.mult, self.params, other_pubkey, self.privkey, hash_algo) + shared_secret = ecdh.perform() + return shared_secret, self.get_trace(ctx) + + def ecdsa_sign(self, data: bytes, hash_algo=None) -> Tuple[SignatureResult, Trace]: + with local(DefaultContext()) as ctx: + ecdsa = Signature(self.mult, self.params, self.mult.formulas["add"], self.pubkey, self.privkey, hash_algo) + signed_data = ecdsa.sign_data(data) + return signed_data, self.get_trace(ctx) + + def ecdsa_verify(self, data: bytes, signature: SignatureResult, hash_algo=None) -> Tuple[bool, Trace]: + with local(DefaultContext()) as ctx: + ecdsa = Signature(self.mult, self.params, self.mult.formulas["add"], self.pubkey, hash_algo) + verified = ecdsa.verify_data(signature, data) + return verified, self.get_trace(ctx) + + def debug(self): + return self.model.shortname, self.coords.name + + def connect(self): + pass + + def disconnect(self): + pass + + def set_trigger(self): + pass + + def quit(self): + pass + + |
