diff options
| author | J08nY | 2020-02-26 17:14:22 +0100 |
|---|---|---|
| committer | J08nY | 2020-02-26 17:14:22 +0100 |
| commit | 19c3f2ef05e152486dcc1f47359fb4456f6f49c8 (patch) | |
| tree | e1522655afdc39c30574bf0306c277747edbf704 | |
| parent | 18a4f5b2837513aebf6a5b2d86911a004b9e7650 (diff) | |
| download | pyecsca-19c3f2ef05e152486dcc1f47359fb4456f6f49c8.tar.gz pyecsca-19c3f2ef05e152486dcc1f47359fb4456f6f49c8.tar.zst pyecsca-19c3f2ef05e152486dcc1f47359fb4456f6f49c8.zip | |
| -rw-r--r-- | pyecsca/sca/target/__init__.py | 3 | ||||
| -rw-r--r-- | pyecsca/sca/target/binary.py | 50 | ||||
| -rw-r--r-- | pyecsca/sca/target/chipwhisperer.py | 45 | ||||
| -rw-r--r-- | pyecsca/sca/target/flash.py | 11 | ||||
| -rw-r--r-- | pyecsca/sca/target/serial.py | 11 | ||||
| -rw-r--r-- | pyecsca/sca/target/simpleserial.py | 71 |
6 files changed, 171 insertions, 20 deletions
diff --git a/pyecsca/sca/target/__init__.py b/pyecsca/sca/target/__init__.py index 86317df..8ea8822 100644 --- a/pyecsca/sca/target/__init__.py +++ b/pyecsca/sca/target/__init__.py @@ -3,6 +3,9 @@ from .ISO7816 import * from .base import * from .serial import * +from .simpleserial import * +from .binary import * +from .flash import * has_chipwhisperer = False has_pyscard = False diff --git a/pyecsca/sca/target/binary.py b/pyecsca/sca/target/binary.py new file mode 100644 index 0000000..62d1102 --- /dev/null +++ b/pyecsca/sca/target/binary.py @@ -0,0 +1,50 @@ +import subprocess +from subprocess import Popen +from typing import Optional + +from public import public + +from .serial import SerialTarget + + +@public +class BinaryTarget(SerialTarget): + binary: str + process: Optional[Popen] + debug_output: bool + + def __init__(self, binary: str, debug_output: bool = False, **kwargs): + super().__init__(**kwargs) + self.binary = binary + self.debug_output = debug_output + + def connect(self): + self.process = Popen([self.binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, + text=True, bufsize=1) + + def write(self, data: bytes): + if self.process is None: + raise ValueError + if self.debug_output: + print(">>", data.decode()) + self.process.stdin.write(data.decode()) + self.process.stdin.flush() + + def read(self, num: Optional[int] = 0, timeout: Optional[int] = 0) -> bytes: + if self.process is None: + raise ValueError + if num != 0: + read = self.process.stdout.readline(num) + else: + read = self.process.stdout.readline() + if self.debug_output: + print("<<", read, end="") + return read.encode() + + def disconnect(self): + if self.process is None: + return + self.process.stdin.close() + self.process.stdout.close() + self.process.terminate() + self.process.wait() diff --git a/pyecsca/sca/target/chipwhisperer.py b/pyecsca/sca/target/chipwhisperer.py index 1b03713..0528618 100644 --- a/pyecsca/sca/target/chipwhisperer.py +++ b/pyecsca/sca/target/chipwhisperer.py @@ -1,33 +1,44 @@ -from typing import Union +from binascii import unhexlify +from typing import Optional +from time import sleep -from chipwhisperer.capture.scopes import OpenADC -from chipwhisperer.capture.targets.simpleserial_readers.cw import SimpleSerial_ChipWhisperer -from chipwhisperer.capture.targets.simpleserial_readers.cwlite import SimpleSerial_ChipWhispererLite -from chipwhisperer.capture.targets.simpleserial_readers.sys_serial import SimpleSerial_serial +import chipwhisperer as cw +from chipwhisperer.capture.scopes.base import ScopeTemplate +from chipwhisperer.capture.targets import SimpleSerial from public import public -from .serial import SerialTarget +from .flash import Flashable +from .simpleserial import SimpleSerialTarget @public -class SimpleSerialTarget(SerialTarget): # pragma: no cover +class ChipWhispererTarget(Flashable, SimpleSerialTarget): # pragma: no cover - def __init__(self, ser: Union[ - SimpleSerial_ChipWhisperer, SimpleSerial_ChipWhispererLite, SimpleSerial_serial], - scope: OpenADC): + def __init__(self, target: SimpleSerial, scope: ScopeTemplate, programmer, **kwargs): super().__init__() - self.ser = ser + self.target = target self.scope = scope + self.programmer = programmer def connect(self): - self.ser.con(self.scope) + self.target.con(self.scope) + self.target.baud = 115200 + sleep(0.5) + + def flash(self, fw_path: str) -> bool: + try: + cw.program_target(self.scope, self.programmer, fw_path) + except Exception as e: + print(e) + return False + return True def write(self, data: bytes): - self.ser.write(data) - self.ser.flush() + self.target.flush() + self.target.write(data.decode()) - def read(self, timeout: int) -> bytes: - return self.ser.read(0, timeout) + def read(self, num: Optional[int] = 0, timeout: Optional[int] = 0) -> bytes: + return self.target.read(num, timeout).encode() def disconnect(self): - self.ser.dis() + self.target.dis() diff --git a/pyecsca/sca/target/flash.py b/pyecsca/sca/target/flash.py new file mode 100644 index 0000000..be3cfb0 --- /dev/null +++ b/pyecsca/sca/target/flash.py @@ -0,0 +1,11 @@ +from public import public +from abc import ABC, abstractmethod + + +@public +class Flashable(ABC): + """A flashable target.""" + + @abstractmethod + def flash(self, fw_path: str) -> bool: + ...
\ No newline at end of file diff --git a/pyecsca/sca/target/serial.py b/pyecsca/sca/target/serial.py index ab61d9e..3c0b5c4 100644 --- a/pyecsca/sca/target/serial.py +++ b/pyecsca/sca/target/serial.py @@ -1,3 +1,6 @@ +from abc import abstractmethod +from typing import Optional + from public import public from .base import Target @@ -6,8 +9,10 @@ from .base import Target @public class SerialTarget(Target): + @abstractmethod def write(self, data: bytes): - raise NotImplementedError + ... - def read(self, timeout: int) -> bytes: - raise NotImplementedError + @abstractmethod + def read(self, num: Optional[int] = 0, timeout: Optional[int] = 0) -> bytes: + ... diff --git a/pyecsca/sca/target/simpleserial.py b/pyecsca/sca/target/simpleserial.py new file mode 100644 index 0000000..3d4b7e0 --- /dev/null +++ b/pyecsca/sca/target/simpleserial.py @@ -0,0 +1,71 @@ +from time import time_ns, sleep +from typing import Mapping, Union + +from public import public + +from .serial import SerialTarget + + +@public +class SimpleSerialMessage(object): + char: str + data: str + + def __init__(self, char: str, data: str): + self.char = char + self.data = data + + @staticmethod + def from_raw(raw: Union[str, bytes]) -> "SimpleSerialMessage": + if isinstance(raw, bytes): + raw = raw.decode() + return SimpleSerialMessage(raw[0], raw[1:]) + + def __bytes__(self): + return str(self).encode() + + def __str__(self): + return self.char + self.data + + def __repr__(self): + return str(self) + + +@public +class SimpleSerialTarget(SerialTarget): + + def recv_msgs(self, timeout: int) -> Mapping[str, SimpleSerialMessage]: + start = time_ns() // 1000000 + buffer = bytes() + while not buffer.endswith(b"z00\n"): + wait = timeout - ((time_ns() // 1000000) - start) + if wait <= 0: + break + buffer += self.read(1 if not buffer else 0, wait) + if not buffer: + return {} + msgs = buffer.split(b"\n") + if buffer.endswith(b"\n"): + msgs.pop() + + result = {} + for raw in msgs: + msg = SimpleSerialMessage.from_raw(raw) + result[msg.char] = msg + return result + + def send_cmd(self, cmd: SimpleSerialMessage, timeout: int) -> Mapping[str, SimpleSerialMessage]: + """ + + :param cmd: + :param timeout: + :return: + """ + data = bytes(cmd) + for i in range(0, len(data), 64): + chunk = data[i:i+64] + sleep(0.010) + self.write(chunk) + self.write(b"\n") + return self.recv_msgs(timeout) + |
