aboutsummaryrefslogtreecommitdiff
path: root/pyecsca/misc/utils.py
blob: ba7b75a99e911fc3f1d00659d4bb01d0ec47a6c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Just some utilities I promise."""

import sys
from ast import parse
from contextlib import contextmanager
from typing import List, Any, Generator

from pyecsca.misc.cfg import getconfig, TemporaryConfig

from concurrent.futures import TimeoutError
from loky import ProcessPoolExecutor, as_completed, Future


def pexec(s):
    """Parse with exec."""
    return parse(s, mode="exec")


def peval(s):
    """Parse with eval."""
    return parse(s, mode="eval")


def in_notebook() -> bool:
    """Test whether we are executing in Jupyter notebook."""
    try:
        from IPython import get_ipython

        if "IPKernelApp" not in get_ipython().config:  # pragma: no cover
            return False
    except ImportError:
        return False
    except AttributeError:
        return False
    return True


def log(*args, **kwargs):
    """Log a message."""
    if in_notebook() and getconfig().log.enabled:
        print(*args, **kwargs)


def warn(*args, **kwargs):
    """Log a message."""
    if in_notebook() and getconfig().log.enabled:
        print(*args, **kwargs, file=sys.stderr)


@contextmanager
def silent():
    """Temporarily disable output."""
    with TemporaryConfig() as cfg:
        cfg.log.enabled = False
        yield


class TaskExecutor(ProcessPoolExecutor):
    """A simple ProcessPoolExecutor that keeps tracks of tasks that were submitted to it."""

    keys: List[Any]
    """A list of keys that identify the futures."""
    futures: List[Future]
    """A list of futures submitted to the executor."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.keys = []
        self.futures = []

    def submit_task(self, key: Any, fn, /, *args, **kwargs):
        """Submit a task (function `fn`), identified by `key` and with `args` and `kwargs`."""
        future = self.submit(fn, *args, **kwargs)
        self.futures.append(future)
        self.keys.append(key)
        return future

    @property
    def tasks(self):
        """A list of tasks that were submitted to this executor."""
        return list(zip(self.keys, self.futures))

    def as_completed(self, wait: bool = True) -> Generator[tuple[Any, Future], Any, None]:
        """
        Like `concurrent.futures.as_completed`, but yields a pair of key and future.

        If `wait` is True, it will block until all futures are done.
        If `wait` is False, it will return immediately with futures that are already done.
        """
        try:
            for future in as_completed(self.futures, timeout=None if wait else 0):
                i = self.futures.index(future)
                yield self.keys[i], future
                del self.keys[i]
                del self.futures[i]
        except TimeoutError:
            pass