1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
from time import time_ns, sleep
from typing import Mapping, Union
from public import public
from .serial import SerialTarget
@public
class SimpleSerialMessage(object):
char: str
data: str
def __init__(self, char: str, data: str):
self.char = char
self.data = data
@staticmethod
def from_raw(raw: Union[str, bytes]) -> "SimpleSerialMessage":
if isinstance(raw, bytes):
raw = raw.decode()
return SimpleSerialMessage(raw[0], raw[1:])
def __bytes__(self):
return str(self).encode()
def __str__(self):
return self.char + self.data
def __repr__(self):
return str(self)
@public
class SimpleSerialTarget(SerialTarget):
def recv_msgs(self, timeout: int) -> Mapping[str, SimpleSerialMessage]:
start = time_ns() // 1000000
buffer = bytes()
while not buffer.endswith(b"z00\n"):
wait = timeout - ((time_ns() // 1000000) - start)
if wait <= 0:
break
buffer += self.read(1 if not buffer else 0, wait)
if not buffer:
return {}
msgs = buffer.split(b"\n")
if buffer.endswith(b"\n"):
msgs.pop()
result = {}
for raw in msgs:
msg = SimpleSerialMessage.from_raw(raw)
result[msg.char] = msg
return result
def send_cmd(self, cmd: SimpleSerialMessage, timeout: int) -> Mapping[str, SimpleSerialMessage]:
"""
:param cmd:
:param timeout:
:return:
"""
data = bytes(cmd)
for i in range(0, len(data), 64):
chunk = data[i:i+64]
sleep(0.010)
self.write(chunk)
self.write(b"\n")
return self.recv_msgs(timeout)
|