aboutsummaryrefslogtreecommitdiff
path: root/pyecsca/sca/trace_set/hdf5.py
blob: b8a1ad1eab0c3880ee9f1760ce0063221daca265 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from copy import copy
from io import RawIOBase, BufferedIOBase, IOBase
from pathlib import Path
from typing import Union, Optional
import numpy as np
import h5py
from public import public

from .base import TraceSet
from .. import Trace


@public
class HDF5TraceSet(TraceSet):
    _file: Optional[h5py.File]

    @classmethod
    def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
        if isinstance(input, (str, Path)):
            hdf5 = h5py.File(str(input), mode="r")
        elif isinstance(input, IOBase):
            hdf5 = h5py.File(input, mode="r")
        else:
            raise ValueError
        kwargs = dict(hdf5.attrs)
        traces = []
        for k, v in hdf5.items():
            meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
            samples = hdf5[k]
            traces.append(Trace(np.array(samples, dtype=samples.dtype), None, None, meta))
        hdf5.close()
        return HDF5TraceSet(*traces, **kwargs)

    @classmethod
    def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
        if isinstance(input, (str, Path)):
            hdf5 = h5py.File(str(input), mode="a")
        elif isinstance(input, IOBase):
            hdf5 = h5py.File(input, mode="a")
        else:
            raise ValueError
        kwargs = dict(hdf5.attrs)
        traces = []
        for k, v in hdf5.items():
            meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
            samples = hdf5[k]
            traces.append(Trace(samples, k, None, meta))
        return HDF5TraceSet(*traces, **kwargs, _file=hdf5)

    def __setitem__(self, key, value):
        if not isinstance(value, Trace):
            raise TypeError
        if self._file is not None:
            if str(key) in self._file:
                del self._file[str(key)]
            self._file[str(key)] = value.samples
            value.samples = self._file[str(key)]
            if value.meta:
                for k, v in value.meta.items():
                    self._file[str(key)].attrs[k] = v
        super().__setitem__(key, value)

    def append(self, value: Trace):
        if self._file is not None:
            key = sorted(list(map(int, self._file.keys())))[-1] + 1 if self._file.keys() else 0
            self._file[str(key)] = value.samples
            value.samples = self._file[str(key)]
            if value.meta:
                for k, v in value.meta.items():
                    self._file[str(key)].attrs[k] = v
        self._traces.append(value)

    def save(self):
        if self._file is not None:
            self._file.flush()

    def close(self):
        if self._file is not None:
            self._file.close()

    def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
        if isinstance(output, (str, Path)):
            hdf5 = h5py.File(str(output), "w")
        elif isinstance(output, IOBase):
            hdf5 = h5py.File(output, "w")
        else:
            raise ValueError
        for k in self._keys:
            hdf5[k] = getattr(self, k)
        for i, trace in enumerate(self._traces):
            dset = hdf5.create_dataset(str(i), trace.samples)
            if trace.meta:
                for k, v in trace.meta.items():
                    dset.attrs[k] = v
        hdf5.close()