aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pyecsca/sca/attack/DPA.py88
-rw-r--r--pyecsca/sca/target/emulator.py123
2 files changed, 211 insertions, 0 deletions
diff --git a/pyecsca/sca/attack/DPA.py b/pyecsca/sca/attack/DPA.py
new file mode 100644
index 0000000..99e6472
--- /dev/null
+++ b/pyecsca/sca/attack/DPA.py
@@ -0,0 +1,88 @@
+from pyecsca.ec.mult import ScalarMultiplier
+from pyecsca.ec.point import Point
+from pyecsca.ec.context import DefaultContext, local
+from pyecsca.ec.params import DomainParameters
+from pyecsca.sca.trace import Trace, average, subtract, absolute
+from pyecsca.sca.trace.plot import plot_trace
+from typing import Tuple, Dict
+from public import public
+
+@public
+class DPA():
+
+ traces: list[Trace]
+ points: list[Point]
+ mult: ScalarMultiplier
+ params: DomainParameters
+ doms: Dict[str, list[Trace]]
+
+ def __init__(self, points: list[Point], traces: list[Trace], mult: ScalarMultiplier, params: DomainParameters):
+ '''
+ :param points: Points on which scalar multiplication with secret scalar was performed
+ :param traces: Power traces corresponding to the scalar multiplication for each of the points
+ :param mult: Scalar multiplier used
+ :param params: Domain parameters used
+ '''
+ self.points = points
+ self.traces = traces
+ self.mult = mult
+ self.params = params
+ self.doms = {'guess_one' : [], 'guess_zero' : []}
+
+ def compute_split_point(self, guessed_scalar: int, target_bit: int, point: Point) -> Point:
+ with(local(DefaultContext())) as ctx:
+ self.mult.init(self.params, point)
+ self.mult.multiply(guessed_scalar)
+ action_index = -1
+ for bit in bin(guessed_scalar)[2:target_bit + 2]:
+ if bit == '1':
+ action_index += 2
+ elif bit == '0':
+ action_index += 1
+ result = ctx.actions.get_by_index([0, action_index])[0]
+ return result.output_points[0]
+
+ def split_traces(self, guessed_scalar: int, target_bit: int) -> Tuple[list[Trace], list[Trace]]:
+ one_traces = []
+ zero_traces = []
+ for i in range(len(self.points)):
+ #TODO: works only if the computed split point has "X" coordinate
+ split_value = self.compute_split_point(guessed_scalar, target_bit, self.points[i]).X
+ if int(split_value) & 1 == 1:
+ one_traces.append(self.traces[i])
+ elif int(split_value) & 1 == 0:
+ zero_traces.append(self.traces[i])
+ return one_traces, zero_traces
+
+ def calculate_difference_of_means(self, one_traces: list[Trace], zero_traces: list[Trace]) -> Trace:
+ avg_ones = average(*one_traces)
+ avg_zeros = average(*zero_traces)
+ return subtract(avg_ones, avg_zeros)
+
+ def plot_difference_of_means(self, dom):
+ return plot_trace(dom).opts(width=950, height=600)
+
+ def recover_bit(self, recovered_scalar: int, target_bit: int, scalar_bit_length: int, real_pub_key: Point) -> int:
+ if target_bit == scalar_bit_length - 1:
+ self.mult.init(self.params, self.params.generator)
+ if real_pub_key == self.mult.multiply(recovered_scalar):
+ return recovered_scalar
+ return recovered_scalar | 1
+ mask = 1 << (scalar_bit_length - target_bit - 1)
+ guessed_scalar_0 = recovered_scalar
+ guessed_scalar_1 = recovered_scalar | mask
+ ones_0, zeros_0 = self.split_traces(guessed_scalar_0, target_bit)
+ ones_1, zeros_1 = self.split_traces(guessed_scalar_1, target_bit)
+ dom_0 = self.calculate_difference_of_means(ones_0, zeros_0)
+ dom_1 = self.calculate_difference_of_means(ones_1, zeros_1)
+ self.doms['guess_zero'].append(dom_0)
+ self.doms['guess_one'].append(dom_1)
+ if max(absolute(dom_0)) > max(absolute(dom_1)):
+ return guessed_scalar_0
+ return guessed_scalar_1
+
+ def perform(self, scalar_bit_length: int, real_pub_key: Point) -> int:
+ recovered_scalar = 1 << (scalar_bit_length - 1)
+ for target_bit in range(1, scalar_bit_length):
+ recovered_scalar = self.recover_bit(recovered_scalar, target_bit, scalar_bit_length, real_pub_key)
+ return recovered_scalar \ No newline at end of file
diff --git a/pyecsca/sca/target/emulator.py b/pyecsca/sca/target/emulator.py
new file mode 100644
index 0000000..0f097e5
--- /dev/null
+++ b/pyecsca/sca/target/emulator.py
@@ -0,0 +1,123 @@
+from pyecsca.ec.coordinates import CoordinateModel
+from pyecsca.ec.mod import Mod
+from pyecsca.ec.model import CurveModel
+from pyecsca.ec.params import DomainParameters
+from pyecsca.ec.point import Point, InfinityPoint
+from pyecsca.ec.mult import ScalarMultiplier
+from pyecsca.ec.key_generation import KeyGeneration
+from pyecsca.ec.key_agreement import KeyAgreement
+from pyecsca.ec.signature import Signature, SignatureResult
+from pyecsca.ec.formula import FormulaAction
+from pyecsca.ec.context import Context, DefaultContext, local
+from pyecsca.sca.attack import LeakageModel
+from pyecsca.sca.trace import Trace, average, subtract
+from typing import Mapping, Union, Optional, Tuple
+from public import public
+from .base import Target
+import numpy as np
+
+@public
+class EmulatorTarget(Target):
+
+ model: CurveModel
+ coords: CoordinateModel
+ mult: ScalarMultiplier
+ params: Optional[DomainParameters]
+ leakage_model: LeakageModel
+ privkey: Optional[Mod]
+ pubkey: Optional[Point]
+
+ def __init__(self, model: CurveModel, coords: CoordinateModel, mult: ScalarMultiplier):
+ super().__init__()
+ self.model = model
+ self.coords = coords
+ self.mult = mult
+ self.params = None
+ self.leakage_model = None
+ self.privkey = None
+ self.pubkey = None
+
+ def get_trace(self, context: DefaultContext) -> Trace:
+ def callback(action):
+ if isinstance(action, FormulaAction):
+ for intermediate in action.op_results:
+ leak = self.leakage_model(intermediate.value)
+ temp_trace.append(leak)
+ temp_trace = []
+ context.actions.walk(callback)
+ return Trace(np.array(temp_trace))
+
+ def emulate_scalar_mult_traces(self, num_of_traces: int, scalar: int) -> Tuple[list[Point], list[Trace]]:
+ points = [self.params.curve.affine_random().to_model(self.coords, self.params.curve) for _ in range(num_of_traces)]
+ traces = []
+ for point in points:
+ _, trace = self.scalar_mult(scalar, point)
+ traces.append(trace)
+ return points, traces
+
+ def emulate_ecdh_traces(self, num_of_traces: int) -> Tuple[list[Trace], list[Trace]]:
+ other_pubs = [self.params.curve.affine_random().to_model(self.coords, self.params.curve) for _ in range(num_of_traces)]
+ traces = []
+ for pub in other_pubs:
+ _, trace = self.ecdh(pub, None)
+ traces.append(trace)
+ return other_pubs, traces
+
+ def set_params(self, params: DomainParameters) -> None:
+ self.params = params
+
+ def set_leakage_model(self, leakage_model: LeakageModel) -> None:
+ self.leakage_model = leakage_model
+
+ def scalar_mult(self, scalar: int, point: Point) -> Tuple[Point, Trace]:
+ with local(DefaultContext()) as ctx:
+ self.mult.init(self.params, point)
+ res_point = self.mult.multiply(scalar)
+ return res_point, self.get_trace(ctx)
+
+ def generate(self) -> Tuple[Tuple[Mod, Point], Trace]:
+ with local(DefaultContext()) as ctx:
+ keygen = KeyGeneration(self.mult, self.params, False)
+ priv, pub = keygen.generate()
+ return (priv, pub), self.get_trace(ctx)
+
+ def set_privkey(self, privkey: Mod) -> None:
+ self.privkey = privkey
+
+ def set_pubkey(self, pubkey: Point) -> None:
+ self.pubkey = pubkey
+
+ def ecdh(self, other_pubkey: Point, hash_algo=None) -> Tuple[bytes, Trace]:
+ with local(DefaultContext()) as ctx:
+ ecdh = KeyAgreement(self.mult, self.params, other_pubkey, self.privkey, hash_algo)
+ shared_secret = ecdh.perform()
+ return shared_secret, self.get_trace(ctx)
+
+ def ecdsa_sign(self, data: bytes, hash_algo=None) -> Tuple[SignatureResult, Trace]:
+ with local(DefaultContext()) as ctx:
+ ecdsa = Signature(self.mult, self.params, self.mult.formulas["add"], self.pubkey, self.privkey, hash_algo)
+ signed_data = ecdsa.sign_data(data)
+ return signed_data, self.get_trace(ctx)
+
+ def ecdsa_verify(self, data: bytes, signature: SignatureResult, hash_algo=None) -> Tuple[bool, Trace]:
+ with local(DefaultContext()) as ctx:
+ ecdsa = Signature(self.mult, self.params, self.mult.formulas["add"], self.pubkey, hash_algo)
+ verified = ecdsa.verify_data(signature, data)
+ return verified, self.get_trace(ctx)
+
+ def debug(self):
+ return self.model.shortname, self.coords.name
+
+ def connect(self):
+ pass
+
+ def disconnect(self):
+ pass
+
+ def set_trigger(self):
+ pass
+
+ def quit(self):
+ pass
+
+