aboutsummaryrefslogtreecommitdiffhomepage
path: root/pyecsca/sca/trace_set/hdf5.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyecsca/sca/trace_set/hdf5.py')
-rw-r--r--pyecsca/sca/trace_set/hdf5.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py
new file mode 100644
index 0000000..0a24393
--- /dev/null
+++ b/pyecsca/sca/trace_set/hdf5.py
@@ -0,0 +1,84 @@
+from copy import copy
+from io import RawIOBase, BufferedIOBase, IOBase
+from pathlib import Path
+from typing import Union, Optional
+import numpy as np
+import h5py
+from public import public
+
+from .base import TraceSet
+from .. import Trace
+
+
+@public
+class HDF5TraceSet(TraceSet):
+ _file: Optional[h5py.File]
+
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
+ if isinstance(input, (str, Path)):
+ hdf5 = h5py.File(str(input), mode="r")
+ elif isinstance(input, IOBase):
+ hdf5 = h5py.File(input, mode="r")
+ else:
+ raise ValueError
+ kwargs = dict(hdf5.attrs)
+ traces = []
+ for k, v in hdf5.items():
+ meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
+ samples = hdf5[k]
+ traces.append(Trace(np.array(samples, dtype=samples.dtype), None, None, meta))
+ hdf5.close()
+ return HDF5TraceSet(*traces, **kwargs)
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
+ if isinstance(input, (str, Path)):
+ hdf5 = h5py.File(str(input), mode="a")
+ elif isinstance(input, IOBase):
+ hdf5 = h5py.File(input, mode="a")
+ else:
+ raise ValueError
+ kwargs = dict(hdf5.attrs)
+ traces = []
+ for k, v in hdf5.items():
+ meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
+ samples = hdf5[k]
+ traces.append(Trace(samples, k, None, meta))
+ return HDF5TraceSet(*traces, **kwargs, _file=hdf5)
+
+ def __setitem__(self, key, value):
+ if not isinstance(value, Trace):
+ raise TypeError
+ if self._file is not None:
+ if str(key) in self._file:
+ del self._file[str(key)]
+ self._file[str(key)] = value.samples
+ if value.meta:
+ for k, v in value.meta.items():
+ self._file[str(key)].attrs[k] = v
+ super().__setitem__(key, value)
+
+ def save(self):
+ if self._file is not None:
+ self._file.flush()
+
+ def close(self):
+ if self._file is not None:
+ self._file.close()
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ if isinstance(output, (str, Path)):
+ hdf5 = h5py.File(str(output), "w")
+ elif isinstance(output, IOBase):
+ hdf5 = h5py.File(output, "w")
+ else:
+ raise ValueError
+ for k in self._keys:
+ hdf5[k] = getattr(self, k)
+ for i, trace in enumerate(self._traces):
+ dset = hdf5.create_dataset(str(i), trace.samples)
+ if trace.meta:
+ for k, v in trace.meta.items():
+ dset.attrs[k] = v
+ hdf5.close()