aboutsummaryrefslogtreecommitdiff
path: root/pyecsca/sca/target/simpleserial.py
blob: 3d4b7e0c8ec8c16147b3b4c36472029da045e178 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from time import time_ns, sleep
from typing import Mapping, Union

from public import public

from .serial import SerialTarget


@public
class SimpleSerialMessage(object):
    char: str
    data: str

    def __init__(self, char: str, data: str):
        self.char = char
        self.data = data

    @staticmethod
    def from_raw(raw: Union[str, bytes]) -> "SimpleSerialMessage":
        if isinstance(raw, bytes):
            raw = raw.decode()
        return SimpleSerialMessage(raw[0], raw[1:])

    def __bytes__(self):
        return str(self).encode()

    def __str__(self):
        return self.char + self.data

    def __repr__(self):
        return str(self)


@public
class SimpleSerialTarget(SerialTarget):

    def recv_msgs(self, timeout: int) -> Mapping[str, SimpleSerialMessage]:
        start = time_ns() // 1000000
        buffer = bytes()
        while not buffer.endswith(b"z00\n"):
            wait = timeout - ((time_ns() // 1000000) - start)
            if wait <= 0:
                break
            buffer += self.read(1 if not buffer else 0, wait)
        if not buffer:
            return {}
        msgs = buffer.split(b"\n")
        if buffer.endswith(b"\n"):
            msgs.pop()

        result = {}
        for raw in msgs:
            msg = SimpleSerialMessage.from_raw(raw)
            result[msg.char] = msg
        return result

    def send_cmd(self, cmd: SimpleSerialMessage, timeout: int) -> Mapping[str, SimpleSerialMessage]:
        """

        :param cmd:
        :param timeout:
        :return:
        """
        data = bytes(cmd)
        for i in range(0, len(data), 64):
            chunk = data[i:i+64]
            sleep(0.010)
            self.write(chunk)
        self.write(b"\n")
        return self.recv_msgs(timeout)