aboutsummaryrefslogtreecommitdiff
path: root/analysis/countermeasures/test_eval.py
diff options
context:
space:
mode:
Diffstat (limited to 'analysis/countermeasures/test_eval.py')
-rw-r--r--analysis/countermeasures/test_eval.py530
1 files changed, 530 insertions, 0 deletions
diff --git a/analysis/countermeasures/test_eval.py b/analysis/countermeasures/test_eval.py
new file mode 100644
index 0000000..a5d8f93
--- /dev/null
+++ b/analysis/countermeasures/test_eval.py
@@ -0,0 +1,530 @@
+import io
+import math
+import random
+import itertools
+import warnings
+
+import cypari2
+from cysignals.alarm import alarm, AlarmInterrupt
+
+from matplotlib import pyplot as plt
+from collections import Counter
+from tqdm.auto import tqdm, trange
+
+from pyecsca.misc.cfg import TemporaryConfig
+from pyecsca.misc.utils import TaskExecutor
+from pyecsca.ec.mod import mod, RandomModAction
+from pyecsca.ec.point import Point
+from pyecsca.ec.model import ShortWeierstrassModel
+from pyecsca.ec.params import load_params_ectester, get_params
+from pyecsca.ec.mult import LTRMultiplier, RTLMultiplier, ScalarMultiplicationAction
+from pyecsca.ec.context import local, DefaultContext
+from pyecsca.ec.countermeasures import GroupScalarRandomization, AdditiveSplitting, MultiplicativeSplitting, EuclideanSplitting, BrumleyTuveri
+from utils import *
+import cypari2
+pari = cypari2.Pari(256_000_000, 2_000_000_000)
+
+
+
+
+class CounterTest:
+
+
+ def __init__(self):
+ model = ShortWeierstrassModel()
+ self.coords = model.coordinates["projective"]
+
+ add = self.coords.formulas["add-2007-bl"]
+ dbl = self.coords.formulas["dbl-2007-bl"]
+ ltr = LTRMultiplier(add, dbl, complete=False)
+ self.multiplier = ltr
+
+
+ def load_all_results(self,card,tag):
+ lines = []
+ filepath = f"results/{card}/{self.test}"
+ if os.path.exists(filepath):
+ for file in os.listdir(filepath):
+ if tag in file:
+ with open(os.path.join(filepath,file)) as f:
+ lines.extend(f.readlines()[1:])
+
+ if len(lines)==0:
+ raise Exception("Not measured or something went wrong\n")
+ return [line.strip().split(";") for line in lines]
+
+ def load_csv_signatures(self,card,tag):
+ lines = self.load_all_results(card,tag)
+ sigs = []
+ for line in lines:
+ success,error,sig,valid,_,nonce_hex,key_hex,_,curve_csv,_,_,_ = line
+ try:
+ sigs.append({"success":success, "signature":parse_ecdsa_signature(bytes.fromhex(sig)), "nonce": int(nonce_hex,16), "key":int(key_hex,16), "valid":valid, "curve":curve_csv})
+ except ValueError:
+ sigs.append({})
+ return sigs
+
+ def load_csv_ecdhs(self,card,tag):
+ lines = self.load_all_results(card,tag)
+ ecdhs = []
+ for line in lines:
+ success,error,secret,key,_,curve_csv,_,_,_ = line
+ try:
+ ecdhs.append({"success":success, "secret":int(secret,16), "key":int(key,16), "curve":curve_csv})
+ except ValueError:
+ ecdhs.append({})
+ return ecdhs
+
+ def load_csv_keygens(self,card,tag):
+ lines = self.load_all_results(card,tag)
+ keygens = []
+ for line in lines:
+ success,error,key,point,curve_csv,_,_,_ = line
+ try:
+ keygens.append({"success":success, "key":int(key,16), "point":parse_04point(point), "curve":curve_csv})
+ except ValueError:
+ keygens.append({})
+ return keygens
+
+ def existing_measurements(self,cards, tag):
+ print(f"Avaliable measurements for {tag} on:")
+ filtered = []
+ for card in cards:
+ try:
+ assert self.load_all_results(card,tag)
+ filtered.append(card)
+ except:
+ continue
+ print(", ".join(filtered))
+ print()
+
+
+
+
+
+class Test3n(CounterTest):
+
+
+ def __init__(self,curve_path,point_path):
+
+ super().__init__()
+ self.test = "test3n"
+ self.params = load_params_ectester(curve_path, "projective")
+ self.point = csv_to_point(point_path, self.params, self.coords)
+ self.cofactor = 3
+ self.n = self.params.order
+
+ self.multiplier.init(self.params,self.point)
+ self.cofactor_point = self.multiplier.multiply(int(self.n))
+
+
+ def find_mod(self, scalar, compare):
+ kP = self.multiplier.multiply(scalar)
+ for i in range(self.cofactor):
+ if compare(kP):
+ return i
+ kP = self.multiplier._add(kP,self.cofactor_point)
+ return -1
+
+ def print_statistics(self,remainders):
+ for krem, rems in remainders.items():
+ print(f"k = {krem} mod {self.cofactor}, total = {sum(rems)}")
+ for i,count in enumerate(rems[:-1]):
+ print(f"k+{i}*n: {count}")
+ if rems[-1]:
+ print(f"remaining!: {rems[-1]}")
+ print()
+
+
+
+ def print_ecdh(self,card, tag="ecdh"):
+
+ self.multiplier.init(self.params,self.point)
+ remainders = {i:[0]*(self.cofactor+1) for i in range(self.cofactor)}
+ for ecdh_result in self.load_csv_ecdhs(card,tag):
+ key, secret = ecdh_result["key"],ecdh_result["secret"]
+ compare = lambda point: sha(point.to_affine().x)==secret
+ remainders[key%self.cofactor][self.find_mod(key,compare)]+=1
+ self.print_statistics(remainders)
+
+
+
+ def print_ecdsa(self,card,tag = "ecdsa"):
+
+ self.multiplier.init(self.params,self.params.generator)
+ remainders = {i:[0]*(self.cofactor+1) for i in range(self.cofactor)}
+ for signature in self.load_csv_signatures(card,tag):
+ r,s = signature["signature"]
+ nonce = signature["nonce"]
+ compare = lambda point: int(point.to_affine().x)%self.n==r%self.n
+ remainders[nonce%self.cofactor][self.find_mod(nonce,compare)]+=1
+ self.print_statistics(remainders)
+
+
+ def print_keygen(self,card,tag = "keygen"):
+
+ self.multiplier.init(self.params,self.params.generator)
+ remainders = {i:[0]*(self.cofactor+1) for i in range(self.cofactor)}
+ for keygen in self.load_csv_keygens(card,tag):
+ xy, key = keygen["point"], keygen["key"]
+ genpoint = tuple_to_point(xy,self.params,self.coords)
+ compare = lambda point: point.to_affine()==genpoint.to_affine()
+ remainders[key%self.cofactor][self.find_mod(key,compare)]+=1
+ self.print_statistics(remainders)
+
+
+
+class Testinverse(CounterTest):
+
+ def __init__(self,curve_path,point_path,cofactor):
+
+
+ super().__init__()
+ self.test = "testinverse"
+ self.params = load_params_ectester(curve_path, "projective")
+ self.point = csv_to_point(point_path, self.params, self.coords)
+ self.cofactor = cofactor
+ self.n = self.params.order
+
+ def print_statistics(self,total,correct):
+ print(f"Total: {total}")
+ print(f"Correct: {correct}")
+ print()
+
+ def print_ecdh(self,card, tag = None):
+ if not tag: tag = f"ecdh_{self.cofactor}"
+ self.multiplier.init(self.params,self.point)
+ total,correct = 0,0
+ for ecdh_result in self.load_csv_ecdhs(card,tag):
+ total+=1
+ try:
+ secret,key = ecdh_result["secret"], ecdh_result["key"]
+ kP = self.multiplier.multiply(key).to_affine()
+ correct += (sha(kP.x)==secret)
+ except KeyError:
+ continue
+ self.print_statistics(total,correct)
+
+
+
+ def print_ecdsa(self,card, tag = None):
+ if not tag: tag = f"ecdsa_{self.cofactor}"
+ sigs = self.load_csv_signatures(card,tag)
+ total,correct = 0,0
+ for sig in sigs:
+ total+=1
+ try:
+ correct += sig["valid"]=="1"
+ except KeyError:
+ continue
+ self.print_statistics(total,correct)
+
+
+ def print_keygen(self,card, tag = None):
+ if not tag: tag = f"keygen_{self.cofactor}"
+ self.multiplier.init(self.params,self.params.generator)
+ total,correct = 0,0
+ for keygen in self.load_csv_keygens(card,f"keygen_{self.cofactor}"):
+ total+=1
+ try:
+ xy, key = keygen["point"], keygen["key"]
+ genpoint = tuple_to_point(xy,self.params,self.coords).to_affine()
+ kP = self.multiplier.multiply(key).to_affine()
+ correct += (kP==genpoint)
+ except KeyError:
+ continue
+ self.print_statistics(total,correct)
+
+
+
+
+
+class Testk10(CounterTest):
+
+ def __init__(self, curve_path, point_path, k):
+ super().__init__()
+ self.k = 10
+ self.test = "testk10"
+ self.params = load_params_ectester(curve_path, "projective")
+ self.point = csv_to_point(point_path, self.params, self.coords)
+ self.n = self.params.order
+
+ self.multiplier.init(self.params,self.point)
+ kP = self.multiplier.multiply(k).to_affine()
+ self.correct_secret = sha(kP.x)
+
+ def print_statistics(self,total,correct):
+ print(f"Total computed: {total}")
+ print(f"Correct: {correct}")
+ print()
+
+ def print_ecdh(self, card, tag = "ecdh"):
+
+ total, correct = 0,0
+ for ecdh_result in self.load_csv_ecdhs(card, tag):
+ total+=1
+ if self.correct_secret==ecdh_result["secret"]:
+ correct+=1
+ self.print_statistics(total,correct)
+
+
+
+
+def divisors(primes, powers):
+ for comb in itertools.product(*[range(power+1) for power in powers]):
+ value = 1
+ for prime, power in zip(primes, comb):
+ value *= prime**power
+ yield value
+
+def pari_factor(number):
+ pari = cypari2.Pari(256_000_000, 2_000_000_000)
+ factors = pari.factor(number)
+ primes = list(map(int, factors[0]))
+ powers = list(map(int, factors[1]))
+ return primes, powers
+
+def pari_dlog(e, P, G, real_n, facts_str):
+ e[15][0] = real_n
+ facts = pari(facts_str)
+ dlog = pari.elllog(e, P, G, facts)
+ return int(dlog)
+
+
+
+class TestEpsilon_GSR(CounterTest):
+
+ def __init__(self, curve_path, point_path, realn_path):
+
+ super().__init__()
+ self.test = "testdn"
+
+ self.params = load_params_ectester(curve_path, "projective")
+ self.point = csv_to_point(point_path, self.params, self.coords)
+ self.n = self.params.order
+
+ with open(realn_path) as f:
+ self.real_n = int(f.read(),16)
+ self.epsilon = self.n-self.real_n
+
+ self.a = int(self.params.curve.parameters["a"])
+ self.b = int(self.params.curve.parameters["b"])
+ self.p = int(self.params.curve.prime)
+ self.x = int(self.params.generator.X)
+ self.y = int(self.params.generator.Y)
+ self.pari_real_n_facts = repr(pari.factor(self.real_n))
+ self.pari_curve = pari.ellinit([self.a,self.b],self.p)
+
+
+
+
+ def compute_mask(self,scalar,point_candidates,G):
+ for P in point_candidates:
+ d = pari_dlog(self.pari_curve, [int(P.x),int(P.y)], [int(G.X),int(G.Y)], self.real_n, self.pari_real_n_facts)
+ for dp in [d,self.real_n-d]:
+ scalar = int(scalar)
+ if (dp-scalar)%self.epsilon==0:
+ mask = int((dp-scalar)//self.epsilon)
+ return mask
+ raise Exception("No mask found")
+
+ def is_correct_curve(self,result):
+ p,a,b,x,y,n,h = map(lambda x: int(x,16), result["curve"].split(","))
+ return (p,a,b,x,y,n) == (self.p,self.a,self.b,self.x,self.y,self.n)
+
+ def print_statistics(self,masks):
+ for mask in masks:
+ print(f"Mask size: {mask.bit_length()}, mask value: {mask}")
+ print()
+
+ def lift_x(self,x):
+ return self.params.curve.affine_lift_x(mod(x,self.p)).pop()
+
+
+ def test_masks(self,masks,scalars,point,results,compare):
+
+ for mask,result,scalar in zip(masks,results,scalars):
+ masked_key = int(scalar+mask*self.params.order)
+ self.multiplier.init(self.params,point,bits= masked_key.bit_length())
+ kP = self.multiplier.multiply(masked_key).to_affine()
+ assert compare(result,kP)
+
+
+ def recover_ecdh_plain(self,card, tag = "ecdh", N = 5):
+ filtered_results = [res for res in self.load_csv_ecdhs(card,tag) if self.is_correct_curve(res)]
+ masks,results,keys = [],[],[]
+ for ecdh_result in filtered_results[:N]:
+ key, secret = ecdh_result["key"],ecdh_result["secret"]
+ R = self.lift_x(secret)
+ masks.append(self.compute_mask(key,[R],self.point))
+ results.append(R)
+ keys.append(key)
+
+ compare = lambda p,q: p.x==q.x
+ self.test_masks(masks,keys,self.point,results,compare)
+ self.print_statistics(masks)
+
+
+ def recover_ecdsa(self,card, tag = "ecdsa", N = 5):
+
+ filtered_results = [res for res in self.load_csv_signatures(card,tag) if self.is_correct_curve(res)]
+ masks,results,nonces = [],[],[]
+ for line in filtered_results[:N]:
+ nonce = line["nonce"]
+ r, s = line["signature"]
+ r = r%self.n
+ candidates = [self.lift_x(r)]+[self.lift_x(r+i*self.n) for i in range(1,(self.p-r)//self.n)]
+ mask = self.compute_mask(nonce,candidates,self.params.generator)
+ masks.append(mask)
+ nonces.append(nonce)
+ results.append(r)
+
+ compare = lambda r,q: r%self.n==int(q.x)%self.n
+ self.test_masks(masks,nonces,self.params.generator,results,compare)
+ self.print_statistics(masks)
+
+
+
+ def recover_keygen(self,card,tag="keygen", N=5):
+ filtered_results = [res for res in self.load_csv_keygens(card,tag) if self.is_correct_curve(res)]
+ masks,results,keys = [],[],[]
+ for keygen_result in filtered_results[:N]:
+ key, xy = keygen_result["key"],keygen_result["point"]
+ R = tuple_to_point(xy,self.params,self.coords).to_affine()
+ masks.append(self.compute_mask(key,[R],self.params.generator))
+ keys.append(key)
+ results.append(R)
+ compare = lambda p,q: p==q
+ self.test_masks(masks,keys,self.params.generator,results,compare)
+ self.print_statistics(masks)
+
+
+
+class TestEpsilon_Multiplicative(CounterTest):
+
+ def __init__(self, curve_path, point_path, realn_path):
+
+ super().__init__()
+ self.test = "testdn"
+
+ self.params = load_params_ectester(curve_path, "projective")
+ self.point = csv_to_point(point_path, self.params, self.coords)
+ self.n = self.params.order
+
+ with open(realn_path) as f:
+ self.real_n = int(f.read(),16)
+ self.epsilon = self.n-self.real_n
+
+ self.a = int(self.params.curve.parameters["a"])
+ self.b = int(self.params.curve.parameters["b"])
+ self.p = int(self.params.curve.prime)
+ self.x = int(self.params.generator.X)
+ self.y = int(self.params.generator.Y)
+ self.pari_real_n_facts = repr(pari.factor(self.real_n))
+ self.pari_curve = pari.ellinit([self.a,self.b],self.p)
+
+ #
+ self.candidates = None
+
+
+ def compute_t(self,scalar,point_candidates,G):
+ for P in point_candidates:
+ d = pari_dlog(self.pari_curve, [int(P.x),int(P.y)], [int(G.X),int(G.Y)], self.real_n, self.pari_real_n_facts)
+ for dp in [d,self.real_n-d]:
+ scalar = int(scalar)
+ if (dp-scalar)%self.epsilon==0:
+ mask = int((dp-scalar)//self.epsilon)
+ return mask
+ raise Exception("No mask found")
+
+ def is_correct_curve(self,result):
+ p,a,b,x,y,n,h = map(lambda x: int(x,16), result["curve"].split(","))
+ return (p,a,b,x,y,n) == (self.p,self.a,self.b,self.x,self.y,self.n)
+
+ def print_statistics(self,masks):
+ for mask in masks:
+ print(f"Mask size: {mask.bit_length()}, mask value: {mask}")
+ print()
+
+ def compute_candidates(self,lower_bound, upper_bound):
+ all_divisors = [list(divisors(*pari_factor(k+t*self.n))) for t,k in self.ts]
+ filtered = []
+ for divs in all_divisors:
+ filtered.append([d for d in divs if lower_bound<=d.bit_length()<=upper_bound])
+ print(f"Found {len(filtered[-1])} candidates")
+ self.candidates = filtered
+
+ def plot_divisor_histogram(self,candidates):
+ candidate_bits = [c.bit_length() for c in candidates]
+ max_amount = max(candidate_amounts)
+ fig = plt.subplots()
+ plt.hist(candidate_amounts, range=(1, max_amount), align="left", density=True, bins=range(1, max_amount))
+ plt.xlabel("number of candidate masks")
+ plt.ylabel("proportion")
+ plt.xticks(range(max_amount))
+ plt.xlim(0, 20);
+ plt.show()
+
+ def lift_x(self,x):
+ return self.params.curve.affine_lift_x(mod(x,self.p)).pop()
+
+
+ def test_ts(self,ts,scalars,point,results,compare):
+
+ for t,result,scalar in zip(ts,results,scalars):
+ masked_key = int(scalar+t*self.params.order)
+ self.multiplier.init(self.params,point,bits= masked_key.bit_length())
+ kP = self.multiplier.multiply(masked_key).to_affine()
+ assert compare(result,kP)
+
+
+ def recover_ecdh_plain_size(self,card, tag = "ecdh", N = 5):
+ filtered_results = [res for res in self.load_csv_ecdhs(card,tag) if self.is_correct_curve(res)]
+ ts,results,keys = [],[],[]
+ for ecdh_result in filtered_results[:N]:
+ key, secret = ecdh_result["key"],ecdh_result["secret"]
+ R = self.lift_x(secret)
+ ts.append(self.compute_t(key,[R],self.point))
+ results.append(R)
+ keys.append(key)
+
+ compare = lambda p,q: p.x==q.x
+ self.test_ts(ts,keys,self.point,results,compare)
+ self.ts = zip(ts,keys)
+ print(set(t.bit_length() for t in ts))
+
+
+ def recover_ecdsa_size(self,card, tag = "ecdsa", N = 5):
+
+ filtered_results = [res for res in self.load_csv_signatures(card,tag) if self.is_correct_curve(res)]
+ ts,results,nonces = [],[],[]
+ for line in filtered_results[:N]:
+ nonce = line["nonce"]
+ r, s = line["signature"]
+ r = r%self.n
+ candidates = [self.lift_x(r)]+[self.lift_x(r+i*self.n) for i in range(1,(self.p-r)//self.n)]
+ print(nonce,self.p,r,self.n,candidates)
+ t = self.compute_t(nonce,candidates,self.params.generator)
+ ts.append(t)
+ nonces.append(nonce)
+ results.append(r)
+
+ compare = lambda r,q: r%self.n==int(q.x)%self.n
+ self.test_ts(ts,nonces,self.params.generator,results,compare)
+ self.ts = zip(ts,nonces)
+ print(set(t.bit_length() for t in ts))
+
+
+ def recover_keygen_size(self,card,tag="keygen", N=5):
+ filtered_results = [res for res in self.load_csv_keygens(card,tag) if self.is_correct_curve(res)]
+ ts,results,keys = [],[],[]
+ for keygen_result in filtered_results[:N]:
+ key, xy = keygen_result["key"],keygen_result["point"]
+ R = tuple_to_point(xy,self.params,self.coords).to_affine()
+ masks.append(self.compute_t(key,[R],self.params.generator))
+ ts.append(key)
+ results.append(R)
+ compare = lambda p,q: p==q
+ self.test_ts(ts,keys,self.params.generator,results,compare)
+ self.ts = zip(ts,keys)
+ print(set(t.bit_length() for t in ts))