diff options
| author | J08nY | 2021-04-10 17:50:05 +0200 |
|---|---|---|
| committer | J08nY | 2021-04-10 17:50:05 +0200 |
| commit | b76ec0890e4cf997ce5a0b4494722931094683f7 (patch) | |
| tree | 2dc049de8a79adc7b3b23d746ef4f28d58e33fbc /pyecsca | |
| parent | a7ad11f7cd917be55dbd036a516fefda4d19dd4a (diff) | |
| download | pyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.tar.gz pyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.tar.zst pyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.zip | |
Use black.
Diffstat (limited to 'pyecsca')
41 files changed, 1551 insertions, 514 deletions
diff --git a/pyecsca/ec/configuration.py b/pyecsca/ec/configuration.py index 04f8fc7..bf20ec7 100644 --- a/pyecsca/ec/configuration.py +++ b/pyecsca/ec/configuration.py @@ -34,6 +34,7 @@ class EnumDefine(Enum): @public class Multiplication(EnumDefine): """Base multiplication algorithm to use.""" + TOOM_COOK = "MUL_TOOM_COOK" KARATSUBA = "MUL_KARATSUBA" COMBA = "MUL_COMBA" @@ -43,6 +44,7 @@ class Multiplication(EnumDefine): @public class Squaring(EnumDefine): """Base squaring algorithm to use.""" + TOOM_COOK = "SQR_TOOM_COOK" KARATSUBA = "SQR_KARATSUBA" COMBA = "SQR_COMBA" @@ -52,6 +54,7 @@ class Squaring(EnumDefine): @public class Reduction(EnumDefine): """Modular reduction method used.""" + BARRETT = "RED_BARRETT" MONTGOMERY = "RED_MONTGOMERY" BASE = "RED_BASE" @@ -60,6 +63,7 @@ class Reduction(EnumDefine): @public class Inversion(EnumDefine): """Inversion algorithm used.""" + GCD = "INV_GCD" EULER = "INV_EULER" @@ -67,6 +71,7 @@ class Inversion(EnumDefine): @public class HashType(EnumDefine): """Hash algorithm used in ECDH and ECDSA.""" + NONE = "HASH_NONE" SHA1 = "HASH_SHA1" SHA224 = "HASH_SHA224" @@ -78,6 +83,7 @@ class HashType(EnumDefine): @public class RandomMod(EnumDefine): """Method of sampling a uniform integer modulo order.""" + SAMPLE = "MOD_RAND_SAMPLE" REDUCE = "MOD_RAND_REDUCE" @@ -86,6 +92,7 @@ class RandomMod(EnumDefine): @dataclass(frozen=True) class Configuration(object): """An ECC implementation configuration.""" + model: CurveModel coords: CoordinateModel formulas: FrozenSet[Formula] @@ -120,8 +127,11 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None """ def is_optional(arg_type): - return get_origin(arg_type) == Union and len(get_args(arg_type)) == 2 and \ - get_args(arg_type)[1] == type(None) # noqa + return ( + get_origin(arg_type) == Union + and len(get_args(arg_type)) == 2 + and get_args(arg_type)[1] == type(None) # noqa + ) def leaf_subclasses(cls): subs = cls.__subclasses__() @@ -140,7 +150,7 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None "mult": Multiplication, "sqr": Squaring, "red": Reduction, - "inv": Inversion + "inv": Inversion, } keys = list(filter(lambda key: key not in kwargs, options.keys())) values = [options[key] for key in keys] @@ -150,7 +160,11 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None def multipliers(mult_classes, coords_formulas, fixed_args=None): for mult_cls in mult_classes: - if fixed_args is not None and "cls" in fixed_args and mult_cls != fixed_args["cls"]: + if ( + fixed_args is not None + and "cls" in fixed_args + and mult_cls != fixed_args["cls"] + ): continue arg_options = {} for name, required_type in get_type_hints(mult_cls.__init__).items(): @@ -160,17 +174,30 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None if is_optional(required_type): opt_type = get_args(required_type)[0] if issubclass(opt_type, Formula): - options = [formula for formula in coords_formulas if - isinstance(formula, opt_type)] + [None] + options = [ + formula + for formula in coords_formulas + if isinstance(formula, opt_type) + ] + [None] else: options = [None] # TODO: anything here? - elif get_origin(required_type) is None and issubclass(required_type, Formula): - options = [formula for formula in coords_formulas if - isinstance(formula, required_type)] - elif get_origin(required_type) is None and issubclass(required_type, bool): + elif get_origin(required_type) is None and issubclass( + required_type, Formula + ): + options = [ + formula + for formula in coords_formulas + if isinstance(formula, required_type) + ] + elif get_origin(required_type) is None and issubclass( + required_type, bool + ): options = [True, False] - elif get_origin(required_type) is None and issubclass(required_type, - int) and name == "width": + elif ( + get_origin(required_type) is None + and issubclass(required_type, int) + and name == "width" + ): options = [3, 5] else: options = [] @@ -198,19 +225,29 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None if "scalarmult" in kwargs: if isinstance(kwargs["scalarmult"], ScalarMultiplier): mults = [kwargs["scalarmult"]] - if not set(kwargs["scalarmult"].formulas.values()).issubset(coords_formulas): + if not set(kwargs["scalarmult"].formulas.values()).issubset( + coords_formulas + ): continue - elif isinstance(kwargs["scalarmult"], type) and issubclass(kwargs["scalarmult"], - ScalarMultiplier): + elif isinstance(kwargs["scalarmult"], type) and issubclass( + kwargs["scalarmult"], ScalarMultiplier + ): mult_classes = list( - filter(lambda mult: issubclass(mult, kwargs["scalarmult"]), - mult_classes)) + filter( + lambda mult: issubclass(mult, kwargs["scalarmult"]), + mult_classes, + ) + ) mults = multipliers(mult_classes, coords_formulas) else: - mults = multipliers(mult_classes, coords_formulas, kwargs["scalarmult"]) + mults = multipliers( + mult_classes, coords_formulas, kwargs["scalarmult"] + ) else: mults = multipliers(mult_classes, coords_formulas) for mult in mults: formulas = frozenset(mult.formulas.values()) for independent_args in independents(kwargs): - yield Configuration(model, coords, formulas, mult, **independent_args) + yield Configuration( + model, coords, formulas, mult, **independent_args + ) diff --git a/pyecsca/ec/context.py b/pyecsca/ec/context.py index 3b38584..5bef891 100644 --- a/pyecsca/ec/context.py +++ b/pyecsca/ec/context.py @@ -23,6 +23,7 @@ from public import public @public class Action(object): """An Action.""" + inside: bool def __init__(self): @@ -41,6 +42,7 @@ class Action(object): @public class ResultAction(Action): """An action that has a result.""" + _result: Any = None _has_result: bool = False @@ -60,7 +62,12 @@ class ResultAction(Action): return result def __exit__(self, exc_type, exc_val, exc_tb): - if not self._has_result and exc_type is None and exc_val is None and exc_tb is None: + if ( + not self._has_result + and exc_type is None + and exc_val is None + and exc_tb is None + ): raise RuntimeError("Result unset on action exit") super().__exit__(exc_type, exc_val, exc_tb) @@ -167,6 +174,7 @@ class NullContext(Context): @public class DefaultContext(Context): """A context that traces executions of actions in a tree.""" + actions: Tree current: List[Action] @@ -190,6 +198,7 @@ class DefaultContext(Context): @public class PathContext(Context): """A context that traces targeted actions.""" + path: List[int] current: List[int] current_depth: int @@ -212,7 +221,7 @@ class PathContext(Context): else: self.current[self.current_depth] += 1 self.current_depth += 1 - if self.path == self.current[:self.current_depth]: + if self.path == self.current[: self.current_depth]: self.value = action def exit_action(self, action: Action) -> None: @@ -221,10 +230,14 @@ class PathContext(Context): self.current_depth -= 1 def __repr__(self): - return f"{self.__class__.__name__}({self.current!r}, depth={self.current_depth!r})" + return ( + f"{self.__class__.__name__}({self.current!r}, depth={self.current_depth!r})" + ) -_actual_context: ContextVar[Context] = ContextVar("operational_context", default=NullContext()) +_actual_context: ContextVar[Context] = ContextVar( + "operational_context", default=NullContext() +) class _ContextManager(object): diff --git a/pyecsca/ec/coordinates.py b/pyecsca/ec/coordinates.py index 7c76746..24a0b1a 100644 --- a/pyecsca/ec/coordinates.py +++ b/pyecsca/ec/coordinates.py @@ -8,14 +8,23 @@ from typing import List, Any, MutableMapping from pkg_resources import resource_listdir, resource_isdir, resource_stream from public import public -from .formula import (Formula, EFDFormula, AdditionEFDFormula, DoublingEFDFormula, - TriplingEFDFormula, DifferentialAdditionEFDFormula, LadderEFDFormula, - ScalingEFDFormula, NegationEFDFormula) +from .formula import ( + Formula, + EFDFormula, + AdditionEFDFormula, + DoublingEFDFormula, + TriplingEFDFormula, + DifferentialAdditionEFDFormula, + LadderEFDFormula, + ScalingEFDFormula, + NegationEFDFormula, +) @public class CoordinateModel(object): """A coordinate system for a particular model(form) of an elliptic curve.""" + name: str """Name of the coordinate model""" full_name: str @@ -37,7 +46,7 @@ class CoordinateModel(object): """Formulas available on the coordinate system.""" def __repr__(self): - return f"{self.__class__.__name__}(\"{self.name}\" on {self.curve_model.name})" + return f'{self.__class__.__name__}("{self.name}" on {self.curve_model.name})' @public @@ -61,7 +70,6 @@ class AffineCoordinateModel(CoordinateModel): class EFDCoordinateModel(CoordinateModel): - def __init__(self, dir_path: str, name: str, curve_model: Any): self.name = name self.curve_model = curve_model @@ -89,7 +97,7 @@ class EFDCoordinateModel(CoordinateModel): "diffadd": DifferentialAdditionEFDFormula, "ladder": LadderEFDFormula, "scaling": ScalingEFDFormula, - "negation": NegationEFDFormula + "negation": NegationEFDFormula, } cls = formula_types.get(formula_type, EFDFormula) self.formulas[fname] = cls(join(dir_path, fname), fname, self) @@ -118,7 +126,8 @@ class EFDCoordinateModel(CoordinateModel): self.parameters.append(line[10:]) elif line.startswith("assume"): self.assumptions.append( - parse(line[7:].replace("^", "**"), mode="exec")) + parse(line[7:].replace("^", "**"), mode="exec") + ) line = f.readline().decode("ascii").rstrip() def __eq__(self, other): diff --git a/pyecsca/ec/curve.py b/pyecsca/ec/curve.py index 4cfb978..bfa58c8 100644 --- a/pyecsca/ec/curve.py +++ b/pyecsca/ec/curve.py @@ -16,6 +16,7 @@ from .point import Point, InfinityPoint @public class EllipticCurve(object): """An elliptic curve.""" + model: CurveModel """The model of the curve.""" coordinate_model: CoordinateModel @@ -27,11 +28,23 @@ class EllipticCurve(object): neutral: Point """The neutral point on the curve.""" - def __init__(self, model: CurveModel, coordinate_model: CoordinateModel, - prime: int, neutral: Point, parameters: MutableMapping[str, Union[Mod, int]]): - if coordinate_model not in model.coordinates.values() and not isinstance(coordinate_model, AffineCoordinateModel): + def __init__( + self, + model: CurveModel, + coordinate_model: CoordinateModel, + prime: int, + neutral: Point, + parameters: MutableMapping[str, Union[Mod, int]], + ): + if coordinate_model not in model.coordinates.values() and not isinstance( + coordinate_model, AffineCoordinateModel + ): raise ValueError - if set(model.parameter_names).union(coordinate_model.parameters).symmetric_difference(parameters.keys()): + if ( + set(model.parameter_names) + .union(coordinate_model.parameters) + .symmetric_difference(parameters.keys()) + ): raise ValueError self.model = model self.coordinate_model = coordinate_model @@ -52,8 +65,11 @@ class EllipticCurve(object): raise ValueError("Coordinate model of point is not affine.") if point.coordinate_model.curve_model != self.model: raise ValueError("Curve model of point does not match the curve.") - locls = {var + str(i + 1): point.coords[var] - for i, point in enumerate(points) for var in point.coords} + locls = { + var + str(i + 1): point.coords[var] + for i, point in enumerate(points) + for var in point.coords + } locls.update(self.parameters) for line in formulas: exec(compile(line, "", mode="exec"), None, locls) @@ -228,7 +244,9 @@ class EllipticCurve(object): else: raise NotImplementedError else: - raise ValueError(f"Wrong encoding type: {hex(encoded[0])}, should be one of 0x04, 0x06, 0x02, 0x03 or 0x00") + raise ValueError( + f"Wrong encoding type: {hex(encoded[0])}, should be one of 0x04, 0x06, 0x02, 0x03 or 0x00" + ) def affine_random(self) -> Point: """Generate a random affine point on the curve.""" @@ -246,7 +264,12 @@ class EllipticCurve(object): def __eq__(self, other): if not isinstance(other, EllipticCurve): return False - return self.model == other.model and self.coordinate_model == other.coordinate_model and self.prime == other.prime and self.parameters == other.parameters + return ( + self.model == other.model + and self.coordinate_model == other.coordinate_model + and self.prime == other.prime + and self.parameters == other.parameters + ) def __str__(self): return "EllipticCurve" diff --git a/pyecsca/ec/formula.py b/pyecsca/ec/formula.py index d0435f8..c0505e1 100644 --- a/pyecsca/ec/formula.py +++ b/pyecsca/ec/formula.py @@ -21,6 +21,7 @@ from ..misc.cfg import getconfig @public class OpResult(object): """A result of an operation.""" + parents: Tuple op: OpType name: str @@ -44,6 +45,7 @@ class OpResult(object): @public class FormulaAction(ResultAction): """An execution of a formula, on some input points and parameters, with some outputs.""" + formula: "Formula" """The formula that was executed.""" inputs: MutableMapping[str, Mod] @@ -95,6 +97,7 @@ class FormulaAction(ResultAction): @public class Formula(ABC): """A formula operating on points.""" + name: str """Name of the formula.""" shortname: ClassVar[str] @@ -131,7 +134,9 @@ class Formula(ABC): raise ValueError(f"Wrong coordinate model of point {point}.") for coord, value in point.coords.items(): if not isinstance(value, Mod) or value.n != field: - raise ValueError(f"Wrong coordinate input {coord} = {value} of point {i}.") + raise ValueError( + f"Wrong coordinate input {coord} = {value} of point {i}." + ) params[coord + str(i + 1)] = value def __validate_assumptions(self, field, params): @@ -146,16 +151,22 @@ class Formula(ABC): holds = eval(compiled, None, alocals) if not holds: # The assumption doesn't hold, see what is the current configured action and do it. - raise_unsatisified_assumption(getconfig().ec.unsatisfied_formula_assumption_action, - f"Unsatisfied assumption in the formula ({assumption_string}).") + raise_unsatisified_assumption( + getconfig().ec.unsatisfied_formula_assumption_action, + f"Unsatisfied assumption in the formula ({assumption_string}).", + ) else: k = FF(field) expr = sympify(f"{rhs} - {lhs}", evaluate=False) for curve_param, value in params.items(): expr = expr.subs(curve_param, k(value)) - if len(expr.free_symbols) > 1 or (param := str(expr.free_symbols.pop())) not in self.parameters: + if ( + len(expr.free_symbols) > 1 + or (param := str(expr.free_symbols.pop())) not in self.parameters + ): raise ValueError( - f"This formula couldn't be executed due to an unsupported assumption ({assumption_string}).") + f"This formula couldn't be executed due to an unsupported assumption ({assumption_string})." + ) def resolve(expression): if not expression.args: @@ -178,7 +189,9 @@ class Formula(ABC): params[param] = Mod(int(root), field) break else: - raise UnsatisfiedAssumptionError(f"Unsatisfied assumption in the formula ({assumption_string}).") + raise UnsatisfiedAssumptionError( + f"Unsatisfied assumption in the formula ({assumption_string})." + ) def __call__(self, field: int, *points: Any, **params: Mod) -> Tuple[Any, ...]: """ @@ -190,6 +203,7 @@ class Formula(ABC): :return: The resulting point(s). """ from .point import Point + self.__validate_params(field, params) self.__validate_points(field, points, params) self.__validate_assumptions(field, params) @@ -201,7 +215,9 @@ class Formula(ABC): # TODO: This is not general enough, if for example the op is `t = 1/2`, it will be float. # Temporarily, add an assertion that this does not happen so we do not give bad results. if isinstance(op_result, float): - raise AssertionError(f"Bad stuff happened in op {op}, floats will pollute the results.") + raise AssertionError( + f"Bad stuff happened in op {op}, floats will pollute the results." + ) if not isinstance(op_result, Mod): op_result = Mod(op_result, field) action.add_operation(op, op_result) @@ -285,8 +301,14 @@ class Formula(ABC): @property def num_addsubs(self) -> int: """Number of additions and subtractions.""" - return len(list( - filter(lambda op: op.operator == OpType.Add or op.operator == OpType.Sub, self.code))) + return len( + list( + filter( + lambda op: op.operator == OpType.Add or op.operator == OpType.Sub, + self.code, + ) + ) + ) class EFDFormula(Formula): @@ -313,7 +335,10 @@ class EFDFormula(Formula): self.parameters.append(line[10:]) elif line.startswith("assume"): self.assumptions.append( - parse(line[7:].replace("=", "==").replace("^", "**"), mode="eval")) + parse( + line[7:].replace("=", "==").replace("^", "**"), mode="eval" + ) + ) elif line.startswith("unified"): self.unified = True line = f.readline().decode("ascii").rstrip() @@ -321,7 +346,9 @@ class EFDFormula(Formula): def __read_op3_file(self, path): with resource_stream(__name__, path) as f: for line in f.readlines(): - code_module = parse(line.decode("ascii").replace("^", "**"), path, mode="exec") + code_module = parse( + line.decode("ascii").replace("^", "**"), path, mode="exec" + ) self.code.append(CodeOp(code_module)) @property @@ -334,19 +361,29 @@ class EFDFormula(Formula): @property def inputs(self): - return set(var + str(i) for var, i in product(self.coordinate_model.variables, - range(1, 1 + self.num_inputs))) + return set( + var + str(i) + for var, i in product( + self.coordinate_model.variables, range(1, 1 + self.num_inputs) + ) + ) @property def outputs(self): - return set(var + str(i) for var, i in product(self.coordinate_model.variables, - range(self.output_index, - self.output_index + self.num_outputs))) + return set( + var + str(i) + for var, i in product( + self.coordinate_model.variables, + range(self.output_index, self.output_index + self.num_outputs), + ) + ) def __eq__(self, other): if not isinstance(other, EFDFormula): return False - return self.name == other.name and self.coordinate_model == other.coordinate_model + return ( + self.name == other.name and self.coordinate_model == other.coordinate_model + ) def __hash__(self): return hash(self.name) + hash(self.coordinate_model) @@ -355,6 +392,7 @@ class EFDFormula(Formula): @public class AdditionFormula(Formula, ABC): """A formula that adds two points.""" + shortname = "add" num_inputs = 2 num_outputs = 1 @@ -368,6 +406,7 @@ class AdditionEFDFormula(AdditionFormula, EFDFormula): @public class DoublingFormula(Formula, ABC): """A formula that doubles a point.""" + shortname = "dbl" num_inputs = 1 num_outputs = 1 @@ -381,6 +420,7 @@ class DoublingEFDFormula(DoublingFormula, EFDFormula): @public class TriplingFormula(Formula, ABC): """A formula that triples a point.""" + shortname = "tpl" num_inputs = 1 num_outputs = 1 @@ -394,6 +434,7 @@ class TriplingEFDFormula(TriplingFormula, EFDFormula): @public class NegationFormula(Formula, ABC): """A formula that negates a point.""" + shortname = "neg" num_inputs = 1 num_outputs = 1 @@ -407,6 +448,7 @@ class NegationEFDFormula(NegationFormula, EFDFormula): @public class ScalingFormula(Formula, ABC): """A formula that somehow scales the point (to a given representative of a projective class).""" + shortname = "scl" num_inputs = 1 num_outputs = 1 @@ -423,6 +465,7 @@ class DifferentialAdditionFormula(Formula, ABC): A differential addition formula that adds two points with a known difference. The first input point is the difference of the third input and the second input (`P[0] = P[2] - P[1]`). """ + shortname = "dadd" num_inputs = 3 num_outputs = 1 @@ -441,6 +484,7 @@ class LadderFormula(Formula, ABC): The first output point is the doubling of the second input point (`O[0] = 2 * P[1]`). The second output point is the addition of the second and third input points (`O[1] = P[1] + P[2]`). """ + shortname = "ladd" num_inputs = 3 num_outputs = 2 diff --git a/pyecsca/ec/key_agreement.py b/pyecsca/ec/key_agreement.py index c9e4a01..b58374f 100644 --- a/pyecsca/ec/key_agreement.py +++ b/pyecsca/ec/key_agreement.py @@ -16,12 +16,19 @@ from .point import Point @public class ECDHAction(ResultAction): """An ECDH key exchange.""" + params: DomainParameters hash_algo: Optional[Any] privkey: Mod pubkey: Point - def __init__(self, params: DomainParameters, hash_algo: Optional[Any], privkey: Mod, pubkey: Point): + def __init__( + self, + params: DomainParameters, + hash_algo: Optional[Any], + privkey: Mod, + pubkey: Point, + ): super().__init__() self.params = params self.hash_algo = hash_algo @@ -35,14 +42,21 @@ class ECDHAction(ResultAction): @public class KeyAgreement(object): """An EC based key agreement primitive. (ECDH)""" + mult: ScalarMultiplier params: DomainParameters pubkey: Point privkey: Mod hash_algo: Optional[Any] - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, privkey: Mod, - hash_algo: Optional[Any] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + hash_algo: Optional[Any] = None, + ): self.mult = mult self.params = params self.pubkey = pubkey @@ -65,7 +79,9 @@ class KeyAgreement(object): :return: The shared secret. """ - with ECDHAction(self.params, self.hash_algo, self.privkey, self.pubkey) as action: + with ECDHAction( + self.params, self.hash_algo, self.privkey, self.pubkey + ) as action: affine_point = self.perform_raw() x = int(affine_point.x) p = self.params.curve.prime @@ -80,8 +96,13 @@ class KeyAgreement(object): class ECDH_NONE(KeyAgreement): """Raw x-coordinate ECDH.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey) @@ -89,8 +110,13 @@ class ECDH_NONE(KeyAgreement): class ECDH_SHA1(KeyAgreement): """ECDH with SHA1 of x-coordinate.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey, hashlib.sha1) @@ -98,8 +124,13 @@ class ECDH_SHA1(KeyAgreement): class ECDH_SHA224(KeyAgreement): """ECDH with SHA224 of x-coordinate.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey, hashlib.sha224) @@ -107,8 +138,13 @@ class ECDH_SHA224(KeyAgreement): class ECDH_SHA256(KeyAgreement): """ECDH with SHA256 of x-coordinate.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey, hashlib.sha256) @@ -116,8 +152,13 @@ class ECDH_SHA256(KeyAgreement): class ECDH_SHA384(KeyAgreement): """ECDH with SHA384 of x-coordinate.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey, hashlib.sha384) @@ -125,6 +166,11 @@ class ECDH_SHA384(KeyAgreement): class ECDH_SHA512(KeyAgreement): """ECDH with SHA512 of x-coordinate.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, - privkey: Mod): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + pubkey: Point, + privkey: Mod, + ): super().__init__(mult, params, pubkey, privkey, hashlib.sha512) diff --git a/pyecsca/ec/key_generation.py b/pyecsca/ec/key_generation.py index d506585..a08c767 100644 --- a/pyecsca/ec/key_generation.py +++ b/pyecsca/ec/key_generation.py @@ -15,6 +15,7 @@ from .point import Point @public class KeygenAction(ResultAction): """A key generation.""" + params: DomainParameters def __init__(self, params: DomainParameters): @@ -28,11 +29,14 @@ class KeygenAction(ResultAction): @public class KeyGeneration(object): """Key generator.""" + mult: ScalarMultiplier params: DomainParameters affine: bool - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, affine: bool = False): + def __init__( + self, mult: ScalarMultiplier, params: DomainParameters, affine: bool = False + ): """ :param mult: The scalar multiplier to use during key generation. :param params: The domain parameters over which to generate the keypair. diff --git a/pyecsca/ec/mod.py b/pyecsca/ec/mod.py index 8aca8a9..8485cbe 100644 --- a/pyecsca/ec/mod.py +++ b/pyecsca/ec/mod.py @@ -125,6 +125,7 @@ def _check(func): @public class RandomModAction(ResultAction): """A random sampling from Z_n.""" + order: int def __init__(self, order: int): @@ -141,6 +142,7 @@ _mod_classes: Dict[str, Type] = {} @public class Mod(object): """An element x of ℤₙ.""" + x: Any n: Any @@ -153,7 +155,9 @@ class Mod(object): if selected_class not in _mod_classes: # Fallback to something selected_class = next(iter(_mod_classes.keys())) - return _mod_classes[selected_class].__new__(_mod_classes[selected_class], *args, **kwargs) + return _mod_classes[selected_class].__new__( + _mod_classes[selected_class], *args, **kwargs + ) @_check def __add__(self, other): @@ -254,6 +258,7 @@ class Mod(object): @public class RawMod(Mod): """An element x of ℤₙ (implemented using Python integers).""" + x: int n: int @@ -462,6 +467,7 @@ def _symbolic_check(func): @public class SymbolicMod(Mod): """A symbolic element x of ℤₙ (implemented using sympy).""" + x: Expr n: int @@ -489,10 +495,10 @@ class SymbolicMod(Mod): return -self + other def __neg__(self): - return self.__class__(- self.x, self.n) + return self.__class__(-self.x, self.n) def inverse(self): - return self.__class__(self.x**(-1), self.n) + return self.__class__(self.x ** (-1), self.n) def sqrt(self): raise NotImplementedError @@ -569,6 +575,7 @@ if has_gmp: @public class GMPMod(Mod): """An element x of ℤₙ. Implemented by GMP.""" + x: gmpy2.mpz n: gmpy2.mpz diff --git a/pyecsca/ec/model.py b/pyecsca/ec/model.py index 46b03b7..861dfca 100644 --- a/pyecsca/ec/model.py +++ b/pyecsca/ec/model.py @@ -14,6 +14,7 @@ from .coordinates import EFDCoordinateModel, CoordinateModel @public class CurveModel(object): """A model(form) of an elliptic curve.""" + name: str shortname: str coordinates: MutableMapping[str, CoordinateModel] @@ -134,6 +135,7 @@ class MontgomeryModel(EFDCurveModel): B y^2 = x^3 + A x^2 + x """ + def __init__(self): super().__init__("montgom") diff --git a/pyecsca/ec/mult.py b/pyecsca/ec/mult.py index 6fd6feb..7614dd4 100644 --- a/pyecsca/ec/mult.py +++ b/pyecsca/ec/mult.py @@ -8,8 +8,15 @@ from typing import Mapping, Tuple, Optional, MutableMapping, ClassVar, Set, Type from public import public from .context import ResultAction, Action -from .formula import (Formula, AdditionFormula, DoublingFormula, DifferentialAdditionFormula, - ScalingFormula, LadderFormula, NegationFormula) +from .formula import ( + Formula, + AdditionFormula, + DoublingFormula, + DifferentialAdditionFormula, + ScalingFormula, + LadderFormula, + NegationFormula, +) from .naf import naf, wnaf from .params import DomainParameters from .point import Point @@ -18,6 +25,7 @@ from .point import Point @public class ScalarMultiplicationAction(ResultAction): """A scalar multiplication of a point on a curve by a scalar.""" + point: Point scalar: int @@ -33,6 +41,7 @@ class ScalarMultiplicationAction(ResultAction): @public class PrecomputationAction(Action): """A precomputation of a point in scalar multiplication.""" + params: DomainParameters point: Point @@ -51,6 +60,7 @@ class ScalarMultiplier(ABC): of the point at infinity. :param formulas: Formulas this instance will use. """ + requires: ClassVar[Set[Type]] # Type[Formula] but mypy has a false positive """The set of formulas that the multiplier requires.""" optionals: ClassVar[Set[Type]] # Type[Formula] but mypy has a false positive @@ -64,8 +74,16 @@ class ScalarMultiplier(ABC): _initialized: bool = False def __init__(self, short_circuit: bool = True, **formulas: Optional[Formula]): - if len(set(formula.coordinate_model for formula in formulas.values() if - formula is not None)) != 1: + if ( + len( + set( + formula.coordinate_model + for formula in formulas.values() + if formula is not None + ) + ) + != 1 + ): raise ValueError self.short_circuit = short_circuit self.formulas = {k: v for k, v in formulas.items() if v is not None} @@ -78,7 +96,9 @@ class ScalarMultiplier(ABC): return copy(other) if other == self._params.curve.neutral: return copy(one) - return self.formulas["add"](self._params.curve.prime, one, other, **self._params.curve.parameters)[0] + return self.formulas["add"]( + self._params.curve.prime, one, other, **self._params.curve.parameters + )[0] def _dbl(self, point: Point) -> Point: if "dbl" not in self.formulas: @@ -86,12 +106,16 @@ class ScalarMultiplier(ABC): if self.short_circuit: if point == self._params.curve.neutral: return copy(point) - return self.formulas["dbl"](self._params.curve.prime, point, **self._params.curve.parameters)[0] + return self.formulas["dbl"]( + self._params.curve.prime, point, **self._params.curve.parameters + )[0] def _scl(self, point: Point) -> Point: if "scl" not in self.formulas: raise NotImplementedError - return self.formulas["scl"](self._params.curve.prime, point, **self._params.curve.parameters)[0] + return self.formulas["scl"]( + self._params.curve.prime, point, **self._params.curve.parameters + )[0] def _ladd(self, start: Point, to_dbl: Point, to_add: Point) -> Tuple[Point, ...]: if "ladd" not in self.formulas: @@ -101,7 +125,13 @@ class ScalarMultiplier(ABC): return to_dbl, to_add if to_add == self._params.curve.neutral: return self._dbl(to_dbl), to_dbl - return self.formulas["ladd"](self._params.curve.prime, start, to_dbl, to_add, **self._params.curve.parameters) + return self.formulas["ladd"]( + self._params.curve.prime, + start, + to_dbl, + to_add, + **self._params.curve.parameters, + ) def _dadd(self, start: Point, one: Point, other: Point) -> Point: if "dadd" not in self.formulas: @@ -111,12 +141,16 @@ class ScalarMultiplier(ABC): return copy(other) if other == self._params.curve.neutral: return copy(one) - return self.formulas["dadd"](self._params.curve.prime, start, one, other, **self._params.curve.parameters)[0] + return self.formulas["dadd"]( + self._params.curve.prime, start, one, other, **self._params.curve.parameters + )[0] def _neg(self, point: Point) -> Point: if "neg" not in self.formulas: raise NotImplementedError - return self.formulas["neg"](self._params.curve.prime, point, **self._params.curve.parameters)[0] + return self.formulas["neg"]( + self._params.curve.prime, point, **self._params.curve.parameters + )[0] def init(self, params: DomainParameters, point: Point): """ @@ -129,7 +163,10 @@ class ScalarMultiplier(ABC): :param point: The point to initialize the multiplier with. """ coord_model = set(self.formulas.values()).pop().coordinate_model - if params.curve.coordinate_model != coord_model or point.coordinate_model != coord_model: + if ( + params.curve.coordinate_model != coord_model + or point.coordinate_model != coord_model + ): raise ValueError self._params = params self._point = point @@ -156,14 +193,21 @@ class LTRMultiplier(ScalarMultiplier): The `always` parameter determines whether the double and add always method is used. """ + requires = {AdditionFormula, DoublingFormula} optionals = {ScalingFormula} always: bool complete: bool - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, always: bool = False, complete: bool = True, - short_circuit: bool = True): + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + scl: ScalingFormula = None, + always: bool = False, + complete: bool = True, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl) self.always = always self.complete = complete @@ -201,12 +245,19 @@ class RTLMultiplier(ScalarMultiplier): The `always` parameter determines whether the double and add always method is used. """ + requires = {AdditionFormula, DoublingFormula} optionals = {ScalingFormula} always: bool - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, always: bool = False, short_circuit: bool = True): + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + scl: ScalingFormula = None, + always: bool = False, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl) self.always = always @@ -240,11 +291,17 @@ class CoronMultiplier(ScalarMultiplier): https://link.springer.com/content/pdf/10.1007/3-540-48059-5_25.pdf """ + requires = {AdditionFormula, DoublingFormula} optionals = {ScalingFormula} - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, scl: ScalingFormula = None, - short_circuit: bool = True): + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + scl: ScalingFormula = None, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl) def multiply(self, scalar: int) -> Point: @@ -270,12 +327,19 @@ class LadderMultiplier(ScalarMultiplier): """ Montgomery ladder multiplier, using a three input, two output ladder formula. """ + requires = {LadderFormula} optionals = {DoublingFormula, ScalingFormula} complete: bool - def __init__(self, ladd: LadderFormula, dbl: DoublingFormula = None, scl: ScalingFormula = None, - complete: bool = True, short_circuit: bool = True): + def __init__( + self, + ladd: LadderFormula, + dbl: DoublingFormula = None, + scl: ScalingFormula = None, + complete: bool = True, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, ladd=ladd, dbl=dbl, scl=scl) self.complete = complete if (not complete or short_circuit) and dbl is None: @@ -311,12 +375,19 @@ class SimpleLadderMultiplier(ScalarMultiplier): """ Montgomery ladder multiplier, using addition and doubling formulas. """ + requires = {AdditionFormula, DoublingFormula} optionals = {ScalingFormula} complete: bool - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, scl: ScalingFormula = None, - complete: bool = True, short_circuit: bool = True): + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + scl: ScalingFormula = None, + complete: bool = True, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl) self.complete = complete @@ -349,12 +420,19 @@ class DifferentialLadderMultiplier(ScalarMultiplier): """ Montgomery ladder multiplier, using differential addition and doubling formulas. """ + requires = {DifferentialAdditionFormula, DoublingFormula} optionals = {ScalingFormula} complete: bool - def __init__(self, dadd: DifferentialAdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, complete: bool = True, short_circuit: bool = True): + def __init__( + self, + dadd: DifferentialAdditionFormula, + dbl: DoublingFormula, + scl: ScalingFormula = None, + complete: bool = True, + short_circuit: bool = True, + ): super().__init__(short_circuit=short_circuit, dadd=dadd, dbl=dbl, scl=scl) self.complete = complete @@ -388,13 +466,22 @@ class BinaryNAFMultiplier(ScalarMultiplier): """ Binary NAF (Non Adjacent Form) multiplier, left-to-right. """ + requires = {AdditionFormula, DoublingFormula, NegationFormula} optionals = {ScalingFormula} _point_neg: Point - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, - neg: NegationFormula, scl: ScalingFormula = None, short_circuit: bool = True): - super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl) + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + neg: NegationFormula, + scl: ScalingFormula = None, + short_circuit: bool = True, + ): + super().__init__( + short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl + ) def init(self, params: DomainParameters, point: Point): with PrecomputationAction(params, point): @@ -425,6 +512,7 @@ class WindowNAFMultiplier(ScalarMultiplier): """ Window NAF (Non Adjacent Form) multiplier, left-to-right. """ + requires = {AdditionFormula, DoublingFormula, NegationFormula} optionals = {ScalingFormula} _points: MutableMapping[int, Point] @@ -432,10 +520,19 @@ class WindowNAFMultiplier(ScalarMultiplier): precompute_negation: bool = False width: int - def __init__(self, add: AdditionFormula, dbl: DoublingFormula, - neg: NegationFormula, width: int, scl: ScalingFormula = None, - precompute_negation: bool = False, short_circuit: bool = True): - super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl) + def __init__( + self, + add: AdditionFormula, + dbl: DoublingFormula, + neg: NegationFormula, + width: int, + scl: ScalingFormula = None, + precompute_negation: bool = False, + short_circuit: bool = True, + ): + super().__init__( + short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl + ) self.width = width self.precompute_negation = precompute_negation diff --git a/pyecsca/ec/op.py b/pyecsca/ec/op.py index 2401624..e500914 100644 --- a/pyecsca/ec/op.py +++ b/pyecsca/ec/op.py @@ -1,8 +1,23 @@ """ This module provides a class for a code operation. """ -from ast import (Module, walk, Name, BinOp, UnaryOp, Constant, Mult, Div, Add, Sub, Pow, Assign, - operator as ast_operator, unaryop as ast_unaryop, USub) +from ast import ( + Module, + walk, + Name, + BinOp, + UnaryOp, + Constant, + Mult, + Div, + Add, + Sub, + Pow, + Assign, + operator as ast_operator, + unaryop as ast_unaryop, + USub, +) from enum import Enum from types import CodeType from typing import FrozenSet, cast, Any, Optional, Union @@ -15,6 +30,7 @@ from .mod import Mod @public class OpType(Enum): """A type of binary and unary operators.""" + Add = (2, "+") Sub = (2, "-") Neg = (1, "-") @@ -33,6 +49,7 @@ class OpType(Enum): @public class CodeOp(object): """An operation that can be executed.""" + result: str """The result variable of the operation (e.g. the `r` in `r = 2*a`).""" parameters: FrozenSet[str] @@ -90,7 +107,9 @@ class CodeOp(object): else: return None - def __to_op(self, op: Optional[Union[ast_operator, ast_unaryop]], left: Any, right: Any) -> OpType: + def __to_op( + self, op: Optional[Union[ast_operator, ast_unaryop]], left: Any, right: Any + ) -> OpType: if isinstance(op, Mult): return OpType.Mult elif isinstance(op, Div): diff --git a/pyecsca/ec/params.py b/pyecsca/ec/params.py index 6583bae..370b8eb 100644 --- a/pyecsca/ec/params.py +++ b/pyecsca/ec/params.py @@ -17,8 +17,13 @@ from .coordinates import AffineCoordinateModel, CoordinateModel from .curve import EllipticCurve from .error import UnsatisfiedAssumptionError, raise_unsatisified_assumption from .mod import Mod -from .model import (CurveModel, ShortWeierstrassModel, MontgomeryModel, EdwardsModel, - TwistedEdwardsModel) +from .model import ( + CurveModel, + ShortWeierstrassModel, + MontgomeryModel, + EdwardsModel, + TwistedEdwardsModel, +) from .point import Point, InfinityPoint from ..misc.cfg import getconfig @@ -26,6 +31,7 @@ from ..misc.cfg import getconfig @public class DomainParameters(object): """Domain parameters which specify a subgroup on an elliptic curve.""" + curve: EllipticCurve generator: Point order: int @@ -33,8 +39,15 @@ class DomainParameters(object): name: Optional[str] category: Optional[str] - def __init__(self, curve: EllipticCurve, generator: Point, order: int, - cofactor: int, name: Optional[str] = None, category: Optional[str] = None): + def __init__( + self, + curve: EllipticCurve, + generator: Point, + order: int, + cofactor: int, + name: Optional[str] = None, + category: Optional[str] = None, + ): self.curve = curve self.generator = generator self.order = order @@ -45,7 +58,12 @@ class DomainParameters(object): def __eq__(self, other): if not isinstance(other, DomainParameters): return False - return self.curve == other.curve and self.generator == other.generator and self.order == other.order and self.cofactor == other.cofactor + return ( + self.curve == other.curve + and self.generator == other.generator + and self.order == other.order + and self.cofactor == other.cofactor + ) def __get_name(self): if self.name and self.category: @@ -69,6 +87,7 @@ class DomainParameters(object): @public class DomainParameterCategory(object): """A category of domain parameters.""" + name: str description: str curves: List[DomainParameters] @@ -119,7 +138,9 @@ def _create_params(curve, coords, infty): param_names = ["a", "d"] else: raise ValueError("Unknown curve model.") - params = {name: Mod(int(curve["params"][name]["raw"], 16), field) for name in param_names} + params = { + name: Mod(int(curve["params"][name]["raw"], 16), field) for name in param_names + } # Check coordinate model name and assumptions coord_model: CoordinateModel @@ -139,8 +160,10 @@ def _create_params(curve, coords, infty): exec(compiled, None, alocals) for param, value in alocals.items(): if params[param] != value: - raise_unsatisified_assumption(getconfig().ec.unsatisfied_coordinate_assumption_action, - f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (= {value}).") + raise_unsatisified_assumption( + getconfig().ec.unsatisfied_coordinate_assumption_action, + f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (= {value}).", + ) except NameError: k = FF(field) assumption_string = unparse(assumption) @@ -148,15 +171,23 @@ def _create_params(curve, coords, infty): expr = sympify(f"{rhs} - {lhs}") for curve_param, value in params.items(): expr = expr.subs(curve_param, k(value)) - if len(expr.free_symbols) > 1 or (param := str(expr.free_symbols.pop())) not in coord_model.parameters: - raise ValueError(f"This coordinate model couldn't be loaded due to an unsupported assumption ({assumption_string}).") + if ( + len(expr.free_symbols) > 1 + or (param := str(expr.free_symbols.pop())) + not in coord_model.parameters + ): + raise ValueError( + f"This coordinate model couldn't be loaded due to an unsupported assumption ({assumption_string})." + ) poly = Poly(expr, symbols(param), domain=k) roots = poly.ground_roots() for root in roots.keys(): params[param] = Mod(int(root), field) break else: - raise UnsatisfiedAssumptionError(f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (0 = {expr}).") + raise UnsatisfiedAssumptionError( + f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (0 = {expr})." + ) # Construct the point at infinity infinity: Point @@ -170,26 +201,35 @@ def _create_params(curve, coords, infty): infinity_coords = {} for coordinate in coord_model.variables: if coordinate not in ilocals: - raise ValueError(f"Coordinate model {coord_model} requires infty option.") + raise ValueError( + f"Coordinate model {coord_model} requires infty option." + ) value = ilocals[coordinate] if isinstance(value, int): value = Mod(value, field) infinity_coords[coordinate] = value infinity = Point(coord_model, **infinity_coords) elliptic_curve = EllipticCurve(model, coord_model, field, infinity, params) # type: ignore[arg-type] - affine = Point(AffineCoordinateModel(model), - x=Mod(int(curve["generator"]["x"]["raw"], 16), field), - y=Mod(int(curve["generator"]["y"]["raw"], 16), field)) + affine = Point( + AffineCoordinateModel(model), + x=Mod(int(curve["generator"]["x"]["raw"], 16), field), + y=Mod(int(curve["generator"]["y"]["raw"], 16), field), + ) if not isinstance(coord_model, AffineCoordinateModel): generator = affine.to_model(coord_model, elliptic_curve) else: generator = affine - return DomainParameters(elliptic_curve, generator, order, cofactor, curve["name"], curve["category"]) + return DomainParameters( + elliptic_curve, generator, order, cofactor, curve["name"], curve["category"] + ) @public -def load_category(file: Union[str, Path, BinaryIO], coords: Union[str, Callable[[str], str]], - infty: Union[bool, Callable[[str], bool]] = True) -> DomainParameterCategory: +def load_category( + file: Union[str, Path, BinaryIO], + coords: Union[str, Callable[[str], str]], + infty: Union[bool, Callable[[str], bool]] = True, +) -> DomainParameterCategory: """ Load a category of domain parameters containing several curves from a JSON file. @@ -223,7 +263,9 @@ def load_category(file: Union[str, Path, BinaryIO], coords: Union[str, Callable[ @public -def load_params(file: Union[str, Path, BinaryIO], coords: str, infty: bool = True) -> DomainParameters: +def load_params( + file: Union[str, Path, BinaryIO], coords: str, infty: bool = True +) -> DomainParameters: """ Load a curve from a JSON file. @@ -245,8 +287,11 @@ def load_params(file: Union[str, Path, BinaryIO], coords: str, infty: bool = Tru @public -def get_category(category: str, coords: Union[str, Callable[[str], str]], - infty: Union[bool, Callable[[str], bool]] = True) -> DomainParameterCategory: +def get_category( + category: str, + coords: Union[str, Callable[[str], str]], + infty: Union[bool, Callable[[str], bool]] = True, +) -> DomainParameterCategory: """ Retrieve a category from the std-curves database at https://github.com/J08nY/std-curves. @@ -259,7 +304,9 @@ def get_category(category: str, coords: Union[str, Callable[[str], str]], :return: The category. """ listing = resource_listdir(__name__, "std") - categories = list(entry for entry in listing if resource_isdir(__name__, join("std", entry))) + categories = list( + entry for entry in listing if resource_isdir(__name__, join("std", entry)) + ) if category not in categories: raise ValueError(f"Category {category} not found.") json_path = join("std", category, "curves.json") @@ -268,7 +315,9 @@ def get_category(category: str, coords: Union[str, Callable[[str], str]], @public -def get_params(category: str, name: str, coords: str, infty: bool = True) -> DomainParameters: +def get_params( + category: str, name: str, coords: str, infty: bool = True +) -> DomainParameters: """ Retrieve a curve from a set of stored parameters. Uses the std-curves database at https://github.com/J08nY/std-curves. @@ -281,7 +330,9 @@ def get_params(category: str, name: str, coords: str, infty: bool = True) -> Dom :return: The curve. """ listing = resource_listdir(__name__, "std") - categories = list(entry for entry in listing if resource_isdir(__name__, join("std", entry))) + categories = list( + entry for entry in listing if resource_isdir(__name__, join("std", entry)) + ) if category not in categories: raise ValueError(f"Category {category} not found.") json_path = join("std", category, "curves.json") diff --git a/pyecsca/ec/point.py b/pyecsca/ec/point.py index 6ba6469..1d074fe 100644 --- a/pyecsca/ec/point.py +++ b/pyecsca/ec/point.py @@ -19,11 +19,14 @@ if TYPE_CHECKING: @public class CoordinateMappingAction(ResultAction): """A mapping of a point from one coordinate system to another one, usually one is an affine one.""" + model_from: CoordinateModel model_to: CoordinateModel point: "Point" - def __init__(self, model_from: CoordinateModel, model_to: CoordinateModel, point: "Point"): + def __init__( + self, model_from: CoordinateModel, model_to: CoordinateModel, point: "Point" + ): super().__init__() self.model_from = model_from self.model_to = model_to @@ -36,6 +39,7 @@ class CoordinateMappingAction(ResultAction): @public class Point(object): """A point with coordinates in a coordinate model.""" + coordinate_model: CoordinateModel coords: Mapping[str, Mod] field: int @@ -51,7 +55,9 @@ class Point(object): field = value.n else: if field != value.n: - raise ValueError(f"Mismatched coordinate field of definition, {field} vs {value.n}.") + raise ValueError( + f"Mismatched coordinate field of definition, {field} vs {value.n}." + ) self.field = field if field is not None else 0 def __getattribute__(self, name): @@ -65,7 +71,9 @@ class Point(object): def to_affine(self) -> "Point": """Convert this point into the affine coordinate model, if possible.""" affine_model = AffineCoordinateModel(self.coordinate_model.curve_model) - with CoordinateMappingAction(self.coordinate_model, affine_model, self) as action: + with CoordinateMappingAction( + self.coordinate_model, affine_model, self + ) as action: if isinstance(self.coordinate_model, AffineCoordinateModel): return action.exit(copy(self)) ops = [] @@ -91,11 +99,15 @@ class Point(object): result[op.result] = locls[op.result] return action.exit(Point(affine_model, **result)) - def to_model(self, coordinate_model: CoordinateModel, curve: "EllipticCurve") -> "Point": + def to_model( + self, coordinate_model: CoordinateModel, curve: "EllipticCurve" + ) -> "Point": """Convert an affine point into a given coordinate model, if possible.""" if not isinstance(self.coordinate_model, AffineCoordinateModel): raise ValueError - with CoordinateMappingAction(self.coordinate_model, coordinate_model, self) as action: + with CoordinateMappingAction( + self.coordinate_model, coordinate_model, self + ) as action: ops = [] for s in coordinate_model.satisfying: try: @@ -114,7 +126,10 @@ class Point(object): result[var] = locls[var] elif var == "X": result[var] = self.coords["x"] - if isinstance(coordinate_model, EFDCoordinateModel) and coordinate_model.name == "inverted": + if ( + isinstance(coordinate_model, EFDCoordinateModel) + and coordinate_model.name == "inverted" + ): result[var] = result[var].inverse() elif var == "Y": result[var] = self.coords["y"] @@ -124,11 +139,13 @@ class Point(object): elif coordinate_model.name == "yz": result[var] = result[var] * curve.parameters["r"] elif coordinate_model.name == "yzsquared": - result[var] = result[var]**2 * curve.parameters["r"] + result[var] = result[var] ** 2 * curve.parameters["r"] elif var.startswith("Z"): result[var] = Mod(1, curve.prime) elif var == "T": - result[var] = Mod(int(self.coords["x"] * self.coords["y"]), curve.prime) + result[var] = Mod( + int(self.coords["x"] * self.coords["y"]), curve.prime + ) else: raise NotImplementedError return action.exit(Point(coordinate_model, **result)) @@ -201,7 +218,9 @@ class InfinityPoint(Point): def to_affine(self) -> "InfinityPoint": return InfinityPoint(AffineCoordinateModel(self.coordinate_model.curve_model)) - def to_model(self, coordinate_model: CoordinateModel, curve: "EllipticCurve") -> "InfinityPoint": + def to_model( + self, coordinate_model: CoordinateModel, curve: "EllipticCurve" + ) -> "InfinityPoint": return InfinityPoint(coordinate_model) def equals_affine(self, other: "Point") -> bool: diff --git a/pyecsca/ec/signature.py b/pyecsca/ec/signature.py index 9df6599..0e6f546 100644 --- a/pyecsca/ec/signature.py +++ b/pyecsca/ec/signature.py @@ -18,12 +18,20 @@ from .point import Point @public class SignatureResult(object): """An ECDSA signature result (r, s).""" + r: int s: int - def __init__(self, r: int, s: int, data: Optional[bytes] = None, digest: Optional[bytes] = None, - nonce: Optional[int] = None, privkey: Optional[Mod] = None, - pubkey: Optional[Point] = None): + def __init__( + self, + r: int, + s: int, + data: Optional[bytes] = None, + digest: Optional[bytes] = None, + nonce: Optional[int] = None, + privkey: Optional[Mod] = None, + pubkey: Optional[Point] = None, + ): self.r = r self.s = s @@ -55,12 +63,12 @@ class SignatureResult(object): @public class ECDSAAction(Action): """An ECDSA action base class.""" + params: DomainParameters hash_algo: Optional[Any] msg: bytes - def __init__(self, params: DomainParameters, hash_algo: Optional[Any], - msg: bytes): + def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes): super().__init__() self.params = params self.hash_algo = hash_algo @@ -73,10 +81,16 @@ class ECDSAAction(Action): @public class ECDSASignAction(ECDSAAction): """An ECDSA signing.""" + privkey: Mod - def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes, - privkey: Mod): + def __init__( + self, + params: DomainParameters, + hash_algo: Optional[Any], + msg: bytes, + privkey: Mod, + ): super().__init__(params, hash_algo, msg) self.privkey = privkey @@ -87,11 +101,18 @@ class ECDSASignAction(ECDSAAction): @public class ECDSAVerifyAction(ECDSAAction): """An ECDSA verification.""" + signature: SignatureResult pubkey: Point - def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes, - signature: SignatureResult, pubkey: Point): + def __init__( + self, + params: DomainParameters, + hash_algo: Optional[Any], + msg: bytes, + signature: SignatureResult, + pubkey: Point, + ): super().__init__(params, hash_algo, msg) self.signature = signature self.pubkey = pubkey @@ -103,6 +124,7 @@ class ECDSAVerifyAction(ECDSAAction): @public class Signature(object): """An EC based signature primitive. (ECDSA)""" + mult: ScalarMultiplier params: DomainParameters add: Optional[AdditionFormula] @@ -110,10 +132,15 @@ class Signature(object): privkey: Optional[Mod] hash_algo: Optional[Any] - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None, - hash_algo: Optional[Any] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + hash_algo: Optional[Any] = None, + ): if pubkey is None and privkey is None: raise ValueError if add is None: @@ -153,8 +180,9 @@ class Signature(object): affine_point = point.to_affine() r = Mod(int(affine_point.x), self.params.order) s = nonce.inverse() * (Mod(z, self.params.order) + r * self.privkey) - return SignatureResult(int(r), int(s), digest=digest, nonce=int(nonce), - privkey=self.privkey) + return SignatureResult( + int(r), int(s), digest=digest, nonce=int(nonce), privkey=self.privkey + ) def sign_hash(self, digest: bytes, nonce: Optional[int] = None) -> SignatureResult: """Sign already hashed data.""" @@ -203,7 +231,9 @@ class Signature(object): """Verify data.""" if not self.can_verify or self.pubkey is None: raise RuntimeError("This instance cannot verify.") - with ECDSAVerifyAction(self.params, self.hash_algo, data, signature, self.pubkey): + with ECDSAVerifyAction( + self.params, self.hash_algo, data, signature, self.pubkey + ): if self.hash_algo is None: digest = data else: @@ -215,9 +245,14 @@ class Signature(object): class ECDSA_NONE(Signature): """ECDSA with raw message input.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey) @@ -225,9 +260,14 @@ class ECDSA_NONE(Signature): class ECDSA_SHA1(Signature): """ECDSA with SHA1.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey, hashlib.sha1) @@ -235,9 +275,14 @@ class ECDSA_SHA1(Signature): class ECDSA_SHA224(Signature): """ECDSA with SHA224.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey, hashlib.sha224) @@ -245,9 +290,14 @@ class ECDSA_SHA224(Signature): class ECDSA_SHA256(Signature): """ECDSA with SHA256.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey, hashlib.sha256) @@ -255,9 +305,14 @@ class ECDSA_SHA256(Signature): class ECDSA_SHA384(Signature): """ECDSA with SHA384.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey, hashlib.sha384) @@ -265,7 +320,12 @@ class ECDSA_SHA384(Signature): class ECDSA_SHA512(Signature): """ECDSA with SHA512.""" - def __init__(self, mult: ScalarMultiplier, params: DomainParameters, - add: Optional[AdditionFormula] = None, - pubkey: Optional[Point] = None, privkey: Optional[Mod] = None): + def __init__( + self, + mult: ScalarMultiplier, + params: DomainParameters, + add: Optional[AdditionFormula] = None, + pubkey: Optional[Point] = None, + privkey: Optional[Mod] = None, + ): super().__init__(mult, params, add, pubkey, privkey, hashlib.sha512) diff --git a/pyecsca/ec/transformations.py b/pyecsca/ec/transformations.py index 986064b..0987235 100644 --- a/pyecsca/ec/transformations.py +++ b/pyecsca/ec/transformations.py @@ -22,8 +22,12 @@ def __M_map(params, param_names, map_parameters, map_point, model): else: neutral = map_point(param_one, param_other, params.curve.neutral, aff) curve = EllipticCurve(model, aff, params.curve.prime, neutral, parameters) - return DomainParameters(curve, map_point(param_one, param_other, params.generator, aff), params.order, - params.cofactor) + return DomainParameters( + curve, + map_point(param_one, param_other, params.generator, aff), + params.order, + params.cofactor, + ) @public @@ -35,7 +39,8 @@ def M2SW(params: DomainParameters) -> DomainParameters: :return: The converted domain parameters. """ if not isinstance(params.curve.model, MontgomeryModel) or not isinstance( - params.curve.coordinate_model, AffineCoordinateModel): + params.curve.coordinate_model, AffineCoordinateModel + ): raise ValueError def map_parameters(A, B): @@ -48,7 +53,9 @@ def M2SW(params: DomainParameters) -> DomainParameters: v = pt.y / B return Point(aff, x=u, y=v) - return __M_map(params, ("a", "b"), map_parameters, map_point, ShortWeierstrassModel()) + return __M_map( + params, ("a", "b"), map_parameters, map_point, ShortWeierstrassModel() + ) @public @@ -60,7 +67,8 @@ def M2TE(params: DomainParameters) -> DomainParameters: :return: The converted domain parameters. """ if not isinstance(params.curve.model, MontgomeryModel) or not isinstance( - params.curve.coordinate_model, AffineCoordinateModel): + params.curve.coordinate_model, AffineCoordinateModel + ): raise ValueError def map_parameters(A, B): @@ -85,7 +93,8 @@ def TE2M(params: DomainParameters) -> DomainParameters: :return: The converted domain parameters. """ if not isinstance(params.curve.model, TwistedEdwardsModel) or not isinstance( - params.curve.coordinate_model, AffineCoordinateModel): + params.curve.coordinate_model, AffineCoordinateModel + ): raise ValueError def map_parameters(a, d): @@ -110,16 +119,25 @@ def SW2M(params: DomainParameters) -> DomainParameters: :return: The converted domain parameters. """ if not isinstance(params.curve.model, ShortWeierstrassModel) or not isinstance( - params.curve.coordinate_model, AffineCoordinateModel): + params.curve.coordinate_model, AffineCoordinateModel + ): raise ValueError ax = symbols("α") field = FF(params.curve.prime) - rhs = Poly(ax ** 3 + field(int(params.curve.parameters["a"])) * ax + field(int(params.curve.parameters["b"])), ax, domain=field) + rhs = Poly( + ax ** 3 + + field(int(params.curve.parameters["a"])) * ax + + field(int(params.curve.parameters["b"])), + ax, + domain=field, + ) roots = rhs.ground_roots() if not roots: - raise ValueError("Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root).") + raise ValueError( + "Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root)." + ) alpha = Mod(int(next(iter(roots.keys()))), params.curve.prime) - beta = (3 * alpha**2 + params.curve.parameters["a"]).sqrt() + beta = (3 * alpha ** 2 + params.curve.parameters["a"]).sqrt() def map_parameters(a, b): A = (3 * alpha) / beta @@ -143,16 +161,25 @@ def SW2TE(params: DomainParameters) -> DomainParameters: :return: The converted domain parameters. """ if not isinstance(params.curve.model, ShortWeierstrassModel) or not isinstance( - params.curve.coordinate_model, AffineCoordinateModel): + params.curve.coordinate_model, AffineCoordinateModel + ): raise ValueError ax = symbols("α") field = FF(params.curve.prime) - rhs = Poly(ax ** 3 + field(int(params.curve.parameters["a"])) * ax + field(int(params.curve.parameters["b"])), ax, domain=field) + rhs = Poly( + ax ** 3 + + field(int(params.curve.parameters["a"])) * ax + + field(int(params.curve.parameters["b"])), + ax, + domain=field, + ) roots = rhs.ground_roots() if not roots: - raise ValueError("Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root).") + raise ValueError( + "Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root)." + ) alpha = Mod(int(next(iter(roots.keys()))), params.curve.prime) - beta = (3 * alpha**2 + params.curve.parameters["a"]).sqrt() + beta = (3 * alpha ** 2 + params.curve.parameters["a"]).sqrt() def map_parameters(a, b): a = 3 * alpha + 2 * beta diff --git a/pyecsca/misc/cfg.py b/pyecsca/misc/cfg.py index df3cce1..8a5a9ee 100644 --- a/pyecsca/misc/cfg.py +++ b/pyecsca/misc/cfg.py @@ -11,6 +11,7 @@ from public import public @public class ECConfig(object): """Configuration for the :py:mod:`pyecsca.ec` package.""" + _no_inverse_action: str = "error" _non_residue_action: str = "error" _unsatisfied_formula_assumption_action: str = "error" @@ -111,6 +112,7 @@ class ECConfig(object): @public class Config(object): """A runtime configuration for the library.""" + ec: ECConfig """Configuration for the :py:mod:`pyecsca.ec` package.""" diff --git a/pyecsca/sca/re/rpa.py b/pyecsca/sca/re/rpa.py index 3dc00e2..ab038e8 100644 --- a/pyecsca/sca/re/rpa.py +++ b/pyecsca/sca/re/rpa.py @@ -7,8 +7,15 @@ This module provides functionality inspired by the Refined-Power Analysis attack from public import public from typing import MutableMapping, Optional -from ...ec.formula import FormulaAction, DoublingFormula, AdditionFormula, TriplingFormula, NegationFormula, \ - DifferentialAdditionFormula, LadderFormula +from ...ec.formula import ( + FormulaAction, + DoublingFormula, + AdditionFormula, + TriplingFormula, + NegationFormula, + DifferentialAdditionFormula, + LadderFormula, +) from ...ec.mult import ScalarMultiplicationAction, PrecomputationAction from ...ec.point import Point from ...ec.context import Context, Action @@ -17,6 +24,7 @@ from ...ec.context import Context, Action @public class MultipleContext(Context): """A context that traces the multiples of points computed.""" + base: Optional[Point] points: MutableMapping[Point, int] inside: bool @@ -51,7 +59,7 @@ class MultipleContext(Context): elif isinstance(action.formula, NegationFormula): inp = action.input_points[0] out = action.output_points[0] - self.points[out] = - self.points[inp] + self.points[out] = -self.points[inp] elif isinstance(action.formula, DifferentialAdditionFormula): diff, one, other = action.input_points out = action.output_points[0] diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py index 68d60dd..595fe32 100644 --- a/pyecsca/sca/scope/base.py +++ b/pyecsca/sca/scope/base.py @@ -12,6 +12,7 @@ from ..trace import Trace @public class SampleType(Enum): """The sample unit.""" + Raw = auto() Volt = auto() @@ -29,7 +30,9 @@ class Scope(object): """A list of channels available on this scope.""" raise NotImplementedError - def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + def setup_frequency( + self, frequency: int, pretrig: int, posttrig: int + ) -> Tuple[int, int]: """ Setup the frequency and sample count for the measurement. The scope might not support the requested values and will adjust them to get the next best frequency and the largest @@ -42,8 +45,9 @@ class Scope(object): """ raise NotImplementedError - def setup_channel(self, channel: str, coupling: str, range: float, offset: float, - enable: bool) -> None: + def setup_channel( + self, channel: str, coupling: str, range: float, offset: float, enable: bool + ) -> None: """ Setup a channel to use the coupling method and measure the given voltage range. @@ -55,8 +59,15 @@ class Scope(object): """ raise NotImplementedError - def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, - timeout: int, enable: bool) -> None: + def setup_trigger( + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, + ) -> None: """ Setup a trigger on a particular `channel`, the channel has to be set up and enabled. The trigger will fire based on the `threshold` and `direction`, if enabled, the trigger diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py index c5c21de..05c8e69 100644 --- a/pyecsca/sca/scope/chipwhisperer.py +++ b/pyecsca/sca/scope/chipwhisperer.py @@ -27,18 +27,29 @@ class ChipWhispererScope(Scope): # pragma: no cover def channels(self) -> Sequence[str]: return [] - def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + def setup_frequency( + self, frequency: int, pretrig: int, posttrig: int + ) -> Tuple[int, int]: if pretrig != 0: raise ValueError("ChipWhisperer does not support pretrig samples.") self.scope.clock.clkgen_freq = frequency self.scope.samples = posttrig return self.scope.clock.freq_ctr, self.scope.samples - def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool) -> None: + def setup_channel( + self, channel: str, coupling: str, range: float, offset: float, enable: bool + ) -> None: pass - def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, - timeout: int, enable: bool) -> None: + def setup_trigger( + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, + ) -> None: if enable: self.triggers.add(channel) elif channel in self.triggers: @@ -55,11 +66,16 @@ class ChipWhispererScope(Scope): # pragma: no cover def capture(self, timeout: Optional[int] = None) -> bool: return not self.scope.capture() - def retrieve(self, channel: str, type: SampleType, dtype=np.float16) -> Optional[Trace]: + def retrieve( + self, channel: str, type: SampleType, dtype=np.float16 + ) -> Optional[Trace]: data = self.scope.get_last_trace() if data is None: return None - return Trace(np.array(data, dtype=dtype), {"sampling_frequency": self.scope.clock.clkgen_freq, "channel": channel}) + return Trace( + np.array(data, dtype=dtype), + {"sampling_frequency": self.scope.clock.clkgen_freq, "channel": channel}, + ) def stop(self) -> None: pass diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py index 544ef35..577c7dc 100644 --- a/pyecsca/sca/scope/picoscope_alt.py +++ b/pyecsca/sca/scope/picoscope_alt.py @@ -39,20 +39,31 @@ class PicoScopeAlt(Scope): # pragma: no cover def channels(self) -> Sequence[str]: return list(self.ps.CHANNELS.keys()) - def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + def setup_frequency( + self, frequency: int, pretrig: int, posttrig: int + ) -> Tuple[int, int]: samples = pretrig + posttrig actual_frequency, max_samples = self.ps.setSamplingFrequency(frequency, samples) if max_samples < samples: - self.trig_ratio = (pretrig / samples) + self.trig_ratio = pretrig / samples samples = max_samples self.frequency = actual_frequency return actual_frequency, samples - def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool) -> None: + def setup_channel( + self, channel: str, coupling: str, range: float, offset: float, enable: bool + ) -> None: self.ps.setChannel(channel, coupling, range, offset, enable) - def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, - timeout: int, enable: bool) -> None: + def setup_trigger( + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, + ) -> None: self.ps.setSimpleTrigger(channel, threshold, direction, delay, timeout, enable) def setup_capture(self, channel: str, enable: bool) -> None: @@ -69,14 +80,23 @@ class PicoScopeAlt(Scope): # pragma: no cover return False return True - def retrieve(self, channel: str, type: SampleType, dtype=np.float32) -> Optional[Trace]: + def retrieve( + self, channel: str, type: SampleType, dtype=np.float32 + ) -> Optional[Trace]: if type == SampleType.Raw: data = self.ps.getDataRaw(channel).astype(dtype=dtype, copy=False) else: data = self.ps.getDataV(channel, dtype=dtype) if data is None: return None - return Trace(data, {"sampling_frequency": self.frequency, "channel": channel, "sample_type": type}) + return Trace( + data, + { + "sampling_frequency": self.frequency, + "channel": channel, + "sample_type": type, + }, + ) def stop(self) -> None: self.ps.stop() diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py index 8101f94..6237bb8 100644 --- a/pyecsca/sca/scope/picoscope_sdk.py +++ b/pyecsca/sca/scope/picoscope_sdk.py @@ -11,6 +11,7 @@ import numpy as np from picosdk.errors import CannotFindPicoSDKError from picosdk.functions import assert_pico_ok from picosdk.library import Library + try: from picosdk.ps3000 import ps3000 except CannotFindPicoSDKError as exc: @@ -33,8 +34,12 @@ from .base import Scope, SampleType from ..trace import Trace -def adc2volt(adc: Union[np.ndarray, ctypes.c_int16], - volt_range: float, adc_minmax: int, dtype=np.float32) -> Union[np.ndarray, float]: # pragma: no cover +def adc2volt( + adc: Union[np.ndarray, ctypes.c_int16], + volt_range: float, + adc_minmax: int, + dtype=np.float32, +) -> Union[np.ndarray, float]: # pragma: no cover """ Convert raw adc values to volts. @@ -51,8 +56,9 @@ def adc2volt(adc: Union[np.ndarray, ctypes.c_int16], raise ValueError -def volt2adc(volt: Union[np.ndarray, float], - volt_range: float, adc_minmax: int, dtype=np.float32) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover +def volt2adc( + volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32 +) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover """ Convert volt values to raw adc values. @@ -72,6 +78,7 @@ def volt2adc(volt: Union[np.ndarray, float], @public class PicoScopeSdk(Scope): # pragma: no cover """A PicoScope based scope.""" + MODULE: Library PREFIX: str CHANNELS: Mapping @@ -80,12 +87,7 @@ class PicoScopeSdk(Scope): # pragma: no cover MIN_ADC_VALUE: int COUPLING: Mapping TIME_UNITS: Mapping - TRIGGERS: Mapping = { - "above": 0, - "below": 1, - "rising": 2, - "falling": 3 - } + TRIGGERS: Mapping = {"above": 0, "below": 1, "rising": 2, "falling": 3} _variant: Optional[str] def __init__(self, variant: Optional[str] = None): @@ -112,28 +114,52 @@ class PicoScopeSdk(Scope): # pragma: no cover return self._variant info = (ctypes.c_int8 * 6)() size = ctypes.c_int16() - assert_pico_ok(self.__dispatch_call("GetUnitInfo", self.handle, ctypes.byref(info), 6, - ctypes.byref(size), 3)) - self._variant = "".join(chr(i) for i in info[:size.value]) + assert_pico_ok( + self.__dispatch_call( + "GetUnitInfo", self.handle, ctypes.byref(info), 6, ctypes.byref(size), 3 + ) + ) + self._variant = "".join(chr(i) for i in info[: size.value]) return self._variant - def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + def setup_frequency( + self, frequency: int, pretrig: int, posttrig: int + ) -> Tuple[int, int]: return self.set_frequency(frequency, pretrig, posttrig) - def set_channel(self, channel: str, enabled: bool, coupling: str, range: float, offset: float): + def set_channel( + self, channel: str, enabled: bool, coupling: str, range: float, offset: float + ): if offset != 0.0: raise ValueError("Nonzero offset not supported.") assert_pico_ok( - self.__dispatch_call("SetChannel", self.handle, self.CHANNELS[channel], enabled, - self.COUPLING[coupling], self.RANGES[range])) + self.__dispatch_call( + "SetChannel", + self.handle, + self.CHANNELS[channel], + enabled, + self.COUPLING[coupling], + self.RANGES[range], + ) + ) self.ranges[channel] = range - def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool): + def setup_channel( + self, channel: str, coupling: str, range: float, offset: float, enable: bool + ): self.set_channel(channel, enable, coupling, range, offset) - def _set_freq(self, frequency: int, pretrig: int, posttrig: int, period_bound: float, - timebase_bound: int, - low_freq: int, high_freq: int, high_subtract: int) -> Tuple[int, int]: + def _set_freq( + self, + frequency: int, + pretrig: int, + posttrig: int, + period_bound: float, + timebase_bound: int, + low_freq: int, + high_freq: int, + high_subtract: int, + ) -> Tuple[int, int]: samples = pretrig + posttrig period = 1 / frequency if low_freq == 0 or period > period_bound: @@ -145,8 +171,18 @@ class PicoScopeSdk(Scope): # pragma: no cover tb = timebase_bound actual_frequency = low_freq // 2 ** tb max_samples = ctypes.c_int32() - assert_pico_ok(self.__dispatch_call("GetTimebase", self.handle, tb, samples, None, 0, - ctypes.byref(max_samples), 0)) + assert_pico_ok( + self.__dispatch_call( + "GetTimebase", + self.handle, + tb, + samples, + None, + 0, + ctypes.byref(max_samples), + 0, + ) + ) if max_samples.value < samples: pretrig = max_samples.value * (pretrig // samples) posttrig = max_samples.value - pretrig @@ -158,20 +194,43 @@ class PicoScopeSdk(Scope): # pragma: no cover self.timebase = tb return actual_frequency, samples - def set_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]: + def set_frequency( + self, frequency: int, pretrig: int, posttrig: int + ) -> Tuple[int, int]: raise NotImplementedError - def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int, - timeout: int, enable: bool): + def setup_trigger( + self, + channel: str, + threshold: float, + direction: str, + delay: int, + timeout: int, + enable: bool, + ): self.set_trigger(direction, enable, threshold, channel, delay, timeout) - def set_trigger(self, type: str, enabled: bool, value: float, channel: str, - delay: int, timeout: int): + def set_trigger( + self, + type: str, + enabled: bool, + value: float, + channel: str, + delay: int, + timeout: int, + ): assert_pico_ok( - self.__dispatch_call("SetSimpleTrigger", self.handle, enabled, - self.CHANNELS[channel], - volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE), - self.TRIGGERS[type], delay, timeout)) + self.__dispatch_call( + "SetSimpleTrigger", + self.handle, + enabled, + self.CHANNELS[channel], + volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE), + self.TRIGGERS[type], + delay, + timeout, + ) + ) def setup_capture(self, channel: str, enable: bool): self.set_buffer(channel, enable) @@ -184,22 +243,44 @@ class PicoScopeSdk(Scope): # pragma: no cover del self.buffers[channel] buffer = (ctypes.c_int16 * self.samples)() assert_pico_ok( - self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), self.samples)) + self.__dispatch_call( + "SetDataBuffer", + self.handle, + self.CHANNELS[channel], + ctypes.byref(buffer), + self.samples, + ) + ) self.buffers[channel] = buffer else: assert_pico_ok( - self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel], - None, self.samples)) + self.__dispatch_call( + "SetDataBuffer", + self.handle, + self.CHANNELS[channel], + None, + self.samples, + ) + ) del self.buffers[channel] def arm(self): if self.samples is None or self.timebase is None: raise ValueError assert_pico_ok( - self.__dispatch_call("RunBlock", self.handle, self.pretrig, self.posttrig, - self.timebase, 0, - None, 0, None, None)) + self.__dispatch_call( + "RunBlock", + self.handle, + self.pretrig, + self.posttrig, + self.timebase, + 0, + None, + 0, + None, + None, + ) + ) def capture(self, timeout: Optional[int] = None) -> bool: start = time_ns() @@ -209,25 +290,48 @@ class PicoScopeSdk(Scope): # pragma: no cover check = ctypes.c_int16(0) while ready.value == check.value: sleep(0.001) - assert_pico_ok(self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready))) + assert_pico_ok( + self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready)) + ) if timeout is not None and (time_ns() - start) / 1e6 >= timeout: return False return True - def retrieve(self, channel: str, type: SampleType, dtype=np.float32) -> Optional[Trace]: + def retrieve( + self, channel: str, type: SampleType, dtype=np.float32 + ) -> Optional[Trace]: if self.samples is None: raise ValueError actual_samples = ctypes.c_int32(self.samples) overflow = ctypes.c_int16() assert_pico_ok( - self.__dispatch_call("GetValues", self.handle, 0, ctypes.byref(actual_samples), 1, - 0, 0, ctypes.byref(overflow))) + self.__dispatch_call( + "GetValues", + self.handle, + 0, + ctypes.byref(actual_samples), + 1, + 0, + 0, + ctypes.byref(overflow), + ) + ) arr = np.array(self.buffers[channel], dtype=dtype) if type == SampleType.Raw: data = arr else: - data = cast(np.ndarray, adc2volt(arr, self.ranges[channel], self.MAX_ADC_VALUE, dtype=dtype)) - return Trace(data, {"sampling_frequency": self.frequency, "channel": channel, "sample_type": type}) + data = cast( + np.ndarray, + adc2volt(arr, self.ranges[channel], self.MAX_ADC_VALUE, dtype=dtype), + ) + return Trace( + data, + { + "sampling_frequency": self.frequency, + "channel": channel, + "sample_type": type, + }, + ) def stop(self): assert_pico_ok(self.__dispatch_call("Stop")) @@ -243,23 +347,29 @@ class PicoScopeSdk(Scope): # pragma: no cover if isinstance(ps3000, CannotFindPicoSDKError): + @public class PS3000Scope(PicoScopeSdk): # pragma: no cover """A PicoScope 3000 series oscilloscope is not available. (Install `libps3000`).""" + def __init__(self, variant: Optional[str] = None): super().__init__(variant) raise ps3000 + + else: # pragma: no cover + @public class PS3000Scope(PicoScopeSdk): # type: ignore """A PicoScope 3000 series oscilloscope.""" + MODULE = ps3000 PREFIX = "ps3000" CHANNELS = { "A": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_A"], "B": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_B"], "C": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_C"], - "D": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_D"] + "D": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_D"], } RANGES = { @@ -276,48 +386,57 @@ else: # pragma: no cover 50.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_50V"], 100.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_100V"], 200.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_200V"], - 400.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_400V"] + 400.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_400V"], } MAX_ADC_VALUE = 32767 MIN_ADC_VALUE = -32767 - COUPLING = { - "AC": ps3000.PICO_COUPLING["AC"], - "DC": ps3000.PICO_COUPLING["DC"] - } + COUPLING = {"AC": ps3000.PICO_COUPLING["AC"], "DC": ps3000.PICO_COUPLING["DC"]} def get_variant(self): if self._variant is not None: return self._variant info = (ctypes.c_int8 * 6)() size = ctypes.c_int16(6) - assert_pico_ok(self.__dispatch_call("GetUnitInfo", self.handle, ctypes.byref(info), size, 3)) - self._variant = "".join(chr(i) for i in info[:size.value]) + assert_pico_ok( + self.__dispatch_call( + "GetUnitInfo", self.handle, ctypes.byref(info), size, 3 + ) + ) + self._variant = "".join(chr(i) for i in info[: size.value]) return self._variant - def set_frequency(self, frequency: int, pretrig: int, posttrig: int): # TODO: fix + def set_frequency( + self, frequency: int, pretrig: int, posttrig: int + ): # TODO: fix raise NotImplementedError if isinstance(ps4000, CannotFindPicoSDKError): + @public class PS4000Scope(PicoScopeSdk): # pragma: no cover """A PicoScope 4000 series oscilloscope is not available. (Install `libps4000`).""" + def __init__(self, variant: Optional[str] = None): super().__init__(variant) raise ps4000 + + else: # pragma: no cover + @public class PS4000Scope(PicoScopeSdk): # type: ignore """A PicoScope 4000 series oscilloscope.""" + MODULE = ps4000 PREFIX = "ps4000" CHANNELS = { "A": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_A"], "B": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_B"], "C": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_C"], - "D": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_D"] + "D": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_D"], } RANGES = { @@ -333,46 +452,54 @@ else: # pragma: no cover 10.0: ps4000.PS4000_RANGE["PS4000_10V"], 20.0: ps4000.PS4000_RANGE["PS4000_20V"], 50.0: ps4000.PS4000_RANGE["PS4000_50V"], - 100.0: ps4000.PS4000_RANGE["PS4000_100V"] + 100.0: ps4000.PS4000_RANGE["PS4000_100V"], } MAX_ADC_VALUE = 32764 MIN_ADC_VALUE = -32764 - COUPLING = { - "AC": ps4000.PICO_COUPLING["AC"], - "DC": ps4000.PICO_COUPLING["DC"] - } + COUPLING = {"AC": ps4000.PICO_COUPLING["AC"], "DC": ps4000.PICO_COUPLING["DC"]} def set_frequency(self, frequency: int, pretrig: int, posttrig: int): variant = self.get_variant() if variant in ("4223", "4224", "4423", "4424"): - return self._set_freq(frequency, pretrig, posttrig, 50e-9, 2, 80_000_000, 20_000_000, 1) + return self._set_freq( + frequency, pretrig, posttrig, 50e-9, 2, 80_000_000, 20_000_000, 1 + ) elif variant in ("4226", "4227"): - return self._set_freq(frequency, pretrig, posttrig, 32e-9, 3, 250_000_000, 31_250_000, - 2) + return self._set_freq( + frequency, pretrig, posttrig, 32e-9, 3, 250_000_000, 31_250_000, 2 + ) elif variant == "4262": - return self._set_freq(frequency, pretrig, posttrig, 0, 0, 0, 10_000_000, -1) + return self._set_freq( + frequency, pretrig, posttrig, 0, 0, 0, 10_000_000, -1 + ) if isinstance(ps5000, CannotFindPicoSDKError): + @public class PS5000Scope(PicoScopeSdk): # pragma: no cover """A PicoScope 5000 series oscilloscope is not available. (Install `libps5000`).""" + def __init__(self, variant: Optional[str] = None): super().__init__(variant) raise ps5000 + + else: # pragma: no cover + @public class PS5000Scope(PicoScopeSdk): # type: ignore """A PicoScope 5000 series oscilloscope.""" + MODULE = ps5000 PREFIX = "ps5000" CHANNELS = { "A": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_A"], "B": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_B"], "C": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_C"], - "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"] + "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"], } RANGES = { @@ -387,39 +514,44 @@ else: # pragma: no cover 5.00: 8, 10.0: 9, 20.0: 10, - 50.0: 11 + 50.0: 11, } MAX_ADC_VALUE = 32512 MIN_ADC_VALUE = -32512 - COUPLING = { - "AC": 0, - "DC": 1 - } + COUPLING = {"AC": 0, "DC": 1} def set_frequency(self, frequency: int, pretrig: int, posttrig: int): - return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2) + return self._set_freq( + frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2 + ) if isinstance(ps6000, CannotFindPicoSDKError): + @public class PS6000Scope(PicoScopeSdk): # pragma: no cover """A PicoScope 6000 series oscilloscope is not available. (Install `libps6000`).""" + def __init__(self, variant: Optional[str] = None): super().__init__(variant) raise ps6000 + + else: # pragma: no cover + @public class PS6000Scope(PicoScopeSdk): # type: ignore """A PicoScope 6000 series oscilloscope.""" + MODULE = ps6000 PREFIX = "ps6000" CHANNELS = { "A": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_A"], "B": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_B"], "C": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_C"], - "D": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_D"] + "D": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_D"], } RANGES = { @@ -434,7 +566,7 @@ else: # pragma: no cover 5.00: ps6000.PS6000_RANGE["PS6000_5V"], 10.0: ps6000.PS6000_RANGE["PS6000_10V"], 20.0: ps6000.PS6000_RANGE["PS6000_20V"], - 50.0: ps6000.PS6000_RANGE["PS6000_50V"] + 50.0: ps6000.PS6000_RANGE["PS6000_50V"], } MAX_ADC_VALUE = 32512 @@ -443,16 +575,31 @@ else: # pragma: no cover COUPLING = { "AC": ps6000.PS6000_COUPLING["PS6000_AC"], "DC": ps6000.PS6000_COUPLING["PS6000_DC_1M"], - "DC_50": ps6000.PS6000_COUPLING["PS6000_DC_50R"] + "DC_50": ps6000.PS6000_COUPLING["PS6000_DC_50R"], } def open(self): assert_pico_ok(ps6000.ps6000OpenUnit(ctypes.byref(self.handle), None)) - def set_channel(self, channel: str, enabled: bool, coupling: str, range: float, offset: float): - assert_pico_ok(ps6000.ps6000SetChannel(self.handle, self.CHANNELS[channel], enabled, - self.COUPLING[coupling], self.RANGES[range], offset, - ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"])) + def set_channel( + self, + channel: str, + enabled: bool, + coupling: str, + range: float, + offset: float, + ): + assert_pico_ok( + ps6000.ps6000SetChannel( + self.handle, + self.CHANNELS[channel], + enabled, + self.COUPLING[coupling], + self.RANGES[range], + offset, + ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"], + ) + ) def set_buffer(self, channel: str, enable: bool): if self.samples is None: @@ -462,15 +609,24 @@ else: # pragma: no cover del self.buffers[channel] buffer = (ctypes.c_int16 * self.samples)() assert_pico_ok( - ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], - ctypes.byref(buffer), self.samples, 0)) + ps6000.ps6000SetDataBuffer( + self.handle, + self.CHANNELS[channel], + ctypes.byref(buffer), + self.samples, + 0, + ) + ) self.buffers[channel] = buffer else: assert_pico_ok( - ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel], - None, self.samples, 0)) + ps6000.ps6000SetDataBuffer( + self.handle, self.CHANNELS[channel], None, self.samples, 0 + ) + ) del self.buffers[channel] def set_frequency(self, frequency: int, pretrig: int, posttrig: int): - return self._set_freq(frequency, pretrig, posttrig, 3.2e-9, 4, 5_000_000_000, 156_250_000, - 4) + return self._set_freq( + frequency, pretrig, posttrig, 3.2e-9, 4, 5_000_000_000, 156_250_000, 4 + ) diff --git a/pyecsca/sca/target/ISO7816.py b/pyecsca/sca/target/ISO7816.py index 233685e..1a9c8b8 100644 --- a/pyecsca/sca/target/ISO7816.py +++ b/pyecsca/sca/target/ISO7816.py @@ -14,6 +14,7 @@ from .base import Target @dataclass class CommandAPDU(object): # pragma: no cover """A command APDU that can be sent to an ISO7816-4 target.""" + cls: int ins: int p1: int @@ -28,30 +29,61 @@ class CommandAPDU(object): # pragma: no cover return bytes([self.cls, self.ins, self.p1, self.p2]) elif self.ne <= 256: # Case 2s - return bytes([self.cls, self.ins, self.p1, self.p2, self.ne if self.ne != 256 else 0]) + return bytes( + [ + self.cls, + self.ins, + self.p1, + self.p2, + self.ne if self.ne != 256 else 0, + ] + ) else: # Case 2e - return bytes([self.cls, self.ins, self.p1, self.p2]) + (self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0])) + return bytes([self.cls, self.ins, self.p1, self.p2]) + ( + self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0]) + ) elif self.ne is None or self.ne == 0: if len(self.data) <= 255: # Case 3s - return bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + self.data + return ( + bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + + self.data + ) else: # Case 3e - return bytes([self.cls, self.ins, self.p1, self.p2, 0]) + len(self.data).to_bytes(2, "big") + self.data + return ( + bytes([self.cls, self.ins, self.p1, self.p2, 0]) + + len(self.data).to_bytes(2, "big") + + self.data + ) else: if len(self.data) <= 255 and self.ne <= 256: # Case 4s - return bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + self.data + bytes([self.ne if self.ne != 256 else 0]) + return ( + bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + + self.data + + bytes([self.ne if self.ne != 256 else 0]) + ) else: # Case 4e - return bytes([self.cls, self.ins, self.p1, self.p2, 0]) + len(self.data).to_bytes(2, "big") + self.data + (self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0])) + return ( + bytes([self.cls, self.ins, self.p1, self.p2, 0]) + + len(self.data).to_bytes(2, "big") + + self.data + + ( + self.ne.to_bytes(2, "big") + if self.ne != 65536 + else bytes([0, 0]) + ) + ) @public @dataclass class ResponseAPDU(object): """A response APDU that can be received from an ISO7816-4 target.""" + data: bytes sw: int @@ -90,6 +122,7 @@ class ISO7816Target(Target, ABC): @public class ISO7816: """A bunch of ISO7816-4 constants (status words).""" + SW_FILE_FULL = 0x6A84 SW_UNKNOWN = 0x6F00 SW_CLA_NOT_SUPPORTED = 0x6E00 diff --git a/pyecsca/sca/target/PCSC.py b/pyecsca/sca/target/PCSC.py index a377751..4b2620c 100644 --- a/pyecsca/sca/target/PCSC.py +++ b/pyecsca/sca/target/PCSC.py @@ -37,7 +37,7 @@ class PCSCTarget(ISO7816Target): # pragma: no cover return bytes(self.connection.getATR()) def select(self, aid: bytes) -> bool: - apdu = CommandAPDU(0x00, 0xa4, 0x04, 0x00, aid) + apdu = CommandAPDU(0x00, 0xA4, 0x04, 0x00, aid) resp = self.send_apdu(apdu) return resp.sw == ISO7816.SW_NO_ERROR diff --git a/pyecsca/sca/target/binary.py b/pyecsca/sca/target/binary.py index 3e2877b..d4c402e 100644 --- a/pyecsca/sca/target/binary.py +++ b/pyecsca/sca/target/binary.py @@ -13,11 +13,14 @@ from .serial import SerialTarget @public class BinaryTarget(SerialTarget): """A binary target that is runnable on the host and communicates using the stdin/stdout streams.""" + binary: List[str] process: Optional[Popen] = None debug_output: bool - def __init__(self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs): + def __init__( + self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs + ): super().__init__() if not isinstance(binary, (str, list)): raise TypeError @@ -27,8 +30,13 @@ class BinaryTarget(SerialTarget): self.debug_output = debug_output def connect(self): - self.process = Popen(self.binary, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - text=True, bufsize=1) + self.process = Popen( + self.binary, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, + bufsize=1, + ) def write(self, data: bytes) -> None: if self.process is None: diff --git a/pyecsca/sca/target/chipwhisperer.py b/pyecsca/sca/target/chipwhisperer.py index ac3fa42..4037f12 100644 --- a/pyecsca/sca/target/chipwhisperer.py +++ b/pyecsca/sca/target/chipwhisperer.py @@ -22,7 +22,9 @@ class ChipWhispererTarget(Flashable, SimpleSerialTarget): # pragma: no cover using ChipWhisperer-Lite/Pro. """ - def __init__(self, target: SimpleSerial, scope: ScopeTemplate, programmer, **kwargs): + def __init__( + self, target: SimpleSerial, scope: ScopeTemplate, programmer, **kwargs + ): super().__init__() self.target = target self.scope = scope diff --git a/pyecsca/sca/target/ectester.py b/pyecsca/sca/target/ectester.py index 6ab10be..f6635c3 100644 --- a/pyecsca/sca/target/ectester.py +++ b/pyecsca/sca/target/ectester.py @@ -49,6 +49,7 @@ class ShiftableFlag(IntFlag): # pragma: no cover @public class KeypairEnum(ShiftableFlag): # pragma: no cover """ECTester's KeyPair type.""" + KEYPAIR_LOCAL = 0x01 KEYPAIR_REMOTE = 0x02 KEYPAIR_BOTH = KEYPAIR_LOCAL | KEYPAIR_REMOTE @@ -57,12 +58,13 @@ class KeypairEnum(ShiftableFlag): # pragma: no cover @public class InstructionEnum(IntEnum): # pragma: no cover """ECTester's instruction (INS).""" - INS_ALLOCATE = 0x5a - INS_CLEAR = 0x5b - INS_SET = 0x5c - INS_TRANSFORM = 0x5d - INS_GENERATE = 0x5e - INS_EXPORT = 0x5f + + INS_ALLOCATE = 0x5A + INS_CLEAR = 0x5B + INS_SET = 0x5C + INS_TRANSFORM = 0x5D + INS_GENERATE = 0x5E + INS_EXPORT = 0x5F INS_ECDH = 0x70 INS_ECDH_DIRECT = 0x71 INS_ECDSA = 0x72 @@ -73,13 +75,14 @@ class InstructionEnum(IntEnum): # pragma: no cover INS_ALLOCATE_SIG = 0x77 INS_GET_INFO = 0x78 INS_SET_DRY_RUN_MODE = 0x79 - INS_BUFFER = 0x7a - INS_PERFORM = 0x7b + INS_BUFFER = 0x7A + INS_PERFORM = 0x7B @public class KeyBuildEnum(IntEnum): # pragma: no cover """ECTester's key builder type.""" + BUILD_KEYPAIR = 0x01 BUILD_KEYBUILDER = 0x02 @@ -87,7 +90,8 @@ class KeyBuildEnum(IntEnum): # pragma: no cover @public class ExportEnum(IntEnum): # pragma: no cover """ECTester's export boolean.""" - EXPORT_TRUE = 0xff + + EXPORT_TRUE = 0xFF EXPORT_FALSE = 0x00 @classmethod @@ -98,13 +102,15 @@ class ExportEnum(IntEnum): # pragma: no cover @public class RunModeEnum(IntEnum): # pragma: no cover """ECTester's run mode.""" - MODE_NORMAL = 0xaa - MODE_DRY_RUN = 0xbb + + MODE_NORMAL = 0xAA + MODE_DRY_RUN = 0xBB @public class KeyEnum(ShiftableFlag): # pragma: no cover """ECTester's key enum.""" + PUBLIC = 0x01 PRIVATE = 0x02 BOTH = PRIVATE | PUBLIC @@ -113,6 +119,7 @@ class KeyEnum(ShiftableFlag): # pragma: no cover @public class AppletBaseEnum(IntEnum): # pragma: no cover """ECTester's JavaCard applet base version.""" + BASE_221 = 0x0221 BASE_222 = 0x0222 @@ -120,6 +127,7 @@ class AppletBaseEnum(IntEnum): # pragma: no cover @public class KeyClassEnum(IntEnum): # pragma: no cover """JavaCard EC-based key class.""" + ALG_EC_F2M = 4 ALG_EC_FP = 5 @@ -127,6 +135,7 @@ class KeyClassEnum(IntEnum): # pragma: no cover @public class KeyAgreementEnum(IntEnum): # pragma: no cover """JavaCard `KeyAgreement` type values.""" + ALG_EC_SVDP_DH = 1 ALG_EC_SVDP_DH_KDF = 1 ALG_EC_SVDP_DHC = 2 @@ -140,6 +149,7 @@ class KeyAgreementEnum(IntEnum): # pragma: no cover @public class SignatureEnum(IntEnum): # pragma: no cover """JavaCard `Signature` type values.""" + ALG_ECDSA_SHA = 17 ALG_ECDSA_SHA_224 = 37 ALG_ECDSA_SHA_256 = 33 @@ -150,6 +160,7 @@ class SignatureEnum(IntEnum): # pragma: no cover @public class TransformationEnum(ShiftableFlag): # pragma: no cover """ECTester's point/value transformation types.""" + NONE = 0x00 FIXED = 0x01 FULLRANDOM = 0x02 @@ -167,6 +178,7 @@ class TransformationEnum(ShiftableFlag): # pragma: no cover @public class FormatEnum(IntEnum): # pragma: no cover """ECTester's point format types.""" + UNCOMPRESSED = 0 COMPRESSED = 1 HYBRID = 2 @@ -175,8 +187,9 @@ class FormatEnum(IntEnum): # pragma: no cover @public class CurveEnum(IntEnum): # pragma: no cover """ECTester's curve constants.""" + default = 0x00 - external = 0xff + external = 0xFF secp112r1 = 0x01 secp128r1 = 0x02 secp160r1 = 0x03 @@ -186,15 +199,16 @@ class CurveEnum(IntEnum): # pragma: no cover secp384r1 = 0x07 secp521r1 = 0x08 sect163r1 = 0x09 - sect233r1 = 0x0a - sect283r1 = 0x0b - sect409r1 = 0x0c - sect571r1 = 0x0d + sect233r1 = 0x0A + sect283r1 = 0x0B + sect409r1 = 0x0C + sect571r1 = 0x0D @public class ParameterEnum(ShiftableFlag): # pragma: no cover """ECTester's parameter ids.""" + NONE = 0x00 FP = 0x01 F2M = 0x02 @@ -214,11 +228,13 @@ class ParameterEnum(ShiftableFlag): # pragma: no cover @public class ChunkingException(Exception): # pragma: no cover """An exception that is raised if an error happened during the chunking process of a large APDU.""" + pass class Response(ABC): # pragma: no cover """An abstract base class of a response APDU.""" + resp: ResponseAPDU sws: List[int] params: List[bytes] @@ -233,7 +249,7 @@ class Response(ABC): # pragma: no cover offset = 0 for i in range(num_sw): if len(resp.data) >= offset + 2: - self.sws[i] = int.from_bytes(resp.data[offset:offset + 2], "big") + self.sws[i] = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 if self.sws[i] != ISO7816.SW_NO_ERROR: self.success = False @@ -250,13 +266,13 @@ class Response(ABC): # pragma: no cover self.success = False self.error = True break - param_len = int.from_bytes(resp.data[offset:offset + 2], "big") + param_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 if len(resp.data) < offset + param_len: self.success = False self.error = True break - self.params[i] = resp.data[offset:offset + param_len] + self.params[i] = resp.data[offset : offset + param_len] offset += param_len def __repr__(self): @@ -322,12 +338,18 @@ class GenerateResponse(Response): # pragma: no cover @public class ExportResponse(Response): # pragma: no cover """A response to the Export command, contains the exported parameters/values.""" + keypair: KeypairEnum key: KeyEnum parameters: ParameterEnum - def __init__(self, resp: ResponseAPDU, keypair: KeypairEnum, key: KeyEnum, - params: ParameterEnum): + def __init__( + self, + resp: ResponseAPDU, + keypair: KeypairEnum, + key: KeyEnum, + params: ParameterEnum, + ): self.keypair = keypair self.key = key self.parameters = params @@ -344,7 +366,9 @@ class ExportResponse(Response): # pragma: no cover other = 0 other += 1 if key & KeyEnum.PUBLIC and params & ParameterEnum.W else 0 other += 1 if key & KeyEnum.PRIVATE and params & ParameterEnum.S else 0 - super().__init__(resp, exported, exported * keys * param_count + exported * other) + super().__init__( + resp, exported, exported * keys * param_count + exported * other + ) def get_index(self, keypair: KeypairEnum, param: ParameterEnum) -> Optional[int]: pair = KeypairEnum.KEYPAIR_LOCAL @@ -378,8 +402,10 @@ class ExportResponse(Response): # pragma: no cover return None def __repr__(self): - return f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, success={self.success}, error={self.error}, " \ - f"keypair={self.keypair.name}, key={self.key.name}, params={self.parameters.name})" + return ( + f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, success={self.success}, error={self.error}, " + f"keypair={self.keypair.name}, key={self.key.name}, params={self.parameters.name})" + ) @public @@ -434,6 +460,7 @@ class RunModeResponse(Response): # pragma: no cover class InfoResponse(Response): # pragma: no cover """A response to the Info command, contains all information about the applet version/environment.""" + version: str base: AppletBaseEnum system_version: float @@ -447,33 +474,39 @@ class InfoResponse(Response): # pragma: no cover super().__init__(resp, 1, 0) offset = 2 - version_len = int.from_bytes(resp.data[offset:offset + 2], "big") + version_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 - self.version = resp.data[offset:offset + version_len].decode() + self.version = resp.data[offset : offset + version_len].decode() offset += version_len - self.base = AppletBaseEnum(int.from_bytes(resp.data[offset:offset + 2], "big")) + self.base = AppletBaseEnum( + int.from_bytes(resp.data[offset : offset + 2], "big") + ) offset += 2 - system_version = int.from_bytes(resp.data[offset:offset + 2], "big") + system_version = int.from_bytes(resp.data[offset : offset + 2], "big") system_major = system_version >> 8 - system_minor = system_version & 0xff + system_minor = system_version & 0xFF minor_size = 1 if system_minor == 0 else ceil(log(system_minor, 10)) self.system_version = system_major + system_minor / (minor_size * 10) offset += 2 - self.object_deletion_supported = int.from_bytes(resp.data[offset:offset + 2], "big") == 1 + self.object_deletion_supported = ( + int.from_bytes(resp.data[offset : offset + 2], "big") == 1 + ) offset += 2 - self.buf_len = int.from_bytes(resp.data[offset:offset + 2], "big") + self.buf_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 - self.ram1_len = int.from_bytes(resp.data[offset:offset + 2], "big") + self.ram1_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 - self.ram2_len = int.from_bytes(resp.data[offset:offset + 2], "big") + self.ram2_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 - self.apdu_len = int.from_bytes(resp.data[offset:offset + 2], "big") + self.apdu_len = int.from_bytes(resp.data[offset : offset + 2], "big") offset += 2 def __repr__(self): - return f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, " \ - f"success={self.success}, error={self.error}, version={self.version}, base={self.base.name}, system_version={self.system_version}, " \ - f"object_deletion_supported={self.object_deletion_supported}, buf_len={self.buf_len}, ram1_len={self.ram1_len}, ram2_len={self.ram2_len}, apdu_len={self.apdu_len})" + return ( + f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, " + f"success={self.success}, error={self.error}, version={self.version}, base={self.base.name}, system_version={self.system_version}, " + f"object_deletion_supported={self.object_deletion_supported}, buf_len={self.buf_len}, ram1_len={self.ram1_len}, ram2_len={self.ram2_len}, apdu_len={self.apdu_len})" + ) @public @@ -482,7 +515,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover A smartcard target which communicates with the `ECTester <https://github.com/crocs-muni/ECTester>`_ applet on smartcards of the JavaCard platform using PCSC. """ - CLA_ECTESTER = 0xb0 + + CLA_ECTESTER = 0xB0 AID_PREFIX = bytes([0x45, 0x43, 0x54, 0x65, 0x73, 0x74, 0x65, 0x72]) AID_CURRENT_VERSION = bytes([0x30, 0x33, 0x33]) # Version v0.3.3 AID_SUFFIX_221 = bytes([0x62]) @@ -507,15 +541,19 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover chunk_length = 255 if chunk_start + chunk_length > len(data): chunk_length = len(data) - chunk_start - chunk = data[chunk_start: chunk_start + chunk_length] - chunk_apdu = CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_BUFFER, 0, 0, chunk) + chunk = data[chunk_start : chunk_start + chunk_length] + chunk_apdu = CommandAPDU( + self.CLA_ECTESTER, InstructionEnum.INS_BUFFER, 0, 0, chunk + ) resp = super().send_apdu(chunk_apdu) if resp.sw != 0x9000: raise ChunkingException() apdu = CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_PERFORM, 0, 0) resp = super().send_apdu(apdu) - if resp.sw & 0xff00 == ISO7816.SW_BYTES_REMAINING_00: - resp = super().send_apdu(CommandAPDU(0x00, 0xc0, 0x00, 0x00, None, resp.sw & 0xff)) + if resp.sw & 0xFF00 == ISO7816.SW_BYTES_REMAINING_00: + resp = super().send_apdu( + CommandAPDU(0x00, 0xC0, 0x00, 0x00, None, resp.sw & 0xFF) + ) return resp def select_applet(self, latest_version: bytes = AID_CURRENT_VERSION): @@ -548,7 +586,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover return True @staticmethod - def encode_parameters(params: ParameterEnum, obj: Union[DomainParameters, Point, int]) -> Mapping[ParameterEnum, bytes]: + def encode_parameters( + params: ParameterEnum, obj: Union[DomainParameters, Point, int] + ) -> Mapping[ParameterEnum, bytes]: """Encode values from `obj` into the byte parameters that the **ECTester** applet expects.""" def convert_int(obj: int) -> bytes: @@ -559,7 +599,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover return bytes(obj) result = {} - if isinstance(obj, DomainParameters) and isinstance(obj.curve.model, ShortWeierstrassModel): + if isinstance(obj, DomainParameters) and isinstance( + obj.curve.model, ShortWeierstrassModel + ): for param in params & ParameterEnum.DOMAIN_FP: if param == ParameterEnum.G: result[param] = convert_point(obj.generator.to_affine()) @@ -577,7 +619,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover for param in params & (ParameterEnum.G | ParameterEnum.W): result[param] = convert_point(obj) elif isinstance(obj, int): - for param in params & ((ParameterEnum.DOMAIN_FP ^ ParameterEnum.G) | ParameterEnum.S): + for param in params & ( + (ParameterEnum.DOMAIN_FP ^ ParameterEnum.G) | ParameterEnum.S + ): result[param] = convert_int(obj) else: raise TypeError @@ -591,8 +635,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE_KA, 0, 0, - bytes([ka_type]))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ALLOCATE_KA, + 0, + 0, + bytes([ka_type]), + ) + ) return AllocateKaResponse(resp) def allocate_sig(self, sig_type: SignatureEnum) -> AllocateSigResponse: @@ -602,12 +652,24 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :param sig_type: Which Signature type to allocate. :return: The response. """ - resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE_SIG, 0, 0, - bytes([sig_type]))) + resp = self.send_apdu( + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ALLOCATE_SIG, + 0, + 0, + bytes([sig_type]), + ) + ) return AllocateSigResponse(resp) - def allocate(self, keypair: KeypairEnum, builder: KeyBuildEnum, key_length: int, - key_class: KeyClassEnum) -> AllocateResponse: + def allocate( + self, + keypair: KeypairEnum, + builder: KeyBuildEnum, + key_length: int, + key_class: KeyClassEnum, + ) -> AllocateResponse: """ Send the Allocate KeyPair command. @@ -618,8 +680,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE, keypair, builder, - key_length.to_bytes(2, "big") + bytes([key_class]))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ALLOCATE, + keypair, + builder, + key_length.to_bytes(2, "big") + bytes([key_class]), + ) + ) return AllocateResponse(resp, keypair) def clear(self, keypair: KeypairEnum) -> ClearResponse: @@ -630,11 +698,17 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEAR, keypair, 0, None)) + CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEAR, keypair, 0, None) + ) return ClearResponse(resp, keypair) - def set(self, keypair: KeypairEnum, curve: CurveEnum, params: ParameterEnum, - values: Optional[Mapping[ParameterEnum, bytes]] = None) -> SetResponse: + def set( + self, + keypair: KeypairEnum, + curve: CurveEnum, + params: ParameterEnum, + values: Optional[Mapping[ParameterEnum, bytes]] = None, + ) -> SetResponse: """ Send the Set command. @@ -656,18 +730,31 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover break e <<= 1 resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve, - payload)) + CommandAPDU( + self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve, payload + ) + ) elif values is not None: raise ValueError("Values should be specified only if curve is external.") else: resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve, - params.to_bytes(2, "big"))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_SET, + keypair, + curve, + params.to_bytes(2, "big"), + ) + ) return SetResponse(resp, keypair) - def transform(self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum, - transformation: TransformationEnum) -> TransformResponse: + def transform( + self, + keypair: KeypairEnum, + key: KeyEnum, + params: ParameterEnum, + transformation: TransformationEnum, + ) -> TransformResponse: """ Send the Transform command. @@ -678,8 +765,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_TRANSFORM, keypair, key, - params.to_bytes(2, "big") + transformation.to_bytes(2, "big"))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_TRANSFORM, + keypair, + key, + params.to_bytes(2, "big") + transformation.to_bytes(2, "big"), + ) + ) return TransformResponse(resp, keypair) def generate(self, keypair: KeypairEnum) -> GenerateResponse: @@ -690,10 +783,15 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GENERATE, keypair, 0, None)) + CommandAPDU( + self.CLA_ECTESTER, InstructionEnum.INS_GENERATE, keypair, 0, None + ) + ) return GenerateResponse(resp, keypair) - def export(self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum) -> ExportResponse: + def export( + self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum + ) -> ExportResponse: """ Send the Export command. @@ -703,12 +801,24 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response, containing the exported parameters. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_EXPORT, keypair, key, - params.to_bytes(2, "big"))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_EXPORT, + keypair, + key, + params.to_bytes(2, "big"), + ) + ) return ExportResponse(resp, keypair, key, params) - def ecdh(self, pubkey: KeypairEnum, privkey: KeypairEnum, export: bool, - transformation: TransformationEnum, ka_type: KeyAgreementEnum) -> ECDHResponse: + def ecdh( + self, + pubkey: KeypairEnum, + privkey: KeypairEnum, + export: bool, + transformation: TransformationEnum, + ka_type: KeyAgreementEnum, + ) -> ECDHResponse: """ Send the ECDH command. @@ -720,13 +830,26 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDH, pubkey, privkey, - bytes([ExportEnum.from_bool(export)]) + transformation.to_bytes( - 2, "big") + bytes([ka_type]))) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ECDH, + pubkey, + privkey, + bytes([ExportEnum.from_bool(export)]) + + transformation.to_bytes(2, "big") + + bytes([ka_type]), + ) + ) return ECDHResponse(resp, export) - def ecdh_direct(self, privkey: KeypairEnum, export: bool, transformation: TransformationEnum, - ka_type: KeyAgreementEnum, pubkey: bytes) -> ECDHResponse: + def ecdh_direct( + self, + privkey: KeypairEnum, + export: bool, + transformation: TransformationEnum, + ka_type: KeyAgreementEnum, + pubkey: bytes, + ) -> ECDHResponse: """ Send the ECDH direct command. @@ -738,14 +861,22 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDH_DIRECT, privkey, - ExportEnum.from_bool(export), - transformation.to_bytes(2, "big") + bytes([ka_type]) + len( - pubkey).to_bytes(2, "big") + pubkey)) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ECDH_DIRECT, + privkey, + ExportEnum.from_bool(export), + transformation.to_bytes(2, "big") + + bytes([ka_type]) + + len(pubkey).to_bytes(2, "big") + + pubkey, + ) + ) return ECDHResponse(resp, export) - def ecdsa(self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, - data: bytes) -> ECDSAResponse: + def ecdsa( + self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, data: bytes + ) -> ECDSAResponse: """ Send the ECDSA command. @@ -755,13 +886,20 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :param data: The data to sign and verify. :return: The response. """ - resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA, keypair, - ExportEnum.from_bool(export), - bytes([sig_type]) + len(data).to_bytes(2, "big") + data)) + resp = self.send_apdu( + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ECDSA, + keypair, + ExportEnum.from_bool(export), + bytes([sig_type]) + len(data).to_bytes(2, "big") + data, + ) + ) return ECDSAResponse(resp, export) - def ecdsa_sign(self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, - data: bytes) -> ECDSAResponse: + def ecdsa_sign( + self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, data: bytes + ) -> ECDSAResponse: """ Send the ECDSA sign command. @@ -772,13 +910,19 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA_SIGN, keypair, - ExportEnum.from_bool(export), - bytes([sig_type]) + len(data).to_bytes(2, "big") + data)) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ECDSA_SIGN, + keypair, + ExportEnum.from_bool(export), + bytes([sig_type]) + len(data).to_bytes(2, "big") + data, + ) + ) return ECDSAResponse(resp, export) - def ecdsa_verify(self, keypair: KeypairEnum, sig_type: SignatureEnum, sig: bytes, - data: bytes) -> ECDSAResponse: + def ecdsa_verify( + self, keypair: KeypairEnum, sig_type: SignatureEnum, sig: bytes, data: bytes + ) -> ECDSAResponse: """ Send the ECDSA verify command. @@ -788,10 +932,15 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :param data: The data. :return: The response. """ - resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA_VERIFY, - keypair, sig_type, - len(data).to_bytes(2, "big") + data + len(sig).to_bytes(2, - "big") + sig)) + resp = self.send_apdu( + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_ECDSA_VERIFY, + keypair, + sig_type, + len(data).to_bytes(2, "big") + data + len(sig).to_bytes(2, "big") + sig, + ) + ) return ECDSAResponse(resp, False) def cleanup(self) -> CleanupResponse: @@ -801,7 +950,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEANUP, 0, 0, None)) + CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEANUP, 0, 0, None) + ) return CleanupResponse(resp) def info(self) -> InfoResponse: @@ -811,7 +961,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GET_INFO, 0, 0, None)) + CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GET_INFO, 0, 0, None) + ) return InfoResponse(resp) def run_mode(self, run_mode: RunModeEnum) -> RunModeResponse: @@ -821,6 +972,12 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover :return: The response. """ resp = self.send_apdu( - CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET_DRY_RUN_MODE, run_mode, 0, - None)) + CommandAPDU( + self.CLA_ECTESTER, + InstructionEnum.INS_SET_DRY_RUN_MODE, + run_mode, + 0, + None, + ) + ) return RunModeResponse(resp) diff --git a/pyecsca/sca/target/simpleserial.py b/pyecsca/sca/target/simpleserial.py index 03e5768..03cb9cf 100644 --- a/pyecsca/sca/target/simpleserial.py +++ b/pyecsca/sca/target/simpleserial.py @@ -13,6 +13,7 @@ from .serial import SerialTarget @public class SimpleSerialMessage(object): """A SimpleSerial message consisting of a starting character and a hexadecimal string.""" + char: str data: str @@ -60,7 +61,9 @@ class SimpleSerialTarget(SerialTarget): result[msg.char] = msg return result - def send_cmd(self, cmd: SimpleSerialMessage, timeout: int) -> Mapping[str, SimpleSerialMessage]: + def send_cmd( + self, cmd: SimpleSerialMessage, timeout: int + ) -> Mapping[str, SimpleSerialMessage]: """ Send a :py:class:`SimpleSerialMessage` and receive the response messages that the command produces, within a `timeout`. @@ -71,7 +74,7 @@ class SimpleSerialTarget(SerialTarget): """ data = bytes(cmd) for i in range(0, len(data), 64): - chunk = data[i:i + 64] + chunk = data[i : i + 64] sleep(0.010) self.write(chunk) self.write(b"\n") diff --git a/pyecsca/sca/trace/align.py b/pyecsca/sca/trace/align.py index 56be5b3..d97e358 100644 --- a/pyecsca/sca/trace/align.py +++ b/pyecsca/sca/trace/align.py @@ -11,8 +11,9 @@ from .process import normalize from .trace import Trace -def _align_reference(reference: Trace, *traces: Trace, - align_func: Callable[[Trace], Tuple[bool, int]]) -> Tuple[List[Trace], List[int]]: +def _align_reference( + reference: Trace, *traces: Trace, align_func: Callable[[Trace], Tuple[bool, int]] +) -> Tuple[List[Trace], List[int]]: result = [deepcopy(reference)] offsets = [0] for trace in traces: @@ -25,18 +26,23 @@ def _align_reference(reference: Trace, *traces: Trace, else: result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype) if offset > 0: - result_samples[:length - offset] = trace.samples[offset:] + result_samples[: length - offset] = trace.samples[offset:] else: - result_samples[-offset:] = trace.samples[:length + offset] + result_samples[-offset:] = trace.samples[: length + offset] result.append(trace.with_samples(result_samples)) offsets.append(offset) return result, offsets @public -def align_correlation(reference: Trace, *traces: Trace, - reference_offset: int, reference_length: int, - max_offset: int, min_correlation: float = 0.5) -> Tuple[List[Trace], List[int]]: +def align_correlation( + reference: Trace, + *traces: Trace, + reference_offset: int, + reference_length: int, + max_offset: int, + min_correlation: float = 0.5 +) -> Tuple[List[Trace], List[int]]: """ Align `traces` to the reference `trace`. Using the cross-correlation of a part of the reference trace starting at `reference_offset` with `reference_length` and try to match it to a part of @@ -54,14 +60,19 @@ def align_correlation(reference: Trace, *traces: Trace, """ reference_centered = normalize(reference) reference_part = reference_centered.samples[ - reference_offset:reference_offset + reference_length] + reference_offset : reference_offset + reference_length + ] def align_func(trace): length = len(trace.samples) correlation_start = max(reference_offset - max_offset, 0) - correlation_end = min(reference_offset + reference_length + max_offset, length - 1) + correlation_end = min( + reference_offset + reference_length + max_offset, length - 1 + ) trace_part = trace.samples[correlation_start:correlation_end] - trace_part = (trace_part - np.mean(trace_part)) / (np.std(trace_part) * len(trace_part)) + trace_part = (trace_part - np.mean(trace_part)) / ( + np.std(trace_part) * len(trace_part) + ) correlation = np.correlate(trace_part, reference_part, "same") max_correlation_offset = correlation.argmax(axis=0) max_correlation = correlation[max_correlation_offset] @@ -76,8 +87,13 @@ def align_correlation(reference: Trace, *traces: Trace, @public -def align_peaks(reference: Trace, *traces: Trace, - reference_offset: int, reference_length: int, max_offset: int) -> Tuple[List[Trace], List[int]]: +def align_peaks( + reference: Trace, + *traces: Trace, + reference_offset: int, + reference_length: int, + max_offset: int +) -> Tuple[List[Trace], List[int]]: """ Align `traces` to the reference `trace` so that the maximum value within the reference trace window from `reference_offset` of `reference_length` aligns with the maximum @@ -90,14 +106,16 @@ def align_peaks(reference: Trace, *traces: Trace, :param max_offset: :return: """ - reference_part = reference.samples[reference_offset: reference_offset + reference_length] + reference_part = reference.samples[ + reference_offset : reference_offset + reference_length + ] reference_peak = np.argmax(reference_part) def align_func(trace): length = len(trace.samples) window_start = max(reference_offset - max_offset, 0) window_end = min(reference_offset + reference_length + max_offset, length - 1) - window = trace.samples[window_start: window_end] + window = trace.samples[window_start:window_end] window_peak = np.argmax(window) left_space = min(max_offset, reference_offset) return True, int(window_peak - reference_peak - left_space) @@ -106,9 +124,15 @@ def align_peaks(reference: Trace, *traces: Trace, @public -def align_offset(reference: Trace, *traces: Trace, - reference_offset: int, reference_length: int, max_offset: int, - dist_func: Callable[[np.ndarray, np.ndarray], float], max_dist: float = float("inf")) -> Tuple[List[Trace], List[int]]: +def align_offset( + reference: Trace, + *traces: Trace, + reference_offset: int, + reference_length: int, + max_offset: int, + dist_func: Callable[[np.ndarray, np.ndarray], float], + max_dist: float = float("inf") +) -> Tuple[List[Trace], List[int]]: """ Align `traces` to the reference `trace` so that the value of the `dist_func` is minimized between the reference trace window from `reference_offset` of `reference_length` and the trace @@ -122,7 +146,9 @@ def align_offset(reference: Trace, *traces: Trace, :param dist_func: :return: """ - reference_part = reference.samples[reference_offset: reference_offset + reference_length] + reference_part = reference.samples[ + reference_offset : reference_offset + reference_length + ] def align_func(trace): length = len(trace.samples) @@ -142,12 +168,18 @@ def align_offset(reference: Trace, *traces: Trace, return True, best_offset else: return False, 0 + return _align_reference(reference, *traces, align_func=align_func) @public -def align_sad(reference: Trace, *traces: Trace, - reference_offset: int, reference_length: int, max_offset: int) -> Tuple[List[Trace], List[int]]: +def align_sad( + reference: Trace, + *traces: Trace, + reference_offset: int, + reference_length: int, + max_offset: int +) -> Tuple[List[Trace], List[int]]: """ Align `traces` to the reference `trace` so that the Sum Of Absolute Differences between the reference trace window from `reference_offset` of `reference_length` and the trace being aligned @@ -160,17 +192,24 @@ def align_sad(reference: Trace, *traces: Trace, :param max_offset: :return: """ + def sad(reference_part, trace_part): return float(np.sum(np.abs(reference_part - trace_part))) - return align_offset(reference, *traces, - reference_offset=reference_offset, reference_length=reference_length, - max_offset=max_offset, dist_func=sad) + return align_offset( + reference, + *traces, + reference_offset=reference_offset, + reference_length=reference_length, + max_offset=max_offset, + dist_func=sad + ) @public -def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1, - fast: bool = True) -> List[Trace]: +def align_dtw_scale( + reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True +) -> List[Trace]: """ Align `traces` to the `reference` trace. Using fastdtw (Dynamic Time Warping) with scaling as per: @@ -206,7 +245,9 @@ def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1, @public -def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True) -> List[Trace]: +def align_dtw( + reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True +) -> List[Trace]: """ Align `traces` to the `reference` trace. Using fastdtw (Dynamic Time Warping) as per: @@ -228,9 +269,13 @@ def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = Tr dist, path = fastdtw(reference_samples, trace.samples, radius=radius) else: dist, path = dtw(reference_samples, trace.samples) - result_samples = np.zeros(max((len(trace.samples), len(reference_samples))), dtype=trace.samples.dtype) - pairs = np.array(np.array(path, dtype=np.dtype("int,int")), - dtype=np.dtype([("x", "int"), ("y", "int")])) + result_samples = np.zeros( + max((len(trace.samples), len(reference_samples))), dtype=trace.samples.dtype + ) + pairs = np.array( + np.array(path, dtype=np.dtype("int,int")), + dtype=np.dtype([("x", "int"), ("y", "int")]), + ) result_samples[pairs["x"]] = trace.samples[pairs["y"]] del pairs del path diff --git a/pyecsca/sca/trace/combine.py b/pyecsca/sca/trace/combine.py index 8b80869..853b127 100644 --- a/pyecsca/sca/trace/combine.py +++ b/pyecsca/sca/trace/combine.py @@ -25,13 +25,15 @@ def average(*traces: Trace) -> Optional[CombinedTrace]: s = np.zeros(min_samples, dtype=np.float64) for t in traces: s = np.add(s, t.samples[:min_samples]) - avg = ((1 / len(traces)) * s) + avg = (1 / len(traces)) * s del s return CombinedTrace(avg) @public -def conditional_average(*traces: Trace, condition: Callable[[Trace], bool]) -> Optional[CombinedTrace]: +def conditional_average( + *traces: Trace, condition: Callable[[Trace], bool] +) -> Optional[CombinedTrace]: """ Average `traces` for which the `condition` is True, sample-wise. @@ -107,8 +109,10 @@ def average_and_variance(*traces) -> Optional[Tuple[CombinedTrace, CombinedTrace if not traces: return None if len(traces) == 1: - return (CombinedTrace(traces[0].samples.copy()), - CombinedTrace(np.zeros(len(traces[0]), dtype=np.float64))) + return ( + CombinedTrace(traces[0].samples.copy()), + CombinedTrace(np.zeros(len(traces[0]), dtype=np.float64)), + ) min_samples = min(map(len, traces)) s = np.zeros(min_samples, dtype=np.float64) for t in traces: diff --git a/pyecsca/sca/trace/edit.py b/pyecsca/sca/trace/edit.py index c0af078..5aec15f 100644 --- a/pyecsca/sca/trace/edit.py +++ b/pyecsca/sca/trace/edit.py @@ -39,8 +39,11 @@ def reverse(trace: Trace) -> Trace: @public -def pad(trace: Trace, lengths: Union[Tuple[int, int], int], - values: Union[Tuple[Any, Any], Any] = (0, 0)) -> Trace: +def pad( + trace: Trace, + lengths: Union[Tuple[int, int], int], + values: Union[Tuple[Any, Any], Any] = (0, 0), +) -> Trace: """ Pad the samples of the `trace` by `values` at the beginning and end. @@ -53,4 +56,6 @@ def pad(trace: Trace, lengths: Union[Tuple[int, int], int], lengths = (lengths, lengths) if not isinstance(values, tuple): values = (values, values) - return trace.with_samples(np.pad(trace.samples, lengths, "constant", constant_values=values)) + return trace.with_samples( + np.pad(trace.samples, lengths, "constant", constant_values=values) + ) diff --git a/pyecsca/sca/trace/filter.py b/pyecsca/sca/trace/filter.py index 720d311..2fbc284 100644 --- a/pyecsca/sca/trace/filter.py +++ b/pyecsca/sca/trace/filter.py @@ -8,13 +8,23 @@ from typing import Union, Tuple from .trace import Trace -def _filter_any(trace: Trace, sampling_frequency: int, - cutoff: Union[int, Tuple[int, int]], band_type: str) -> Trace: +def _filter_any( + trace: Trace, + sampling_frequency: int, + cutoff: Union[int, Tuple[int, int]], + band_type: str, +) -> Trace: nyq = 0.5 * sampling_frequency if not isinstance(cutoff, int): - b, a = butter(6, tuple(map(lambda x: x / nyq, cutoff)), btype=band_type, analog=False, output='ba') + b, a = butter( + 6, + tuple(map(lambda x: x / nyq, cutoff)), + btype=band_type, + analog=False, + output="ba", + ) else: - b, a = butter(6, cutoff / nyq, btype=band_type, analog=False, output='ba') + b, a = butter(6, cutoff / nyq, btype=band_type, analog=False, output="ba") return trace.with_samples(lfilter(b, a, trace.samples)) @@ -47,7 +57,9 @@ def filter_highpass(trace: Trace, sampling_frequency: int, cutoff: int) -> Trace @public -def filter_bandpass(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace: +def filter_bandpass( + trace: Trace, sampling_frequency: int, low: int, high: int +) -> Trace: """ Apply a bandpass digital filter (Butterworth) to `trace`, given `sampling_frequency`, with the passband from `low` to `high`. @@ -62,7 +74,9 @@ def filter_bandpass(trace: Trace, sampling_frequency: int, low: int, high: int) @public -def filter_bandstop(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace: +def filter_bandstop( + trace: Trace, sampling_frequency: int, low: int, high: int +) -> Trace: """ Apply a bandstop digital filter (Butterworth) to `trace`, given `sampling_frequency`, with the stopband from `low` to `high`. diff --git a/pyecsca/sca/trace/match.py b/pyecsca/sca/trace/match.py index 3728d13..50c86bc 100644 --- a/pyecsca/sca/trace/match.py +++ b/pyecsca/sca/trace/match.py @@ -43,7 +43,9 @@ def match_pattern(trace: Trace, pattern: Trace, threshold: float = 0.8) -> List[ @public -def match_part(trace: Trace, offset: int, length: int, threshold: float = 0.8) -> List[int]: +def match_part( + trace: Trace, offset: int, length: int, threshold: float = 0.8 +) -> List[int]: """ Match a part of a `trace` starting at `offset` of `length` to the `trace`. Returns indices where the pattern matches , e.g. those where correlation of the two traces has peaks larger than `threshold`. Uses the diff --git a/pyecsca/sca/trace/plot.py b/pyecsca/sca/trace/plot.py index b9843cc..6a8cec1 100644 --- a/pyecsca/sca/trace/plot.py +++ b/pyecsca/sca/trace/plot.py @@ -40,10 +40,12 @@ def plot_traces(*traces: Trace, **kwargs): # pragma: no cover ["orange", "darkorange"], ["plum", "deeppink"], ["peru", "chocolate"], - ["cyan", "darkcyan"] + ["cyan", "darkcyan"], ] dss = [] for i, trace in enumerate(traces): - line = hv.Curve((range(len(trace)), trace.samples), kdims="x", vdims="y", **kwargs) + line = hv.Curve( + (range(len(trace)), trace.samples), kdims="x", vdims="y", **kwargs + ) dss.append(datashade(line, normalization="log", cmap=_cmaps[i % len(_cmaps)])) return reduce(lambda x, y: x * y, dss) diff --git a/pyecsca/sca/trace/process.py b/pyecsca/sca/trace/process.py index 87b00ee..2499df4 100644 --- a/pyecsca/sca/trace/process.py +++ b/pyecsca/sca/trace/process.py @@ -61,8 +61,14 @@ def rolling_mean(trace: Trace, window: int) -> Trace: :param window: :return: """ - return trace.with_samples(cast(np.ndarray, np.mean(_rolling_window(trace.samples, window), -1).astype( - dtype=trace.samples.dtype, copy=False))) + return trace.with_samples( + cast( + np.ndarray, + np.mean(_rolling_window(trace.samples, window), -1).astype( + dtype=trace.samples.dtype, copy=False + ), + ) + ) @public @@ -101,7 +107,9 @@ def normalize(trace: Trace) -> Trace: :param trace: :return: """ - return trace.with_samples((trace.samples - np.mean(trace.samples)) / np.std(trace.samples)) + return trace.with_samples( + (trace.samples - np.mean(trace.samples)) / np.std(trace.samples) + ) @public @@ -112,5 +120,7 @@ def normalize_wl(trace: Trace) -> Trace: :param trace: :return: """ - return trace.with_samples((trace.samples - np.mean(trace.samples)) / ( - np.std(trace.samples) * len(trace.samples))) + return trace.with_samples( + (trace.samples - np.mean(trace.samples)) + / (np.std(trace.samples) * len(trace.samples)) + ) diff --git a/pyecsca/sca/trace/sampling.py b/pyecsca/sca/trace/sampling.py index b263adc..71dbdac 100644 --- a/pyecsca/sca/trace/sampling.py +++ b/pyecsca/sca/trace/sampling.py @@ -20,8 +20,15 @@ def downsample_average(trace: Trace, factor: int = 2) -> Trace: :param factor: :return: """ - resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor)) - result_samples = cast(np.ndarray, resized.reshape(-1, factor).mean(axis=1).astype(trace.samples.dtype, copy=False)) + resized = np.resize( + trace.samples, len(trace.samples) - (len(trace.samples) % factor) + ) + result_samples = cast( + np.ndarray, + resized.reshape(-1, factor) + .mean(axis=1) + .astype(trace.samples.dtype, copy=False), + ) return trace.with_samples(result_samples) @@ -49,8 +56,13 @@ def downsample_max(trace: Trace, factor: int = 2) -> Trace: :param factor: :return: """ - resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor)) - result_samples = cast(np.ndarray, resized.reshape(-1, factor).max(axis=1).astype(trace.samples.dtype, copy=False)) + resized = np.resize( + trace.samples, len(trace.samples) - (len(trace.samples) % factor) + ) + result_samples = cast( + np.ndarray, + resized.reshape(-1, factor).max(axis=1).astype(trace.samples.dtype, copy=False), + ) return trace.with_samples(result_samples) @@ -64,8 +76,13 @@ def downsample_min(trace: Trace, factor: int = 2) -> Trace: :param factor: :return: """ - resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor)) - result_samples = cast(np.ndarray, resized.reshape(-1, factor).min(axis=1).astype(trace.samples.dtype, copy=False)) + resized = np.resize( + trace.samples, len(trace.samples) - (len(trace.samples) % factor) + ) + result_samples = cast( + np.ndarray, + resized.reshape(-1, factor).min(axis=1).astype(trace.samples.dtype, copy=False), + ) return trace.with_samples(result_samples) diff --git a/pyecsca/sca/trace/test.py b/pyecsca/sca/trace/test.py index 8512a70..f90498a 100644 --- a/pyecsca/sca/trace/test.py +++ b/pyecsca/sca/trace/test.py @@ -12,8 +12,9 @@ from .combine import average_and_variance from .edit import trim -def _ttest_func(first_set: Sequence[Trace], second_set: Sequence[Trace], - equal_var: bool) -> Optional[CombinedTrace]: +def _ttest_func( + first_set: Sequence[Trace], second_set: Sequence[Trace], equal_var: bool +) -> Optional[CombinedTrace]: if not first_set or not second_set or len(first_set) == 0 or len(second_set) == 0: return None first_stack = np.stack([first.samples for first in first_set]) @@ -23,7 +24,12 @@ def _ttest_func(first_set: Sequence[Trace], second_set: Sequence[Trace], @public -def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bool = False, p_value: bool = False) -> Optional[Tuple[CombinedTrace, ...]]: +def welch_ttest( + first_set: Sequence[Trace], + second_set: Sequence[Trace], + dof: bool = False, + p_value: bool = False, +) -> Optional[Tuple[CombinedTrace, ...]]: """ Perform the Welch's t-test sample wise on two sets of traces `first_set` and `second_set`. Useful for Test Vector Leakage Analysis (TVLA). @@ -51,8 +57,8 @@ def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bo tval = (mean_0.samples - mean_1.samples) / np.sqrt(varn_0 + varn_1) result = [CombinedTrace(tval)] if dof or p_value: - top = (varn_0 + varn_1)**2 - bot = (varn_0**2 / (n0 - 1)) + (varn_1**2 / (n1 - 1)) + top = (varn_0 + varn_1) ** 2 + bot = (varn_0 ** 2 / (n0 - 1)) + (varn_1 ** 2 / (n1 - 1)) df = top / bot del top del bot @@ -66,8 +72,9 @@ def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bo @public -def student_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional[ - CombinedTrace]: +def student_ttest( + first_set: Sequence[Trace], second_set: Sequence[Trace] +) -> Optional[CombinedTrace]: """ Perform the Students's t-test sample wise on two sets of traces `first_set` and `second_set`. Useful for Test Vector Leakage Analysis (TVLA). @@ -80,7 +87,9 @@ def student_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Op @public -def ks_test(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional[CombinedTrace]: +def ks_test( + first_set: Sequence[Trace], second_set: Sequence[Trace] +) -> Optional[CombinedTrace]: """ Perform the Kolmogorov-Smirnov two sample test on equality of distributions sample wise on two sets of traces `first_set` and `second_set`. diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py index aef0837..96f1ec8 100644 --- a/pyecsca/sca/trace/trace.py +++ b/pyecsca/sca/trace/trace.py @@ -13,10 +13,13 @@ from public import public @public class Trace(object): """A trace, which has some samples and metadata.""" + meta: Mapping[str, Any] samples: ndarray - def __init__(self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None): + def __init__( + self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None + ): """ Construct a new trace. @@ -92,8 +95,11 @@ class Trace(object): return Trace(copy(self.samples), copy(self.meta), copy(self.trace_set)) def __deepcopy__(self, memodict={}): - return Trace(deepcopy(self.samples, memo=memodict), deepcopy(self.meta, memo=memodict), - deepcopy(self.trace_set, memo=memodict)) + return Trace( + deepcopy(self.samples, memo=memodict), + deepcopy(self.meta, memo=memodict), + deepcopy(self.trace_set, memo=memodict), + ) def __repr__(self): return f"Trace(samples={self.samples!r}, trace_set={self.trace_set!r})" @@ -103,8 +109,13 @@ class Trace(object): class CombinedTrace(Trace): """A trace that was combined from other traces, `parents`.""" - def __init__(self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None, - parents: Sequence[Trace] = None): + def __init__( + self, + samples: ndarray, + meta: Mapping[str, Any] = None, + trace_set: Any = None, + parents: Sequence[Trace] = None, + ): super().__init__(samples, meta, trace_set=trace_set) self.parents = None if parents is not None: diff --git a/pyecsca/sca/trace_set/base.py b/pyecsca/sca/trace_set/base.py index 6f4e4b0..097d666 100644 --- a/pyecsca/sca/trace_set/base.py +++ b/pyecsca/sca/trace_set/base.py @@ -12,6 +12,7 @@ from ..trace import Trace @public class TraceSet(object): """A set of traces with some metadata.""" + _traces: List[Trace] _keys: List @@ -46,6 +47,7 @@ class TraceSet(object): raise NotImplementedError def __repr__(self): - args = ", ".join(["{}={!r}".format(key, getattr(self, key)) for key in - self._keys]) + args = ", ".join( + ["{}={!r}".format(key, getattr(self, key)) for key in self._keys] + ) return "TraceSet({})".format(args) diff --git a/pyecsca/sca/trace_set/chipwhisperer.py b/pyecsca/sca/trace_set/chipwhisperer.py index ba78a85..a56e676 100644 --- a/pyecsca/sca/trace_set/chipwhisperer.py +++ b/pyecsca/sca/trace_set/chipwhisperer.py @@ -24,7 +24,9 @@ class ChipWhispererTraceSet(TraceSet): raise ValueError @classmethod - def inplace(cls, input: Union[str, Path, bytes, BinaryIO]) -> "ChipWhispererTraceSet": + def inplace( + cls, input: Union[str, Path, bytes, BinaryIO] + ) -> "ChipWhispererTraceSet": raise NotImplementedError def write(self, output: Union[str, Path, BinaryIO]): @@ -39,10 +41,12 @@ class ChipWhispererTraceSet(TraceSet): name = file_name[7:-4] data = ChipWhispererTraceSet.__read_data(path, name) traces = [] - for samples, key, textin, textout in zip_longest(data["traces"], data["keylist"], - data["textin"], data["textout"]): + for samples, key, textin, textout in zip_longest( + data["traces"], data["keylist"], data["textin"], data["textout"] + ): traces.append( - Trace(samples, {"key": key, "textin": textin, "textout": textout})) + Trace(samples, {"key": key, "textin": textin, "textout": textout}) + ) del data["traces"] del data["keylist"] del data["textin"] @@ -52,7 +56,13 @@ class ChipWhispererTraceSet(TraceSet): @classmethod def __read_data(cls, path, name): - types = {"keylist": None, "knownkey": None, "textin": None, "textout": None, "traces": None} + types = { + "keylist": None, + "knownkey": None, + "textin": None, + "textout": None, + "traces": None, + } for type in types.keys(): type_path = join(path, name + type + ".npy") if exists(type_path) and isfile(type_path): diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py index 4b3b7b0..f1d34ef 100644 --- a/pyecsca/sca/trace_set/hdf5.py +++ b/pyecsca/sca/trace_set/hdf5.py @@ -22,6 +22,7 @@ from .. import Trace @public class HDF5Meta(MutableMapping): """Metadata mapping that is HDF5-compatible (items are picklable).""" + _dataset: h5py.AttributeManager def __init__(self, attrs: h5py.AttributeManager): @@ -55,12 +56,18 @@ class HDF5Meta(MutableMapping): @public class HDF5TraceSet(TraceSet): """A traceset based on the HDF5 (Hierarchical Data Format).""" + _file: Optional[h5py.File] _ordering: List[str] # _meta: Optional[HDF5Meta] - def __init__(self, *traces: Trace, _file: Optional[h5py.File] = None, - _ordering: Optional[List[str]] = None, **kwargs): + def __init__( + self, + *traces: Trace, + _file: Optional[h5py.File] = None, + _ordering: Optional[List[str]] = None, + **kwargs, + ): # self._meta = HDF5Meta(_file.attrs) if _file is not None else None self._file = _file if _ordering is None: @@ -76,7 +83,9 @@ class HDF5TraceSet(TraceSet): else: raise TypeError kwargs = dict(hdf5.attrs) - kwargs["_ordering"] = list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys()) + kwargs["_ordering"] = ( + list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys()) + ) traces = [] for k in kwargs["_ordering"]: meta = dict(HDF5Meta(hdf5[k].attrs)) @@ -94,7 +103,9 @@ class HDF5TraceSet(TraceSet): else: raise TypeError kwargs = dict(hdf5.attrs) - kwargs["_ordering"] = list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys()) + kwargs["_ordering"] = ( + list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys()) + ) traces = [] for k in kwargs["_ordering"]: meta = HDF5Meta(hdf5[k].attrs) @@ -182,5 +193,11 @@ class HDF5TraceSet(TraceSet): fname = self._file.filename else: status = "(closed)" - args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys if not key.startswith("_")]) + args = ", ".join( + [ + f"{key}={getattr(self, key)!r}" + for key in self._keys + if not key.startswith("_") + ] + ) return f"HDF5TraceSet('{fname}'{status}, {args})" diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py index cc3f81f..83bbade 100644 --- a/pyecsca/sca/trace_set/inspector.py +++ b/pyecsca/sca/trace_set/inspector.py @@ -25,7 +25,7 @@ class SampleCoding(IntEnum): def dtype(self): char = "f" if self.value & 0x10 else "i" - return np.dtype("<{}{}".format(char, self.value & 0x0f)) + return np.dtype("<{}{}".format(char, self.value & 0x0F)) @public @@ -107,37 +107,44 @@ class InspectorTraceSet(TraceSet): _tag_parsers: dict = { 0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int), 0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int), - 0x43: ("sample_coding", 1, - lambda bytes: SampleCoding(Parsers.read_int(bytes)), - lambda coding, length: Parsers.write_int(coding.value, - length=length)), + 0x43: ( + "sample_coding", + 1, + lambda bytes: SampleCoding(Parsers.read_int(bytes)), + lambda coding, length: Parsers.write_int(coding.value, length=length), + ), 0x44: ("data_space", 2, Parsers.read_int, Parsers.write_int), 0x45: ("title_space", 1, Parsers.read_int, Parsers.write_int), 0x46: ("global_title", None, Parsers.read_str, Parsers.write_str), 0x47: ("description", None, Parsers.read_str, Parsers.write_str), 0x48: ("x_offset", None, Parsers.read_int, Parsers.write_int), 0x49: ("x_label", None, Parsers.read_str, Parsers.write_str), - 0x4a: ("y_label", None, Parsers.read_str, Parsers.write_str), - 0x4b: ("x_scale", 4, Parsers.read_float, Parsers.write_float), - 0x4c: ("y_scale", 4, Parsers.read_float, Parsers.write_float), - 0x4d: ("trace_offset", 4, Parsers.read_int, Parsers.write_int), - 0x4e: ("log_scale", 1, Parsers.read_int, Parsers.write_int), + 0x4A: ("y_label", None, Parsers.read_str, Parsers.write_str), + 0x4B: ("x_scale", 4, Parsers.read_float, Parsers.write_float), + 0x4C: ("y_scale", 4, Parsers.read_float, Parsers.write_float), + 0x4D: ("trace_offset", 4, Parsers.read_int, Parsers.write_int), + 0x4E: ("log_scale", 1, Parsers.read_int, Parsers.write_int), 0x55: ("scope_range", 4, Parsers.read_float, Parsers.write_float), 0x56: ("scope_coupling", 4, Parsers.read_int, Parsers.write_int), 0x57: ("scope_offset", 4, Parsers.read_float, Parsers.write_float), 0x58: ("scope_impedance", 4, Parsers.read_float, Parsers.write_float), 0x59: ("scope_id", None, Parsers.read_str, Parsers.write_str), - 0x5a: ("filter_type", 4, Parsers.read_int, Parsers.write_int), - 0x5b: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float), - 0x5c: ("filter_range", 4, Parsers.read_float, Parsers.read_float), + 0x5A: ("filter_type", 4, Parsers.read_int, Parsers.write_int), + 0x5B: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float), + 0x5C: ("filter_range", 4, Parsers.read_float, Parsers.read_float), 0x60: ("external_clock", 1, Parsers.read_bool, Parsers.write_bool), 0x61: ("external_clock_threshold", 4, Parsers.read_float, Parsers.write_float), 0x62: ("external_clock_multiplier", 4, Parsers.read_int, Parsers.write_int), 0x63: ("external_clock_phase_shift", 4, Parsers.read_int, Parsers.write_int), 0x64: ("external_clock_resampler_mask", 4, Parsers.read_int, Parsers.write_int), - 0x65: ("external_clock_resampler_enabled", 1, Parsers.read_bool, Parsers.write_bool), + 0x65: ( + "external_clock_resampler_enabled", + 1, + Parsers.read_bool, + Parsers.write_bool, + ), 0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float), - 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int) + 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int), } @classmethod @@ -171,28 +178,33 @@ class InspectorTraceSet(TraceSet): tag = ord(file.read(1)) length = ord(file.read(1)) if length & 0x80: - length = Parsers.read_int(file.read(length & 0x7f)) + length = Parsers.read_int(file.read(length & 0x7F)) value = file.read(length) if tag in InspectorTraceSet._tag_parsers: tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag] if tag_len is None or length == tag_len: tags[tag_name] = tag_reader(value) - elif tag == 0x5f and length == 0: + elif tag == 0x5F and length == 0: break else: continue result = [] for _ in range(tags["num_traces"]): - title = None if "title_space" not in tags else Parsers.read_str( - file.read(tags["title_space"])) + title = ( + None + if "title_space" not in tags + else Parsers.read_str(file.read(tags["title_space"])) + ) data = None if "data_space" not in tags else file.read(tags["data_space"]) dtype = tags["sample_coding"].dtype() try: samples = np.fromfile(file, dtype, tags["num_samples"]) except UnsupportedOperation: samples = np.frombuffer( - file.read(dtype.itemsize * tags["num_samples"]), dtype, - tags["num_samples"]) + file.read(dtype.itemsize * tags["num_samples"]), + dtype, + tags["num_samples"], + ) result.append(Trace(samples, {"title": title, "data": data})) return result, tags @@ -222,12 +234,13 @@ class InspectorTraceSet(TraceSet): tag_byte = Parsers.write_int(tag, length=1) value_bytes = tag_writer(getattr(self, tag_name), tag_len) length = len(value_bytes) - if length <= 0x7f: + if length <= 0x7F: length_bytes = Parsers.write_int(length, length=1) else: - length_data = Parsers.write_int(length, length=(length.bit_length() + 7) // 8) - length_bytes = Parsers.write_int( - 0x80 | len(length_data)) + length_data + length_data = Parsers.write_int( + length, length=(length.bit_length() + 7) // 8 + ) + length_bytes = Parsers.write_int(0x80 | len(length_data)) + length_data file.write(tag_byte) file.write(length_bytes) file.write(value_bytes) @@ -238,7 +251,9 @@ class InspectorTraceSet(TraceSet): file.write(Parsers.write_str(trace.meta["title"])) if self.data_space != 0 and trace.meta["data"] is not None: file.write(trace.meta["data"]) - unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding) + unscaled = InspectorTraceSet.__unscale( + trace.samples, self.y_scale, self.sample_coding + ) try: unscaled.tofile(file) except UnsupportedOperation: |
