From 28390ec1575e0af026be2bfea6fd0bca8f55c008 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 23 Apr 2019 19:05:27 +0200 Subject: Add proper context management. --- .travis.yml | 1 - Pipfile | 1 + README.md | 2 +- docs/index.rst | 3 +- pyecsca/ec/context.py | 174 ++++++++++++++++++++++++++++++++++++++++++++---- pyecsca/ec/formula.py | 9 ++- pyecsca/ec/mult.py | 66 +++++++++--------- pyecsca/ec/op.py | 9 ++- pyecsca/ec/signature.py | 3 +- setup.py | 2 +- test/ec/test_context.py | 23 +++++++ 11 files changed, 235 insertions(+), 58 deletions(-) create mode 100644 test/ec/test_context.py diff --git a/.travis.yml b/.travis.yml index d62e2d1..767db8e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: python dist: xenial python: - - "3.6" - "3.7" install: diff --git a/Pipfile b/Pipfile index 43d0a43..1cc3913 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,7 @@ green = "*" mypy = "*" sphinx = "*" sphinx-autodoc-typehints = "*" +coverage = "*" [packages] numpy = "*" diff --git a/README.md b/README.md index fe7a08d..42b07ae 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ and ECC simulation in the [*pyecsca.ec*](pyecsca/ec) package. - [matplotlib](https://matplotlib.org/) - [atpublic](https://public.readthedocs.io/) - [fastdtw](https://github.com/slaypni/fastdtw) - - asn1crypto + - [asn1crypto](https://github.com/wbond/asn1crypto) *pyecsca* contains data from the [Explicit-Formulas Database](https://www.hyperelliptic.org/EFD/index.html) by Daniel J. Bernstein and Tanja Lange. The data was partially changed, to make working with it easier. diff --git a/docs/index.rst b/docs/index.rst index 525dff6..13d70ac 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -36,7 +36,7 @@ Requirements - matplotlib_ - atpublic_ - fastdtw_ - - asn1crypto + - asn1crypto_ *pyecsca* contains data from the `Explicit-Formulas Database`_ by Daniel J. Bernstein and Tanja Lange. @@ -90,6 +90,7 @@ this support is very appreciated. .. _matplotlib: https://matplotlib.org/ .. _atpublic: https://public.readthedocs.io/ .. _fastdtw: https://github.com/slaypni/fastdtw +.. _asn1crypto: https://github.com/wbond/asn1crypto .. _nose2: https://nose2.readthedocs.io .. _green: https://github.com/CleanCut/green .. _mypy: http://mypy-lang.org/ diff --git a/pyecsca/ec/context.py b/pyecsca/ec/context.py index 0bb86d8..0c21942 100644 --- a/pyecsca/ec/context.py +++ b/pyecsca/ec/context.py @@ -1,22 +1,92 @@ -from typing import List, Tuple +import ast +from contextvars import ContextVar, Token +from copy import deepcopy +from typing import List, Tuple, Optional, Union, MutableMapping, Any, Mapping + +from public import public from .formula import Formula from .mod import Mod +from .op import CodeOp from .point import Point +@public +class OpResult(object): + parents: Tuple + op: ast.operator + name: str + value: Mod + + def __init__(self, name: str, value: Mod, op: ast.operator, *parents: Any): + self.parents = tuple(parents) + self.name = name + self.value = value + self.op = op + + def __str__(self): + return self.name + + def __repr__(self): + char = "" + if isinstance(self.op, ast.Mult): + char = "*" + elif isinstance(self.op, ast.Add): + char = "+" + elif isinstance(self.op, ast.Sub): + char = "-" + elif isinstance(self.op, ast.Div): + char = "/" + parents = char.join(str(parent) for parent in self.parents) + return f"{self.name} = {parents}" + + +@public +class Action(object): + formula: Formula + input_points: List[Point] + inputs: MutableMapping[str, Mod] + intermediates: MutableMapping[str, Union[Mod, OpResult]] + roots: MutableMapping[str, OpResult] + output_points: List[Point] + + def __init__(self, formula: Formula, *points: Point, **inputs: Mod): + self.formula = formula + self.input_points = list(points) + self.inputs = inputs + self.intermediates = {} + self.roots = {} + self.output_points = [] + + def add_operation(self, op: CodeOp, value: Mod): + parents = [] + for parent in {*op.variables, *op.parameters}: + if parent in self.intermediates: + parents.append(self.intermediates[parent]) + elif parent in self.inputs: + parents.append(self.inputs[parent]) + self.intermediates[op.result] = OpResult(op.result, value, op.operator, *parents) + + def add_result(self, point: Point, **outputs: Mod): + for k in outputs: + self.roots[k] = self.intermediates[k] + self.output_points.append(point) + + +@public class Context(object): - intermediates: List[Tuple[str, Mod]] - actions: List[Tuple[Formula, Tuple[Point, ...]]] + def _log_action(self, formula: Formula, *points: Point, **inputs: Mod): + raise NotImplementedError - def __init__(self): - self.intermediates = [] - self.actions = [] + def _log_operation(self, op: CodeOp, value: Mod): + raise NotImplementedError - def execute(self, formula: Formula, *points: Point, **params: Mod) -> Tuple[Point, ...]: + def _log_result(self, point: Point, **outputs: Mod): + raise NotImplementedError + + def _execute(self, formula: Formula, *points: Point, **params: Mod) -> Tuple[Point, ...]: if len(points) != formula.num_inputs: raise ValueError - self.actions.append((formula, tuple(points))) coords = {} for i, point in enumerate(points): if point.coordinate_model != formula.coordinate_model: @@ -24,15 +94,93 @@ class Context(object): for coord, value in point.coords.items(): coords[coord + str(i + 1)] = value locals = {**coords, **params} + self._log_action(formula, *points, **locals) for op in formula.code: op_result = op(**locals) - self.intermediates.append((op.result, op_result)) + self._log_operation(op, op_result) locals[op.result] = op_result result = [] for i in range(formula.num_outputs): ind = str(i + formula.output_index) - resulting = {variable: locals[variable + ind] - for variable in formula.coordinate_model.variables - if variable + ind in locals} - result.append(Point(formula.coordinate_model, **resulting)) + resulting = {} + full_resulting = {} + for variable in formula.coordinate_model.variables: + full_variable = variable + ind + if full_variable not in locals: + continue + resulting[variable] = locals[full_variable] + full_resulting[full_variable] = locals[full_variable] + point = Point(formula.coordinate_model, **resulting) + + self._log_result(point, **full_resulting) + result.append(point) return tuple(result) + + def execute(self, formula: Formula, *points: Point, **params: Mod) -> Tuple[Point, ...]: + return self._execute(formula, *points, **params) + + def __str__(self): + return self.__class__.__name__ + + +@public +class NullContext(Context): + + def _log_action(self, formula: Formula, *points: Point, **inputs: Mod): + pass + + def _log_operation(self, op: CodeOp, value: Mod): + pass + + def _log_result(self, point: Point, **outputs: Mod): + pass + + +@public +class DefaultContext(Context): + actions: List[Action] + + def __init__(self): + self.actions = [] + + def _log_action(self, formula: Formula, *points: Point, **inputs: Mod): + self.actions.append(Action(formula, *points, **inputs)) + + def _log_operation(self, op: CodeOp, value: Mod): + self.actions[-1].add_operation(op, value) + + def _log_result(self, point: Point, **outputs: Mod): + self.actions[-1].add_result(point, **outputs) + + +_actual_context: ContextVar[Context] = ContextVar("operational_context", default=NullContext()) + + +class ContextManager(object): + def __init__(self, new_context): + self.new_context = deepcopy(new_context) + + def __enter__(self) -> Context: + self.saved_context = getcontext() + setcontext(self.new_context) + return self.new_context + + def __exit__(self, t, v, tb): + setcontext(self.saved_context) + + +@public +def getcontext(): + return _actual_context.get() + + +@public +def setcontext(ctx: Context) -> Token: + return _actual_context.set(ctx) + + +@public +def local(ctx: Optional[Context] = None) -> ContextManager: + if ctx is None: + ctx = getcontext() + return ContextManager(ctx) diff --git a/pyecsca/ec/formula.py b/pyecsca/ec/formula.py index e6d8d9f..441953e 100644 --- a/pyecsca/ec/formula.py +++ b/pyecsca/ec/formula.py @@ -1,7 +1,8 @@ from ast import parse, Expression +from typing import List, Any, ClassVar, MutableMapping + from pkg_resources import resource_stream from public import public -from typing import List, Any, ClassVar, MutableMapping from .op import Op, CodeOp @@ -21,7 +22,7 @@ class Formula(object): @property def output_index(cls): - return max(cls.num_inputs + 1, 3) + raise NotImplementedError class EFDFormula(Formula): @@ -56,6 +57,10 @@ class EFDFormula(Formula): code_module = parse(line.decode("ascii").replace("^", "**"), path, mode="exec") self.code.append(CodeOp(code_module)) + @property + def output_index(cls): + return max(cls.num_inputs + 1, 3) + @public class AdditionFormula(Formula): diff --git a/pyecsca/ec/mult.py b/pyecsca/ec/mult.py index 2576d7a..ab6cf7b 100644 --- a/pyecsca/ec/mult.py +++ b/pyecsca/ec/mult.py @@ -1,31 +1,26 @@ from copy import copy -from public import public from typing import Mapping, Tuple, Optional, MutableMapping, Union -from pyecsca.ec.naf import naf, wnaf -from .context import Context -from .curve import EllipticCurve +from public import public + +from .context import getcontext +from .formula import (Formula, AdditionFormula, DoublingFormula, DifferentialAdditionFormula, + ScalingFormula, LadderFormula, NegationFormula) from .group import AbelianGroup -from .formula import (Formula, AdditionFormula, DoublingFormula, ScalingFormula, LadderFormula, - NegationFormula, DifferentialAdditionFormula) +from .naf import naf, wnaf from .point import Point class ScalarMultiplier(object): group: AbelianGroup formulas: Mapping[str, Formula] - context: Context _point: Point = None - def __init__(self, group: AbelianGroup, ctx: Context = None, **formulas: Optional[Formula]): + def __init__(self, group: AbelianGroup, **formulas: Optional[Formula]): for formula in formulas.values(): if formula is not None and formula.coordinate_model is not group.curve.coordinate_model: raise ValueError self.group = group - if ctx: - self.context = ctx - else: - self.context = Context() self.formulas = dict(filter(lambda pair: pair[1] is not None, formulas.items())) def _add(self, one: Point, other: Point) -> Point: @@ -35,24 +30,25 @@ class ScalarMultiplier(object): return copy(other) if other == self.group.neutral: return copy(one) - return self.context.execute(self.formulas["add"], one, other, **self.group.curve.parameters)[0] + return \ + getcontext().execute(self.formulas["add"], one, other, **self.group.curve.parameters)[0] def _dbl(self, point: Point) -> Point: if "dbl" not in self.formulas: raise NotImplementedError if point == self.group.neutral: return copy(point) - return self.context.execute(self.formulas["dbl"], point, **self.group.curve.parameters)[0] + return getcontext().execute(self.formulas["dbl"], point, **self.group.curve.parameters)[0] def _scl(self, point: Point) -> Point: if "scl" not in self.formulas: raise NotImplementedError - return self.context.execute(self.formulas["scl"], point, **self.group.curve.parameters)[0] + return getcontext().execute(self.formulas["scl"], point, **self.group.curve.parameters)[0] def _ladd(self, start: Point, to_dbl: Point, to_add: Point) -> Tuple[Point, ...]: if "ladd" not in self.formulas: raise NotImplementedError - return self.context.execute(self.formulas["ladd"], start, to_dbl, to_add, + return getcontext().execute(self.formulas["ladd"], start, to_dbl, to_add, **self.group.curve.parameters) def _dadd(self, start: Point, one: Point, other: Point) -> Point: @@ -62,13 +58,13 @@ class ScalarMultiplier(object): return copy(other) if other == self.group.neutral: return copy(one) - return self.context.execute(self.formulas["dadd"], start, one, other, + return getcontext().execute(self.formulas["dadd"], start, one, other, **self.group.curve.parameters)[0] def _neg(self, point: Point) -> Point: if "neg" not in self.formulas: raise NotImplementedError - return self.context.execute(self.formulas["neg"], point, **self.group.curve.parameters)[0] + return getcontext().execute(self.formulas["neg"], point, **self.group.curve.parameters)[0] def init(self, point: Point): self._point = point @@ -96,9 +92,8 @@ class LTRMultiplier(ScalarMultiplier): always: bool def __init__(self, group: AbelianGroup, add: AdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, - ctx: Context = None, always: bool = False): - super().__init__(group, ctx, add=add, dbl=dbl, scl=scl) + scl: ScalingFormula = None, always: bool = False): + super().__init__(group, add=add, dbl=dbl, scl=scl) self.always = always def multiply(self, scalar: int, point: Optional[Point] = None) -> Point: @@ -127,9 +122,8 @@ class RTLMultiplier(ScalarMultiplier): always: bool def __init__(self, group: AbelianGroup, add: AdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, - ctx: Context = None, always: bool = False): - super().__init__(group, ctx, add=add, dbl=dbl, scl=scl) + scl: ScalingFormula = None, always: bool = False): + super().__init__(group, add=add, dbl=dbl, scl=scl) self.always = always def multiply(self, scalar: int, point: Optional[Point] = None) -> Point: @@ -159,8 +153,8 @@ class CoronMultiplier(ScalarMultiplier): """ def __init__(self, group: AbelianGroup, add: AdditionFormula, dbl: DoublingFormula, - scl: ScalingFormula = None, ctx: Context = None): - super().__init__(group, ctx, add=add, dbl=dbl, scl=scl) + scl: ScalingFormula = None): + super().__init__(group, add=add, dbl=dbl, scl=scl) def multiply(self, scalar: int, point: Optional[Point] = None): if scalar == 0: @@ -183,9 +177,9 @@ class LadderMultiplier(ScalarMultiplier): Montgomery ladder multiplier, using a three input, two output ladder formula. """ - def __init__(self, group: AbelianGroup, ladd: LadderFormula, dbl: DoublingFormula, scl: ScalingFormula = None, - ctx: Context = None): - super().__init__(group, ctx, ladd=ladd, dbl=dbl, scl=scl) + def __init__(self, group: AbelianGroup, ladd: LadderFormula, dbl: DoublingFormula, + scl: ScalingFormula = None): + super().__init__(group, ladd=ladd, dbl=dbl, scl=scl) def multiply(self, scalar: int, point: Optional[Point] = None) -> Point: if scalar == 0: @@ -212,11 +206,11 @@ class SimpleLadderMultiplier(ScalarMultiplier): def __init__(self, group: AbelianGroup, add: Union[AdditionFormula, DifferentialAdditionFormula], dbl: DoublingFormula, - scl: ScalingFormula = None, ctx: Context = None): + scl: ScalingFormula = None): if isinstance(add, AdditionFormula): - super().__init__(group, ctx, add=add, dbl=dbl, scl=scl) + super().__init__(group, add=add, dbl=dbl, scl=scl) elif isinstance(add, DifferentialAdditionFormula): - super().__init__(group, ctx, dadd=add, dbl=dbl, scl=scl) + super().__init__(group, dadd=add, dbl=dbl, scl=scl) self._differential = True else: raise ValueError @@ -253,8 +247,8 @@ class BinaryNAFMultiplier(ScalarMultiplier): _point_neg: Point def __init__(self, group: AbelianGroup, add: AdditionFormula, dbl: DoublingFormula, - neg: NegationFormula, scl: ScalingFormula = None, ctx: Context = None): - super().__init__(group, ctx, add=add, dbl=dbl, neg=neg, scl=scl) + neg: NegationFormula, scl: ScalingFormula = None): + super().__init__(group, add=add, dbl=dbl, neg=neg, scl=scl) def init(self, point: Point): super().init(point) @@ -288,9 +282,9 @@ class WindowNAFMultiplier(ScalarMultiplier): _width: int def __init__(self, group: AbelianGroup, add: AdditionFormula, dbl: DoublingFormula, - neg: NegationFormula, width: int, scl: ScalingFormula = None, ctx: Context = None, + neg: NegationFormula, width: int, scl: ScalingFormula = None, precompute_negation: bool = False): - super().__init__(group, ctx, add=add, dbl=dbl, neg=neg, scl=scl) + super().__init__(group, add=add, dbl=dbl, neg=neg, scl=scl) self._width = width self._precompute_neg = precompute_negation diff --git a/pyecsca/ec/op.py b/pyecsca/ec/op.py index ea2254f..6a057fe 100644 --- a/pyecsca/ec/op.py +++ b/pyecsca/ec/op.py @@ -1,6 +1,6 @@ -from ast import Module, walk, Name +from ast import Module, walk, Name, BinOp, operator from types import CodeType -from typing import FrozenSet +from typing import FrozenSet, Optional from .mod import Mod @@ -16,6 +16,7 @@ class Op(object): class CodeOp(Op): code: Module + operator: Optional[operator] compiled: CodeType def __init__(self, code: Module): @@ -24,6 +25,7 @@ class CodeOp(Op): self.result = assign.targets[0].id params = set() variables = set() + op = None for node in walk(assign.value): if isinstance(node, Name): name = node.id @@ -31,6 +33,9 @@ class CodeOp(Op): variables.add(name) else: params.add(name) + elif isinstance(node, BinOp): + op = node.op + self.operator = op self.parameters = frozenset(params) self.variables = frozenset(variables) self.compiled = compile(self.code, "", mode="exec") diff --git a/pyecsca/ec/signature.py b/pyecsca/ec/signature.py index 147bf27..857d5a2 100644 --- a/pyecsca/ec/signature.py +++ b/pyecsca/ec/signature.py @@ -5,6 +5,7 @@ from typing import Optional, Any from asn1crypto.core import Sequence, SequenceOf, Integer from public import public +from .context import getcontext from .formula import AdditionFormula from .mod import Mod from .mult import ScalarMultiplier @@ -113,7 +114,7 @@ class Signature(object): u2 = Mod(signature.r, self.mult.group.order) * c p1 = self.mult.multiply(int(u1), self.mult.group.generator) p2 = self.mult.multiply(int(u2), self.pubkey) - p = self.mult.context.execute(self.add, p1, p2, **self.mult.group.curve.parameters)[0] + p = getcontext().execute(self.add, p1, p2, **self.mult.group.curve.parameters)[0] affine = p.to_affine() # TODO: add to context v = Mod(int(affine.x), self.mult.group.order) return signature.r == int(v) diff --git a/setup.py b/setup.py index 5c4f5c8..20268bb 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( "Intended Audience :: Science/Research" ], install_package_data=True, - python_requires='>=3', + python_requires='>=3.7', install_requires=[ "numpy", "scipy", diff --git a/test/ec/test_context.py b/test/ec/test_context.py new file mode 100644 index 0000000..51fefd7 --- /dev/null +++ b/test/ec/test_context.py @@ -0,0 +1,23 @@ +from unittest import TestCase + +from pyecsca.ec.context import local, DefaultContext +from pyecsca.ec.mult import LTRMultiplier +from test.ec.curves import get_secp128r1 + + +class ContextTests(TestCase): + + def setUp(self): + self.secp128r1 = get_secp128r1() + self.base = self.secp128r1.generator + self.coords = self.secp128r1.curve.coordinate_model + self.mult = LTRMultiplier(self.secp128r1, self.coords.formulas["add-1998-cmo"], + self.coords.formulas["dbl-1998-cmo"], self.coords.formulas["z"]) + + def test_null(self): + self.mult.multiply(59, self.base) + + def test_default(self): + with local(DefaultContext()) as ctx: + self.mult.multiply(59, self.base) + self.assertEqual(len(ctx.actions), 10) -- cgit v1.2.3-70-g09d2