aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyecsca
diff options
context:
space:
mode:
authorJ08nY2021-04-10 17:50:05 +0200
committerJ08nY2021-04-10 17:50:05 +0200
commitb76ec0890e4cf997ce5a0b4494722931094683f7 (patch)
tree2dc049de8a79adc7b3b23d746ef4f28d58e33fbc /pyecsca
parenta7ad11f7cd917be55dbd036a516fefda4d19dd4a (diff)
downloadpyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.tar.gz
pyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.tar.zst
pyecsca-b76ec0890e4cf997ce5a0b4494722931094683f7.zip
Use black.
Diffstat (limited to 'pyecsca')
-rw-r--r--pyecsca/ec/configuration.py75
-rw-r--r--pyecsca/ec/context.py21
-rw-r--r--pyecsca/ec/coordinates.py23
-rw-r--r--pyecsca/ec/curve.py39
-rw-r--r--pyecsca/ec/formula.py78
-rw-r--r--pyecsca/ec/key_agreement.py78
-rw-r--r--pyecsca/ec/key_generation.py6
-rw-r--r--pyecsca/ec/mod.py13
-rw-r--r--pyecsca/ec/model.py2
-rw-r--r--pyecsca/ec/mult.py159
-rw-r--r--pyecsca/ec/op.py25
-rw-r--r--pyecsca/ec/params.py99
-rw-r--r--pyecsca/ec/point.py37
-rw-r--r--pyecsca/ec/signature.py128
-rw-r--r--pyecsca/ec/transformations.py55
-rw-r--r--pyecsca/misc/cfg.py2
-rw-r--r--pyecsca/sca/re/rpa.py14
-rw-r--r--pyecsca/sca/scope/base.py21
-rw-r--r--pyecsca/sca/scope/chipwhisperer.py28
-rw-r--r--pyecsca/sca/scope/picoscope_alt.py34
-rw-r--r--pyecsca/sca/scope/picoscope_sdk.py324
-rw-r--r--pyecsca/sca/target/ISO7816.py45
-rw-r--r--pyecsca/sca/target/PCSC.py2
-rw-r--r--pyecsca/sca/target/binary.py14
-rw-r--r--pyecsca/sca/target/chipwhisperer.py4
-rw-r--r--pyecsca/sca/target/ectester.py355
-rw-r--r--pyecsca/sca/target/simpleserial.py7
-rw-r--r--pyecsca/sca/trace/align.py103
-rw-r--r--pyecsca/sca/trace/combine.py12
-rw-r--r--pyecsca/sca/trace/edit.py11
-rw-r--r--pyecsca/sca/trace/filter.py26
-rw-r--r--pyecsca/sca/trace/match.py4
-rw-r--r--pyecsca/sca/trace/plot.py6
-rw-r--r--pyecsca/sca/trace/process.py20
-rw-r--r--pyecsca/sca/trace/sampling.py29
-rw-r--r--pyecsca/sca/trace/test.py25
-rw-r--r--pyecsca/sca/trace/trace.py21
-rw-r--r--pyecsca/sca/trace_set/base.py6
-rw-r--r--pyecsca/sca/trace_set/chipwhisperer.py20
-rw-r--r--pyecsca/sca/trace_set/hdf5.py27
-rw-r--r--pyecsca/sca/trace_set/inspector.py67
41 files changed, 1551 insertions, 514 deletions
diff --git a/pyecsca/ec/configuration.py b/pyecsca/ec/configuration.py
index 04f8fc7..bf20ec7 100644
--- a/pyecsca/ec/configuration.py
+++ b/pyecsca/ec/configuration.py
@@ -34,6 +34,7 @@ class EnumDefine(Enum):
@public
class Multiplication(EnumDefine):
"""Base multiplication algorithm to use."""
+
TOOM_COOK = "MUL_TOOM_COOK"
KARATSUBA = "MUL_KARATSUBA"
COMBA = "MUL_COMBA"
@@ -43,6 +44,7 @@ class Multiplication(EnumDefine):
@public
class Squaring(EnumDefine):
"""Base squaring algorithm to use."""
+
TOOM_COOK = "SQR_TOOM_COOK"
KARATSUBA = "SQR_KARATSUBA"
COMBA = "SQR_COMBA"
@@ -52,6 +54,7 @@ class Squaring(EnumDefine):
@public
class Reduction(EnumDefine):
"""Modular reduction method used."""
+
BARRETT = "RED_BARRETT"
MONTGOMERY = "RED_MONTGOMERY"
BASE = "RED_BASE"
@@ -60,6 +63,7 @@ class Reduction(EnumDefine):
@public
class Inversion(EnumDefine):
"""Inversion algorithm used."""
+
GCD = "INV_GCD"
EULER = "INV_EULER"
@@ -67,6 +71,7 @@ class Inversion(EnumDefine):
@public
class HashType(EnumDefine):
"""Hash algorithm used in ECDH and ECDSA."""
+
NONE = "HASH_NONE"
SHA1 = "HASH_SHA1"
SHA224 = "HASH_SHA224"
@@ -78,6 +83,7 @@ class HashType(EnumDefine):
@public
class RandomMod(EnumDefine):
"""Method of sampling a uniform integer modulo order."""
+
SAMPLE = "MOD_RAND_SAMPLE"
REDUCE = "MOD_RAND_REDUCE"
@@ -86,6 +92,7 @@ class RandomMod(EnumDefine):
@dataclass(frozen=True)
class Configuration(object):
"""An ECC implementation configuration."""
+
model: CurveModel
coords: CoordinateModel
formulas: FrozenSet[Formula]
@@ -120,8 +127,11 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None
"""
def is_optional(arg_type):
- return get_origin(arg_type) == Union and len(get_args(arg_type)) == 2 and \
- get_args(arg_type)[1] == type(None) # noqa
+ return (
+ get_origin(arg_type) == Union
+ and len(get_args(arg_type)) == 2
+ and get_args(arg_type)[1] == type(None) # noqa
+ )
def leaf_subclasses(cls):
subs = cls.__subclasses__()
@@ -140,7 +150,7 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None
"mult": Multiplication,
"sqr": Squaring,
"red": Reduction,
- "inv": Inversion
+ "inv": Inversion,
}
keys = list(filter(lambda key: key not in kwargs, options.keys()))
values = [options[key] for key in keys]
@@ -150,7 +160,11 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None
def multipliers(mult_classes, coords_formulas, fixed_args=None):
for mult_cls in mult_classes:
- if fixed_args is not None and "cls" in fixed_args and mult_cls != fixed_args["cls"]:
+ if (
+ fixed_args is not None
+ and "cls" in fixed_args
+ and mult_cls != fixed_args["cls"]
+ ):
continue
arg_options = {}
for name, required_type in get_type_hints(mult_cls.__init__).items():
@@ -160,17 +174,30 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None
if is_optional(required_type):
opt_type = get_args(required_type)[0]
if issubclass(opt_type, Formula):
- options = [formula for formula in coords_formulas if
- isinstance(formula, opt_type)] + [None]
+ options = [
+ formula
+ for formula in coords_formulas
+ if isinstance(formula, opt_type)
+ ] + [None]
else:
options = [None] # TODO: anything here?
- elif get_origin(required_type) is None and issubclass(required_type, Formula):
- options = [formula for formula in coords_formulas if
- isinstance(formula, required_type)]
- elif get_origin(required_type) is None and issubclass(required_type, bool):
+ elif get_origin(required_type) is None and issubclass(
+ required_type, Formula
+ ):
+ options = [
+ formula
+ for formula in coords_formulas
+ if isinstance(formula, required_type)
+ ]
+ elif get_origin(required_type) is None and issubclass(
+ required_type, bool
+ ):
options = [True, False]
- elif get_origin(required_type) is None and issubclass(required_type,
- int) and name == "width":
+ elif (
+ get_origin(required_type) is None
+ and issubclass(required_type, int)
+ and name == "width"
+ ):
options = [3, 5]
else:
options = []
@@ -198,19 +225,29 @@ def all_configurations(**kwargs) -> Generator[Configuration, Configuration, None
if "scalarmult" in kwargs:
if isinstance(kwargs["scalarmult"], ScalarMultiplier):
mults = [kwargs["scalarmult"]]
- if not set(kwargs["scalarmult"].formulas.values()).issubset(coords_formulas):
+ if not set(kwargs["scalarmult"].formulas.values()).issubset(
+ coords_formulas
+ ):
continue
- elif isinstance(kwargs["scalarmult"], type) and issubclass(kwargs["scalarmult"],
- ScalarMultiplier):
+ elif isinstance(kwargs["scalarmult"], type) and issubclass(
+ kwargs["scalarmult"], ScalarMultiplier
+ ):
mult_classes = list(
- filter(lambda mult: issubclass(mult, kwargs["scalarmult"]),
- mult_classes))
+ filter(
+ lambda mult: issubclass(mult, kwargs["scalarmult"]),
+ mult_classes,
+ )
+ )
mults = multipliers(mult_classes, coords_formulas)
else:
- mults = multipliers(mult_classes, coords_formulas, kwargs["scalarmult"])
+ mults = multipliers(
+ mult_classes, coords_formulas, kwargs["scalarmult"]
+ )
else:
mults = multipliers(mult_classes, coords_formulas)
for mult in mults:
formulas = frozenset(mult.formulas.values())
for independent_args in independents(kwargs):
- yield Configuration(model, coords, formulas, mult, **independent_args)
+ yield Configuration(
+ model, coords, formulas, mult, **independent_args
+ )
diff --git a/pyecsca/ec/context.py b/pyecsca/ec/context.py
index 3b38584..5bef891 100644
--- a/pyecsca/ec/context.py
+++ b/pyecsca/ec/context.py
@@ -23,6 +23,7 @@ from public import public
@public
class Action(object):
"""An Action."""
+
inside: bool
def __init__(self):
@@ -41,6 +42,7 @@ class Action(object):
@public
class ResultAction(Action):
"""An action that has a result."""
+
_result: Any = None
_has_result: bool = False
@@ -60,7 +62,12 @@ class ResultAction(Action):
return result
def __exit__(self, exc_type, exc_val, exc_tb):
- if not self._has_result and exc_type is None and exc_val is None and exc_tb is None:
+ if (
+ not self._has_result
+ and exc_type is None
+ and exc_val is None
+ and exc_tb is None
+ ):
raise RuntimeError("Result unset on action exit")
super().__exit__(exc_type, exc_val, exc_tb)
@@ -167,6 +174,7 @@ class NullContext(Context):
@public
class DefaultContext(Context):
"""A context that traces executions of actions in a tree."""
+
actions: Tree
current: List[Action]
@@ -190,6 +198,7 @@ class DefaultContext(Context):
@public
class PathContext(Context):
"""A context that traces targeted actions."""
+
path: List[int]
current: List[int]
current_depth: int
@@ -212,7 +221,7 @@ class PathContext(Context):
else:
self.current[self.current_depth] += 1
self.current_depth += 1
- if self.path == self.current[:self.current_depth]:
+ if self.path == self.current[: self.current_depth]:
self.value = action
def exit_action(self, action: Action) -> None:
@@ -221,10 +230,14 @@ class PathContext(Context):
self.current_depth -= 1
def __repr__(self):
- return f"{self.__class__.__name__}({self.current!r}, depth={self.current_depth!r})"
+ return (
+ f"{self.__class__.__name__}({self.current!r}, depth={self.current_depth!r})"
+ )
-_actual_context: ContextVar[Context] = ContextVar("operational_context", default=NullContext())
+_actual_context: ContextVar[Context] = ContextVar(
+ "operational_context", default=NullContext()
+)
class _ContextManager(object):
diff --git a/pyecsca/ec/coordinates.py b/pyecsca/ec/coordinates.py
index 7c76746..24a0b1a 100644
--- a/pyecsca/ec/coordinates.py
+++ b/pyecsca/ec/coordinates.py
@@ -8,14 +8,23 @@ from typing import List, Any, MutableMapping
from pkg_resources import resource_listdir, resource_isdir, resource_stream
from public import public
-from .formula import (Formula, EFDFormula, AdditionEFDFormula, DoublingEFDFormula,
- TriplingEFDFormula, DifferentialAdditionEFDFormula, LadderEFDFormula,
- ScalingEFDFormula, NegationEFDFormula)
+from .formula import (
+ Formula,
+ EFDFormula,
+ AdditionEFDFormula,
+ DoublingEFDFormula,
+ TriplingEFDFormula,
+ DifferentialAdditionEFDFormula,
+ LadderEFDFormula,
+ ScalingEFDFormula,
+ NegationEFDFormula,
+)
@public
class CoordinateModel(object):
"""A coordinate system for a particular model(form) of an elliptic curve."""
+
name: str
"""Name of the coordinate model"""
full_name: str
@@ -37,7 +46,7 @@ class CoordinateModel(object):
"""Formulas available on the coordinate system."""
def __repr__(self):
- return f"{self.__class__.__name__}(\"{self.name}\" on {self.curve_model.name})"
+ return f'{self.__class__.__name__}("{self.name}" on {self.curve_model.name})'
@public
@@ -61,7 +70,6 @@ class AffineCoordinateModel(CoordinateModel):
class EFDCoordinateModel(CoordinateModel):
-
def __init__(self, dir_path: str, name: str, curve_model: Any):
self.name = name
self.curve_model = curve_model
@@ -89,7 +97,7 @@ class EFDCoordinateModel(CoordinateModel):
"diffadd": DifferentialAdditionEFDFormula,
"ladder": LadderEFDFormula,
"scaling": ScalingEFDFormula,
- "negation": NegationEFDFormula
+ "negation": NegationEFDFormula,
}
cls = formula_types.get(formula_type, EFDFormula)
self.formulas[fname] = cls(join(dir_path, fname), fname, self)
@@ -118,7 +126,8 @@ class EFDCoordinateModel(CoordinateModel):
self.parameters.append(line[10:])
elif line.startswith("assume"):
self.assumptions.append(
- parse(line[7:].replace("^", "**"), mode="exec"))
+ parse(line[7:].replace("^", "**"), mode="exec")
+ )
line = f.readline().decode("ascii").rstrip()
def __eq__(self, other):
diff --git a/pyecsca/ec/curve.py b/pyecsca/ec/curve.py
index 4cfb978..bfa58c8 100644
--- a/pyecsca/ec/curve.py
+++ b/pyecsca/ec/curve.py
@@ -16,6 +16,7 @@ from .point import Point, InfinityPoint
@public
class EllipticCurve(object):
"""An elliptic curve."""
+
model: CurveModel
"""The model of the curve."""
coordinate_model: CoordinateModel
@@ -27,11 +28,23 @@ class EllipticCurve(object):
neutral: Point
"""The neutral point on the curve."""
- def __init__(self, model: CurveModel, coordinate_model: CoordinateModel,
- prime: int, neutral: Point, parameters: MutableMapping[str, Union[Mod, int]]):
- if coordinate_model not in model.coordinates.values() and not isinstance(coordinate_model, AffineCoordinateModel):
+ def __init__(
+ self,
+ model: CurveModel,
+ coordinate_model: CoordinateModel,
+ prime: int,
+ neutral: Point,
+ parameters: MutableMapping[str, Union[Mod, int]],
+ ):
+ if coordinate_model not in model.coordinates.values() and not isinstance(
+ coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
- if set(model.parameter_names).union(coordinate_model.parameters).symmetric_difference(parameters.keys()):
+ if (
+ set(model.parameter_names)
+ .union(coordinate_model.parameters)
+ .symmetric_difference(parameters.keys())
+ ):
raise ValueError
self.model = model
self.coordinate_model = coordinate_model
@@ -52,8 +65,11 @@ class EllipticCurve(object):
raise ValueError("Coordinate model of point is not affine.")
if point.coordinate_model.curve_model != self.model:
raise ValueError("Curve model of point does not match the curve.")
- locls = {var + str(i + 1): point.coords[var]
- for i, point in enumerate(points) for var in point.coords}
+ locls = {
+ var + str(i + 1): point.coords[var]
+ for i, point in enumerate(points)
+ for var in point.coords
+ }
locls.update(self.parameters)
for line in formulas:
exec(compile(line, "", mode="exec"), None, locls)
@@ -228,7 +244,9 @@ class EllipticCurve(object):
else:
raise NotImplementedError
else:
- raise ValueError(f"Wrong encoding type: {hex(encoded[0])}, should be one of 0x04, 0x06, 0x02, 0x03 or 0x00")
+ raise ValueError(
+ f"Wrong encoding type: {hex(encoded[0])}, should be one of 0x04, 0x06, 0x02, 0x03 or 0x00"
+ )
def affine_random(self) -> Point:
"""Generate a random affine point on the curve."""
@@ -246,7 +264,12 @@ class EllipticCurve(object):
def __eq__(self, other):
if not isinstance(other, EllipticCurve):
return False
- return self.model == other.model and self.coordinate_model == other.coordinate_model and self.prime == other.prime and self.parameters == other.parameters
+ return (
+ self.model == other.model
+ and self.coordinate_model == other.coordinate_model
+ and self.prime == other.prime
+ and self.parameters == other.parameters
+ )
def __str__(self):
return "EllipticCurve"
diff --git a/pyecsca/ec/formula.py b/pyecsca/ec/formula.py
index d0435f8..c0505e1 100644
--- a/pyecsca/ec/formula.py
+++ b/pyecsca/ec/formula.py
@@ -21,6 +21,7 @@ from ..misc.cfg import getconfig
@public
class OpResult(object):
"""A result of an operation."""
+
parents: Tuple
op: OpType
name: str
@@ -44,6 +45,7 @@ class OpResult(object):
@public
class FormulaAction(ResultAction):
"""An execution of a formula, on some input points and parameters, with some outputs."""
+
formula: "Formula"
"""The formula that was executed."""
inputs: MutableMapping[str, Mod]
@@ -95,6 +97,7 @@ class FormulaAction(ResultAction):
@public
class Formula(ABC):
"""A formula operating on points."""
+
name: str
"""Name of the formula."""
shortname: ClassVar[str]
@@ -131,7 +134,9 @@ class Formula(ABC):
raise ValueError(f"Wrong coordinate model of point {point}.")
for coord, value in point.coords.items():
if not isinstance(value, Mod) or value.n != field:
- raise ValueError(f"Wrong coordinate input {coord} = {value} of point {i}.")
+ raise ValueError(
+ f"Wrong coordinate input {coord} = {value} of point {i}."
+ )
params[coord + str(i + 1)] = value
def __validate_assumptions(self, field, params):
@@ -146,16 +151,22 @@ class Formula(ABC):
holds = eval(compiled, None, alocals)
if not holds:
# The assumption doesn't hold, see what is the current configured action and do it.
- raise_unsatisified_assumption(getconfig().ec.unsatisfied_formula_assumption_action,
- f"Unsatisfied assumption in the formula ({assumption_string}).")
+ raise_unsatisified_assumption(
+ getconfig().ec.unsatisfied_formula_assumption_action,
+ f"Unsatisfied assumption in the formula ({assumption_string}).",
+ )
else:
k = FF(field)
expr = sympify(f"{rhs} - {lhs}", evaluate=False)
for curve_param, value in params.items():
expr = expr.subs(curve_param, k(value))
- if len(expr.free_symbols) > 1 or (param := str(expr.free_symbols.pop())) not in self.parameters:
+ if (
+ len(expr.free_symbols) > 1
+ or (param := str(expr.free_symbols.pop())) not in self.parameters
+ ):
raise ValueError(
- f"This formula couldn't be executed due to an unsupported assumption ({assumption_string}).")
+ f"This formula couldn't be executed due to an unsupported assumption ({assumption_string})."
+ )
def resolve(expression):
if not expression.args:
@@ -178,7 +189,9 @@ class Formula(ABC):
params[param] = Mod(int(root), field)
break
else:
- raise UnsatisfiedAssumptionError(f"Unsatisfied assumption in the formula ({assumption_string}).")
+ raise UnsatisfiedAssumptionError(
+ f"Unsatisfied assumption in the formula ({assumption_string})."
+ )
def __call__(self, field: int, *points: Any, **params: Mod) -> Tuple[Any, ...]:
"""
@@ -190,6 +203,7 @@ class Formula(ABC):
:return: The resulting point(s).
"""
from .point import Point
+
self.__validate_params(field, params)
self.__validate_points(field, points, params)
self.__validate_assumptions(field, params)
@@ -201,7 +215,9 @@ class Formula(ABC):
# TODO: This is not general enough, if for example the op is `t = 1/2`, it will be float.
# Temporarily, add an assertion that this does not happen so we do not give bad results.
if isinstance(op_result, float):
- raise AssertionError(f"Bad stuff happened in op {op}, floats will pollute the results.")
+ raise AssertionError(
+ f"Bad stuff happened in op {op}, floats will pollute the results."
+ )
if not isinstance(op_result, Mod):
op_result = Mod(op_result, field)
action.add_operation(op, op_result)
@@ -285,8 +301,14 @@ class Formula(ABC):
@property
def num_addsubs(self) -> int:
"""Number of additions and subtractions."""
- return len(list(
- filter(lambda op: op.operator == OpType.Add or op.operator == OpType.Sub, self.code)))
+ return len(
+ list(
+ filter(
+ lambda op: op.operator == OpType.Add or op.operator == OpType.Sub,
+ self.code,
+ )
+ )
+ )
class EFDFormula(Formula):
@@ -313,7 +335,10 @@ class EFDFormula(Formula):
self.parameters.append(line[10:])
elif line.startswith("assume"):
self.assumptions.append(
- parse(line[7:].replace("=", "==").replace("^", "**"), mode="eval"))
+ parse(
+ line[7:].replace("=", "==").replace("^", "**"), mode="eval"
+ )
+ )
elif line.startswith("unified"):
self.unified = True
line = f.readline().decode("ascii").rstrip()
@@ -321,7 +346,9 @@ class EFDFormula(Formula):
def __read_op3_file(self, path):
with resource_stream(__name__, path) as f:
for line in f.readlines():
- code_module = parse(line.decode("ascii").replace("^", "**"), path, mode="exec")
+ code_module = parse(
+ line.decode("ascii").replace("^", "**"), path, mode="exec"
+ )
self.code.append(CodeOp(code_module))
@property
@@ -334,19 +361,29 @@ class EFDFormula(Formula):
@property
def inputs(self):
- return set(var + str(i) for var, i in product(self.coordinate_model.variables,
- range(1, 1 + self.num_inputs)))
+ return set(
+ var + str(i)
+ for var, i in product(
+ self.coordinate_model.variables, range(1, 1 + self.num_inputs)
+ )
+ )
@property
def outputs(self):
- return set(var + str(i) for var, i in product(self.coordinate_model.variables,
- range(self.output_index,
- self.output_index + self.num_outputs)))
+ return set(
+ var + str(i)
+ for var, i in product(
+ self.coordinate_model.variables,
+ range(self.output_index, self.output_index + self.num_outputs),
+ )
+ )
def __eq__(self, other):
if not isinstance(other, EFDFormula):
return False
- return self.name == other.name and self.coordinate_model == other.coordinate_model
+ return (
+ self.name == other.name and self.coordinate_model == other.coordinate_model
+ )
def __hash__(self):
return hash(self.name) + hash(self.coordinate_model)
@@ -355,6 +392,7 @@ class EFDFormula(Formula):
@public
class AdditionFormula(Formula, ABC):
"""A formula that adds two points."""
+
shortname = "add"
num_inputs = 2
num_outputs = 1
@@ -368,6 +406,7 @@ class AdditionEFDFormula(AdditionFormula, EFDFormula):
@public
class DoublingFormula(Formula, ABC):
"""A formula that doubles a point."""
+
shortname = "dbl"
num_inputs = 1
num_outputs = 1
@@ -381,6 +420,7 @@ class DoublingEFDFormula(DoublingFormula, EFDFormula):
@public
class TriplingFormula(Formula, ABC):
"""A formula that triples a point."""
+
shortname = "tpl"
num_inputs = 1
num_outputs = 1
@@ -394,6 +434,7 @@ class TriplingEFDFormula(TriplingFormula, EFDFormula):
@public
class NegationFormula(Formula, ABC):
"""A formula that negates a point."""
+
shortname = "neg"
num_inputs = 1
num_outputs = 1
@@ -407,6 +448,7 @@ class NegationEFDFormula(NegationFormula, EFDFormula):
@public
class ScalingFormula(Formula, ABC):
"""A formula that somehow scales the point (to a given representative of a projective class)."""
+
shortname = "scl"
num_inputs = 1
num_outputs = 1
@@ -423,6 +465,7 @@ class DifferentialAdditionFormula(Formula, ABC):
A differential addition formula that adds two points with a known difference.
The first input point is the difference of the third input and the second input (`P[0] = P[2] - P[1]`).
"""
+
shortname = "dadd"
num_inputs = 3
num_outputs = 1
@@ -441,6 +484,7 @@ class LadderFormula(Formula, ABC):
The first output point is the doubling of the second input point (`O[0] = 2 * P[1]`).
The second output point is the addition of the second and third input points (`O[1] = P[1] + P[2]`).
"""
+
shortname = "ladd"
num_inputs = 3
num_outputs = 2
diff --git a/pyecsca/ec/key_agreement.py b/pyecsca/ec/key_agreement.py
index c9e4a01..b58374f 100644
--- a/pyecsca/ec/key_agreement.py
+++ b/pyecsca/ec/key_agreement.py
@@ -16,12 +16,19 @@ from .point import Point
@public
class ECDHAction(ResultAction):
"""An ECDH key exchange."""
+
params: DomainParameters
hash_algo: Optional[Any]
privkey: Mod
pubkey: Point
- def __init__(self, params: DomainParameters, hash_algo: Optional[Any], privkey: Mod, pubkey: Point):
+ def __init__(
+ self,
+ params: DomainParameters,
+ hash_algo: Optional[Any],
+ privkey: Mod,
+ pubkey: Point,
+ ):
super().__init__()
self.params = params
self.hash_algo = hash_algo
@@ -35,14 +42,21 @@ class ECDHAction(ResultAction):
@public
class KeyAgreement(object):
"""An EC based key agreement primitive. (ECDH)"""
+
mult: ScalarMultiplier
params: DomainParameters
pubkey: Point
privkey: Mod
hash_algo: Optional[Any]
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point, privkey: Mod,
- hash_algo: Optional[Any] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ hash_algo: Optional[Any] = None,
+ ):
self.mult = mult
self.params = params
self.pubkey = pubkey
@@ -65,7 +79,9 @@ class KeyAgreement(object):
:return: The shared secret.
"""
- with ECDHAction(self.params, self.hash_algo, self.privkey, self.pubkey) as action:
+ with ECDHAction(
+ self.params, self.hash_algo, self.privkey, self.pubkey
+ ) as action:
affine_point = self.perform_raw()
x = int(affine_point.x)
p = self.params.curve.prime
@@ -80,8 +96,13 @@ class KeyAgreement(object):
class ECDH_NONE(KeyAgreement):
"""Raw x-coordinate ECDH."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey)
@@ -89,8 +110,13 @@ class ECDH_NONE(KeyAgreement):
class ECDH_SHA1(KeyAgreement):
"""ECDH with SHA1 of x-coordinate."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey, hashlib.sha1)
@@ -98,8 +124,13 @@ class ECDH_SHA1(KeyAgreement):
class ECDH_SHA224(KeyAgreement):
"""ECDH with SHA224 of x-coordinate."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey, hashlib.sha224)
@@ -107,8 +138,13 @@ class ECDH_SHA224(KeyAgreement):
class ECDH_SHA256(KeyAgreement):
"""ECDH with SHA256 of x-coordinate."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey, hashlib.sha256)
@@ -116,8 +152,13 @@ class ECDH_SHA256(KeyAgreement):
class ECDH_SHA384(KeyAgreement):
"""ECDH with SHA384 of x-coordinate."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey, hashlib.sha384)
@@ -125,6 +166,11 @@ class ECDH_SHA384(KeyAgreement):
class ECDH_SHA512(KeyAgreement):
"""ECDH with SHA512 of x-coordinate."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, pubkey: Point,
- privkey: Mod):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ pubkey: Point,
+ privkey: Mod,
+ ):
super().__init__(mult, params, pubkey, privkey, hashlib.sha512)
diff --git a/pyecsca/ec/key_generation.py b/pyecsca/ec/key_generation.py
index d506585..a08c767 100644
--- a/pyecsca/ec/key_generation.py
+++ b/pyecsca/ec/key_generation.py
@@ -15,6 +15,7 @@ from .point import Point
@public
class KeygenAction(ResultAction):
"""A key generation."""
+
params: DomainParameters
def __init__(self, params: DomainParameters):
@@ -28,11 +29,14 @@ class KeygenAction(ResultAction):
@public
class KeyGeneration(object):
"""Key generator."""
+
mult: ScalarMultiplier
params: DomainParameters
affine: bool
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters, affine: bool = False):
+ def __init__(
+ self, mult: ScalarMultiplier, params: DomainParameters, affine: bool = False
+ ):
"""
:param mult: The scalar multiplier to use during key generation.
:param params: The domain parameters over which to generate the keypair.
diff --git a/pyecsca/ec/mod.py b/pyecsca/ec/mod.py
index 8aca8a9..8485cbe 100644
--- a/pyecsca/ec/mod.py
+++ b/pyecsca/ec/mod.py
@@ -125,6 +125,7 @@ def _check(func):
@public
class RandomModAction(ResultAction):
"""A random sampling from Z_n."""
+
order: int
def __init__(self, order: int):
@@ -141,6 +142,7 @@ _mod_classes: Dict[str, Type] = {}
@public
class Mod(object):
"""An element x of ℤₙ."""
+
x: Any
n: Any
@@ -153,7 +155,9 @@ class Mod(object):
if selected_class not in _mod_classes:
# Fallback to something
selected_class = next(iter(_mod_classes.keys()))
- return _mod_classes[selected_class].__new__(_mod_classes[selected_class], *args, **kwargs)
+ return _mod_classes[selected_class].__new__(
+ _mod_classes[selected_class], *args, **kwargs
+ )
@_check
def __add__(self, other):
@@ -254,6 +258,7 @@ class Mod(object):
@public
class RawMod(Mod):
"""An element x of ℤₙ (implemented using Python integers)."""
+
x: int
n: int
@@ -462,6 +467,7 @@ def _symbolic_check(func):
@public
class SymbolicMod(Mod):
"""A symbolic element x of ℤₙ (implemented using sympy)."""
+
x: Expr
n: int
@@ -489,10 +495,10 @@ class SymbolicMod(Mod):
return -self + other
def __neg__(self):
- return self.__class__(- self.x, self.n)
+ return self.__class__(-self.x, self.n)
def inverse(self):
- return self.__class__(self.x**(-1), self.n)
+ return self.__class__(self.x ** (-1), self.n)
def sqrt(self):
raise NotImplementedError
@@ -569,6 +575,7 @@ if has_gmp:
@public
class GMPMod(Mod):
"""An element x of ℤₙ. Implemented by GMP."""
+
x: gmpy2.mpz
n: gmpy2.mpz
diff --git a/pyecsca/ec/model.py b/pyecsca/ec/model.py
index 46b03b7..861dfca 100644
--- a/pyecsca/ec/model.py
+++ b/pyecsca/ec/model.py
@@ -14,6 +14,7 @@ from .coordinates import EFDCoordinateModel, CoordinateModel
@public
class CurveModel(object):
"""A model(form) of an elliptic curve."""
+
name: str
shortname: str
coordinates: MutableMapping[str, CoordinateModel]
@@ -134,6 +135,7 @@ class MontgomeryModel(EFDCurveModel):
B y^2 = x^3 + A x^2 + x
"""
+
def __init__(self):
super().__init__("montgom")
diff --git a/pyecsca/ec/mult.py b/pyecsca/ec/mult.py
index 6fd6feb..7614dd4 100644
--- a/pyecsca/ec/mult.py
+++ b/pyecsca/ec/mult.py
@@ -8,8 +8,15 @@ from typing import Mapping, Tuple, Optional, MutableMapping, ClassVar, Set, Type
from public import public
from .context import ResultAction, Action
-from .formula import (Formula, AdditionFormula, DoublingFormula, DifferentialAdditionFormula,
- ScalingFormula, LadderFormula, NegationFormula)
+from .formula import (
+ Formula,
+ AdditionFormula,
+ DoublingFormula,
+ DifferentialAdditionFormula,
+ ScalingFormula,
+ LadderFormula,
+ NegationFormula,
+)
from .naf import naf, wnaf
from .params import DomainParameters
from .point import Point
@@ -18,6 +25,7 @@ from .point import Point
@public
class ScalarMultiplicationAction(ResultAction):
"""A scalar multiplication of a point on a curve by a scalar."""
+
point: Point
scalar: int
@@ -33,6 +41,7 @@ class ScalarMultiplicationAction(ResultAction):
@public
class PrecomputationAction(Action):
"""A precomputation of a point in scalar multiplication."""
+
params: DomainParameters
point: Point
@@ -51,6 +60,7 @@ class ScalarMultiplier(ABC):
of the point at infinity.
:param formulas: Formulas this instance will use.
"""
+
requires: ClassVar[Set[Type]] # Type[Formula] but mypy has a false positive
"""The set of formulas that the multiplier requires."""
optionals: ClassVar[Set[Type]] # Type[Formula] but mypy has a false positive
@@ -64,8 +74,16 @@ class ScalarMultiplier(ABC):
_initialized: bool = False
def __init__(self, short_circuit: bool = True, **formulas: Optional[Formula]):
- if len(set(formula.coordinate_model for formula in formulas.values() if
- formula is not None)) != 1:
+ if (
+ len(
+ set(
+ formula.coordinate_model
+ for formula in formulas.values()
+ if formula is not None
+ )
+ )
+ != 1
+ ):
raise ValueError
self.short_circuit = short_circuit
self.formulas = {k: v for k, v in formulas.items() if v is not None}
@@ -78,7 +96,9 @@ class ScalarMultiplier(ABC):
return copy(other)
if other == self._params.curve.neutral:
return copy(one)
- return self.formulas["add"](self._params.curve.prime, one, other, **self._params.curve.parameters)[0]
+ return self.formulas["add"](
+ self._params.curve.prime, one, other, **self._params.curve.parameters
+ )[0]
def _dbl(self, point: Point) -> Point:
if "dbl" not in self.formulas:
@@ -86,12 +106,16 @@ class ScalarMultiplier(ABC):
if self.short_circuit:
if point == self._params.curve.neutral:
return copy(point)
- return self.formulas["dbl"](self._params.curve.prime, point, **self._params.curve.parameters)[0]
+ return self.formulas["dbl"](
+ self._params.curve.prime, point, **self._params.curve.parameters
+ )[0]
def _scl(self, point: Point) -> Point:
if "scl" not in self.formulas:
raise NotImplementedError
- return self.formulas["scl"](self._params.curve.prime, point, **self._params.curve.parameters)[0]
+ return self.formulas["scl"](
+ self._params.curve.prime, point, **self._params.curve.parameters
+ )[0]
def _ladd(self, start: Point, to_dbl: Point, to_add: Point) -> Tuple[Point, ...]:
if "ladd" not in self.formulas:
@@ -101,7 +125,13 @@ class ScalarMultiplier(ABC):
return to_dbl, to_add
if to_add == self._params.curve.neutral:
return self._dbl(to_dbl), to_dbl
- return self.formulas["ladd"](self._params.curve.prime, start, to_dbl, to_add, **self._params.curve.parameters)
+ return self.formulas["ladd"](
+ self._params.curve.prime,
+ start,
+ to_dbl,
+ to_add,
+ **self._params.curve.parameters,
+ )
def _dadd(self, start: Point, one: Point, other: Point) -> Point:
if "dadd" not in self.formulas:
@@ -111,12 +141,16 @@ class ScalarMultiplier(ABC):
return copy(other)
if other == self._params.curve.neutral:
return copy(one)
- return self.formulas["dadd"](self._params.curve.prime, start, one, other, **self._params.curve.parameters)[0]
+ return self.formulas["dadd"](
+ self._params.curve.prime, start, one, other, **self._params.curve.parameters
+ )[0]
def _neg(self, point: Point) -> Point:
if "neg" not in self.formulas:
raise NotImplementedError
- return self.formulas["neg"](self._params.curve.prime, point, **self._params.curve.parameters)[0]
+ return self.formulas["neg"](
+ self._params.curve.prime, point, **self._params.curve.parameters
+ )[0]
def init(self, params: DomainParameters, point: Point):
"""
@@ -129,7 +163,10 @@ class ScalarMultiplier(ABC):
:param point: The point to initialize the multiplier with.
"""
coord_model = set(self.formulas.values()).pop().coordinate_model
- if params.curve.coordinate_model != coord_model or point.coordinate_model != coord_model:
+ if (
+ params.curve.coordinate_model != coord_model
+ or point.coordinate_model != coord_model
+ ):
raise ValueError
self._params = params
self._point = point
@@ -156,14 +193,21 @@ class LTRMultiplier(ScalarMultiplier):
The `always` parameter determines whether the double and add always method is used.
"""
+
requires = {AdditionFormula, DoublingFormula}
optionals = {ScalingFormula}
always: bool
complete: bool
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula,
- scl: ScalingFormula = None, always: bool = False, complete: bool = True,
- short_circuit: bool = True):
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ scl: ScalingFormula = None,
+ always: bool = False,
+ complete: bool = True,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl)
self.always = always
self.complete = complete
@@ -201,12 +245,19 @@ class RTLMultiplier(ScalarMultiplier):
The `always` parameter determines whether the double and add always method is used.
"""
+
requires = {AdditionFormula, DoublingFormula}
optionals = {ScalingFormula}
always: bool
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula,
- scl: ScalingFormula = None, always: bool = False, short_circuit: bool = True):
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ scl: ScalingFormula = None,
+ always: bool = False,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl)
self.always = always
@@ -240,11 +291,17 @@ class CoronMultiplier(ScalarMultiplier):
https://link.springer.com/content/pdf/10.1007/3-540-48059-5_25.pdf
"""
+
requires = {AdditionFormula, DoublingFormula}
optionals = {ScalingFormula}
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula, scl: ScalingFormula = None,
- short_circuit: bool = True):
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ scl: ScalingFormula = None,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl)
def multiply(self, scalar: int) -> Point:
@@ -270,12 +327,19 @@ class LadderMultiplier(ScalarMultiplier):
"""
Montgomery ladder multiplier, using a three input, two output ladder formula.
"""
+
requires = {LadderFormula}
optionals = {DoublingFormula, ScalingFormula}
complete: bool
- def __init__(self, ladd: LadderFormula, dbl: DoublingFormula = None, scl: ScalingFormula = None,
- complete: bool = True, short_circuit: bool = True):
+ def __init__(
+ self,
+ ladd: LadderFormula,
+ dbl: DoublingFormula = None,
+ scl: ScalingFormula = None,
+ complete: bool = True,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, ladd=ladd, dbl=dbl, scl=scl)
self.complete = complete
if (not complete or short_circuit) and dbl is None:
@@ -311,12 +375,19 @@ class SimpleLadderMultiplier(ScalarMultiplier):
"""
Montgomery ladder multiplier, using addition and doubling formulas.
"""
+
requires = {AdditionFormula, DoublingFormula}
optionals = {ScalingFormula}
complete: bool
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula, scl: ScalingFormula = None,
- complete: bool = True, short_circuit: bool = True):
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ scl: ScalingFormula = None,
+ complete: bool = True,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, scl=scl)
self.complete = complete
@@ -349,12 +420,19 @@ class DifferentialLadderMultiplier(ScalarMultiplier):
"""
Montgomery ladder multiplier, using differential addition and doubling formulas.
"""
+
requires = {DifferentialAdditionFormula, DoublingFormula}
optionals = {ScalingFormula}
complete: bool
- def __init__(self, dadd: DifferentialAdditionFormula, dbl: DoublingFormula,
- scl: ScalingFormula = None, complete: bool = True, short_circuit: bool = True):
+ def __init__(
+ self,
+ dadd: DifferentialAdditionFormula,
+ dbl: DoublingFormula,
+ scl: ScalingFormula = None,
+ complete: bool = True,
+ short_circuit: bool = True,
+ ):
super().__init__(short_circuit=short_circuit, dadd=dadd, dbl=dbl, scl=scl)
self.complete = complete
@@ -388,13 +466,22 @@ class BinaryNAFMultiplier(ScalarMultiplier):
"""
Binary NAF (Non Adjacent Form) multiplier, left-to-right.
"""
+
requires = {AdditionFormula, DoublingFormula, NegationFormula}
optionals = {ScalingFormula}
_point_neg: Point
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula,
- neg: NegationFormula, scl: ScalingFormula = None, short_circuit: bool = True):
- super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl)
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ neg: NegationFormula,
+ scl: ScalingFormula = None,
+ short_circuit: bool = True,
+ ):
+ super().__init__(
+ short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl
+ )
def init(self, params: DomainParameters, point: Point):
with PrecomputationAction(params, point):
@@ -425,6 +512,7 @@ class WindowNAFMultiplier(ScalarMultiplier):
"""
Window NAF (Non Adjacent Form) multiplier, left-to-right.
"""
+
requires = {AdditionFormula, DoublingFormula, NegationFormula}
optionals = {ScalingFormula}
_points: MutableMapping[int, Point]
@@ -432,10 +520,19 @@ class WindowNAFMultiplier(ScalarMultiplier):
precompute_negation: bool = False
width: int
- def __init__(self, add: AdditionFormula, dbl: DoublingFormula,
- neg: NegationFormula, width: int, scl: ScalingFormula = None,
- precompute_negation: bool = False, short_circuit: bool = True):
- super().__init__(short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl)
+ def __init__(
+ self,
+ add: AdditionFormula,
+ dbl: DoublingFormula,
+ neg: NegationFormula,
+ width: int,
+ scl: ScalingFormula = None,
+ precompute_negation: bool = False,
+ short_circuit: bool = True,
+ ):
+ super().__init__(
+ short_circuit=short_circuit, add=add, dbl=dbl, neg=neg, scl=scl
+ )
self.width = width
self.precompute_negation = precompute_negation
diff --git a/pyecsca/ec/op.py b/pyecsca/ec/op.py
index 2401624..e500914 100644
--- a/pyecsca/ec/op.py
+++ b/pyecsca/ec/op.py
@@ -1,8 +1,23 @@
"""
This module provides a class for a code operation.
"""
-from ast import (Module, walk, Name, BinOp, UnaryOp, Constant, Mult, Div, Add, Sub, Pow, Assign,
- operator as ast_operator, unaryop as ast_unaryop, USub)
+from ast import (
+ Module,
+ walk,
+ Name,
+ BinOp,
+ UnaryOp,
+ Constant,
+ Mult,
+ Div,
+ Add,
+ Sub,
+ Pow,
+ Assign,
+ operator as ast_operator,
+ unaryop as ast_unaryop,
+ USub,
+)
from enum import Enum
from types import CodeType
from typing import FrozenSet, cast, Any, Optional, Union
@@ -15,6 +30,7 @@ from .mod import Mod
@public
class OpType(Enum):
"""A type of binary and unary operators."""
+
Add = (2, "+")
Sub = (2, "-")
Neg = (1, "-")
@@ -33,6 +49,7 @@ class OpType(Enum):
@public
class CodeOp(object):
"""An operation that can be executed."""
+
result: str
"""The result variable of the operation (e.g. the `r` in `r = 2*a`)."""
parameters: FrozenSet[str]
@@ -90,7 +107,9 @@ class CodeOp(object):
else:
return None
- def __to_op(self, op: Optional[Union[ast_operator, ast_unaryop]], left: Any, right: Any) -> OpType:
+ def __to_op(
+ self, op: Optional[Union[ast_operator, ast_unaryop]], left: Any, right: Any
+ ) -> OpType:
if isinstance(op, Mult):
return OpType.Mult
elif isinstance(op, Div):
diff --git a/pyecsca/ec/params.py b/pyecsca/ec/params.py
index 6583bae..370b8eb 100644
--- a/pyecsca/ec/params.py
+++ b/pyecsca/ec/params.py
@@ -17,8 +17,13 @@ from .coordinates import AffineCoordinateModel, CoordinateModel
from .curve import EllipticCurve
from .error import UnsatisfiedAssumptionError, raise_unsatisified_assumption
from .mod import Mod
-from .model import (CurveModel, ShortWeierstrassModel, MontgomeryModel, EdwardsModel,
- TwistedEdwardsModel)
+from .model import (
+ CurveModel,
+ ShortWeierstrassModel,
+ MontgomeryModel,
+ EdwardsModel,
+ TwistedEdwardsModel,
+)
from .point import Point, InfinityPoint
from ..misc.cfg import getconfig
@@ -26,6 +31,7 @@ from ..misc.cfg import getconfig
@public
class DomainParameters(object):
"""Domain parameters which specify a subgroup on an elliptic curve."""
+
curve: EllipticCurve
generator: Point
order: int
@@ -33,8 +39,15 @@ class DomainParameters(object):
name: Optional[str]
category: Optional[str]
- def __init__(self, curve: EllipticCurve, generator: Point, order: int,
- cofactor: int, name: Optional[str] = None, category: Optional[str] = None):
+ def __init__(
+ self,
+ curve: EllipticCurve,
+ generator: Point,
+ order: int,
+ cofactor: int,
+ name: Optional[str] = None,
+ category: Optional[str] = None,
+ ):
self.curve = curve
self.generator = generator
self.order = order
@@ -45,7 +58,12 @@ class DomainParameters(object):
def __eq__(self, other):
if not isinstance(other, DomainParameters):
return False
- return self.curve == other.curve and self.generator == other.generator and self.order == other.order and self.cofactor == other.cofactor
+ return (
+ self.curve == other.curve
+ and self.generator == other.generator
+ and self.order == other.order
+ and self.cofactor == other.cofactor
+ )
def __get_name(self):
if self.name and self.category:
@@ -69,6 +87,7 @@ class DomainParameters(object):
@public
class DomainParameterCategory(object):
"""A category of domain parameters."""
+
name: str
description: str
curves: List[DomainParameters]
@@ -119,7 +138,9 @@ def _create_params(curve, coords, infty):
param_names = ["a", "d"]
else:
raise ValueError("Unknown curve model.")
- params = {name: Mod(int(curve["params"][name]["raw"], 16), field) for name in param_names}
+ params = {
+ name: Mod(int(curve["params"][name]["raw"], 16), field) for name in param_names
+ }
# Check coordinate model name and assumptions
coord_model: CoordinateModel
@@ -139,8 +160,10 @@ def _create_params(curve, coords, infty):
exec(compiled, None, alocals)
for param, value in alocals.items():
if params[param] != value:
- raise_unsatisified_assumption(getconfig().ec.unsatisfied_coordinate_assumption_action,
- f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (= {value}).")
+ raise_unsatisified_assumption(
+ getconfig().ec.unsatisfied_coordinate_assumption_action,
+ f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (= {value}).",
+ )
except NameError:
k = FF(field)
assumption_string = unparse(assumption)
@@ -148,15 +171,23 @@ def _create_params(curve, coords, infty):
expr = sympify(f"{rhs} - {lhs}")
for curve_param, value in params.items():
expr = expr.subs(curve_param, k(value))
- if len(expr.free_symbols) > 1 or (param := str(expr.free_symbols.pop())) not in coord_model.parameters:
- raise ValueError(f"This coordinate model couldn't be loaded due to an unsupported assumption ({assumption_string}).")
+ if (
+ len(expr.free_symbols) > 1
+ or (param := str(expr.free_symbols.pop()))
+ not in coord_model.parameters
+ ):
+ raise ValueError(
+ f"This coordinate model couldn't be loaded due to an unsupported assumption ({assumption_string})."
+ )
poly = Poly(expr, symbols(param), domain=k)
roots = poly.ground_roots()
for root in roots.keys():
params[param] = Mod(int(root), field)
break
else:
- raise UnsatisfiedAssumptionError(f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (0 = {expr}).")
+ raise UnsatisfiedAssumptionError(
+ f"Coordinate model {coord_model} has an unsatisifed assumption on the {param} parameter (0 = {expr})."
+ )
# Construct the point at infinity
infinity: Point
@@ -170,26 +201,35 @@ def _create_params(curve, coords, infty):
infinity_coords = {}
for coordinate in coord_model.variables:
if coordinate not in ilocals:
- raise ValueError(f"Coordinate model {coord_model} requires infty option.")
+ raise ValueError(
+ f"Coordinate model {coord_model} requires infty option."
+ )
value = ilocals[coordinate]
if isinstance(value, int):
value = Mod(value, field)
infinity_coords[coordinate] = value
infinity = Point(coord_model, **infinity_coords)
elliptic_curve = EllipticCurve(model, coord_model, field, infinity, params) # type: ignore[arg-type]
- affine = Point(AffineCoordinateModel(model),
- x=Mod(int(curve["generator"]["x"]["raw"], 16), field),
- y=Mod(int(curve["generator"]["y"]["raw"], 16), field))
+ affine = Point(
+ AffineCoordinateModel(model),
+ x=Mod(int(curve["generator"]["x"]["raw"], 16), field),
+ y=Mod(int(curve["generator"]["y"]["raw"], 16), field),
+ )
if not isinstance(coord_model, AffineCoordinateModel):
generator = affine.to_model(coord_model, elliptic_curve)
else:
generator = affine
- return DomainParameters(elliptic_curve, generator, order, cofactor, curve["name"], curve["category"])
+ return DomainParameters(
+ elliptic_curve, generator, order, cofactor, curve["name"], curve["category"]
+ )
@public
-def load_category(file: Union[str, Path, BinaryIO], coords: Union[str, Callable[[str], str]],
- infty: Union[bool, Callable[[str], bool]] = True) -> DomainParameterCategory:
+def load_category(
+ file: Union[str, Path, BinaryIO],
+ coords: Union[str, Callable[[str], str]],
+ infty: Union[bool, Callable[[str], bool]] = True,
+) -> DomainParameterCategory:
"""
Load a category of domain parameters containing several curves from a JSON file.
@@ -223,7 +263,9 @@ def load_category(file: Union[str, Path, BinaryIO], coords: Union[str, Callable[
@public
-def load_params(file: Union[str, Path, BinaryIO], coords: str, infty: bool = True) -> DomainParameters:
+def load_params(
+ file: Union[str, Path, BinaryIO], coords: str, infty: bool = True
+) -> DomainParameters:
"""
Load a curve from a JSON file.
@@ -245,8 +287,11 @@ def load_params(file: Union[str, Path, BinaryIO], coords: str, infty: bool = Tru
@public
-def get_category(category: str, coords: Union[str, Callable[[str], str]],
- infty: Union[bool, Callable[[str], bool]] = True) -> DomainParameterCategory:
+def get_category(
+ category: str,
+ coords: Union[str, Callable[[str], str]],
+ infty: Union[bool, Callable[[str], bool]] = True,
+) -> DomainParameterCategory:
"""
Retrieve a category from the std-curves database at https://github.com/J08nY/std-curves.
@@ -259,7 +304,9 @@ def get_category(category: str, coords: Union[str, Callable[[str], str]],
:return: The category.
"""
listing = resource_listdir(__name__, "std")
- categories = list(entry for entry in listing if resource_isdir(__name__, join("std", entry)))
+ categories = list(
+ entry for entry in listing if resource_isdir(__name__, join("std", entry))
+ )
if category not in categories:
raise ValueError(f"Category {category} not found.")
json_path = join("std", category, "curves.json")
@@ -268,7 +315,9 @@ def get_category(category: str, coords: Union[str, Callable[[str], str]],
@public
-def get_params(category: str, name: str, coords: str, infty: bool = True) -> DomainParameters:
+def get_params(
+ category: str, name: str, coords: str, infty: bool = True
+) -> DomainParameters:
"""
Retrieve a curve from a set of stored parameters. Uses the std-curves database at
https://github.com/J08nY/std-curves.
@@ -281,7 +330,9 @@ def get_params(category: str, name: str, coords: str, infty: bool = True) -> Dom
:return: The curve.
"""
listing = resource_listdir(__name__, "std")
- categories = list(entry for entry in listing if resource_isdir(__name__, join("std", entry)))
+ categories = list(
+ entry for entry in listing if resource_isdir(__name__, join("std", entry))
+ )
if category not in categories:
raise ValueError(f"Category {category} not found.")
json_path = join("std", category, "curves.json")
diff --git a/pyecsca/ec/point.py b/pyecsca/ec/point.py
index 6ba6469..1d074fe 100644
--- a/pyecsca/ec/point.py
+++ b/pyecsca/ec/point.py
@@ -19,11 +19,14 @@ if TYPE_CHECKING:
@public
class CoordinateMappingAction(ResultAction):
"""A mapping of a point from one coordinate system to another one, usually one is an affine one."""
+
model_from: CoordinateModel
model_to: CoordinateModel
point: "Point"
- def __init__(self, model_from: CoordinateModel, model_to: CoordinateModel, point: "Point"):
+ def __init__(
+ self, model_from: CoordinateModel, model_to: CoordinateModel, point: "Point"
+ ):
super().__init__()
self.model_from = model_from
self.model_to = model_to
@@ -36,6 +39,7 @@ class CoordinateMappingAction(ResultAction):
@public
class Point(object):
"""A point with coordinates in a coordinate model."""
+
coordinate_model: CoordinateModel
coords: Mapping[str, Mod]
field: int
@@ -51,7 +55,9 @@ class Point(object):
field = value.n
else:
if field != value.n:
- raise ValueError(f"Mismatched coordinate field of definition, {field} vs {value.n}.")
+ raise ValueError(
+ f"Mismatched coordinate field of definition, {field} vs {value.n}."
+ )
self.field = field if field is not None else 0
def __getattribute__(self, name):
@@ -65,7 +71,9 @@ class Point(object):
def to_affine(self) -> "Point":
"""Convert this point into the affine coordinate model, if possible."""
affine_model = AffineCoordinateModel(self.coordinate_model.curve_model)
- with CoordinateMappingAction(self.coordinate_model, affine_model, self) as action:
+ with CoordinateMappingAction(
+ self.coordinate_model, affine_model, self
+ ) as action:
if isinstance(self.coordinate_model, AffineCoordinateModel):
return action.exit(copy(self))
ops = []
@@ -91,11 +99,15 @@ class Point(object):
result[op.result] = locls[op.result]
return action.exit(Point(affine_model, **result))
- def to_model(self, coordinate_model: CoordinateModel, curve: "EllipticCurve") -> "Point":
+ def to_model(
+ self, coordinate_model: CoordinateModel, curve: "EllipticCurve"
+ ) -> "Point":
"""Convert an affine point into a given coordinate model, if possible."""
if not isinstance(self.coordinate_model, AffineCoordinateModel):
raise ValueError
- with CoordinateMappingAction(self.coordinate_model, coordinate_model, self) as action:
+ with CoordinateMappingAction(
+ self.coordinate_model, coordinate_model, self
+ ) as action:
ops = []
for s in coordinate_model.satisfying:
try:
@@ -114,7 +126,10 @@ class Point(object):
result[var] = locls[var]
elif var == "X":
result[var] = self.coords["x"]
- if isinstance(coordinate_model, EFDCoordinateModel) and coordinate_model.name == "inverted":
+ if (
+ isinstance(coordinate_model, EFDCoordinateModel)
+ and coordinate_model.name == "inverted"
+ ):
result[var] = result[var].inverse()
elif var == "Y":
result[var] = self.coords["y"]
@@ -124,11 +139,13 @@ class Point(object):
elif coordinate_model.name == "yz":
result[var] = result[var] * curve.parameters["r"]
elif coordinate_model.name == "yzsquared":
- result[var] = result[var]**2 * curve.parameters["r"]
+ result[var] = result[var] ** 2 * curve.parameters["r"]
elif var.startswith("Z"):
result[var] = Mod(1, curve.prime)
elif var == "T":
- result[var] = Mod(int(self.coords["x"] * self.coords["y"]), curve.prime)
+ result[var] = Mod(
+ int(self.coords["x"] * self.coords["y"]), curve.prime
+ )
else:
raise NotImplementedError
return action.exit(Point(coordinate_model, **result))
@@ -201,7 +218,9 @@ class InfinityPoint(Point):
def to_affine(self) -> "InfinityPoint":
return InfinityPoint(AffineCoordinateModel(self.coordinate_model.curve_model))
- def to_model(self, coordinate_model: CoordinateModel, curve: "EllipticCurve") -> "InfinityPoint":
+ def to_model(
+ self, coordinate_model: CoordinateModel, curve: "EllipticCurve"
+ ) -> "InfinityPoint":
return InfinityPoint(coordinate_model)
def equals_affine(self, other: "Point") -> bool:
diff --git a/pyecsca/ec/signature.py b/pyecsca/ec/signature.py
index 9df6599..0e6f546 100644
--- a/pyecsca/ec/signature.py
+++ b/pyecsca/ec/signature.py
@@ -18,12 +18,20 @@ from .point import Point
@public
class SignatureResult(object):
"""An ECDSA signature result (r, s)."""
+
r: int
s: int
- def __init__(self, r: int, s: int, data: Optional[bytes] = None, digest: Optional[bytes] = None,
- nonce: Optional[int] = None, privkey: Optional[Mod] = None,
- pubkey: Optional[Point] = None):
+ def __init__(
+ self,
+ r: int,
+ s: int,
+ data: Optional[bytes] = None,
+ digest: Optional[bytes] = None,
+ nonce: Optional[int] = None,
+ privkey: Optional[Mod] = None,
+ pubkey: Optional[Point] = None,
+ ):
self.r = r
self.s = s
@@ -55,12 +63,12 @@ class SignatureResult(object):
@public
class ECDSAAction(Action):
"""An ECDSA action base class."""
+
params: DomainParameters
hash_algo: Optional[Any]
msg: bytes
- def __init__(self, params: DomainParameters, hash_algo: Optional[Any],
- msg: bytes):
+ def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes):
super().__init__()
self.params = params
self.hash_algo = hash_algo
@@ -73,10 +81,16 @@ class ECDSAAction(Action):
@public
class ECDSASignAction(ECDSAAction):
"""An ECDSA signing."""
+
privkey: Mod
- def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes,
- privkey: Mod):
+ def __init__(
+ self,
+ params: DomainParameters,
+ hash_algo: Optional[Any],
+ msg: bytes,
+ privkey: Mod,
+ ):
super().__init__(params, hash_algo, msg)
self.privkey = privkey
@@ -87,11 +101,18 @@ class ECDSASignAction(ECDSAAction):
@public
class ECDSAVerifyAction(ECDSAAction):
"""An ECDSA verification."""
+
signature: SignatureResult
pubkey: Point
- def __init__(self, params: DomainParameters, hash_algo: Optional[Any], msg: bytes,
- signature: SignatureResult, pubkey: Point):
+ def __init__(
+ self,
+ params: DomainParameters,
+ hash_algo: Optional[Any],
+ msg: bytes,
+ signature: SignatureResult,
+ pubkey: Point,
+ ):
super().__init__(params, hash_algo, msg)
self.signature = signature
self.pubkey = pubkey
@@ -103,6 +124,7 @@ class ECDSAVerifyAction(ECDSAAction):
@public
class Signature(object):
"""An EC based signature primitive. (ECDSA)"""
+
mult: ScalarMultiplier
params: DomainParameters
add: Optional[AdditionFormula]
@@ -110,10 +132,15 @@ class Signature(object):
privkey: Optional[Mod]
hash_algo: Optional[Any]
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None,
- hash_algo: Optional[Any] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ hash_algo: Optional[Any] = None,
+ ):
if pubkey is None and privkey is None:
raise ValueError
if add is None:
@@ -153,8 +180,9 @@ class Signature(object):
affine_point = point.to_affine()
r = Mod(int(affine_point.x), self.params.order)
s = nonce.inverse() * (Mod(z, self.params.order) + r * self.privkey)
- return SignatureResult(int(r), int(s), digest=digest, nonce=int(nonce),
- privkey=self.privkey)
+ return SignatureResult(
+ int(r), int(s), digest=digest, nonce=int(nonce), privkey=self.privkey
+ )
def sign_hash(self, digest: bytes, nonce: Optional[int] = None) -> SignatureResult:
"""Sign already hashed data."""
@@ -203,7 +231,9 @@ class Signature(object):
"""Verify data."""
if not self.can_verify or self.pubkey is None:
raise RuntimeError("This instance cannot verify.")
- with ECDSAVerifyAction(self.params, self.hash_algo, data, signature, self.pubkey):
+ with ECDSAVerifyAction(
+ self.params, self.hash_algo, data, signature, self.pubkey
+ ):
if self.hash_algo is None:
digest = data
else:
@@ -215,9 +245,14 @@ class Signature(object):
class ECDSA_NONE(Signature):
"""ECDSA with raw message input."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey)
@@ -225,9 +260,14 @@ class ECDSA_NONE(Signature):
class ECDSA_SHA1(Signature):
"""ECDSA with SHA1."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey, hashlib.sha1)
@@ -235,9 +275,14 @@ class ECDSA_SHA1(Signature):
class ECDSA_SHA224(Signature):
"""ECDSA with SHA224."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey, hashlib.sha224)
@@ -245,9 +290,14 @@ class ECDSA_SHA224(Signature):
class ECDSA_SHA256(Signature):
"""ECDSA with SHA256."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey, hashlib.sha256)
@@ -255,9 +305,14 @@ class ECDSA_SHA256(Signature):
class ECDSA_SHA384(Signature):
"""ECDSA with SHA384."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey, hashlib.sha384)
@@ -265,7 +320,12 @@ class ECDSA_SHA384(Signature):
class ECDSA_SHA512(Signature):
"""ECDSA with SHA512."""
- def __init__(self, mult: ScalarMultiplier, params: DomainParameters,
- add: Optional[AdditionFormula] = None,
- pubkey: Optional[Point] = None, privkey: Optional[Mod] = None):
+ def __init__(
+ self,
+ mult: ScalarMultiplier,
+ params: DomainParameters,
+ add: Optional[AdditionFormula] = None,
+ pubkey: Optional[Point] = None,
+ privkey: Optional[Mod] = None,
+ ):
super().__init__(mult, params, add, pubkey, privkey, hashlib.sha512)
diff --git a/pyecsca/ec/transformations.py b/pyecsca/ec/transformations.py
index 986064b..0987235 100644
--- a/pyecsca/ec/transformations.py
+++ b/pyecsca/ec/transformations.py
@@ -22,8 +22,12 @@ def __M_map(params, param_names, map_parameters, map_point, model):
else:
neutral = map_point(param_one, param_other, params.curve.neutral, aff)
curve = EllipticCurve(model, aff, params.curve.prime, neutral, parameters)
- return DomainParameters(curve, map_point(param_one, param_other, params.generator, aff), params.order,
- params.cofactor)
+ return DomainParameters(
+ curve,
+ map_point(param_one, param_other, params.generator, aff),
+ params.order,
+ params.cofactor,
+ )
@public
@@ -35,7 +39,8 @@ def M2SW(params: DomainParameters) -> DomainParameters:
:return: The converted domain parameters.
"""
if not isinstance(params.curve.model, MontgomeryModel) or not isinstance(
- params.curve.coordinate_model, AffineCoordinateModel):
+ params.curve.coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
def map_parameters(A, B):
@@ -48,7 +53,9 @@ def M2SW(params: DomainParameters) -> DomainParameters:
v = pt.y / B
return Point(aff, x=u, y=v)
- return __M_map(params, ("a", "b"), map_parameters, map_point, ShortWeierstrassModel())
+ return __M_map(
+ params, ("a", "b"), map_parameters, map_point, ShortWeierstrassModel()
+ )
@public
@@ -60,7 +67,8 @@ def M2TE(params: DomainParameters) -> DomainParameters:
:return: The converted domain parameters.
"""
if not isinstance(params.curve.model, MontgomeryModel) or not isinstance(
- params.curve.coordinate_model, AffineCoordinateModel):
+ params.curve.coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
def map_parameters(A, B):
@@ -85,7 +93,8 @@ def TE2M(params: DomainParameters) -> DomainParameters:
:return: The converted domain parameters.
"""
if not isinstance(params.curve.model, TwistedEdwardsModel) or not isinstance(
- params.curve.coordinate_model, AffineCoordinateModel):
+ params.curve.coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
def map_parameters(a, d):
@@ -110,16 +119,25 @@ def SW2M(params: DomainParameters) -> DomainParameters:
:return: The converted domain parameters.
"""
if not isinstance(params.curve.model, ShortWeierstrassModel) or not isinstance(
- params.curve.coordinate_model, AffineCoordinateModel):
+ params.curve.coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
ax = symbols("α")
field = FF(params.curve.prime)
- rhs = Poly(ax ** 3 + field(int(params.curve.parameters["a"])) * ax + field(int(params.curve.parameters["b"])), ax, domain=field)
+ rhs = Poly(
+ ax ** 3
+ + field(int(params.curve.parameters["a"])) * ax
+ + field(int(params.curve.parameters["b"])),
+ ax,
+ domain=field,
+ )
roots = rhs.ground_roots()
if not roots:
- raise ValueError("Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root).")
+ raise ValueError(
+ "Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root)."
+ )
alpha = Mod(int(next(iter(roots.keys()))), params.curve.prime)
- beta = (3 * alpha**2 + params.curve.parameters["a"]).sqrt()
+ beta = (3 * alpha ** 2 + params.curve.parameters["a"]).sqrt()
def map_parameters(a, b):
A = (3 * alpha) / beta
@@ -143,16 +161,25 @@ def SW2TE(params: DomainParameters) -> DomainParameters:
:return: The converted domain parameters.
"""
if not isinstance(params.curve.model, ShortWeierstrassModel) or not isinstance(
- params.curve.coordinate_model, AffineCoordinateModel):
+ params.curve.coordinate_model, AffineCoordinateModel
+ ):
raise ValueError
ax = symbols("α")
field = FF(params.curve.prime)
- rhs = Poly(ax ** 3 + field(int(params.curve.parameters["a"])) * ax + field(int(params.curve.parameters["b"])), ax, domain=field)
+ rhs = Poly(
+ ax ** 3
+ + field(int(params.curve.parameters["a"])) * ax
+ + field(int(params.curve.parameters["b"])),
+ ax,
+ domain=field,
+ )
roots = rhs.ground_roots()
if not roots:
- raise ValueError("Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root).")
+ raise ValueError(
+ "Curve cannot be transformed to Montgomery model (x^3 + ax + b has no root)."
+ )
alpha = Mod(int(next(iter(roots.keys()))), params.curve.prime)
- beta = (3 * alpha**2 + params.curve.parameters["a"]).sqrt()
+ beta = (3 * alpha ** 2 + params.curve.parameters["a"]).sqrt()
def map_parameters(a, b):
a = 3 * alpha + 2 * beta
diff --git a/pyecsca/misc/cfg.py b/pyecsca/misc/cfg.py
index df3cce1..8a5a9ee 100644
--- a/pyecsca/misc/cfg.py
+++ b/pyecsca/misc/cfg.py
@@ -11,6 +11,7 @@ from public import public
@public
class ECConfig(object):
"""Configuration for the :py:mod:`pyecsca.ec` package."""
+
_no_inverse_action: str = "error"
_non_residue_action: str = "error"
_unsatisfied_formula_assumption_action: str = "error"
@@ -111,6 +112,7 @@ class ECConfig(object):
@public
class Config(object):
"""A runtime configuration for the library."""
+
ec: ECConfig
"""Configuration for the :py:mod:`pyecsca.ec` package."""
diff --git a/pyecsca/sca/re/rpa.py b/pyecsca/sca/re/rpa.py
index 3dc00e2..ab038e8 100644
--- a/pyecsca/sca/re/rpa.py
+++ b/pyecsca/sca/re/rpa.py
@@ -7,8 +7,15 @@ This module provides functionality inspired by the Refined-Power Analysis attack
from public import public
from typing import MutableMapping, Optional
-from ...ec.formula import FormulaAction, DoublingFormula, AdditionFormula, TriplingFormula, NegationFormula, \
- DifferentialAdditionFormula, LadderFormula
+from ...ec.formula import (
+ FormulaAction,
+ DoublingFormula,
+ AdditionFormula,
+ TriplingFormula,
+ NegationFormula,
+ DifferentialAdditionFormula,
+ LadderFormula,
+)
from ...ec.mult import ScalarMultiplicationAction, PrecomputationAction
from ...ec.point import Point
from ...ec.context import Context, Action
@@ -17,6 +24,7 @@ from ...ec.context import Context, Action
@public
class MultipleContext(Context):
"""A context that traces the multiples of points computed."""
+
base: Optional[Point]
points: MutableMapping[Point, int]
inside: bool
@@ -51,7 +59,7 @@ class MultipleContext(Context):
elif isinstance(action.formula, NegationFormula):
inp = action.input_points[0]
out = action.output_points[0]
- self.points[out] = - self.points[inp]
+ self.points[out] = -self.points[inp]
elif isinstance(action.formula, DifferentialAdditionFormula):
diff, one, other = action.input_points
out = action.output_points[0]
diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py
index 68d60dd..595fe32 100644
--- a/pyecsca/sca/scope/base.py
+++ b/pyecsca/sca/scope/base.py
@@ -12,6 +12,7 @@ from ..trace import Trace
@public
class SampleType(Enum):
"""The sample unit."""
+
Raw = auto()
Volt = auto()
@@ -29,7 +30,9 @@ class Scope(object):
"""A list of channels available on this scope."""
raise NotImplementedError
- def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]:
+ def setup_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ) -> Tuple[int, int]:
"""
Setup the frequency and sample count for the measurement. The scope might not support
the requested values and will adjust them to get the next best frequency and the largest
@@ -42,8 +45,9 @@ class Scope(object):
"""
raise NotImplementedError
- def setup_channel(self, channel: str, coupling: str, range: float, offset: float,
- enable: bool) -> None:
+ def setup_channel(
+ self, channel: str, coupling: str, range: float, offset: float, enable: bool
+ ) -> None:
"""
Setup a channel to use the coupling method and measure the given voltage range.
@@ -55,8 +59,15 @@ class Scope(object):
"""
raise NotImplementedError
- def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
- timeout: int, enable: bool) -> None:
+ def setup_trigger(
+ self,
+ channel: str,
+ threshold: float,
+ direction: str,
+ delay: int,
+ timeout: int,
+ enable: bool,
+ ) -> None:
"""
Setup a trigger on a particular `channel`, the channel has to be set up and enabled.
The trigger will fire based on the `threshold` and `direction`, if enabled, the trigger
diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py
index c5c21de..05c8e69 100644
--- a/pyecsca/sca/scope/chipwhisperer.py
+++ b/pyecsca/sca/scope/chipwhisperer.py
@@ -27,18 +27,29 @@ class ChipWhispererScope(Scope): # pragma: no cover
def channels(self) -> Sequence[str]:
return []
- def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]:
+ def setup_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ) -> Tuple[int, int]:
if pretrig != 0:
raise ValueError("ChipWhisperer does not support pretrig samples.")
self.scope.clock.clkgen_freq = frequency
self.scope.samples = posttrig
return self.scope.clock.freq_ctr, self.scope.samples
- def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool) -> None:
+ def setup_channel(
+ self, channel: str, coupling: str, range: float, offset: float, enable: bool
+ ) -> None:
pass
- def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
- timeout: int, enable: bool) -> None:
+ def setup_trigger(
+ self,
+ channel: str,
+ threshold: float,
+ direction: str,
+ delay: int,
+ timeout: int,
+ enable: bool,
+ ) -> None:
if enable:
self.triggers.add(channel)
elif channel in self.triggers:
@@ -55,11 +66,16 @@ class ChipWhispererScope(Scope): # pragma: no cover
def capture(self, timeout: Optional[int] = None) -> bool:
return not self.scope.capture()
- def retrieve(self, channel: str, type: SampleType, dtype=np.float16) -> Optional[Trace]:
+ def retrieve(
+ self, channel: str, type: SampleType, dtype=np.float16
+ ) -> Optional[Trace]:
data = self.scope.get_last_trace()
if data is None:
return None
- return Trace(np.array(data, dtype=dtype), {"sampling_frequency": self.scope.clock.clkgen_freq, "channel": channel})
+ return Trace(
+ np.array(data, dtype=dtype),
+ {"sampling_frequency": self.scope.clock.clkgen_freq, "channel": channel},
+ )
def stop(self) -> None:
pass
diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py
index 544ef35..577c7dc 100644
--- a/pyecsca/sca/scope/picoscope_alt.py
+++ b/pyecsca/sca/scope/picoscope_alt.py
@@ -39,20 +39,31 @@ class PicoScopeAlt(Scope): # pragma: no cover
def channels(self) -> Sequence[str]:
return list(self.ps.CHANNELS.keys())
- def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]:
+ def setup_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ) -> Tuple[int, int]:
samples = pretrig + posttrig
actual_frequency, max_samples = self.ps.setSamplingFrequency(frequency, samples)
if max_samples < samples:
- self.trig_ratio = (pretrig / samples)
+ self.trig_ratio = pretrig / samples
samples = max_samples
self.frequency = actual_frequency
return actual_frequency, samples
- def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool) -> None:
+ def setup_channel(
+ self, channel: str, coupling: str, range: float, offset: float, enable: bool
+ ) -> None:
self.ps.setChannel(channel, coupling, range, offset, enable)
- def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
- timeout: int, enable: bool) -> None:
+ def setup_trigger(
+ self,
+ channel: str,
+ threshold: float,
+ direction: str,
+ delay: int,
+ timeout: int,
+ enable: bool,
+ ) -> None:
self.ps.setSimpleTrigger(channel, threshold, direction, delay, timeout, enable)
def setup_capture(self, channel: str, enable: bool) -> None:
@@ -69,14 +80,23 @@ class PicoScopeAlt(Scope): # pragma: no cover
return False
return True
- def retrieve(self, channel: str, type: SampleType, dtype=np.float32) -> Optional[Trace]:
+ def retrieve(
+ self, channel: str, type: SampleType, dtype=np.float32
+ ) -> Optional[Trace]:
if type == SampleType.Raw:
data = self.ps.getDataRaw(channel).astype(dtype=dtype, copy=False)
else:
data = self.ps.getDataV(channel, dtype=dtype)
if data is None:
return None
- return Trace(data, {"sampling_frequency": self.frequency, "channel": channel, "sample_type": type})
+ return Trace(
+ data,
+ {
+ "sampling_frequency": self.frequency,
+ "channel": channel,
+ "sample_type": type,
+ },
+ )
def stop(self) -> None:
self.ps.stop()
diff --git a/pyecsca/sca/scope/picoscope_sdk.py b/pyecsca/sca/scope/picoscope_sdk.py
index 8101f94..6237bb8 100644
--- a/pyecsca/sca/scope/picoscope_sdk.py
+++ b/pyecsca/sca/scope/picoscope_sdk.py
@@ -11,6 +11,7 @@ import numpy as np
from picosdk.errors import CannotFindPicoSDKError
from picosdk.functions import assert_pico_ok
from picosdk.library import Library
+
try:
from picosdk.ps3000 import ps3000
except CannotFindPicoSDKError as exc:
@@ -33,8 +34,12 @@ from .base import Scope, SampleType
from ..trace import Trace
-def adc2volt(adc: Union[np.ndarray, ctypes.c_int16],
- volt_range: float, adc_minmax: int, dtype=np.float32) -> Union[np.ndarray, float]: # pragma: no cover
+def adc2volt(
+ adc: Union[np.ndarray, ctypes.c_int16],
+ volt_range: float,
+ adc_minmax: int,
+ dtype=np.float32,
+) -> Union[np.ndarray, float]: # pragma: no cover
"""
Convert raw adc values to volts.
@@ -51,8 +56,9 @@ def adc2volt(adc: Union[np.ndarray, ctypes.c_int16],
raise ValueError
-def volt2adc(volt: Union[np.ndarray, float],
- volt_range: float, adc_minmax: int, dtype=np.float32) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover
+def volt2adc(
+ volt: Union[np.ndarray, float], volt_range: float, adc_minmax: int, dtype=np.float32
+) -> Union[np.ndarray, ctypes.c_int16]: # pragma: no cover
"""
Convert volt values to raw adc values.
@@ -72,6 +78,7 @@ def volt2adc(volt: Union[np.ndarray, float],
@public
class PicoScopeSdk(Scope): # pragma: no cover
"""A PicoScope based scope."""
+
MODULE: Library
PREFIX: str
CHANNELS: Mapping
@@ -80,12 +87,7 @@ class PicoScopeSdk(Scope): # pragma: no cover
MIN_ADC_VALUE: int
COUPLING: Mapping
TIME_UNITS: Mapping
- TRIGGERS: Mapping = {
- "above": 0,
- "below": 1,
- "rising": 2,
- "falling": 3
- }
+ TRIGGERS: Mapping = {"above": 0, "below": 1, "rising": 2, "falling": 3}
_variant: Optional[str]
def __init__(self, variant: Optional[str] = None):
@@ -112,28 +114,52 @@ class PicoScopeSdk(Scope): # pragma: no cover
return self._variant
info = (ctypes.c_int8 * 6)()
size = ctypes.c_int16()
- assert_pico_ok(self.__dispatch_call("GetUnitInfo", self.handle, ctypes.byref(info), 6,
- ctypes.byref(size), 3))
- self._variant = "".join(chr(i) for i in info[:size.value])
+ assert_pico_ok(
+ self.__dispatch_call(
+ "GetUnitInfo", self.handle, ctypes.byref(info), 6, ctypes.byref(size), 3
+ )
+ )
+ self._variant = "".join(chr(i) for i in info[: size.value])
return self._variant
- def setup_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]:
+ def setup_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ) -> Tuple[int, int]:
return self.set_frequency(frequency, pretrig, posttrig)
- def set_channel(self, channel: str, enabled: bool, coupling: str, range: float, offset: float):
+ def set_channel(
+ self, channel: str, enabled: bool, coupling: str, range: float, offset: float
+ ):
if offset != 0.0:
raise ValueError("Nonzero offset not supported.")
assert_pico_ok(
- self.__dispatch_call("SetChannel", self.handle, self.CHANNELS[channel], enabled,
- self.COUPLING[coupling], self.RANGES[range]))
+ self.__dispatch_call(
+ "SetChannel",
+ self.handle,
+ self.CHANNELS[channel],
+ enabled,
+ self.COUPLING[coupling],
+ self.RANGES[range],
+ )
+ )
self.ranges[channel] = range
- def setup_channel(self, channel: str, coupling: str, range: float, offset: float, enable: bool):
+ def setup_channel(
+ self, channel: str, coupling: str, range: float, offset: float, enable: bool
+ ):
self.set_channel(channel, enable, coupling, range, offset)
- def _set_freq(self, frequency: int, pretrig: int, posttrig: int, period_bound: float,
- timebase_bound: int,
- low_freq: int, high_freq: int, high_subtract: int) -> Tuple[int, int]:
+ def _set_freq(
+ self,
+ frequency: int,
+ pretrig: int,
+ posttrig: int,
+ period_bound: float,
+ timebase_bound: int,
+ low_freq: int,
+ high_freq: int,
+ high_subtract: int,
+ ) -> Tuple[int, int]:
samples = pretrig + posttrig
period = 1 / frequency
if low_freq == 0 or period > period_bound:
@@ -145,8 +171,18 @@ class PicoScopeSdk(Scope): # pragma: no cover
tb = timebase_bound
actual_frequency = low_freq // 2 ** tb
max_samples = ctypes.c_int32()
- assert_pico_ok(self.__dispatch_call("GetTimebase", self.handle, tb, samples, None, 0,
- ctypes.byref(max_samples), 0))
+ assert_pico_ok(
+ self.__dispatch_call(
+ "GetTimebase",
+ self.handle,
+ tb,
+ samples,
+ None,
+ 0,
+ ctypes.byref(max_samples),
+ 0,
+ )
+ )
if max_samples.value < samples:
pretrig = max_samples.value * (pretrig // samples)
posttrig = max_samples.value - pretrig
@@ -158,20 +194,43 @@ class PicoScopeSdk(Scope): # pragma: no cover
self.timebase = tb
return actual_frequency, samples
- def set_frequency(self, frequency: int, pretrig: int, posttrig: int) -> Tuple[int, int]:
+ def set_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ) -> Tuple[int, int]:
raise NotImplementedError
- def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
- timeout: int, enable: bool):
+ def setup_trigger(
+ self,
+ channel: str,
+ threshold: float,
+ direction: str,
+ delay: int,
+ timeout: int,
+ enable: bool,
+ ):
self.set_trigger(direction, enable, threshold, channel, delay, timeout)
- def set_trigger(self, type: str, enabled: bool, value: float, channel: str,
- delay: int, timeout: int):
+ def set_trigger(
+ self,
+ type: str,
+ enabled: bool,
+ value: float,
+ channel: str,
+ delay: int,
+ timeout: int,
+ ):
assert_pico_ok(
- self.__dispatch_call("SetSimpleTrigger", self.handle, enabled,
- self.CHANNELS[channel],
- volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE),
- self.TRIGGERS[type], delay, timeout))
+ self.__dispatch_call(
+ "SetSimpleTrigger",
+ self.handle,
+ enabled,
+ self.CHANNELS[channel],
+ volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE),
+ self.TRIGGERS[type],
+ delay,
+ timeout,
+ )
+ )
def setup_capture(self, channel: str, enable: bool):
self.set_buffer(channel, enable)
@@ -184,22 +243,44 @@ class PicoScopeSdk(Scope): # pragma: no cover
del self.buffers[channel]
buffer = (ctypes.c_int16 * self.samples)()
assert_pico_ok(
- self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel],
- ctypes.byref(buffer), self.samples))
+ self.__dispatch_call(
+ "SetDataBuffer",
+ self.handle,
+ self.CHANNELS[channel],
+ ctypes.byref(buffer),
+ self.samples,
+ )
+ )
self.buffers[channel] = buffer
else:
assert_pico_ok(
- self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel],
- None, self.samples))
+ self.__dispatch_call(
+ "SetDataBuffer",
+ self.handle,
+ self.CHANNELS[channel],
+ None,
+ self.samples,
+ )
+ )
del self.buffers[channel]
def arm(self):
if self.samples is None or self.timebase is None:
raise ValueError
assert_pico_ok(
- self.__dispatch_call("RunBlock", self.handle, self.pretrig, self.posttrig,
- self.timebase, 0,
- None, 0, None, None))
+ self.__dispatch_call(
+ "RunBlock",
+ self.handle,
+ self.pretrig,
+ self.posttrig,
+ self.timebase,
+ 0,
+ None,
+ 0,
+ None,
+ None,
+ )
+ )
def capture(self, timeout: Optional[int] = None) -> bool:
start = time_ns()
@@ -209,25 +290,48 @@ class PicoScopeSdk(Scope): # pragma: no cover
check = ctypes.c_int16(0)
while ready.value == check.value:
sleep(0.001)
- assert_pico_ok(self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready)))
+ assert_pico_ok(
+ self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready))
+ )
if timeout is not None and (time_ns() - start) / 1e6 >= timeout:
return False
return True
- def retrieve(self, channel: str, type: SampleType, dtype=np.float32) -> Optional[Trace]:
+ def retrieve(
+ self, channel: str, type: SampleType, dtype=np.float32
+ ) -> Optional[Trace]:
if self.samples is None:
raise ValueError
actual_samples = ctypes.c_int32(self.samples)
overflow = ctypes.c_int16()
assert_pico_ok(
- self.__dispatch_call("GetValues", self.handle, 0, ctypes.byref(actual_samples), 1,
- 0, 0, ctypes.byref(overflow)))
+ self.__dispatch_call(
+ "GetValues",
+ self.handle,
+ 0,
+ ctypes.byref(actual_samples),
+ 1,
+ 0,
+ 0,
+ ctypes.byref(overflow),
+ )
+ )
arr = np.array(self.buffers[channel], dtype=dtype)
if type == SampleType.Raw:
data = arr
else:
- data = cast(np.ndarray, adc2volt(arr, self.ranges[channel], self.MAX_ADC_VALUE, dtype=dtype))
- return Trace(data, {"sampling_frequency": self.frequency, "channel": channel, "sample_type": type})
+ data = cast(
+ np.ndarray,
+ adc2volt(arr, self.ranges[channel], self.MAX_ADC_VALUE, dtype=dtype),
+ )
+ return Trace(
+ data,
+ {
+ "sampling_frequency": self.frequency,
+ "channel": channel,
+ "sample_type": type,
+ },
+ )
def stop(self):
assert_pico_ok(self.__dispatch_call("Stop"))
@@ -243,23 +347,29 @@ class PicoScopeSdk(Scope): # pragma: no cover
if isinstance(ps3000, CannotFindPicoSDKError):
+
@public
class PS3000Scope(PicoScopeSdk): # pragma: no cover
"""A PicoScope 3000 series oscilloscope is not available. (Install `libps3000`)."""
+
def __init__(self, variant: Optional[str] = None):
super().__init__(variant)
raise ps3000
+
+
else: # pragma: no cover
+
@public
class PS3000Scope(PicoScopeSdk): # type: ignore
"""A PicoScope 3000 series oscilloscope."""
+
MODULE = ps3000
PREFIX = "ps3000"
CHANNELS = {
"A": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_A"],
"B": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_B"],
"C": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_C"],
- "D": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_D"]
+ "D": ps3000.PS3000_CHANNEL["PS3000_CHANNEL_D"],
}
RANGES = {
@@ -276,48 +386,57 @@ else: # pragma: no cover
50.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_50V"],
100.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_100V"],
200.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_200V"],
- 400.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_400V"]
+ 400.0: ps3000.PS3000_VOLTAGE_RANGE["PS3000_400V"],
}
MAX_ADC_VALUE = 32767
MIN_ADC_VALUE = -32767
- COUPLING = {
- "AC": ps3000.PICO_COUPLING["AC"],
- "DC": ps3000.PICO_COUPLING["DC"]
- }
+ COUPLING = {"AC": ps3000.PICO_COUPLING["AC"], "DC": ps3000.PICO_COUPLING["DC"]}
def get_variant(self):
if self._variant is not None:
return self._variant
info = (ctypes.c_int8 * 6)()
size = ctypes.c_int16(6)
- assert_pico_ok(self.__dispatch_call("GetUnitInfo", self.handle, ctypes.byref(info), size, 3))
- self._variant = "".join(chr(i) for i in info[:size.value])
+ assert_pico_ok(
+ self.__dispatch_call(
+ "GetUnitInfo", self.handle, ctypes.byref(info), size, 3
+ )
+ )
+ self._variant = "".join(chr(i) for i in info[: size.value])
return self._variant
- def set_frequency(self, frequency: int, pretrig: int, posttrig: int): # TODO: fix
+ def set_frequency(
+ self, frequency: int, pretrig: int, posttrig: int
+ ): # TODO: fix
raise NotImplementedError
if isinstance(ps4000, CannotFindPicoSDKError):
+
@public
class PS4000Scope(PicoScopeSdk): # pragma: no cover
"""A PicoScope 4000 series oscilloscope is not available. (Install `libps4000`)."""
+
def __init__(self, variant: Optional[str] = None):
super().__init__(variant)
raise ps4000
+
+
else: # pragma: no cover
+
@public
class PS4000Scope(PicoScopeSdk): # type: ignore
"""A PicoScope 4000 series oscilloscope."""
+
MODULE = ps4000
PREFIX = "ps4000"
CHANNELS = {
"A": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_A"],
"B": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_B"],
"C": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_C"],
- "D": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_D"]
+ "D": ps4000.PS4000_CHANNEL["PS4000_CHANNEL_D"],
}
RANGES = {
@@ -333,46 +452,54 @@ else: # pragma: no cover
10.0: ps4000.PS4000_RANGE["PS4000_10V"],
20.0: ps4000.PS4000_RANGE["PS4000_20V"],
50.0: ps4000.PS4000_RANGE["PS4000_50V"],
- 100.0: ps4000.PS4000_RANGE["PS4000_100V"]
+ 100.0: ps4000.PS4000_RANGE["PS4000_100V"],
}
MAX_ADC_VALUE = 32764
MIN_ADC_VALUE = -32764
- COUPLING = {
- "AC": ps4000.PICO_COUPLING["AC"],
- "DC": ps4000.PICO_COUPLING["DC"]
- }
+ COUPLING = {"AC": ps4000.PICO_COUPLING["AC"], "DC": ps4000.PICO_COUPLING["DC"]}
def set_frequency(self, frequency: int, pretrig: int, posttrig: int):
variant = self.get_variant()
if variant in ("4223", "4224", "4423", "4424"):
- return self._set_freq(frequency, pretrig, posttrig, 50e-9, 2, 80_000_000, 20_000_000, 1)
+ return self._set_freq(
+ frequency, pretrig, posttrig, 50e-9, 2, 80_000_000, 20_000_000, 1
+ )
elif variant in ("4226", "4227"):
- return self._set_freq(frequency, pretrig, posttrig, 32e-9, 3, 250_000_000, 31_250_000,
- 2)
+ return self._set_freq(
+ frequency, pretrig, posttrig, 32e-9, 3, 250_000_000, 31_250_000, 2
+ )
elif variant == "4262":
- return self._set_freq(frequency, pretrig, posttrig, 0, 0, 0, 10_000_000, -1)
+ return self._set_freq(
+ frequency, pretrig, posttrig, 0, 0, 0, 10_000_000, -1
+ )
if isinstance(ps5000, CannotFindPicoSDKError):
+
@public
class PS5000Scope(PicoScopeSdk): # pragma: no cover
"""A PicoScope 5000 series oscilloscope is not available. (Install `libps5000`)."""
+
def __init__(self, variant: Optional[str] = None):
super().__init__(variant)
raise ps5000
+
+
else: # pragma: no cover
+
@public
class PS5000Scope(PicoScopeSdk): # type: ignore
"""A PicoScope 5000 series oscilloscope."""
+
MODULE = ps5000
PREFIX = "ps5000"
CHANNELS = {
"A": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_A"],
"B": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_B"],
"C": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_C"],
- "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"]
+ "D": ps5000.PS5000_CHANNEL["PS5000_CHANNEL_D"],
}
RANGES = {
@@ -387,39 +514,44 @@ else: # pragma: no cover
5.00: 8,
10.0: 9,
20.0: 10,
- 50.0: 11
+ 50.0: 11,
}
MAX_ADC_VALUE = 32512
MIN_ADC_VALUE = -32512
- COUPLING = {
- "AC": 0,
- "DC": 1
- }
+ COUPLING = {"AC": 0, "DC": 1}
def set_frequency(self, frequency: int, pretrig: int, posttrig: int):
- return self._set_freq(frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2)
+ return self._set_freq(
+ frequency, pretrig, posttrig, 4e-9, 2, 1_000_000_000, 125_000_000, 2
+ )
if isinstance(ps6000, CannotFindPicoSDKError):
+
@public
class PS6000Scope(PicoScopeSdk): # pragma: no cover
"""A PicoScope 6000 series oscilloscope is not available. (Install `libps6000`)."""
+
def __init__(self, variant: Optional[str] = None):
super().__init__(variant)
raise ps6000
+
+
else: # pragma: no cover
+
@public
class PS6000Scope(PicoScopeSdk): # type: ignore
"""A PicoScope 6000 series oscilloscope."""
+
MODULE = ps6000
PREFIX = "ps6000"
CHANNELS = {
"A": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_A"],
"B": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_B"],
"C": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_C"],
- "D": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_D"]
+ "D": ps6000.PS6000_CHANNEL["PS6000_CHANNEL_D"],
}
RANGES = {
@@ -434,7 +566,7 @@ else: # pragma: no cover
5.00: ps6000.PS6000_RANGE["PS6000_5V"],
10.0: ps6000.PS6000_RANGE["PS6000_10V"],
20.0: ps6000.PS6000_RANGE["PS6000_20V"],
- 50.0: ps6000.PS6000_RANGE["PS6000_50V"]
+ 50.0: ps6000.PS6000_RANGE["PS6000_50V"],
}
MAX_ADC_VALUE = 32512
@@ -443,16 +575,31 @@ else: # pragma: no cover
COUPLING = {
"AC": ps6000.PS6000_COUPLING["PS6000_AC"],
"DC": ps6000.PS6000_COUPLING["PS6000_DC_1M"],
- "DC_50": ps6000.PS6000_COUPLING["PS6000_DC_50R"]
+ "DC_50": ps6000.PS6000_COUPLING["PS6000_DC_50R"],
}
def open(self):
assert_pico_ok(ps6000.ps6000OpenUnit(ctypes.byref(self.handle), None))
- def set_channel(self, channel: str, enabled: bool, coupling: str, range: float, offset: float):
- assert_pico_ok(ps6000.ps6000SetChannel(self.handle, self.CHANNELS[channel], enabled,
- self.COUPLING[coupling], self.RANGES[range], offset,
- ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"]))
+ def set_channel(
+ self,
+ channel: str,
+ enabled: bool,
+ coupling: str,
+ range: float,
+ offset: float,
+ ):
+ assert_pico_ok(
+ ps6000.ps6000SetChannel(
+ self.handle,
+ self.CHANNELS[channel],
+ enabled,
+ self.COUPLING[coupling],
+ self.RANGES[range],
+ offset,
+ ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"],
+ )
+ )
def set_buffer(self, channel: str, enable: bool):
if self.samples is None:
@@ -462,15 +609,24 @@ else: # pragma: no cover
del self.buffers[channel]
buffer = (ctypes.c_int16 * self.samples)()
assert_pico_ok(
- ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel],
- ctypes.byref(buffer), self.samples, 0))
+ ps6000.ps6000SetDataBuffer(
+ self.handle,
+ self.CHANNELS[channel],
+ ctypes.byref(buffer),
+ self.samples,
+ 0,
+ )
+ )
self.buffers[channel] = buffer
else:
assert_pico_ok(
- ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel],
- None, self.samples, 0))
+ ps6000.ps6000SetDataBuffer(
+ self.handle, self.CHANNELS[channel], None, self.samples, 0
+ )
+ )
del self.buffers[channel]
def set_frequency(self, frequency: int, pretrig: int, posttrig: int):
- return self._set_freq(frequency, pretrig, posttrig, 3.2e-9, 4, 5_000_000_000, 156_250_000,
- 4)
+ return self._set_freq(
+ frequency, pretrig, posttrig, 3.2e-9, 4, 5_000_000_000, 156_250_000, 4
+ )
diff --git a/pyecsca/sca/target/ISO7816.py b/pyecsca/sca/target/ISO7816.py
index 233685e..1a9c8b8 100644
--- a/pyecsca/sca/target/ISO7816.py
+++ b/pyecsca/sca/target/ISO7816.py
@@ -14,6 +14,7 @@ from .base import Target
@dataclass
class CommandAPDU(object): # pragma: no cover
"""A command APDU that can be sent to an ISO7816-4 target."""
+
cls: int
ins: int
p1: int
@@ -28,30 +29,61 @@ class CommandAPDU(object): # pragma: no cover
return bytes([self.cls, self.ins, self.p1, self.p2])
elif self.ne <= 256:
# Case 2s
- return bytes([self.cls, self.ins, self.p1, self.p2, self.ne if self.ne != 256 else 0])
+ return bytes(
+ [
+ self.cls,
+ self.ins,
+ self.p1,
+ self.p2,
+ self.ne if self.ne != 256 else 0,
+ ]
+ )
else:
# Case 2e
- return bytes([self.cls, self.ins, self.p1, self.p2]) + (self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0]))
+ return bytes([self.cls, self.ins, self.p1, self.p2]) + (
+ self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0])
+ )
elif self.ne is None or self.ne == 0:
if len(self.data) <= 255:
# Case 3s
- return bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + self.data
+ return (
+ bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)])
+ + self.data
+ )
else:
# Case 3e
- return bytes([self.cls, self.ins, self.p1, self.p2, 0]) + len(self.data).to_bytes(2, "big") + self.data
+ return (
+ bytes([self.cls, self.ins, self.p1, self.p2, 0])
+ + len(self.data).to_bytes(2, "big")
+ + self.data
+ )
else:
if len(self.data) <= 255 and self.ne <= 256:
# Case 4s
- return bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)]) + self.data + bytes([self.ne if self.ne != 256 else 0])
+ return (
+ bytes([self.cls, self.ins, self.p1, self.p2, len(self.data)])
+ + self.data
+ + bytes([self.ne if self.ne != 256 else 0])
+ )
else:
# Case 4e
- return bytes([self.cls, self.ins, self.p1, self.p2, 0]) + len(self.data).to_bytes(2, "big") + self.data + (self.ne.to_bytes(2, "big") if self.ne != 65536 else bytes([0, 0]))
+ return (
+ bytes([self.cls, self.ins, self.p1, self.p2, 0])
+ + len(self.data).to_bytes(2, "big")
+ + self.data
+ + (
+ self.ne.to_bytes(2, "big")
+ if self.ne != 65536
+ else bytes([0, 0])
+ )
+ )
@public
@dataclass
class ResponseAPDU(object):
"""A response APDU that can be received from an ISO7816-4 target."""
+
data: bytes
sw: int
@@ -90,6 +122,7 @@ class ISO7816Target(Target, ABC):
@public
class ISO7816:
"""A bunch of ISO7816-4 constants (status words)."""
+
SW_FILE_FULL = 0x6A84
SW_UNKNOWN = 0x6F00
SW_CLA_NOT_SUPPORTED = 0x6E00
diff --git a/pyecsca/sca/target/PCSC.py b/pyecsca/sca/target/PCSC.py
index a377751..4b2620c 100644
--- a/pyecsca/sca/target/PCSC.py
+++ b/pyecsca/sca/target/PCSC.py
@@ -37,7 +37,7 @@ class PCSCTarget(ISO7816Target): # pragma: no cover
return bytes(self.connection.getATR())
def select(self, aid: bytes) -> bool:
- apdu = CommandAPDU(0x00, 0xa4, 0x04, 0x00, aid)
+ apdu = CommandAPDU(0x00, 0xA4, 0x04, 0x00, aid)
resp = self.send_apdu(apdu)
return resp.sw == ISO7816.SW_NO_ERROR
diff --git a/pyecsca/sca/target/binary.py b/pyecsca/sca/target/binary.py
index 3e2877b..d4c402e 100644
--- a/pyecsca/sca/target/binary.py
+++ b/pyecsca/sca/target/binary.py
@@ -13,11 +13,14 @@ from .serial import SerialTarget
@public
class BinaryTarget(SerialTarget):
"""A binary target that is runnable on the host and communicates using the stdin/stdout streams."""
+
binary: List[str]
process: Optional[Popen] = None
debug_output: bool
- def __init__(self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs):
+ def __init__(
+ self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs
+ ):
super().__init__()
if not isinstance(binary, (str, list)):
raise TypeError
@@ -27,8 +30,13 @@ class BinaryTarget(SerialTarget):
self.debug_output = debug_output
def connect(self):
- self.process = Popen(self.binary, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- text=True, bufsize=1)
+ self.process = Popen(
+ self.binary,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ text=True,
+ bufsize=1,
+ )
def write(self, data: bytes) -> None:
if self.process is None:
diff --git a/pyecsca/sca/target/chipwhisperer.py b/pyecsca/sca/target/chipwhisperer.py
index ac3fa42..4037f12 100644
--- a/pyecsca/sca/target/chipwhisperer.py
+++ b/pyecsca/sca/target/chipwhisperer.py
@@ -22,7 +22,9 @@ class ChipWhispererTarget(Flashable, SimpleSerialTarget): # pragma: no cover
using ChipWhisperer-Lite/Pro.
"""
- def __init__(self, target: SimpleSerial, scope: ScopeTemplate, programmer, **kwargs):
+ def __init__(
+ self, target: SimpleSerial, scope: ScopeTemplate, programmer, **kwargs
+ ):
super().__init__()
self.target = target
self.scope = scope
diff --git a/pyecsca/sca/target/ectester.py b/pyecsca/sca/target/ectester.py
index 6ab10be..f6635c3 100644
--- a/pyecsca/sca/target/ectester.py
+++ b/pyecsca/sca/target/ectester.py
@@ -49,6 +49,7 @@ class ShiftableFlag(IntFlag): # pragma: no cover
@public
class KeypairEnum(ShiftableFlag): # pragma: no cover
"""ECTester's KeyPair type."""
+
KEYPAIR_LOCAL = 0x01
KEYPAIR_REMOTE = 0x02
KEYPAIR_BOTH = KEYPAIR_LOCAL | KEYPAIR_REMOTE
@@ -57,12 +58,13 @@ class KeypairEnum(ShiftableFlag): # pragma: no cover
@public
class InstructionEnum(IntEnum): # pragma: no cover
"""ECTester's instruction (INS)."""
- INS_ALLOCATE = 0x5a
- INS_CLEAR = 0x5b
- INS_SET = 0x5c
- INS_TRANSFORM = 0x5d
- INS_GENERATE = 0x5e
- INS_EXPORT = 0x5f
+
+ INS_ALLOCATE = 0x5A
+ INS_CLEAR = 0x5B
+ INS_SET = 0x5C
+ INS_TRANSFORM = 0x5D
+ INS_GENERATE = 0x5E
+ INS_EXPORT = 0x5F
INS_ECDH = 0x70
INS_ECDH_DIRECT = 0x71
INS_ECDSA = 0x72
@@ -73,13 +75,14 @@ class InstructionEnum(IntEnum): # pragma: no cover
INS_ALLOCATE_SIG = 0x77
INS_GET_INFO = 0x78
INS_SET_DRY_RUN_MODE = 0x79
- INS_BUFFER = 0x7a
- INS_PERFORM = 0x7b
+ INS_BUFFER = 0x7A
+ INS_PERFORM = 0x7B
@public
class KeyBuildEnum(IntEnum): # pragma: no cover
"""ECTester's key builder type."""
+
BUILD_KEYPAIR = 0x01
BUILD_KEYBUILDER = 0x02
@@ -87,7 +90,8 @@ class KeyBuildEnum(IntEnum): # pragma: no cover
@public
class ExportEnum(IntEnum): # pragma: no cover
"""ECTester's export boolean."""
- EXPORT_TRUE = 0xff
+
+ EXPORT_TRUE = 0xFF
EXPORT_FALSE = 0x00
@classmethod
@@ -98,13 +102,15 @@ class ExportEnum(IntEnum): # pragma: no cover
@public
class RunModeEnum(IntEnum): # pragma: no cover
"""ECTester's run mode."""
- MODE_NORMAL = 0xaa
- MODE_DRY_RUN = 0xbb
+
+ MODE_NORMAL = 0xAA
+ MODE_DRY_RUN = 0xBB
@public
class KeyEnum(ShiftableFlag): # pragma: no cover
"""ECTester's key enum."""
+
PUBLIC = 0x01
PRIVATE = 0x02
BOTH = PRIVATE | PUBLIC
@@ -113,6 +119,7 @@ class KeyEnum(ShiftableFlag): # pragma: no cover
@public
class AppletBaseEnum(IntEnum): # pragma: no cover
"""ECTester's JavaCard applet base version."""
+
BASE_221 = 0x0221
BASE_222 = 0x0222
@@ -120,6 +127,7 @@ class AppletBaseEnum(IntEnum): # pragma: no cover
@public
class KeyClassEnum(IntEnum): # pragma: no cover
"""JavaCard EC-based key class."""
+
ALG_EC_F2M = 4
ALG_EC_FP = 5
@@ -127,6 +135,7 @@ class KeyClassEnum(IntEnum): # pragma: no cover
@public
class KeyAgreementEnum(IntEnum): # pragma: no cover
"""JavaCard `KeyAgreement` type values."""
+
ALG_EC_SVDP_DH = 1
ALG_EC_SVDP_DH_KDF = 1
ALG_EC_SVDP_DHC = 2
@@ -140,6 +149,7 @@ class KeyAgreementEnum(IntEnum): # pragma: no cover
@public
class SignatureEnum(IntEnum): # pragma: no cover
"""JavaCard `Signature` type values."""
+
ALG_ECDSA_SHA = 17
ALG_ECDSA_SHA_224 = 37
ALG_ECDSA_SHA_256 = 33
@@ -150,6 +160,7 @@ class SignatureEnum(IntEnum): # pragma: no cover
@public
class TransformationEnum(ShiftableFlag): # pragma: no cover
"""ECTester's point/value transformation types."""
+
NONE = 0x00
FIXED = 0x01
FULLRANDOM = 0x02
@@ -167,6 +178,7 @@ class TransformationEnum(ShiftableFlag): # pragma: no cover
@public
class FormatEnum(IntEnum): # pragma: no cover
"""ECTester's point format types."""
+
UNCOMPRESSED = 0
COMPRESSED = 1
HYBRID = 2
@@ -175,8 +187,9 @@ class FormatEnum(IntEnum): # pragma: no cover
@public
class CurveEnum(IntEnum): # pragma: no cover
"""ECTester's curve constants."""
+
default = 0x00
- external = 0xff
+ external = 0xFF
secp112r1 = 0x01
secp128r1 = 0x02
secp160r1 = 0x03
@@ -186,15 +199,16 @@ class CurveEnum(IntEnum): # pragma: no cover
secp384r1 = 0x07
secp521r1 = 0x08
sect163r1 = 0x09
- sect233r1 = 0x0a
- sect283r1 = 0x0b
- sect409r1 = 0x0c
- sect571r1 = 0x0d
+ sect233r1 = 0x0A
+ sect283r1 = 0x0B
+ sect409r1 = 0x0C
+ sect571r1 = 0x0D
@public
class ParameterEnum(ShiftableFlag): # pragma: no cover
"""ECTester's parameter ids."""
+
NONE = 0x00
FP = 0x01
F2M = 0x02
@@ -214,11 +228,13 @@ class ParameterEnum(ShiftableFlag): # pragma: no cover
@public
class ChunkingException(Exception): # pragma: no cover
"""An exception that is raised if an error happened during the chunking process of a large APDU."""
+
pass
class Response(ABC): # pragma: no cover
"""An abstract base class of a response APDU."""
+
resp: ResponseAPDU
sws: List[int]
params: List[bytes]
@@ -233,7 +249,7 @@ class Response(ABC): # pragma: no cover
offset = 0
for i in range(num_sw):
if len(resp.data) >= offset + 2:
- self.sws[i] = int.from_bytes(resp.data[offset:offset + 2], "big")
+ self.sws[i] = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
if self.sws[i] != ISO7816.SW_NO_ERROR:
self.success = False
@@ -250,13 +266,13 @@ class Response(ABC): # pragma: no cover
self.success = False
self.error = True
break
- param_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ param_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
if len(resp.data) < offset + param_len:
self.success = False
self.error = True
break
- self.params[i] = resp.data[offset:offset + param_len]
+ self.params[i] = resp.data[offset : offset + param_len]
offset += param_len
def __repr__(self):
@@ -322,12 +338,18 @@ class GenerateResponse(Response): # pragma: no cover
@public
class ExportResponse(Response): # pragma: no cover
"""A response to the Export command, contains the exported parameters/values."""
+
keypair: KeypairEnum
key: KeyEnum
parameters: ParameterEnum
- def __init__(self, resp: ResponseAPDU, keypair: KeypairEnum, key: KeyEnum,
- params: ParameterEnum):
+ def __init__(
+ self,
+ resp: ResponseAPDU,
+ keypair: KeypairEnum,
+ key: KeyEnum,
+ params: ParameterEnum,
+ ):
self.keypair = keypair
self.key = key
self.parameters = params
@@ -344,7 +366,9 @@ class ExportResponse(Response): # pragma: no cover
other = 0
other += 1 if key & KeyEnum.PUBLIC and params & ParameterEnum.W else 0
other += 1 if key & KeyEnum.PRIVATE and params & ParameterEnum.S else 0
- super().__init__(resp, exported, exported * keys * param_count + exported * other)
+ super().__init__(
+ resp, exported, exported * keys * param_count + exported * other
+ )
def get_index(self, keypair: KeypairEnum, param: ParameterEnum) -> Optional[int]:
pair = KeypairEnum.KEYPAIR_LOCAL
@@ -378,8 +402,10 @@ class ExportResponse(Response): # pragma: no cover
return None
def __repr__(self):
- return f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, success={self.success}, error={self.error}, " \
- f"keypair={self.keypair.name}, key={self.key.name}, params={self.parameters.name})"
+ return (
+ f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, success={self.success}, error={self.error}, "
+ f"keypair={self.keypair.name}, key={self.key.name}, params={self.parameters.name})"
+ )
@public
@@ -434,6 +460,7 @@ class RunModeResponse(Response): # pragma: no cover
class InfoResponse(Response): # pragma: no cover
"""A response to the Info command, contains all information about the applet version/environment."""
+
version: str
base: AppletBaseEnum
system_version: float
@@ -447,33 +474,39 @@ class InfoResponse(Response): # pragma: no cover
super().__init__(resp, 1, 0)
offset = 2
- version_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ version_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
- self.version = resp.data[offset:offset + version_len].decode()
+ self.version = resp.data[offset : offset + version_len].decode()
offset += version_len
- self.base = AppletBaseEnum(int.from_bytes(resp.data[offset:offset + 2], "big"))
+ self.base = AppletBaseEnum(
+ int.from_bytes(resp.data[offset : offset + 2], "big")
+ )
offset += 2
- system_version = int.from_bytes(resp.data[offset:offset + 2], "big")
+ system_version = int.from_bytes(resp.data[offset : offset + 2], "big")
system_major = system_version >> 8
- system_minor = system_version & 0xff
+ system_minor = system_version & 0xFF
minor_size = 1 if system_minor == 0 else ceil(log(system_minor, 10))
self.system_version = system_major + system_minor / (minor_size * 10)
offset += 2
- self.object_deletion_supported = int.from_bytes(resp.data[offset:offset + 2], "big") == 1
+ self.object_deletion_supported = (
+ int.from_bytes(resp.data[offset : offset + 2], "big") == 1
+ )
offset += 2
- self.buf_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ self.buf_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
- self.ram1_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ self.ram1_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
- self.ram2_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ self.ram2_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
- self.apdu_len = int.from_bytes(resp.data[offset:offset + 2], "big")
+ self.apdu_len = int.from_bytes(resp.data[offset : offset + 2], "big")
offset += 2
def __repr__(self):
- return f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, " \
- f"success={self.success}, error={self.error}, version={self.version}, base={self.base.name}, system_version={self.system_version}, " \
- f"object_deletion_supported={self.object_deletion_supported}, buf_len={self.buf_len}, ram1_len={self.ram1_len}, ram2_len={self.ram2_len}, apdu_len={self.apdu_len})"
+ return (
+ f"{self.__class__.__name__}(sws=[{', '.join(list(map(hex, self.sws)))}], sw={hex(self.resp.sw)}, "
+ f"success={self.success}, error={self.error}, version={self.version}, base={self.base.name}, system_version={self.system_version}, "
+ f"object_deletion_supported={self.object_deletion_supported}, buf_len={self.buf_len}, ram1_len={self.ram1_len}, ram2_len={self.ram2_len}, apdu_len={self.apdu_len})"
+ )
@public
@@ -482,7 +515,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
A smartcard target which communicates with the `ECTester <https://github.com/crocs-muni/ECTester>`_
applet on smartcards of the JavaCard platform using PCSC.
"""
- CLA_ECTESTER = 0xb0
+
+ CLA_ECTESTER = 0xB0
AID_PREFIX = bytes([0x45, 0x43, 0x54, 0x65, 0x73, 0x74, 0x65, 0x72])
AID_CURRENT_VERSION = bytes([0x30, 0x33, 0x33]) # Version v0.3.3
AID_SUFFIX_221 = bytes([0x62])
@@ -507,15 +541,19 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
chunk_length = 255
if chunk_start + chunk_length > len(data):
chunk_length = len(data) - chunk_start
- chunk = data[chunk_start: chunk_start + chunk_length]
- chunk_apdu = CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_BUFFER, 0, 0, chunk)
+ chunk = data[chunk_start : chunk_start + chunk_length]
+ chunk_apdu = CommandAPDU(
+ self.CLA_ECTESTER, InstructionEnum.INS_BUFFER, 0, 0, chunk
+ )
resp = super().send_apdu(chunk_apdu)
if resp.sw != 0x9000:
raise ChunkingException()
apdu = CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_PERFORM, 0, 0)
resp = super().send_apdu(apdu)
- if resp.sw & 0xff00 == ISO7816.SW_BYTES_REMAINING_00:
- resp = super().send_apdu(CommandAPDU(0x00, 0xc0, 0x00, 0x00, None, resp.sw & 0xff))
+ if resp.sw & 0xFF00 == ISO7816.SW_BYTES_REMAINING_00:
+ resp = super().send_apdu(
+ CommandAPDU(0x00, 0xC0, 0x00, 0x00, None, resp.sw & 0xFF)
+ )
return resp
def select_applet(self, latest_version: bytes = AID_CURRENT_VERSION):
@@ -548,7 +586,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
return True
@staticmethod
- def encode_parameters(params: ParameterEnum, obj: Union[DomainParameters, Point, int]) -> Mapping[ParameterEnum, bytes]:
+ def encode_parameters(
+ params: ParameterEnum, obj: Union[DomainParameters, Point, int]
+ ) -> Mapping[ParameterEnum, bytes]:
"""Encode values from `obj` into the byte parameters that the **ECTester** applet expects."""
def convert_int(obj: int) -> bytes:
@@ -559,7 +599,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
return bytes(obj)
result = {}
- if isinstance(obj, DomainParameters) and isinstance(obj.curve.model, ShortWeierstrassModel):
+ if isinstance(obj, DomainParameters) and isinstance(
+ obj.curve.model, ShortWeierstrassModel
+ ):
for param in params & ParameterEnum.DOMAIN_FP:
if param == ParameterEnum.G:
result[param] = convert_point(obj.generator.to_affine())
@@ -577,7 +619,9 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
for param in params & (ParameterEnum.G | ParameterEnum.W):
result[param] = convert_point(obj)
elif isinstance(obj, int):
- for param in params & ((ParameterEnum.DOMAIN_FP ^ ParameterEnum.G) | ParameterEnum.S):
+ for param in params & (
+ (ParameterEnum.DOMAIN_FP ^ ParameterEnum.G) | ParameterEnum.S
+ ):
result[param] = convert_int(obj)
else:
raise TypeError
@@ -591,8 +635,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE_KA, 0, 0,
- bytes([ka_type])))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ALLOCATE_KA,
+ 0,
+ 0,
+ bytes([ka_type]),
+ )
+ )
return AllocateKaResponse(resp)
def allocate_sig(self, sig_type: SignatureEnum) -> AllocateSigResponse:
@@ -602,12 +652,24 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:param sig_type: Which Signature type to allocate.
:return: The response.
"""
- resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE_SIG, 0, 0,
- bytes([sig_type])))
+ resp = self.send_apdu(
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ALLOCATE_SIG,
+ 0,
+ 0,
+ bytes([sig_type]),
+ )
+ )
return AllocateSigResponse(resp)
- def allocate(self, keypair: KeypairEnum, builder: KeyBuildEnum, key_length: int,
- key_class: KeyClassEnum) -> AllocateResponse:
+ def allocate(
+ self,
+ keypair: KeypairEnum,
+ builder: KeyBuildEnum,
+ key_length: int,
+ key_class: KeyClassEnum,
+ ) -> AllocateResponse:
"""
Send the Allocate KeyPair command.
@@ -618,8 +680,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ALLOCATE, keypair, builder,
- key_length.to_bytes(2, "big") + bytes([key_class])))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ALLOCATE,
+ keypair,
+ builder,
+ key_length.to_bytes(2, "big") + bytes([key_class]),
+ )
+ )
return AllocateResponse(resp, keypair)
def clear(self, keypair: KeypairEnum) -> ClearResponse:
@@ -630,11 +698,17 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEAR, keypair, 0, None))
+ CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEAR, keypair, 0, None)
+ )
return ClearResponse(resp, keypair)
- def set(self, keypair: KeypairEnum, curve: CurveEnum, params: ParameterEnum,
- values: Optional[Mapping[ParameterEnum, bytes]] = None) -> SetResponse:
+ def set(
+ self,
+ keypair: KeypairEnum,
+ curve: CurveEnum,
+ params: ParameterEnum,
+ values: Optional[Mapping[ParameterEnum, bytes]] = None,
+ ) -> SetResponse:
"""
Send the Set command.
@@ -656,18 +730,31 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
break
e <<= 1
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve,
- payload))
+ CommandAPDU(
+ self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve, payload
+ )
+ )
elif values is not None:
raise ValueError("Values should be specified only if curve is external.")
else:
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET, keypair, curve,
- params.to_bytes(2, "big")))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_SET,
+ keypair,
+ curve,
+ params.to_bytes(2, "big"),
+ )
+ )
return SetResponse(resp, keypair)
- def transform(self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum,
- transformation: TransformationEnum) -> TransformResponse:
+ def transform(
+ self,
+ keypair: KeypairEnum,
+ key: KeyEnum,
+ params: ParameterEnum,
+ transformation: TransformationEnum,
+ ) -> TransformResponse:
"""
Send the Transform command.
@@ -678,8 +765,14 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_TRANSFORM, keypair, key,
- params.to_bytes(2, "big") + transformation.to_bytes(2, "big")))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_TRANSFORM,
+ keypair,
+ key,
+ params.to_bytes(2, "big") + transformation.to_bytes(2, "big"),
+ )
+ )
return TransformResponse(resp, keypair)
def generate(self, keypair: KeypairEnum) -> GenerateResponse:
@@ -690,10 +783,15 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GENERATE, keypair, 0, None))
+ CommandAPDU(
+ self.CLA_ECTESTER, InstructionEnum.INS_GENERATE, keypair, 0, None
+ )
+ )
return GenerateResponse(resp, keypair)
- def export(self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum) -> ExportResponse:
+ def export(
+ self, keypair: KeypairEnum, key: KeyEnum, params: ParameterEnum
+ ) -> ExportResponse:
"""
Send the Export command.
@@ -703,12 +801,24 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response, containing the exported parameters.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_EXPORT, keypair, key,
- params.to_bytes(2, "big")))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_EXPORT,
+ keypair,
+ key,
+ params.to_bytes(2, "big"),
+ )
+ )
return ExportResponse(resp, keypair, key, params)
- def ecdh(self, pubkey: KeypairEnum, privkey: KeypairEnum, export: bool,
- transformation: TransformationEnum, ka_type: KeyAgreementEnum) -> ECDHResponse:
+ def ecdh(
+ self,
+ pubkey: KeypairEnum,
+ privkey: KeypairEnum,
+ export: bool,
+ transformation: TransformationEnum,
+ ka_type: KeyAgreementEnum,
+ ) -> ECDHResponse:
"""
Send the ECDH command.
@@ -720,13 +830,26 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDH, pubkey, privkey,
- bytes([ExportEnum.from_bool(export)]) + transformation.to_bytes(
- 2, "big") + bytes([ka_type])))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ECDH,
+ pubkey,
+ privkey,
+ bytes([ExportEnum.from_bool(export)])
+ + transformation.to_bytes(2, "big")
+ + bytes([ka_type]),
+ )
+ )
return ECDHResponse(resp, export)
- def ecdh_direct(self, privkey: KeypairEnum, export: bool, transformation: TransformationEnum,
- ka_type: KeyAgreementEnum, pubkey: bytes) -> ECDHResponse:
+ def ecdh_direct(
+ self,
+ privkey: KeypairEnum,
+ export: bool,
+ transformation: TransformationEnum,
+ ka_type: KeyAgreementEnum,
+ pubkey: bytes,
+ ) -> ECDHResponse:
"""
Send the ECDH direct command.
@@ -738,14 +861,22 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDH_DIRECT, privkey,
- ExportEnum.from_bool(export),
- transformation.to_bytes(2, "big") + bytes([ka_type]) + len(
- pubkey).to_bytes(2, "big") + pubkey))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ECDH_DIRECT,
+ privkey,
+ ExportEnum.from_bool(export),
+ transformation.to_bytes(2, "big")
+ + bytes([ka_type])
+ + len(pubkey).to_bytes(2, "big")
+ + pubkey,
+ )
+ )
return ECDHResponse(resp, export)
- def ecdsa(self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum,
- data: bytes) -> ECDSAResponse:
+ def ecdsa(
+ self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, data: bytes
+ ) -> ECDSAResponse:
"""
Send the ECDSA command.
@@ -755,13 +886,20 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:param data: The data to sign and verify.
:return: The response.
"""
- resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA, keypair,
- ExportEnum.from_bool(export),
- bytes([sig_type]) + len(data).to_bytes(2, "big") + data))
+ resp = self.send_apdu(
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ECDSA,
+ keypair,
+ ExportEnum.from_bool(export),
+ bytes([sig_type]) + len(data).to_bytes(2, "big") + data,
+ )
+ )
return ECDSAResponse(resp, export)
- def ecdsa_sign(self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum,
- data: bytes) -> ECDSAResponse:
+ def ecdsa_sign(
+ self, keypair: KeypairEnum, export: bool, sig_type: SignatureEnum, data: bytes
+ ) -> ECDSAResponse:
"""
Send the ECDSA sign command.
@@ -772,13 +910,19 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA_SIGN, keypair,
- ExportEnum.from_bool(export),
- bytes([sig_type]) + len(data).to_bytes(2, "big") + data))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ECDSA_SIGN,
+ keypair,
+ ExportEnum.from_bool(export),
+ bytes([sig_type]) + len(data).to_bytes(2, "big") + data,
+ )
+ )
return ECDSAResponse(resp, export)
- def ecdsa_verify(self, keypair: KeypairEnum, sig_type: SignatureEnum, sig: bytes,
- data: bytes) -> ECDSAResponse:
+ def ecdsa_verify(
+ self, keypair: KeypairEnum, sig_type: SignatureEnum, sig: bytes, data: bytes
+ ) -> ECDSAResponse:
"""
Send the ECDSA verify command.
@@ -788,10 +932,15 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:param data: The data.
:return: The response.
"""
- resp = self.send_apdu(CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_ECDSA_VERIFY,
- keypair, sig_type,
- len(data).to_bytes(2, "big") + data + len(sig).to_bytes(2,
- "big") + sig))
+ resp = self.send_apdu(
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_ECDSA_VERIFY,
+ keypair,
+ sig_type,
+ len(data).to_bytes(2, "big") + data + len(sig).to_bytes(2, "big") + sig,
+ )
+ )
return ECDSAResponse(resp, False)
def cleanup(self) -> CleanupResponse:
@@ -801,7 +950,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEANUP, 0, 0, None))
+ CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_CLEANUP, 0, 0, None)
+ )
return CleanupResponse(resp)
def info(self) -> InfoResponse:
@@ -811,7 +961,8 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GET_INFO, 0, 0, None))
+ CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_GET_INFO, 0, 0, None)
+ )
return InfoResponse(resp)
def run_mode(self, run_mode: RunModeEnum) -> RunModeResponse:
@@ -821,6 +972,12 @@ class ECTesterTarget(PCSCTarget): # pragma: no cover
:return: The response.
"""
resp = self.send_apdu(
- CommandAPDU(self.CLA_ECTESTER, InstructionEnum.INS_SET_DRY_RUN_MODE, run_mode, 0,
- None))
+ CommandAPDU(
+ self.CLA_ECTESTER,
+ InstructionEnum.INS_SET_DRY_RUN_MODE,
+ run_mode,
+ 0,
+ None,
+ )
+ )
return RunModeResponse(resp)
diff --git a/pyecsca/sca/target/simpleserial.py b/pyecsca/sca/target/simpleserial.py
index 03e5768..03cb9cf 100644
--- a/pyecsca/sca/target/simpleserial.py
+++ b/pyecsca/sca/target/simpleserial.py
@@ -13,6 +13,7 @@ from .serial import SerialTarget
@public
class SimpleSerialMessage(object):
"""A SimpleSerial message consisting of a starting character and a hexadecimal string."""
+
char: str
data: str
@@ -60,7 +61,9 @@ class SimpleSerialTarget(SerialTarget):
result[msg.char] = msg
return result
- def send_cmd(self, cmd: SimpleSerialMessage, timeout: int) -> Mapping[str, SimpleSerialMessage]:
+ def send_cmd(
+ self, cmd: SimpleSerialMessage, timeout: int
+ ) -> Mapping[str, SimpleSerialMessage]:
"""
Send a :py:class:`SimpleSerialMessage` and receive the response messages that the command produces,
within a `timeout`.
@@ -71,7 +74,7 @@ class SimpleSerialTarget(SerialTarget):
"""
data = bytes(cmd)
for i in range(0, len(data), 64):
- chunk = data[i:i + 64]
+ chunk = data[i : i + 64]
sleep(0.010)
self.write(chunk)
self.write(b"\n")
diff --git a/pyecsca/sca/trace/align.py b/pyecsca/sca/trace/align.py
index 56be5b3..d97e358 100644
--- a/pyecsca/sca/trace/align.py
+++ b/pyecsca/sca/trace/align.py
@@ -11,8 +11,9 @@ from .process import normalize
from .trace import Trace
-def _align_reference(reference: Trace, *traces: Trace,
- align_func: Callable[[Trace], Tuple[bool, int]]) -> Tuple[List[Trace], List[int]]:
+def _align_reference(
+ reference: Trace, *traces: Trace, align_func: Callable[[Trace], Tuple[bool, int]]
+) -> Tuple[List[Trace], List[int]]:
result = [deepcopy(reference)]
offsets = [0]
for trace in traces:
@@ -25,18 +26,23 @@ def _align_reference(reference: Trace, *traces: Trace,
else:
result_samples = np.zeros(len(trace.samples), dtype=trace.samples.dtype)
if offset > 0:
- result_samples[:length - offset] = trace.samples[offset:]
+ result_samples[: length - offset] = trace.samples[offset:]
else:
- result_samples[-offset:] = trace.samples[:length + offset]
+ result_samples[-offset:] = trace.samples[: length + offset]
result.append(trace.with_samples(result_samples))
offsets.append(offset)
return result, offsets
@public
-def align_correlation(reference: Trace, *traces: Trace,
- reference_offset: int, reference_length: int,
- max_offset: int, min_correlation: float = 0.5) -> Tuple[List[Trace], List[int]]:
+def align_correlation(
+ reference: Trace,
+ *traces: Trace,
+ reference_offset: int,
+ reference_length: int,
+ max_offset: int,
+ min_correlation: float = 0.5
+) -> Tuple[List[Trace], List[int]]:
"""
Align `traces` to the reference `trace`. Using the cross-correlation of a part of the reference
trace starting at `reference_offset` with `reference_length` and try to match it to a part of
@@ -54,14 +60,19 @@ def align_correlation(reference: Trace, *traces: Trace,
"""
reference_centered = normalize(reference)
reference_part = reference_centered.samples[
- reference_offset:reference_offset + reference_length]
+ reference_offset : reference_offset + reference_length
+ ]
def align_func(trace):
length = len(trace.samples)
correlation_start = max(reference_offset - max_offset, 0)
- correlation_end = min(reference_offset + reference_length + max_offset, length - 1)
+ correlation_end = min(
+ reference_offset + reference_length + max_offset, length - 1
+ )
trace_part = trace.samples[correlation_start:correlation_end]
- trace_part = (trace_part - np.mean(trace_part)) / (np.std(trace_part) * len(trace_part))
+ trace_part = (trace_part - np.mean(trace_part)) / (
+ np.std(trace_part) * len(trace_part)
+ )
correlation = np.correlate(trace_part, reference_part, "same")
max_correlation_offset = correlation.argmax(axis=0)
max_correlation = correlation[max_correlation_offset]
@@ -76,8 +87,13 @@ def align_correlation(reference: Trace, *traces: Trace,
@public
-def align_peaks(reference: Trace, *traces: Trace,
- reference_offset: int, reference_length: int, max_offset: int) -> Tuple[List[Trace], List[int]]:
+def align_peaks(
+ reference: Trace,
+ *traces: Trace,
+ reference_offset: int,
+ reference_length: int,
+ max_offset: int
+) -> Tuple[List[Trace], List[int]]:
"""
Align `traces` to the reference `trace` so that the maximum value within the reference trace
window from `reference_offset` of `reference_length` aligns with the maximum
@@ -90,14 +106,16 @@ def align_peaks(reference: Trace, *traces: Trace,
:param max_offset:
:return:
"""
- reference_part = reference.samples[reference_offset: reference_offset + reference_length]
+ reference_part = reference.samples[
+ reference_offset : reference_offset + reference_length
+ ]
reference_peak = np.argmax(reference_part)
def align_func(trace):
length = len(trace.samples)
window_start = max(reference_offset - max_offset, 0)
window_end = min(reference_offset + reference_length + max_offset, length - 1)
- window = trace.samples[window_start: window_end]
+ window = trace.samples[window_start:window_end]
window_peak = np.argmax(window)
left_space = min(max_offset, reference_offset)
return True, int(window_peak - reference_peak - left_space)
@@ -106,9 +124,15 @@ def align_peaks(reference: Trace, *traces: Trace,
@public
-def align_offset(reference: Trace, *traces: Trace,
- reference_offset: int, reference_length: int, max_offset: int,
- dist_func: Callable[[np.ndarray, np.ndarray], float], max_dist: float = float("inf")) -> Tuple[List[Trace], List[int]]:
+def align_offset(
+ reference: Trace,
+ *traces: Trace,
+ reference_offset: int,
+ reference_length: int,
+ max_offset: int,
+ dist_func: Callable[[np.ndarray, np.ndarray], float],
+ max_dist: float = float("inf")
+) -> Tuple[List[Trace], List[int]]:
"""
Align `traces` to the reference `trace` so that the value of the `dist_func` is minimized
between the reference trace window from `reference_offset` of `reference_length` and the trace
@@ -122,7 +146,9 @@ def align_offset(reference: Trace, *traces: Trace,
:param dist_func:
:return:
"""
- reference_part = reference.samples[reference_offset: reference_offset + reference_length]
+ reference_part = reference.samples[
+ reference_offset : reference_offset + reference_length
+ ]
def align_func(trace):
length = len(trace.samples)
@@ -142,12 +168,18 @@ def align_offset(reference: Trace, *traces: Trace,
return True, best_offset
else:
return False, 0
+
return _align_reference(reference, *traces, align_func=align_func)
@public
-def align_sad(reference: Trace, *traces: Trace,
- reference_offset: int, reference_length: int, max_offset: int) -> Tuple[List[Trace], List[int]]:
+def align_sad(
+ reference: Trace,
+ *traces: Trace,
+ reference_offset: int,
+ reference_length: int,
+ max_offset: int
+) -> Tuple[List[Trace], List[int]]:
"""
Align `traces` to the reference `trace` so that the Sum Of Absolute Differences between the
reference trace window from `reference_offset` of `reference_length` and the trace being aligned
@@ -160,17 +192,24 @@ def align_sad(reference: Trace, *traces: Trace,
:param max_offset:
:return:
"""
+
def sad(reference_part, trace_part):
return float(np.sum(np.abs(reference_part - trace_part)))
- return align_offset(reference, *traces,
- reference_offset=reference_offset, reference_length=reference_length,
- max_offset=max_offset, dist_func=sad)
+ return align_offset(
+ reference,
+ *traces,
+ reference_offset=reference_offset,
+ reference_length=reference_length,
+ max_offset=max_offset,
+ dist_func=sad
+ )
@public
-def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1,
- fast: bool = True) -> List[Trace]:
+def align_dtw_scale(
+ reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True
+) -> List[Trace]:
"""
Align `traces` to the `reference` trace.
Using fastdtw (Dynamic Time Warping) with scaling as per:
@@ -206,7 +245,9 @@ def align_dtw_scale(reference: Trace, *traces: Trace, radius: int = 1,
@public
-def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True) -> List[Trace]:
+def align_dtw(
+ reference: Trace, *traces: Trace, radius: int = 1, fast: bool = True
+) -> List[Trace]:
"""
Align `traces` to the `reference` trace. Using fastdtw (Dynamic Time Warping) as per:
@@ -228,9 +269,13 @@ def align_dtw(reference: Trace, *traces: Trace, radius: int = 1, fast: bool = Tr
dist, path = fastdtw(reference_samples, trace.samples, radius=radius)
else:
dist, path = dtw(reference_samples, trace.samples)
- result_samples = np.zeros(max((len(trace.samples), len(reference_samples))), dtype=trace.samples.dtype)
- pairs = np.array(np.array(path, dtype=np.dtype("int,int")),
- dtype=np.dtype([("x", "int"), ("y", "int")]))
+ result_samples = np.zeros(
+ max((len(trace.samples), len(reference_samples))), dtype=trace.samples.dtype
+ )
+ pairs = np.array(
+ np.array(path, dtype=np.dtype("int,int")),
+ dtype=np.dtype([("x", "int"), ("y", "int")]),
+ )
result_samples[pairs["x"]] = trace.samples[pairs["y"]]
del pairs
del path
diff --git a/pyecsca/sca/trace/combine.py b/pyecsca/sca/trace/combine.py
index 8b80869..853b127 100644
--- a/pyecsca/sca/trace/combine.py
+++ b/pyecsca/sca/trace/combine.py
@@ -25,13 +25,15 @@ def average(*traces: Trace) -> Optional[CombinedTrace]:
s = np.zeros(min_samples, dtype=np.float64)
for t in traces:
s = np.add(s, t.samples[:min_samples])
- avg = ((1 / len(traces)) * s)
+ avg = (1 / len(traces)) * s
del s
return CombinedTrace(avg)
@public
-def conditional_average(*traces: Trace, condition: Callable[[Trace], bool]) -> Optional[CombinedTrace]:
+def conditional_average(
+ *traces: Trace, condition: Callable[[Trace], bool]
+) -> Optional[CombinedTrace]:
"""
Average `traces` for which the `condition` is True, sample-wise.
@@ -107,8 +109,10 @@ def average_and_variance(*traces) -> Optional[Tuple[CombinedTrace, CombinedTrace
if not traces:
return None
if len(traces) == 1:
- return (CombinedTrace(traces[0].samples.copy()),
- CombinedTrace(np.zeros(len(traces[0]), dtype=np.float64)))
+ return (
+ CombinedTrace(traces[0].samples.copy()),
+ CombinedTrace(np.zeros(len(traces[0]), dtype=np.float64)),
+ )
min_samples = min(map(len, traces))
s = np.zeros(min_samples, dtype=np.float64)
for t in traces:
diff --git a/pyecsca/sca/trace/edit.py b/pyecsca/sca/trace/edit.py
index c0af078..5aec15f 100644
--- a/pyecsca/sca/trace/edit.py
+++ b/pyecsca/sca/trace/edit.py
@@ -39,8 +39,11 @@ def reverse(trace: Trace) -> Trace:
@public
-def pad(trace: Trace, lengths: Union[Tuple[int, int], int],
- values: Union[Tuple[Any, Any], Any] = (0, 0)) -> Trace:
+def pad(
+ trace: Trace,
+ lengths: Union[Tuple[int, int], int],
+ values: Union[Tuple[Any, Any], Any] = (0, 0),
+) -> Trace:
"""
Pad the samples of the `trace` by `values` at the beginning and end.
@@ -53,4 +56,6 @@ def pad(trace: Trace, lengths: Union[Tuple[int, int], int],
lengths = (lengths, lengths)
if not isinstance(values, tuple):
values = (values, values)
- return trace.with_samples(np.pad(trace.samples, lengths, "constant", constant_values=values))
+ return trace.with_samples(
+ np.pad(trace.samples, lengths, "constant", constant_values=values)
+ )
diff --git a/pyecsca/sca/trace/filter.py b/pyecsca/sca/trace/filter.py
index 720d311..2fbc284 100644
--- a/pyecsca/sca/trace/filter.py
+++ b/pyecsca/sca/trace/filter.py
@@ -8,13 +8,23 @@ from typing import Union, Tuple
from .trace import Trace
-def _filter_any(trace: Trace, sampling_frequency: int,
- cutoff: Union[int, Tuple[int, int]], band_type: str) -> Trace:
+def _filter_any(
+ trace: Trace,
+ sampling_frequency: int,
+ cutoff: Union[int, Tuple[int, int]],
+ band_type: str,
+) -> Trace:
nyq = 0.5 * sampling_frequency
if not isinstance(cutoff, int):
- b, a = butter(6, tuple(map(lambda x: x / nyq, cutoff)), btype=band_type, analog=False, output='ba')
+ b, a = butter(
+ 6,
+ tuple(map(lambda x: x / nyq, cutoff)),
+ btype=band_type,
+ analog=False,
+ output="ba",
+ )
else:
- b, a = butter(6, cutoff / nyq, btype=band_type, analog=False, output='ba')
+ b, a = butter(6, cutoff / nyq, btype=band_type, analog=False, output="ba")
return trace.with_samples(lfilter(b, a, trace.samples))
@@ -47,7 +57,9 @@ def filter_highpass(trace: Trace, sampling_frequency: int, cutoff: int) -> Trace
@public
-def filter_bandpass(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace:
+def filter_bandpass(
+ trace: Trace, sampling_frequency: int, low: int, high: int
+) -> Trace:
"""
Apply a bandpass digital filter (Butterworth) to `trace`, given `sampling_frequency`, with the
passband from `low` to `high`.
@@ -62,7 +74,9 @@ def filter_bandpass(trace: Trace, sampling_frequency: int, low: int, high: int)
@public
-def filter_bandstop(trace: Trace, sampling_frequency: int, low: int, high: int) -> Trace:
+def filter_bandstop(
+ trace: Trace, sampling_frequency: int, low: int, high: int
+) -> Trace:
"""
Apply a bandstop digital filter (Butterworth) to `trace`, given `sampling_frequency`, with the
stopband from `low` to `high`.
diff --git a/pyecsca/sca/trace/match.py b/pyecsca/sca/trace/match.py
index 3728d13..50c86bc 100644
--- a/pyecsca/sca/trace/match.py
+++ b/pyecsca/sca/trace/match.py
@@ -43,7 +43,9 @@ def match_pattern(trace: Trace, pattern: Trace, threshold: float = 0.8) -> List[
@public
-def match_part(trace: Trace, offset: int, length: int, threshold: float = 0.8) -> List[int]:
+def match_part(
+ trace: Trace, offset: int, length: int, threshold: float = 0.8
+) -> List[int]:
"""
Match a part of a `trace` starting at `offset` of `length` to the `trace`. Returns indices where the pattern matches
, e.g. those where correlation of the two traces has peaks larger than `threshold`. Uses the
diff --git a/pyecsca/sca/trace/plot.py b/pyecsca/sca/trace/plot.py
index b9843cc..6a8cec1 100644
--- a/pyecsca/sca/trace/plot.py
+++ b/pyecsca/sca/trace/plot.py
@@ -40,10 +40,12 @@ def plot_traces(*traces: Trace, **kwargs): # pragma: no cover
["orange", "darkorange"],
["plum", "deeppink"],
["peru", "chocolate"],
- ["cyan", "darkcyan"]
+ ["cyan", "darkcyan"],
]
dss = []
for i, trace in enumerate(traces):
- line = hv.Curve((range(len(trace)), trace.samples), kdims="x", vdims="y", **kwargs)
+ line = hv.Curve(
+ (range(len(trace)), trace.samples), kdims="x", vdims="y", **kwargs
+ )
dss.append(datashade(line, normalization="log", cmap=_cmaps[i % len(_cmaps)]))
return reduce(lambda x, y: x * y, dss)
diff --git a/pyecsca/sca/trace/process.py b/pyecsca/sca/trace/process.py
index 87b00ee..2499df4 100644
--- a/pyecsca/sca/trace/process.py
+++ b/pyecsca/sca/trace/process.py
@@ -61,8 +61,14 @@ def rolling_mean(trace: Trace, window: int) -> Trace:
:param window:
:return:
"""
- return trace.with_samples(cast(np.ndarray, np.mean(_rolling_window(trace.samples, window), -1).astype(
- dtype=trace.samples.dtype, copy=False)))
+ return trace.with_samples(
+ cast(
+ np.ndarray,
+ np.mean(_rolling_window(trace.samples, window), -1).astype(
+ dtype=trace.samples.dtype, copy=False
+ ),
+ )
+ )
@public
@@ -101,7 +107,9 @@ def normalize(trace: Trace) -> Trace:
:param trace:
:return:
"""
- return trace.with_samples((trace.samples - np.mean(trace.samples)) / np.std(trace.samples))
+ return trace.with_samples(
+ (trace.samples - np.mean(trace.samples)) / np.std(trace.samples)
+ )
@public
@@ -112,5 +120,7 @@ def normalize_wl(trace: Trace) -> Trace:
:param trace:
:return:
"""
- return trace.with_samples((trace.samples - np.mean(trace.samples)) / (
- np.std(trace.samples) * len(trace.samples)))
+ return trace.with_samples(
+ (trace.samples - np.mean(trace.samples))
+ / (np.std(trace.samples) * len(trace.samples))
+ )
diff --git a/pyecsca/sca/trace/sampling.py b/pyecsca/sca/trace/sampling.py
index b263adc..71dbdac 100644
--- a/pyecsca/sca/trace/sampling.py
+++ b/pyecsca/sca/trace/sampling.py
@@ -20,8 +20,15 @@ def downsample_average(trace: Trace, factor: int = 2) -> Trace:
:param factor:
:return:
"""
- resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor))
- result_samples = cast(np.ndarray, resized.reshape(-1, factor).mean(axis=1).astype(trace.samples.dtype, copy=False))
+ resized = np.resize(
+ trace.samples, len(trace.samples) - (len(trace.samples) % factor)
+ )
+ result_samples = cast(
+ np.ndarray,
+ resized.reshape(-1, factor)
+ .mean(axis=1)
+ .astype(trace.samples.dtype, copy=False),
+ )
return trace.with_samples(result_samples)
@@ -49,8 +56,13 @@ def downsample_max(trace: Trace, factor: int = 2) -> Trace:
:param factor:
:return:
"""
- resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor))
- result_samples = cast(np.ndarray, resized.reshape(-1, factor).max(axis=1).astype(trace.samples.dtype, copy=False))
+ resized = np.resize(
+ trace.samples, len(trace.samples) - (len(trace.samples) % factor)
+ )
+ result_samples = cast(
+ np.ndarray,
+ resized.reshape(-1, factor).max(axis=1).astype(trace.samples.dtype, copy=False),
+ )
return trace.with_samples(result_samples)
@@ -64,8 +76,13 @@ def downsample_min(trace: Trace, factor: int = 2) -> Trace:
:param factor:
:return:
"""
- resized = np.resize(trace.samples, len(trace.samples) - (len(trace.samples) % factor))
- result_samples = cast(np.ndarray, resized.reshape(-1, factor).min(axis=1).astype(trace.samples.dtype, copy=False))
+ resized = np.resize(
+ trace.samples, len(trace.samples) - (len(trace.samples) % factor)
+ )
+ result_samples = cast(
+ np.ndarray,
+ resized.reshape(-1, factor).min(axis=1).astype(trace.samples.dtype, copy=False),
+ )
return trace.with_samples(result_samples)
diff --git a/pyecsca/sca/trace/test.py b/pyecsca/sca/trace/test.py
index 8512a70..f90498a 100644
--- a/pyecsca/sca/trace/test.py
+++ b/pyecsca/sca/trace/test.py
@@ -12,8 +12,9 @@ from .combine import average_and_variance
from .edit import trim
-def _ttest_func(first_set: Sequence[Trace], second_set: Sequence[Trace],
- equal_var: bool) -> Optional[CombinedTrace]:
+def _ttest_func(
+ first_set: Sequence[Trace], second_set: Sequence[Trace], equal_var: bool
+) -> Optional[CombinedTrace]:
if not first_set or not second_set or len(first_set) == 0 or len(second_set) == 0:
return None
first_stack = np.stack([first.samples for first in first_set])
@@ -23,7 +24,12 @@ def _ttest_func(first_set: Sequence[Trace], second_set: Sequence[Trace],
@public
-def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bool = False, p_value: bool = False) -> Optional[Tuple[CombinedTrace, ...]]:
+def welch_ttest(
+ first_set: Sequence[Trace],
+ second_set: Sequence[Trace],
+ dof: bool = False,
+ p_value: bool = False,
+) -> Optional[Tuple[CombinedTrace, ...]]:
"""
Perform the Welch's t-test sample wise on two sets of traces `first_set` and `second_set`.
Useful for Test Vector Leakage Analysis (TVLA).
@@ -51,8 +57,8 @@ def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bo
tval = (mean_0.samples - mean_1.samples) / np.sqrt(varn_0 + varn_1)
result = [CombinedTrace(tval)]
if dof or p_value:
- top = (varn_0 + varn_1)**2
- bot = (varn_0**2 / (n0 - 1)) + (varn_1**2 / (n1 - 1))
+ top = (varn_0 + varn_1) ** 2
+ bot = (varn_0 ** 2 / (n0 - 1)) + (varn_1 ** 2 / (n1 - 1))
df = top / bot
del top
del bot
@@ -66,8 +72,9 @@ def welch_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace], dof: bo
@public
-def student_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional[
- CombinedTrace]:
+def student_ttest(
+ first_set: Sequence[Trace], second_set: Sequence[Trace]
+) -> Optional[CombinedTrace]:
"""
Perform the Students's t-test sample wise on two sets of traces `first_set` and `second_set`.
Useful for Test Vector Leakage Analysis (TVLA).
@@ -80,7 +87,9 @@ def student_ttest(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Op
@public
-def ks_test(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional[CombinedTrace]:
+def ks_test(
+ first_set: Sequence[Trace], second_set: Sequence[Trace]
+) -> Optional[CombinedTrace]:
"""
Perform the Kolmogorov-Smirnov two sample test on equality of distributions sample wise on
two sets of traces `first_set` and `second_set`.
diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py
index aef0837..96f1ec8 100644
--- a/pyecsca/sca/trace/trace.py
+++ b/pyecsca/sca/trace/trace.py
@@ -13,10 +13,13 @@ from public import public
@public
class Trace(object):
"""A trace, which has some samples and metadata."""
+
meta: Mapping[str, Any]
samples: ndarray
- def __init__(self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None):
+ def __init__(
+ self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None
+ ):
"""
Construct a new trace.
@@ -92,8 +95,11 @@ class Trace(object):
return Trace(copy(self.samples), copy(self.meta), copy(self.trace_set))
def __deepcopy__(self, memodict={}):
- return Trace(deepcopy(self.samples, memo=memodict), deepcopy(self.meta, memo=memodict),
- deepcopy(self.trace_set, memo=memodict))
+ return Trace(
+ deepcopy(self.samples, memo=memodict),
+ deepcopy(self.meta, memo=memodict),
+ deepcopy(self.trace_set, memo=memodict),
+ )
def __repr__(self):
return f"Trace(samples={self.samples!r}, trace_set={self.trace_set!r})"
@@ -103,8 +109,13 @@ class Trace(object):
class CombinedTrace(Trace):
"""A trace that was combined from other traces, `parents`."""
- def __init__(self, samples: ndarray, meta: Mapping[str, Any] = None, trace_set: Any = None,
- parents: Sequence[Trace] = None):
+ def __init__(
+ self,
+ samples: ndarray,
+ meta: Mapping[str, Any] = None,
+ trace_set: Any = None,
+ parents: Sequence[Trace] = None,
+ ):
super().__init__(samples, meta, trace_set=trace_set)
self.parents = None
if parents is not None:
diff --git a/pyecsca/sca/trace_set/base.py b/pyecsca/sca/trace_set/base.py
index 6f4e4b0..097d666 100644
--- a/pyecsca/sca/trace_set/base.py
+++ b/pyecsca/sca/trace_set/base.py
@@ -12,6 +12,7 @@ from ..trace import Trace
@public
class TraceSet(object):
"""A set of traces with some metadata."""
+
_traces: List[Trace]
_keys: List
@@ -46,6 +47,7 @@ class TraceSet(object):
raise NotImplementedError
def __repr__(self):
- args = ", ".join(["{}={!r}".format(key, getattr(self, key)) for key in
- self._keys])
+ args = ", ".join(
+ ["{}={!r}".format(key, getattr(self, key)) for key in self._keys]
+ )
return "TraceSet({})".format(args)
diff --git a/pyecsca/sca/trace_set/chipwhisperer.py b/pyecsca/sca/trace_set/chipwhisperer.py
index ba78a85..a56e676 100644
--- a/pyecsca/sca/trace_set/chipwhisperer.py
+++ b/pyecsca/sca/trace_set/chipwhisperer.py
@@ -24,7 +24,9 @@ class ChipWhispererTraceSet(TraceSet):
raise ValueError
@classmethod
- def inplace(cls, input: Union[str, Path, bytes, BinaryIO]) -> "ChipWhispererTraceSet":
+ def inplace(
+ cls, input: Union[str, Path, bytes, BinaryIO]
+ ) -> "ChipWhispererTraceSet":
raise NotImplementedError
def write(self, output: Union[str, Path, BinaryIO]):
@@ -39,10 +41,12 @@ class ChipWhispererTraceSet(TraceSet):
name = file_name[7:-4]
data = ChipWhispererTraceSet.__read_data(path, name)
traces = []
- for samples, key, textin, textout in zip_longest(data["traces"], data["keylist"],
- data["textin"], data["textout"]):
+ for samples, key, textin, textout in zip_longest(
+ data["traces"], data["keylist"], data["textin"], data["textout"]
+ ):
traces.append(
- Trace(samples, {"key": key, "textin": textin, "textout": textout}))
+ Trace(samples, {"key": key, "textin": textin, "textout": textout})
+ )
del data["traces"]
del data["keylist"]
del data["textin"]
@@ -52,7 +56,13 @@ class ChipWhispererTraceSet(TraceSet):
@classmethod
def __read_data(cls, path, name):
- types = {"keylist": None, "knownkey": None, "textin": None, "textout": None, "traces": None}
+ types = {
+ "keylist": None,
+ "knownkey": None,
+ "textin": None,
+ "textout": None,
+ "traces": None,
+ }
for type in types.keys():
type_path = join(path, name + type + ".npy")
if exists(type_path) and isfile(type_path):
diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py
index 4b3b7b0..f1d34ef 100644
--- a/pyecsca/sca/trace_set/hdf5.py
+++ b/pyecsca/sca/trace_set/hdf5.py
@@ -22,6 +22,7 @@ from .. import Trace
@public
class HDF5Meta(MutableMapping):
"""Metadata mapping that is HDF5-compatible (items are picklable)."""
+
_dataset: h5py.AttributeManager
def __init__(self, attrs: h5py.AttributeManager):
@@ -55,12 +56,18 @@ class HDF5Meta(MutableMapping):
@public
class HDF5TraceSet(TraceSet):
"""A traceset based on the HDF5 (Hierarchical Data Format)."""
+
_file: Optional[h5py.File]
_ordering: List[str]
# _meta: Optional[HDF5Meta]
- def __init__(self, *traces: Trace, _file: Optional[h5py.File] = None,
- _ordering: Optional[List[str]] = None, **kwargs):
+ def __init__(
+ self,
+ *traces: Trace,
+ _file: Optional[h5py.File] = None,
+ _ordering: Optional[List[str]] = None,
+ **kwargs,
+ ):
# self._meta = HDF5Meta(_file.attrs) if _file is not None else None
self._file = _file
if _ordering is None:
@@ -76,7 +83,9 @@ class HDF5TraceSet(TraceSet):
else:
raise TypeError
kwargs = dict(hdf5.attrs)
- kwargs["_ordering"] = list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys())
+ kwargs["_ordering"] = (
+ list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys())
+ )
traces = []
for k in kwargs["_ordering"]:
meta = dict(HDF5Meta(hdf5[k].attrs))
@@ -94,7 +103,9 @@ class HDF5TraceSet(TraceSet):
else:
raise TypeError
kwargs = dict(hdf5.attrs)
- kwargs["_ordering"] = list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys())
+ kwargs["_ordering"] = (
+ list(kwargs["_ordering"]) if "_ordering" in kwargs else list(hdf5.keys())
+ )
traces = []
for k in kwargs["_ordering"]:
meta = HDF5Meta(hdf5[k].attrs)
@@ -182,5 +193,11 @@ class HDF5TraceSet(TraceSet):
fname = self._file.filename
else:
status = "(closed)"
- args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys if not key.startswith("_")])
+ args = ", ".join(
+ [
+ f"{key}={getattr(self, key)!r}"
+ for key in self._keys
+ if not key.startswith("_")
+ ]
+ )
return f"HDF5TraceSet('{fname}'{status}, {args})"
diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py
index cc3f81f..83bbade 100644
--- a/pyecsca/sca/trace_set/inspector.py
+++ b/pyecsca/sca/trace_set/inspector.py
@@ -25,7 +25,7 @@ class SampleCoding(IntEnum):
def dtype(self):
char = "f" if self.value & 0x10 else "i"
- return np.dtype("<{}{}".format(char, self.value & 0x0f))
+ return np.dtype("<{}{}".format(char, self.value & 0x0F))
@public
@@ -107,37 +107,44 @@ class InspectorTraceSet(TraceSet):
_tag_parsers: dict = {
0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int),
0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int),
- 0x43: ("sample_coding", 1,
- lambda bytes: SampleCoding(Parsers.read_int(bytes)),
- lambda coding, length: Parsers.write_int(coding.value,
- length=length)),
+ 0x43: (
+ "sample_coding",
+ 1,
+ lambda bytes: SampleCoding(Parsers.read_int(bytes)),
+ lambda coding, length: Parsers.write_int(coding.value, length=length),
+ ),
0x44: ("data_space", 2, Parsers.read_int, Parsers.write_int),
0x45: ("title_space", 1, Parsers.read_int, Parsers.write_int),
0x46: ("global_title", None, Parsers.read_str, Parsers.write_str),
0x47: ("description", None, Parsers.read_str, Parsers.write_str),
0x48: ("x_offset", None, Parsers.read_int, Parsers.write_int),
0x49: ("x_label", None, Parsers.read_str, Parsers.write_str),
- 0x4a: ("y_label", None, Parsers.read_str, Parsers.write_str),
- 0x4b: ("x_scale", 4, Parsers.read_float, Parsers.write_float),
- 0x4c: ("y_scale", 4, Parsers.read_float, Parsers.write_float),
- 0x4d: ("trace_offset", 4, Parsers.read_int, Parsers.write_int),
- 0x4e: ("log_scale", 1, Parsers.read_int, Parsers.write_int),
+ 0x4A: ("y_label", None, Parsers.read_str, Parsers.write_str),
+ 0x4B: ("x_scale", 4, Parsers.read_float, Parsers.write_float),
+ 0x4C: ("y_scale", 4, Parsers.read_float, Parsers.write_float),
+ 0x4D: ("trace_offset", 4, Parsers.read_int, Parsers.write_int),
+ 0x4E: ("log_scale", 1, Parsers.read_int, Parsers.write_int),
0x55: ("scope_range", 4, Parsers.read_float, Parsers.write_float),
0x56: ("scope_coupling", 4, Parsers.read_int, Parsers.write_int),
0x57: ("scope_offset", 4, Parsers.read_float, Parsers.write_float),
0x58: ("scope_impedance", 4, Parsers.read_float, Parsers.write_float),
0x59: ("scope_id", None, Parsers.read_str, Parsers.write_str),
- 0x5a: ("filter_type", 4, Parsers.read_int, Parsers.write_int),
- 0x5b: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float),
- 0x5c: ("filter_range", 4, Parsers.read_float, Parsers.read_float),
+ 0x5A: ("filter_type", 4, Parsers.read_int, Parsers.write_int),
+ 0x5B: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float),
+ 0x5C: ("filter_range", 4, Parsers.read_float, Parsers.read_float),
0x60: ("external_clock", 1, Parsers.read_bool, Parsers.write_bool),
0x61: ("external_clock_threshold", 4, Parsers.read_float, Parsers.write_float),
0x62: ("external_clock_multiplier", 4, Parsers.read_int, Parsers.write_int),
0x63: ("external_clock_phase_shift", 4, Parsers.read_int, Parsers.write_int),
0x64: ("external_clock_resampler_mask", 4, Parsers.read_int, Parsers.write_int),
- 0x65: ("external_clock_resampler_enabled", 1, Parsers.read_bool, Parsers.write_bool),
+ 0x65: (
+ "external_clock_resampler_enabled",
+ 1,
+ Parsers.read_bool,
+ Parsers.write_bool,
+ ),
0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float),
- 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int)
+ 0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int),
}
@classmethod
@@ -171,28 +178,33 @@ class InspectorTraceSet(TraceSet):
tag = ord(file.read(1))
length = ord(file.read(1))
if length & 0x80:
- length = Parsers.read_int(file.read(length & 0x7f))
+ length = Parsers.read_int(file.read(length & 0x7F))
value = file.read(length)
if tag in InspectorTraceSet._tag_parsers:
tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag]
if tag_len is None or length == tag_len:
tags[tag_name] = tag_reader(value)
- elif tag == 0x5f and length == 0:
+ elif tag == 0x5F and length == 0:
break
else:
continue
result = []
for _ in range(tags["num_traces"]):
- title = None if "title_space" not in tags else Parsers.read_str(
- file.read(tags["title_space"]))
+ title = (
+ None
+ if "title_space" not in tags
+ else Parsers.read_str(file.read(tags["title_space"]))
+ )
data = None if "data_space" not in tags else file.read(tags["data_space"])
dtype = tags["sample_coding"].dtype()
try:
samples = np.fromfile(file, dtype, tags["num_samples"])
except UnsupportedOperation:
samples = np.frombuffer(
- file.read(dtype.itemsize * tags["num_samples"]), dtype,
- tags["num_samples"])
+ file.read(dtype.itemsize * tags["num_samples"]),
+ dtype,
+ tags["num_samples"],
+ )
result.append(Trace(samples, {"title": title, "data": data}))
return result, tags
@@ -222,12 +234,13 @@ class InspectorTraceSet(TraceSet):
tag_byte = Parsers.write_int(tag, length=1)
value_bytes = tag_writer(getattr(self, tag_name), tag_len)
length = len(value_bytes)
- if length <= 0x7f:
+ if length <= 0x7F:
length_bytes = Parsers.write_int(length, length=1)
else:
- length_data = Parsers.write_int(length, length=(length.bit_length() + 7) // 8)
- length_bytes = Parsers.write_int(
- 0x80 | len(length_data)) + length_data
+ length_data = Parsers.write_int(
+ length, length=(length.bit_length() + 7) // 8
+ )
+ length_bytes = Parsers.write_int(0x80 | len(length_data)) + length_data
file.write(tag_byte)
file.write(length_bytes)
file.write(value_bytes)
@@ -238,7 +251,9 @@ class InspectorTraceSet(TraceSet):
file.write(Parsers.write_str(trace.meta["title"]))
if self.data_space != 0 and trace.meta["data"] is not None:
file.write(trace.meta["data"])
- unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding)
+ unscaled = InspectorTraceSet.__unscale(
+ trace.samples, self.y_scale, self.sample_coding
+ )
try:
unscaled.tofile(file)
except UnsupportedOperation: