aboutsummaryrefslogtreecommitdiff
path: root/pyecsca/sca/target/binary.py
blob: 3e2877b4c67f6b0f53b969d8178e2a5f4e0015be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""
This module provides a binary target class which represents a target that is a runnable binary on the host.
"""
import subprocess
from subprocess import Popen
from typing import Optional, Union, List

from public import public

from .serial import SerialTarget


@public
class BinaryTarget(SerialTarget):
    """A binary target that is runnable on the host and communicates using the stdin/stdout streams."""
    binary: List[str]
    process: Optional[Popen] = None
    debug_output: bool

    def __init__(self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs):
        super().__init__()
        if not isinstance(binary, (str, list)):
            raise TypeError
        if isinstance(binary, str):
            binary = [binary]
        self.binary = binary
        self.debug_output = debug_output

    def connect(self):
        self.process = Popen(self.binary, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                             text=True, bufsize=1)

    def write(self, data: bytes) -> None:
        if self.process is None:
            raise ValueError
        if self.debug_output:
            print(">>", data.decode())
        if self.process.stdin:
            self.process.stdin.write(data.decode())
            self.process.stdin.flush()

    def read(self, num: int = 0, timeout: int = 0) -> bytes:
        if self.process is None:
            raise ValueError
        if self.process.stdout:
            if num != 0:
                read = self.process.stdout.readline(num)
            else:
                read = self.process.stdout.readline()
        else:
            read = bytes()  # pragma: no cover
        if self.debug_output:
            print("<<", read, end="")
        return read.encode()

    def disconnect(self):
        if self.process is None:
            return
        self.process.stdin.close()
        self.process.stdout.close()
        self.process.terminate()
        self.process.wait()