{ "cells": [ { "cell_type": "markdown", "id": "805d746e-610b-4d40-80d2-a8080a993f96", "metadata": {}, "source": [ "## Simulating EPA-RE using points of low-order\n", "\n", "As visible in the [`formulas`](formulas.ipynb) notebook, most addition formulas have exceptional cases.\n", "We can use trigger these exceptions by supplying points of low order to the scalar multiplier, which\n", "can result in the point at infinity appearing in an unexpected place in the scalar multiplication computation.\n", "The behavior of an implementation with regards to these exceptional cases depends on what formulas it implements\n", "and what checks it performs on the inputs/outputs/intermediates of the formulas and the whole scalar multiplication.\n", "Furthermore, what multiples appear in the scalar multiplication depends on the scalar multiplier.\n", "\n", "This notebook explores the statistical differences in error rates of scalar multipliers computing on low-order base points\n", "to reverse-engineer the scalar multipliers and countermeasures, even in the presence of scalar randomization. This\n", "is possible because we do not care about the actual multiples that happen for any given scalar, but rather the statistical\n", "properties, i.e.: For random scalars, how probable is an error for a point of low order, for example, 5?\n", "\n", "Examine the figure below that shows the big picture.\n", "![](unravelling_simulate.svg)" ] }, { "cell_type": "code", "execution_count": null, "id": "b4386513-cc14-434b-a748-2863f8657452", "metadata": {}, "outputs": [], "source": [ "import gc\n", "import glob\n", "import hashlib\n", "import pickle\n", "import random\n", "import re\n", "import tempfile\n", "\n", "import warnings\n", "warnings.filterwarnings(\n", " \"ignore\",\n", " message=\"pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html.\",\n", " category=UserWarning\n", ")\n", "\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", "from collections import Counter\n", "from pathlib import Path\n", "from random import randint, randbytes, randrange\n", "from functools import partial\n", "from typing import Type, Any\n", "\n", "from tqdm.auto import tqdm, trange\n", "\n", "from pyecsca.ec.params import DomainParameters, get_params\n", "from pyecsca.ec.mult import *\n", "from pyecsca.ec.mod import mod\n", "from pyecsca.sca.re.rpa import multiple_graph\n", "from pyecsca.sca.re.epa import graph_to_check_inputs, evaluate_checks\n", "from pyecsca.misc.utils import TaskExecutor\n", "\n", "from epare import *" ] }, { "cell_type": "markdown", "id": "5b156d2a-7345-47f8-a76e-71a7d2be9d22", "metadata": {}, "source": [ "## Initialize\n", "Let's first initialize some sets of multipliers and countermeasures we will be reverse-engineering. They come from the\n", "[common.py](common.py) file, which you can examine for more information. We need to silence [some warnings](https://github.com/newaetech/chipwhisperer/pull/549), due to \n", "ChipWhisperer, which is used by pyecsca." ] }, { "cell_type": "code", "execution_count": null, "id": "3463a7bd-34d8-458b-8ceb-dddf99de21dc", "metadata": {}, "outputs": [], "source": [ "def silence():\n", " import warnings\n", " warnings.filterwarnings(\n", " \"ignore\",\n", " message=\"pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html.\",\n", " category=UserWarning\n", " )\n", "silence()" ] }, { "cell_type": "code", "execution_count": null, "id": "170c11fc-86cf-4eb1-bf4e-b2e44b2d7ac5", "metadata": {}, "outputs": [], "source": [ "nmults = len(all_mults)\n", "nmults_ctr = len(all_mults_with_ctr)\n", "nerror_models = len(all_error_models)\n", "ncfgs = nmults_ctr * nerror_models\n", "\n", "print(f\"Scalar multipliers considered: {nmults}\")\n", "print(f\"Scalar multipliers (with a combination of up-to two countermeasures) considered: {nmults_ctr}\")\n", "print(f\"Error models considered: {nerror_models}\")\n", "print(f\"Total configurations considered: {ncfgs}\")" ] }, { "cell_type": "markdown", "id": "8c5e9543-8447-4362-b9e2-c896d71f69a9", "metadata": {}, "source": [ "## Prepare\n", "Next, let's setup some parameters. The curve specified below is not used for any computation, except that its order is given\n", "to the multipliers. The `use_init` and `use_multiply` booleans specify whether the precomputation part and multiplication part, respectively, is used for the simulation. The `num_workers` option specifies the level of parallelization (via processes) that is employed. Make sure to set this to something reasonable. The `samples` option specifies the amount of samples {i.e. scalar multiplication per scalar multiplier) that get computed in one *chunk*, which corresponds to one run\n", "of the cell. We use chunks to make it easy to compute and aggregate more samples, without needing too much memory." ] }, { "cell_type": "code", "execution_count": null, "id": "4d5c7f10-618f-4612-b594-81d1607b0d1d", "metadata": {}, "outputs": [], "source": [ "category = \"secg\"\n", "curve = \"secp256r1\"\n", "params = get_params(category, curve, \"projective\")\n", "bits = params.order.bit_length()\n", "\n", "use_init = True\n", "use_multiply = True\n", "\n", "num_workers = 20\n", "samples = 1000\n", "\n", "selected_mults = all_mults" ] }, { "cell_type": "markdown", "id": "3aaf712e-5b97-4390-8dd4-e1db1dfe36a2", "metadata": {}, "source": [ "## Run\n", "Run this cell as many times as you want. It will simulate `samples` scalar multiplications for each `Config` (a scalar multiplier implementation with an optional countermeasure) and store them into the chunk." ] }, { "cell_type": "code", "execution_count": null, "id": "84359084-4116-436c-92cd-d43fdfeca842", "metadata": {}, "outputs": [], "source": [ "chunk_id = randbytes(4).hex()\n", "with TaskExecutor(max_workers=num_workers, initializer=silence) as pool, tempfile.TemporaryDirectory() as tmp_dirname:\n", " tmp_path = Path(tmp_dirname)\n", " for i, mult in enumerate(all_configs):\n", " pool.submit_task(mult,\n", " simulate_multiples_direct,\n", " mult, params, bits, tmp_path / f\"{i}.pickle\", samples, seed=chunk_id)\n", " with open(f\"multiples_{bits}_chunk{chunk_id}.pickle\",\"wb\") as h:\n", " for mult, future in tqdm(pool.as_completed(), desc=\"Computing multiple graphs.\", total=len(pool.tasks)):\n", " #print(f\"Got {mult}.\")\n", " if error := future.exception():\n", " print(\"Error!\", error)\n", " continue\n", " fpath = future.result()\n", " with fpath.open(\"rb\") as f:\n", " h.write(f.read())\n", " fpath.unlink()" ] }, { "cell_type": "markdown", "id": "44120f28-ae4a-42e3-befb-ebc487d51f9e", "metadata": {}, "source": [ "## Process\n", "The multiple chunks generated in the previous cell are not fully processed yet. They only contain a trace of the scalar multiplication computation as a graph of formula applications on symbolic multiples of the input point. In the next step,\n", "we iterate over all possible error models and evaluate these graphs using the error models. Thereby, we go from e.g. 1000\n", "scalar multiplication traces + error model + base point order `q` to a number from 0 to 1000 specifying the number of errors in those multiplications given the error model and base point order `q`. The orders that are used are described in the\n", "[common.py](common.py) file.\n", "\n", "> The cell below computes the error probabilities for all chunks of multiples that are missing them. Hence you only need\n", "> to run it once." ] }, { "cell_type": "code", "execution_count": null, "id": "fbab8333-b8f1-4890-b38a-7bb34f5ffb02", "metadata": {}, "outputs": [], "source": [ "with TaskExecutor(max_workers=num_workers, initializer=silence) as pool:\n", " for in_fname in tqdm(glob.glob(f\"multiples_{bits}_chunk*.pickle\"), desc=\"Processing chunks\", smoothing=0):\n", " \n", " match = re.match(\"multiples_(?P[0-9]+)_chunk(?P[0-9a-f]+).pickle\", in_fname)\n", " chunk_id = match.group(\"id\")\n", " out_fname = f\"probs_{bits}_{'i' if use_init else 'ni'}_{'m' if use_multiply else 'nm'}_chunk{chunk_id}.pickle\"\n", "\n", " in_file = Path(in_fname)\n", " out_file = Path(out_fname)\n", "\n", " cfgs_todo = set()\n", " for mult in all_configs:\n", " for error_model in all_error_models:\n", " cfgs_todo.add(mult.with_error_model(error_model))\n", "\n", " if out_file.exists():\n", " print(f\"Processing chunk {chunk_id}, some(or all) probmaps found.\")\n", " with out_file.open(\"r+b\") as f:\n", " while True:\n", " try:\n", " full, _ = pickle.load(f)\n", " cfgs_todo.remove(full)\n", " last_end = f.tell()\n", " except EOFError:\n", " break\n", " except pickle.UnpicklingError:\n", " f.truncate(last_end)\n", " if not cfgs_todo:\n", " print(f\"Chunk complete. Continuing...\")\n", " continue\n", " else:\n", " print(f\"Chunk missing {len(cfgs_todo)} probmaps, computing...\")\n", " else:\n", " print(f\"Processing chunk {chunk_id}, no probmaps found.\")\n", " \n", " with in_file.open(\"rb\") as f, out_file.open(\"ab\") as h:\n", " loading_bar = tqdm(total=nmults_ctr, desc=f\"Loading chunk {chunk_id}.\", smoothing=0)\n", " processing_bar = tqdm(total=len(cfgs_todo), desc=f\"Processing {chunk_id}.\", smoothing=0)\n", " while True:\n", " try:\n", " start = f.tell()\n", " mult, vals = pickle.load(f)\n", " loading_bar.update(1)\n", " for error_model in all_error_models:\n", " full = mult.with_error_model(error_model)\n", " if full in cfgs_todo:\n", " # Pass the file name and offset to speed up computation start.\n", " pool.submit_task(full,\n", " evaluate_multiples_direct,\n", " full, in_fname, start, divisor_map[\"all\"], use_init, use_multiply)\n", " gc.collect()\n", " for full, future in pool.as_completed(wait=False):\n", " processing_bar.update(1)\n", " if error := future.exception():\n", " print(\"Error!\", full, error)\n", " continue\n", " res = future.result()\n", " pickle.dump((full, res), h)\n", " except EOFError:\n", " break\n", " except pickle.UnpicklingError:\n", " print(\"Bad unpickling, the multiples file is likely truncated.\")\n", " break\n", " for full, future in pool.as_completed():\n", " processing_bar.update(1)\n", " if error := future.exception():\n", " print(\"Error!\", full, error)\n", " continue\n", " res = future.result()\n", " pickle.dump((full, res), h)\n", " print(\"Chunk completed.\")\n" ] }, { "cell_type": "markdown", "id": "a1bb9459-aa2b-4a07-970e-6e981ab3f97e", "metadata": {}, "source": [ "## Merge\n", "\n", "Finally, we have multiple chunks of error probabilities (i.e. `ProbMaps`) that we merge into a single file `merged.pickle`\n", "that can be used by the [visualize](visualize.ipynb) and [distinguish](distinguish.ipynb) notebooks." ] }, { "cell_type": "code", "execution_count": null, "id": "0f8daeda-f289-42c0-9157-19c0f688c6aa", "metadata": {}, "outputs": [], "source": [ "probmaps = {}\n", "for in_fname in tqdm(glob.glob(f\"probs_{bits}_{'i' if use_init else 'ni'}_{'m' if use_multiply else 'nm'}_chunk*.pickle\"), desc=\"Processing chunks\", smoothing=0):\n", " \n", " match = re.match(\"probs_(?P[0-9]+)_(?P(?:n)?i)_(?P(?:n)?m)_chunk(?P[0-9a-f]+).pickle\", in_fname)\n", " chunk_id = match.group(\"id\")\n", " \n", " with open(in_fname, \"rb\") as f:\n", " loading_bar = tqdm(total=ncfgs, desc=f\"Loading chunk {chunk_id}.\", smoothing=0)\n", " while True:\n", " try:\n", " mult, probmap = pickle.load(f)\n", " if mult in probmaps:\n", " current = probmaps[mult]\n", " current.merge(probmap)\n", " else:\n", " probmaps[mult] = probmap\n", " loading_bar.update(1)\n", " except EOFError:\n", " break\n", " except pickle.UnpicklingError:\n", " print(\"Bad unpickling, the probs file is likely truncated.\")\n", " break\n", " loading_bar.close()\n", "\n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "6bc15256-8228-4498-9bbf-73e76eea3c70", "metadata": {}, "outputs": [], "source": [ "with open(\"merged.pickle\", \"wb\") as f:\n", " pickle.dump(probmaps, f)" ] }, { "cell_type": "markdown", "id": "228922dc-67bf-481a-9f08-4084695e2059", "metadata": {}, "source": [ "## Misc\n", "\n", "The cell below performs some profiling of the multiple evaluation." ] }, { "cell_type": "code", "execution_count": null, "id": "8cdaad25-80d2-4574-9cfb-9d93e55e90d6", "metadata": {}, "outputs": [], "source": [ "from pyinstrument import Profiler as PyProfiler\n", "mult = next(iter(multiples_mults))\n", "res = multiples_mults[mult]\n", "\n", "for checks in powerset(checks_add):\n", " for precomp_to_affine in (True, False):\n", " for check_condition in (\"all\", \"necessary\"):\n", " error_model = ErrorModel(checks, check_condition=check_condition, precomp_to_affine=precomp_to_affine)\n", " full = mult.with_error_model(error_model)\n", " print(full)\n", " with PyProfiler() as prof:\n", " probmap = evaluate_multiples(full, res, divisor_map[\"all\"])\n", " print(prof.output_text(unicode=True, color=True))\n", " break\n", " break\n", " break" ] }, { "cell_type": "code", "execution_count": null, "id": "a07cf1d0-14d8-4c9b-a86c-6a7437d4e9dc", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }