1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import subprocess
from subprocess import Popen
from typing import Optional, Union, List
from public import public
from .serial import SerialTarget
@public
class BinaryTarget(SerialTarget):
binary: List[str]
process: Optional[Popen]
debug_output: bool
def __init__(self, binary: Union[str, List[str]], debug_output: bool = False, **kwargs):
super().__init__()
if not isinstance(binary, (str, list)):
raise TypeError
if isinstance(binary, str):
binary = [binary]
self.binary = binary
self.debug_output = debug_output
def connect(self):
self.process = Popen(self.binary, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
text=True, bufsize=1)
def write(self, data: bytes):
if self.process is None:
raise ValueError
if self.debug_output:
print(">>", data.decode())
self.process.stdin.write(data.decode())
self.process.stdin.flush()
def read(self, num: int = 0, timeout: int = 0) -> bytes:
if self.process is None:
raise ValueError
if num != 0:
read = self.process.stdout.readline(num)
else:
read = self.process.stdout.readline()
if self.debug_output:
print("<<", read, end="")
return read.encode()
def disconnect(self):
if self.process is None:
return
self.process.stdin.close()
self.process.stdout.close()
self.process.terminate()
self.process.wait()
|