1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
"""Just some utilities I promise."""
import sys
from ast import parse
from contextlib import contextmanager
from typing import List, Any, Generator
from pyecsca.misc.cfg import getconfig, TemporaryConfig
from concurrent.futures import TimeoutError
from loky import ProcessPoolExecutor, as_completed, Future
def pexec(s):
"""Parse with exec."""
return parse(s, mode="exec")
def peval(s):
"""Parse with eval."""
return parse(s, mode="eval")
def in_notebook() -> bool:
"""Test whether we are executing in Jupyter notebook."""
try:
from IPython import get_ipython
if "IPKernelApp" not in get_ipython().config: # pragma: no cover
return False
except ImportError:
return False
except AttributeError:
return False
return True
def log(*args, **kwargs):
"""Log a message."""
if in_notebook() and getconfig().log.enabled:
print(*args, **kwargs)
def warn(*args, **kwargs):
"""Log a message."""
if in_notebook() and getconfig().log.enabled:
print(*args, **kwargs, file=sys.stderr)
@contextmanager
def silent():
"""Temporarily disable output."""
with TemporaryConfig() as cfg:
cfg.log.enabled = False
yield
class TaskExecutor(ProcessPoolExecutor):
"""A simple ProcessPoolExecutor that keeps tracks of tasks that were submitted to it."""
keys: List[Any]
"""A list of keys that identify the futures."""
futures: List[Future]
"""A list of futures submitted to the executor."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.keys = []
self.futures = []
def submit_task(self, key: Any, fn, /, *args, **kwargs):
"""Submit a task (function `fn`), identified by `key` and with `args` and `kwargs`."""
future = self.submit(fn, *args, **kwargs)
self.futures.append(future)
self.keys.append(key)
return future
@property
def tasks(self):
"""A list of tasks that were submitted to this executor."""
return list(zip(self.keys, self.futures))
def as_completed(self, wait: bool = True) -> Generator[tuple[Any, Future], Any, None]:
"""
Like `concurrent.futures.as_completed`, but yields a pair of key and future.
If `wait` is True, it will block until all futures are done.
If `wait` is False, it will return immediately with futures that are already done.
"""
try:
for future in as_completed(self.futures, timeout=None if wait else 0):
i = self.futures.index(future)
yield self.keys[i], future
del self.keys[i]
del self.futures[i]
except TimeoutError:
pass
|