diff options
| author | J08nY | 2021-12-23 19:16:36 +0100 |
|---|---|---|
| committer | J08nY | 2021-12-23 19:16:36 +0100 |
| commit | cba77ff4688026d8e6ef8274ff14e857ecee956f (patch) | |
| tree | a602360df35d63e05b00c5f308a44556332e4ce8 | |
| parent | cf60066621b92ec9580de10e0c0835a8a0aa4a05 (diff) | |
| download | sec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.tar.gz sec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.tar.zst sec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.zip | |
Refactor IUT and MIP stuff into modern interface.
| -rw-r--r-- | sec_certs/dataset/fips_iut.py | 64 | ||||
| -rw-r--r-- | sec_certs/dataset/fips_mip.py | 87 | ||||
| -rw-r--r-- | sec_certs/helpers.py | 2 | ||||
| -rw-r--r-- | sec_certs/sample/fips_iut.py | 89 | ||||
| -rw-r--r-- | sec_certs/sample/fips_mip.py | 135 |
5 files changed, 233 insertions, 144 deletions
diff --git a/sec_certs/dataset/fips_iut.py b/sec_certs/dataset/fips_iut.py index 183c8848..51bf7573 100644 --- a/sec_certs/dataset/fips_iut.py +++ b/sec_certs/dataset/fips_iut.py @@ -1,16 +1,11 @@ -import json from dataclasses import dataclass -from datetime import datetime from pathlib import Path -from typing import List, Union, Mapping +from typing import List, Mapping, Union -from bs4 import BeautifulSoup, Tag from tqdm import tqdm -from sec_certs.helpers import to_utc -from sec_certs.dataset.dataset import logger -from sec_certs.sample.fips_iut import IUTEntry, IUTSnapshot -from sec_certs.serialization.json import ComplexSerializableType, CustomJSONEncoder, CustomJSONDecoder +from sec_certs.sample.fips_iut import IUTSnapshot +from sec_certs.serialization.json import ComplexSerializableType @dataclass @@ -27,60 +22,15 @@ class IUTDataset(ComplexSerializableType): return len(self.snapshots) @classmethod - def from_dump(cls, dump_path: Union[str, Path]) -> "IUTDataset": + def from_dumps(cls, dump_path: Union[str, Path]) -> "IUTDataset": directory = Path(dump_path) - snapshots = [] fnames = list(directory.glob("*")) - for fname in tqdm(sorted(fnames), total=len(fnames)): - snapshot_date = to_utc( - datetime.fromisoformat(fname.name[len("fips_iut_") : -len(".html")]) - ) - with open(fname) as f: - soup = BeautifulSoup(f, "html.parser") - tables = soup.find_all("table") - if len(tables) != 1: - logger.error(f"*** Not only a single table in {fname}.") - continue - last_updated_elem = next( - filter( - lambda e: isinstance(e, Tag) and e.name == "p", - soup.find(id="content").next_siblings, - ) - ) - last_updated_text = str(last_updated_elem.string).strip() - last_updated = datetime.strptime( - last_updated_text, "Last Updated: %m/%d/%Y" - ).date() - table = tables[0].find("tbody") - lines = table.find_all("tr") - entries = { - IUTEntry( - str(line[0].string), - str(line[1].string), - str(line[2].string), - datetime.strptime(str(line[3].string), "%m/%d/%Y").date(), - ) - for line in map(lambda tr: tr.find_all("td"), lines) - } - snapshots.append(IUTSnapshot(entries, snapshot_date, last_updated)) + snapshots = [IUTSnapshot.from_dump(dump_path) for dump_path in tqdm(sorted(fnames), total=len(fnames))] return cls(snapshots) def to_dict(self): - return { - "snapshots": list(self.snapshots) - } + return {"snapshots": list(self.snapshots)} @classmethod def from_dict(cls, dct: Mapping) -> "IUTDataset": - return cls( - dct["snapshots"] - ) - - def to_json(self, json_path: Union[str, Path]): - with open(json_path, 'w') as handle: - json.dump(self, handle, indent=4, cls=CustomJSONEncoder) - - @classmethod - def from_json(cls, json_path: Union[str, Path]) -> "IUTDataset": - with Path(json_path).open('r') as handle: - return json.load(handle, cls=CustomJSONDecoder) + return cls(dct["snapshots"]) diff --git a/sec_certs/dataset/fips_mip.py b/sec_certs/dataset/fips_mip.py index e0852bc2..88fc837d 100644 --- a/sec_certs/dataset/fips_mip.py +++ b/sec_certs/dataset/fips_mip.py @@ -1,16 +1,11 @@ -import json from dataclasses import dataclass -from datetime import datetime from pathlib import Path -from typing import List, Union, Mapping +from typing import List, Mapping, Union -from bs4 import BeautifulSoup, Tag from tqdm import tqdm -from sec_certs.dataset.dataset import logger -from sec_certs.helpers import to_utc -from sec_certs.sample.fips_mip import MIPEntry, MIPSnapshot, MIPStatus -from sec_certs.serialization.json import ComplexSerializableType, CustomJSONEncoder, CustomJSONDecoder +from sec_certs.sample.fips_mip import MIPSnapshot +from sec_certs.serialization.json import ComplexSerializableType @dataclass @@ -27,83 +22,15 @@ class MIPDataset(ComplexSerializableType): return len(self.snapshots) @classmethod - def from_dump(cls, dump_path: Union[str, Path]) -> "MIPDataset": + def from_dumps(cls, dump_path: Union[str, Path]) -> "MIPDataset": directory = Path(dump_path) - snapshots = [] fnames = list(directory.glob("*")) - for fname in tqdm(sorted(fnames), total=len(fnames)): - snapshot_date = to_utc( - datetime.fromisoformat(fname.name[len("fips_mip_") : -len(".html")]) - ) - with open(fname) as f: - soup = BeautifulSoup(f, "html.parser") - tables = soup.find_all("table") - if len(tables) != 1: - logger.error(f"*** Not only a single table in {fname}.") - continue - last_updated_elem = next( - filter( - lambda e: isinstance(e, Tag) and e.name == "p", - soup.find(id="content").next_siblings, - ) - ) - last_updated_text = str(last_updated_elem.string).strip() - last_updated = datetime.strptime( - last_updated_text, "Last Updated: %m/%d/%Y" - ).date() - table = tables[0].find("tbody") - lines = table.find_all("tr") - if snapshot_date <= datetime(2020, 10, 28): - # NIST had a different format of the MIP table before this date, handle it. - entries = set() - for tr in lines: - tds = tr.find_all("td") - status = None - if "mip-highlight" in tds[-1]["class"]: - status = MIPStatus.FINALIZATION - elif "mip-highlight" in tds[-2]["class"]: - status = MIPStatus.COORDINATION - elif "mip-highlight" in tds[-3]["class"]: - status = MIPStatus.REVIEW_PENDING - elif "mip-highlight" in tds[-4]["class"]: - status = MIPStatus.IN_REVIEW - entries.add( - MIPEntry( - str(tds[0].string), - str(tds[1].string), - str(tds[2].string), - status, - ) - ) - else: - entries = { - MIPEntry( - str(line[0].string), - str(line[1].string), - str(line[2].string), - MIPStatus(str(line[3].string)), - ) - for line in map(lambda tr: tr.find_all("td"), lines) - } - snapshots.append(MIPSnapshot(entries, snapshot_date, last_updated)) + snapshots = [MIPSnapshot.from_dump(dump_path) for dump_path in tqdm(sorted(fnames), total=len(fnames))] return cls(snapshots) def to_dict(self): - return { - "snapshots": list(self.snapshots) - } + return {"snapshots": list(self.snapshots)} @classmethod def from_dict(cls, dct: Mapping) -> "MIPDataset": - return cls( - dct["snapshots"] - ) - - def to_json(self, json_path: Union[str, Path]): - with open(json_path, 'w') as handle: - json.dump(self, handle, indent=4, cls=CustomJSONEncoder) - - @classmethod - def from_json(cls, json_path: Union[str, Path]): - with Path(json_path).open('r') as handle: - return json.load(handle, cls=CustomJSONDecoder)
\ No newline at end of file + return cls(dct["snapshots"]) diff --git a/sec_certs/helpers.py b/sec_certs/helpers.py index 8c28b729..231cd552 100644 --- a/sec_certs/helpers.py +++ b/sec_certs/helpers.py @@ -225,6 +225,8 @@ def extract_pdf_metadata(filepath: Path): def to_utc(dt): + if dt.utcoffset() is None: + return dt dt -= dt.utcoffset() dt = dt.replace(tzinfo=None) return dt diff --git a/sec_certs/sample/fips_iut.py b/sec_certs/sample/fips_iut.py index 69bfa02c..8dfacd43 100644 --- a/sec_certs/sample/fips_iut.py +++ b/sec_certs/sample/fips_iut.py @@ -1,7 +1,12 @@ from dataclasses import dataclass from datetime import date, datetime -from typing import Mapping, Set +from pathlib import Path +from typing import Iterator, Mapping, Optional, Set, Union +import requests +from bs4 import BeautifulSoup, Tag + +from sec_certs.helpers import to_utc from sec_certs.serialization.json import ComplexSerializableType @@ -30,12 +35,24 @@ class IUTSnapshot(ComplexSerializableType): entries: Set[IUTEntry] timestamp: datetime last_updated: date + displayed: int + not_displayed: int + total: int + + def __len__(self): + return len(self.entries) + + def __iter__(self) -> Iterator[IUTEntry]: + yield from self.entries def to_dict(self): return { "entries": list(self.entries), "timestamp": self.timestamp.isoformat(), "last_updated": self.last_updated.isoformat(), + "displayed": self.displayed, + "not_displayed": self.not_displayed, + "total": self.total, } @classmethod @@ -44,4 +61,72 @@ class IUTSnapshot(ComplexSerializableType): set(dct["entries"]), datetime.fromisoformat(dct["timestamp"]), date.fromisoformat(dct["last_updated"]), - )
\ No newline at end of file + dct["displayed"], + dct["not_displayed"], + dct["total"], + ) + + @classmethod + def from_page(cls, content: bytes, snapshot_date: datetime) -> "IUTSnapshot": + soup = BeautifulSoup(content, "html.parser") + tables = soup.find_all("table") + if len(tables) != 1: + raise ValueError("Not only a single table in IUT.") + + last_updated_elem = next( + filter( + lambda e: isinstance(e, Tag) and e.name == "p", + soup.find(id="content").next_siblings, + ) + ) + last_updated_text = str(last_updated_elem.string).strip() + last_updated = datetime.strptime(last_updated_text, "Last Updated: %m/%d/%Y").date() + table = tables[0].find("tbody") + lines = table.find_all("tr") + entries = { + IUTEntry( + str(line[0].string), + str(line[1].string), + str(line[2].string), + datetime.strptime(str(line[3].string), "%m/%d/%Y").date(), + ) + for line in map(lambda tr: tr.find_all("td"), lines) + } + + # Parse footer + footer = soup.find(id="IUTFooter") + footer_lines = footer.find_all("tr") + displayed = int(footer_lines[0].find_all("td")[1].text) + not_displayed = int(footer_lines[1].find_all("td")[1].text) + total = int(footer_lines[2].find_all("td")[1].text) + + return cls( + entries=entries, + timestamp=snapshot_date, + last_updated=last_updated, + displayed=displayed, + not_displayed=not_displayed, + total=total, + ) + + @classmethod + def from_dump(cls, dump_path: Union[str, Path], snapshot_date: Optional[datetime] = None) -> "IUTSnapshot": + dump_path = Path(dump_path) + if snapshot_date is None: + try: + snapshot_date = to_utc(datetime.fromisoformat(dump_path.name[len("fips_iut_") : -len(".html")])) + except Exception: + raise ValueError("snapshot_date not given and could not be inferred from filename.") + with dump_path.open("rb") as f: + content = f.read() + return cls.from_page(content, snapshot_date) + + @classmethod + def from_web(cls) -> "IUTSnapshot": + iut_url = "https://csrc.nist.gov/Projects/cryptographic-module-validation-program/modules-in-process/IUT-List" + iut_resp = requests.get(iut_url) + if iut_resp.status_code != 200: + raise ValueError("Getting MIP snapshot failed") + + snapshot_date = to_utc(datetime.now()) + return cls.from_page(iut_resp.content, snapshot_date) diff --git a/sec_certs/sample/fips_mip.py b/sec_certs/sample/fips_mip.py index a1441ddd..7e5c2bd4 100644 --- a/sec_certs/sample/fips_mip.py +++ b/sec_certs/sample/fips_mip.py @@ -1,10 +1,18 @@ +import logging from dataclasses import dataclass -from datetime import datetime, date +from datetime import date, datetime from enum import Enum -from typing import Mapping, Set +from pathlib import Path +from typing import Iterator, Mapping, Optional, Set, Union +import requests +from bs4 import BeautifulSoup, Tag + +from sec_certs.helpers import to_utc from sec_certs.serialization.json import ComplexSerializableType +logger = logging.getLogger(__name__) + class MIPStatus(Enum): IN_REVIEW = "In Review" @@ -18,10 +26,10 @@ class MIPEntry(ComplexSerializableType): module_name: str vendor_name: str standard: str - status: MIPStatus + status: Optional[MIPStatus] def to_dict(self): - return {**self.__dict__, "status": self.status.value} + return {**self.__dict__, "status": self.status.value if self.status else None} @classmethod def from_dict(cls, dct: Mapping) -> "MIPEntry": @@ -29,7 +37,7 @@ class MIPEntry(ComplexSerializableType): dct["module_name"], dct["vendor_name"], dct["standard"], - MIPStatus(dct["status"]), + MIPStatus(dct["status"]) if dct["status"] else None, ) @@ -38,12 +46,24 @@ class MIPSnapshot(ComplexSerializableType): entries: Set[MIPEntry] timestamp: datetime last_updated: date + displayed: int + not_displayed: int + total: int + + def __len__(self): + return len(self.entries) + + def __iter__(self) -> Iterator[MIPEntry]: + yield from self.entries def to_dict(self): return { "entries": list(self.entries), "timestamp": self.timestamp.isoformat(), "last_updated": self.last_updated.isoformat(), + "displayed": self.displayed, + "not_displayed": self.not_displayed, + "total": self.total, } @classmethod @@ -52,4 +72,109 @@ class MIPSnapshot(ComplexSerializableType): set(dct["entries"]), datetime.fromisoformat(dct["timestamp"]), date.fromisoformat(dct["last_updated"]), + dct["displayed"], + dct["not_displayed"], + dct["total"], + ) + + @classmethod + def from_page(cls, content: bytes, snapshot_date: datetime) -> "MIPSnapshot": + soup = BeautifulSoup(content, "html.parser") + tables = soup.find_all("table") + if len(tables) != 1: + raise ValueError("Not only a single table in MIP data.") + + # Parse Last Updated + last_updated_elem = next( + filter( + lambda e: isinstance(e, Tag) and e.name == "p", + soup.find(id="content").next_siblings, + ) + ) + last_updated_text = str(last_updated_elem.string).strip() + last_updated = datetime.strptime(last_updated_text, "Last Updated: %m/%d/%Y").date() + + # Parse entries + table = tables[0].find("tbody") + lines = table.find_all("tr") + if snapshot_date <= datetime(2020, 10, 28): + # NIST had a different format of the MIP table before this date, handle it. + entries = set() + for tr in lines: + tds = tr.find_all("td") + status = None + if "mip-highlight" in tds[-1]["class"]: + status = MIPStatus.FINALIZATION + elif "mip-highlight" in tds[-2]["class"]: + status = MIPStatus.COORDINATION + elif "mip-highlight" in tds[-3]["class"]: + status = MIPStatus.REVIEW_PENDING + elif "mip-highlight" in tds[-4]["class"]: + status = MIPStatus.IN_REVIEW + entries.add( + MIPEntry( + str(tds[0].string), + str(tds[1].string), + str(tds[2].string), + status, + ) + ) + elif snapshot_date <= datetime(2021, 4, 20): + # Yet another format change + entries = { + MIPEntry( + str(line[0].string), + str(line[1].string), + str(line[2].string), + MIPStatus(str(line[3].string)), + ) + for line in map(lambda tr: tr.find_all("td"), lines) + } + else: + entries = { + MIPEntry( + str(line[0].string), + str(" ".join(line[1].find_all(text=True, recursive=False)).strip()), + str(line[2].string), + MIPStatus(str(line[3].string)), + ) + for line in map(lambda tr: tr.find_all("td"), lines) + } + + # Parse footer + footer = soup.find(id="MIPFooter") + footer_lines = footer.find_all("tr") + displayed = int(footer_lines[0].find_all("td")[1].text) + not_displayed = int(footer_lines[1].find_all("td")[1].text) + total = int(footer_lines[2].find_all("td")[1].text) + + return cls( + entries=entries, + timestamp=snapshot_date, + last_updated=last_updated, + displayed=displayed, + not_displayed=not_displayed, + total=total, ) + + @classmethod + def from_dump(cls, dump_path: Union[str, Path], snapshot_date: Optional[datetime] = None) -> "MIPSnapshot": + dump_path = Path(dump_path) + if snapshot_date is None: + try: + snapshot_date = to_utc(datetime.fromisoformat(dump_path.name[len("fips_mip_") : -len(".html")])) + except Exception: + raise ValueError("snapshot_date not given and could not be inferred from filename.") + with dump_path.open("rb") as f: + content = f.read() + return cls.from_page(content, snapshot_date) + + @classmethod + def from_web(cls) -> "MIPSnapshot": + mip_url = "https://csrc.nist.gov/Projects/cryptographic-module-validation-program/modules-in-process/Modules-In-Process-List" + mip_resp = requests.get(mip_url) + if mip_resp.status_code != 200: + raise ValueError("Getting MIP snapshot failed") + + snapshot_date = to_utc(datetime.now()) + return cls.from_page(mip_resp.content, snapshot_date) |
