aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJ08nY2021-12-23 19:16:36 +0100
committerJ08nY2021-12-23 19:16:36 +0100
commitcba77ff4688026d8e6ef8274ff14e857ecee956f (patch)
treea602360df35d63e05b00c5f308a44556332e4ce8
parentcf60066621b92ec9580de10e0c0835a8a0aa4a05 (diff)
downloadsec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.tar.gz
sec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.tar.zst
sec-certs-cba77ff4688026d8e6ef8274ff14e857ecee956f.zip
Refactor IUT and MIP stuff into modern interface.
-rw-r--r--sec_certs/dataset/fips_iut.py64
-rw-r--r--sec_certs/dataset/fips_mip.py87
-rw-r--r--sec_certs/helpers.py2
-rw-r--r--sec_certs/sample/fips_iut.py89
-rw-r--r--sec_certs/sample/fips_mip.py135
5 files changed, 233 insertions, 144 deletions
diff --git a/sec_certs/dataset/fips_iut.py b/sec_certs/dataset/fips_iut.py
index 183c8848..51bf7573 100644
--- a/sec_certs/dataset/fips_iut.py
+++ b/sec_certs/dataset/fips_iut.py
@@ -1,16 +1,11 @@
-import json
from dataclasses import dataclass
-from datetime import datetime
from pathlib import Path
-from typing import List, Union, Mapping
+from typing import List, Mapping, Union
-from bs4 import BeautifulSoup, Tag
from tqdm import tqdm
-from sec_certs.helpers import to_utc
-from sec_certs.dataset.dataset import logger
-from sec_certs.sample.fips_iut import IUTEntry, IUTSnapshot
-from sec_certs.serialization.json import ComplexSerializableType, CustomJSONEncoder, CustomJSONDecoder
+from sec_certs.sample.fips_iut import IUTSnapshot
+from sec_certs.serialization.json import ComplexSerializableType
@dataclass
@@ -27,60 +22,15 @@ class IUTDataset(ComplexSerializableType):
return len(self.snapshots)
@classmethod
- def from_dump(cls, dump_path: Union[str, Path]) -> "IUTDataset":
+ def from_dumps(cls, dump_path: Union[str, Path]) -> "IUTDataset":
directory = Path(dump_path)
- snapshots = []
fnames = list(directory.glob("*"))
- for fname in tqdm(sorted(fnames), total=len(fnames)):
- snapshot_date = to_utc(
- datetime.fromisoformat(fname.name[len("fips_iut_") : -len(".html")])
- )
- with open(fname) as f:
- soup = BeautifulSoup(f, "html.parser")
- tables = soup.find_all("table")
- if len(tables) != 1:
- logger.error(f"*** Not only a single table in {fname}.")
- continue
- last_updated_elem = next(
- filter(
- lambda e: isinstance(e, Tag) and e.name == "p",
- soup.find(id="content").next_siblings,
- )
- )
- last_updated_text = str(last_updated_elem.string).strip()
- last_updated = datetime.strptime(
- last_updated_text, "Last Updated: %m/%d/%Y"
- ).date()
- table = tables[0].find("tbody")
- lines = table.find_all("tr")
- entries = {
- IUTEntry(
- str(line[0].string),
- str(line[1].string),
- str(line[2].string),
- datetime.strptime(str(line[3].string), "%m/%d/%Y").date(),
- )
- for line in map(lambda tr: tr.find_all("td"), lines)
- }
- snapshots.append(IUTSnapshot(entries, snapshot_date, last_updated))
+ snapshots = [IUTSnapshot.from_dump(dump_path) for dump_path in tqdm(sorted(fnames), total=len(fnames))]
return cls(snapshots)
def to_dict(self):
- return {
- "snapshots": list(self.snapshots)
- }
+ return {"snapshots": list(self.snapshots)}
@classmethod
def from_dict(cls, dct: Mapping) -> "IUTDataset":
- return cls(
- dct["snapshots"]
- )
-
- def to_json(self, json_path: Union[str, Path]):
- with open(json_path, 'w') as handle:
- json.dump(self, handle, indent=4, cls=CustomJSONEncoder)
-
- @classmethod
- def from_json(cls, json_path: Union[str, Path]) -> "IUTDataset":
- with Path(json_path).open('r') as handle:
- return json.load(handle, cls=CustomJSONDecoder)
+ return cls(dct["snapshots"])
diff --git a/sec_certs/dataset/fips_mip.py b/sec_certs/dataset/fips_mip.py
index e0852bc2..88fc837d 100644
--- a/sec_certs/dataset/fips_mip.py
+++ b/sec_certs/dataset/fips_mip.py
@@ -1,16 +1,11 @@
-import json
from dataclasses import dataclass
-from datetime import datetime
from pathlib import Path
-from typing import List, Union, Mapping
+from typing import List, Mapping, Union
-from bs4 import BeautifulSoup, Tag
from tqdm import tqdm
-from sec_certs.dataset.dataset import logger
-from sec_certs.helpers import to_utc
-from sec_certs.sample.fips_mip import MIPEntry, MIPSnapshot, MIPStatus
-from sec_certs.serialization.json import ComplexSerializableType, CustomJSONEncoder, CustomJSONDecoder
+from sec_certs.sample.fips_mip import MIPSnapshot
+from sec_certs.serialization.json import ComplexSerializableType
@dataclass
@@ -27,83 +22,15 @@ class MIPDataset(ComplexSerializableType):
return len(self.snapshots)
@classmethod
- def from_dump(cls, dump_path: Union[str, Path]) -> "MIPDataset":
+ def from_dumps(cls, dump_path: Union[str, Path]) -> "MIPDataset":
directory = Path(dump_path)
- snapshots = []
fnames = list(directory.glob("*"))
- for fname in tqdm(sorted(fnames), total=len(fnames)):
- snapshot_date = to_utc(
- datetime.fromisoformat(fname.name[len("fips_mip_") : -len(".html")])
- )
- with open(fname) as f:
- soup = BeautifulSoup(f, "html.parser")
- tables = soup.find_all("table")
- if len(tables) != 1:
- logger.error(f"*** Not only a single table in {fname}.")
- continue
- last_updated_elem = next(
- filter(
- lambda e: isinstance(e, Tag) and e.name == "p",
- soup.find(id="content").next_siblings,
- )
- )
- last_updated_text = str(last_updated_elem.string).strip()
- last_updated = datetime.strptime(
- last_updated_text, "Last Updated: %m/%d/%Y"
- ).date()
- table = tables[0].find("tbody")
- lines = table.find_all("tr")
- if snapshot_date <= datetime(2020, 10, 28):
- # NIST had a different format of the MIP table before this date, handle it.
- entries = set()
- for tr in lines:
- tds = tr.find_all("td")
- status = None
- if "mip-highlight" in tds[-1]["class"]:
- status = MIPStatus.FINALIZATION
- elif "mip-highlight" in tds[-2]["class"]:
- status = MIPStatus.COORDINATION
- elif "mip-highlight" in tds[-3]["class"]:
- status = MIPStatus.REVIEW_PENDING
- elif "mip-highlight" in tds[-4]["class"]:
- status = MIPStatus.IN_REVIEW
- entries.add(
- MIPEntry(
- str(tds[0].string),
- str(tds[1].string),
- str(tds[2].string),
- status,
- )
- )
- else:
- entries = {
- MIPEntry(
- str(line[0].string),
- str(line[1].string),
- str(line[2].string),
- MIPStatus(str(line[3].string)),
- )
- for line in map(lambda tr: tr.find_all("td"), lines)
- }
- snapshots.append(MIPSnapshot(entries, snapshot_date, last_updated))
+ snapshots = [MIPSnapshot.from_dump(dump_path) for dump_path in tqdm(sorted(fnames), total=len(fnames))]
return cls(snapshots)
def to_dict(self):
- return {
- "snapshots": list(self.snapshots)
- }
+ return {"snapshots": list(self.snapshots)}
@classmethod
def from_dict(cls, dct: Mapping) -> "MIPDataset":
- return cls(
- dct["snapshots"]
- )
-
- def to_json(self, json_path: Union[str, Path]):
- with open(json_path, 'w') as handle:
- json.dump(self, handle, indent=4, cls=CustomJSONEncoder)
-
- @classmethod
- def from_json(cls, json_path: Union[str, Path]):
- with Path(json_path).open('r') as handle:
- return json.load(handle, cls=CustomJSONDecoder) \ No newline at end of file
+ return cls(dct["snapshots"])
diff --git a/sec_certs/helpers.py b/sec_certs/helpers.py
index 8c28b729..231cd552 100644
--- a/sec_certs/helpers.py
+++ b/sec_certs/helpers.py
@@ -225,6 +225,8 @@ def extract_pdf_metadata(filepath: Path):
def to_utc(dt):
+ if dt.utcoffset() is None:
+ return dt
dt -= dt.utcoffset()
dt = dt.replace(tzinfo=None)
return dt
diff --git a/sec_certs/sample/fips_iut.py b/sec_certs/sample/fips_iut.py
index 69bfa02c..8dfacd43 100644
--- a/sec_certs/sample/fips_iut.py
+++ b/sec_certs/sample/fips_iut.py
@@ -1,7 +1,12 @@
from dataclasses import dataclass
from datetime import date, datetime
-from typing import Mapping, Set
+from pathlib import Path
+from typing import Iterator, Mapping, Optional, Set, Union
+import requests
+from bs4 import BeautifulSoup, Tag
+
+from sec_certs.helpers import to_utc
from sec_certs.serialization.json import ComplexSerializableType
@@ -30,12 +35,24 @@ class IUTSnapshot(ComplexSerializableType):
entries: Set[IUTEntry]
timestamp: datetime
last_updated: date
+ displayed: int
+ not_displayed: int
+ total: int
+
+ def __len__(self):
+ return len(self.entries)
+
+ def __iter__(self) -> Iterator[IUTEntry]:
+ yield from self.entries
def to_dict(self):
return {
"entries": list(self.entries),
"timestamp": self.timestamp.isoformat(),
"last_updated": self.last_updated.isoformat(),
+ "displayed": self.displayed,
+ "not_displayed": self.not_displayed,
+ "total": self.total,
}
@classmethod
@@ -44,4 +61,72 @@ class IUTSnapshot(ComplexSerializableType):
set(dct["entries"]),
datetime.fromisoformat(dct["timestamp"]),
date.fromisoformat(dct["last_updated"]),
- ) \ No newline at end of file
+ dct["displayed"],
+ dct["not_displayed"],
+ dct["total"],
+ )
+
+ @classmethod
+ def from_page(cls, content: bytes, snapshot_date: datetime) -> "IUTSnapshot":
+ soup = BeautifulSoup(content, "html.parser")
+ tables = soup.find_all("table")
+ if len(tables) != 1:
+ raise ValueError("Not only a single table in IUT.")
+
+ last_updated_elem = next(
+ filter(
+ lambda e: isinstance(e, Tag) and e.name == "p",
+ soup.find(id="content").next_siblings,
+ )
+ )
+ last_updated_text = str(last_updated_elem.string).strip()
+ last_updated = datetime.strptime(last_updated_text, "Last Updated: %m/%d/%Y").date()
+ table = tables[0].find("tbody")
+ lines = table.find_all("tr")
+ entries = {
+ IUTEntry(
+ str(line[0].string),
+ str(line[1].string),
+ str(line[2].string),
+ datetime.strptime(str(line[3].string), "%m/%d/%Y").date(),
+ )
+ for line in map(lambda tr: tr.find_all("td"), lines)
+ }
+
+ # Parse footer
+ footer = soup.find(id="IUTFooter")
+ footer_lines = footer.find_all("tr")
+ displayed = int(footer_lines[0].find_all("td")[1].text)
+ not_displayed = int(footer_lines[1].find_all("td")[1].text)
+ total = int(footer_lines[2].find_all("td")[1].text)
+
+ return cls(
+ entries=entries,
+ timestamp=snapshot_date,
+ last_updated=last_updated,
+ displayed=displayed,
+ not_displayed=not_displayed,
+ total=total,
+ )
+
+ @classmethod
+ def from_dump(cls, dump_path: Union[str, Path], snapshot_date: Optional[datetime] = None) -> "IUTSnapshot":
+ dump_path = Path(dump_path)
+ if snapshot_date is None:
+ try:
+ snapshot_date = to_utc(datetime.fromisoformat(dump_path.name[len("fips_iut_") : -len(".html")]))
+ except Exception:
+ raise ValueError("snapshot_date not given and could not be inferred from filename.")
+ with dump_path.open("rb") as f:
+ content = f.read()
+ return cls.from_page(content, snapshot_date)
+
+ @classmethod
+ def from_web(cls) -> "IUTSnapshot":
+ iut_url = "https://csrc.nist.gov/Projects/cryptographic-module-validation-program/modules-in-process/IUT-List"
+ iut_resp = requests.get(iut_url)
+ if iut_resp.status_code != 200:
+ raise ValueError("Getting MIP snapshot failed")
+
+ snapshot_date = to_utc(datetime.now())
+ return cls.from_page(iut_resp.content, snapshot_date)
diff --git a/sec_certs/sample/fips_mip.py b/sec_certs/sample/fips_mip.py
index a1441ddd..7e5c2bd4 100644
--- a/sec_certs/sample/fips_mip.py
+++ b/sec_certs/sample/fips_mip.py
@@ -1,10 +1,18 @@
+import logging
from dataclasses import dataclass
-from datetime import datetime, date
+from datetime import date, datetime
from enum import Enum
-from typing import Mapping, Set
+from pathlib import Path
+from typing import Iterator, Mapping, Optional, Set, Union
+import requests
+from bs4 import BeautifulSoup, Tag
+
+from sec_certs.helpers import to_utc
from sec_certs.serialization.json import ComplexSerializableType
+logger = logging.getLogger(__name__)
+
class MIPStatus(Enum):
IN_REVIEW = "In Review"
@@ -18,10 +26,10 @@ class MIPEntry(ComplexSerializableType):
module_name: str
vendor_name: str
standard: str
- status: MIPStatus
+ status: Optional[MIPStatus]
def to_dict(self):
- return {**self.__dict__, "status": self.status.value}
+ return {**self.__dict__, "status": self.status.value if self.status else None}
@classmethod
def from_dict(cls, dct: Mapping) -> "MIPEntry":
@@ -29,7 +37,7 @@ class MIPEntry(ComplexSerializableType):
dct["module_name"],
dct["vendor_name"],
dct["standard"],
- MIPStatus(dct["status"]),
+ MIPStatus(dct["status"]) if dct["status"] else None,
)
@@ -38,12 +46,24 @@ class MIPSnapshot(ComplexSerializableType):
entries: Set[MIPEntry]
timestamp: datetime
last_updated: date
+ displayed: int
+ not_displayed: int
+ total: int
+
+ def __len__(self):
+ return len(self.entries)
+
+ def __iter__(self) -> Iterator[MIPEntry]:
+ yield from self.entries
def to_dict(self):
return {
"entries": list(self.entries),
"timestamp": self.timestamp.isoformat(),
"last_updated": self.last_updated.isoformat(),
+ "displayed": self.displayed,
+ "not_displayed": self.not_displayed,
+ "total": self.total,
}
@classmethod
@@ -52,4 +72,109 @@ class MIPSnapshot(ComplexSerializableType):
set(dct["entries"]),
datetime.fromisoformat(dct["timestamp"]),
date.fromisoformat(dct["last_updated"]),
+ dct["displayed"],
+ dct["not_displayed"],
+ dct["total"],
+ )
+
+ @classmethod
+ def from_page(cls, content: bytes, snapshot_date: datetime) -> "MIPSnapshot":
+ soup = BeautifulSoup(content, "html.parser")
+ tables = soup.find_all("table")
+ if len(tables) != 1:
+ raise ValueError("Not only a single table in MIP data.")
+
+ # Parse Last Updated
+ last_updated_elem = next(
+ filter(
+ lambda e: isinstance(e, Tag) and e.name == "p",
+ soup.find(id="content").next_siblings,
+ )
+ )
+ last_updated_text = str(last_updated_elem.string).strip()
+ last_updated = datetime.strptime(last_updated_text, "Last Updated: %m/%d/%Y").date()
+
+ # Parse entries
+ table = tables[0].find("tbody")
+ lines = table.find_all("tr")
+ if snapshot_date <= datetime(2020, 10, 28):
+ # NIST had a different format of the MIP table before this date, handle it.
+ entries = set()
+ for tr in lines:
+ tds = tr.find_all("td")
+ status = None
+ if "mip-highlight" in tds[-1]["class"]:
+ status = MIPStatus.FINALIZATION
+ elif "mip-highlight" in tds[-2]["class"]:
+ status = MIPStatus.COORDINATION
+ elif "mip-highlight" in tds[-3]["class"]:
+ status = MIPStatus.REVIEW_PENDING
+ elif "mip-highlight" in tds[-4]["class"]:
+ status = MIPStatus.IN_REVIEW
+ entries.add(
+ MIPEntry(
+ str(tds[0].string),
+ str(tds[1].string),
+ str(tds[2].string),
+ status,
+ )
+ )
+ elif snapshot_date <= datetime(2021, 4, 20):
+ # Yet another format change
+ entries = {
+ MIPEntry(
+ str(line[0].string),
+ str(line[1].string),
+ str(line[2].string),
+ MIPStatus(str(line[3].string)),
+ )
+ for line in map(lambda tr: tr.find_all("td"), lines)
+ }
+ else:
+ entries = {
+ MIPEntry(
+ str(line[0].string),
+ str(" ".join(line[1].find_all(text=True, recursive=False)).strip()),
+ str(line[2].string),
+ MIPStatus(str(line[3].string)),
+ )
+ for line in map(lambda tr: tr.find_all("td"), lines)
+ }
+
+ # Parse footer
+ footer = soup.find(id="MIPFooter")
+ footer_lines = footer.find_all("tr")
+ displayed = int(footer_lines[0].find_all("td")[1].text)
+ not_displayed = int(footer_lines[1].find_all("td")[1].text)
+ total = int(footer_lines[2].find_all("td")[1].text)
+
+ return cls(
+ entries=entries,
+ timestamp=snapshot_date,
+ last_updated=last_updated,
+ displayed=displayed,
+ not_displayed=not_displayed,
+ total=total,
)
+
+ @classmethod
+ def from_dump(cls, dump_path: Union[str, Path], snapshot_date: Optional[datetime] = None) -> "MIPSnapshot":
+ dump_path = Path(dump_path)
+ if snapshot_date is None:
+ try:
+ snapshot_date = to_utc(datetime.fromisoformat(dump_path.name[len("fips_mip_") : -len(".html")]))
+ except Exception:
+ raise ValueError("snapshot_date not given and could not be inferred from filename.")
+ with dump_path.open("rb") as f:
+ content = f.read()
+ return cls.from_page(content, snapshot_date)
+
+ @classmethod
+ def from_web(cls) -> "MIPSnapshot":
+ mip_url = "https://csrc.nist.gov/Projects/cryptographic-module-validation-program/modules-in-process/Modules-In-Process-List"
+ mip_resp = requests.get(mip_url)
+ if mip_resp.status_code != 200:
+ raise ValueError("Getting MIP snapshot failed")
+
+ snapshot_date = to_utc(datetime.now())
+ return cls.from_page(mip_resp.content, snapshot_date)