aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/sample/fips_iut.py
blob: 963758c7b2a7fce87e4fe8f6fb3a6b6698cad959 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from __future__ import annotations

from collections.abc import Iterator, Mapping
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from tempfile import NamedTemporaryFile

import requests
from bs4 import BeautifulSoup, Tag

from sec_certs import constants
from sec_certs.configuration import config
from sec_certs.serialization.json import ComplexSerializableType
from sec_certs.utils.helpers import to_utc


@dataclass(frozen=True)
class IUTEntry(ComplexSerializableType):
    module_name: str
    vendor_name: str
    standard: str
    iut_date: date

    def to_dict(self) -> dict[str, str]:
        return {**self.__dict__, "iut_date": self.iut_date.isoformat()}

    @classmethod
    def from_dict(cls, dct: Mapping) -> IUTEntry:
        return cls(
            dct["module_name"],
            dct["vendor_name"],
            dct["standard"],
            date.fromisoformat(dct["iut_date"]),
        )


@dataclass
class IUTSnapshot(ComplexSerializableType):
    entries: set[IUTEntry]
    timestamp: datetime
    last_updated: date
    displayed: int | None
    not_displayed: int | None
    total: int | None

    def __len__(self) -> int:
        return len(self.entries)

    def __iter__(self) -> Iterator[IUTEntry]:
        yield from self.entries

    def to_dict(self) -> dict[str, int | None | list[IUTEntry] | str]:
        return {
            "entries": list(self.entries),
            "timestamp": self.timestamp.isoformat(),
            "last_updated": self.last_updated.isoformat(),
            "displayed": self.displayed,
            "not_displayed": self.not_displayed,
            "total": self.total,
        }

    @classmethod
    def from_dict(cls, dct: Mapping) -> IUTSnapshot:
        return cls(
            set(dct["entries"]),
            datetime.fromisoformat(dct["timestamp"]),
            date.fromisoformat(dct["last_updated"]),
            dct["displayed"],
            dct["not_displayed"],
            dct["total"],
        )

    @classmethod
    def from_page(cls, content: bytes, snapshot_date: datetime) -> IUTSnapshot:
        """
        Get an IUT snapshot from a HTML dump of the FIPS website.
        """
        if not content:
            raise ValueError("Empty content in IUT.")
        soup = BeautifulSoup(content, "html5lib")
        tables = soup.find_all("table")
        if len(tables) != 1:
            raise ValueError("Not only a single table in IUT.")

        last_updated_elem = next(
            filter(
                lambda e: isinstance(e, Tag) and e.name == "p" and "Last Updated" in str(e.string),
                soup.find(id="content").next_siblings,
            )
        )

        last_updated_text = str(last_updated_elem.string).strip()
        last_updated = datetime.strptime(last_updated_text, "Last Updated: %m/%d/%Y").date()
        table = tables[0].find("tbody")
        lines = table.find_all("tr")
        entries = {
            IUTEntry(
                str(line[0].string),
                str(line[1].string),
                str(line[2].string),
                datetime.strptime(str(line[3].string), "%m/%d/%Y").date(),
            )
            for line in (tr.find_all("td") for tr in lines)
        }

        # Parse footer
        footer = soup.find(id="IUTFooter")
        displayed: int | None
        not_displayed: int | None
        total: int | None

        if footer:
            footer_lines = footer.find_all("tr")
            displayed = int(footer_lines[0].find_all("td")[1].text)
            not_displayed = int(footer_lines[1].find_all("td")[1].text)
            total = int(footer_lines[2].find_all("td")[1].text)
        else:
            displayed, not_displayed, total = (None, None, None)

        return cls(
            entries=entries,
            timestamp=snapshot_date,
            last_updated=last_updated,
            displayed=displayed,
            not_displayed=not_displayed,
            total=total,
        )

    @classmethod
    def from_dump(cls, dump_path: str | Path, snapshot_date: datetime | None = None) -> IUTSnapshot:
        """
        Get an IUT snapshot from a HTML file dump of the FIPS website.
        """
        dump_path = Path(dump_path)
        if snapshot_date is None:
            try:
                snapshot_date = to_utc(datetime.fromisoformat(dump_path.name[len("fips_iut_") : -len(".html")]))
            except Exception:
                raise ValueError("snapshot_date not given and could not be inferred from filename.")
        with dump_path.open("rb") as f:
            content = f.read()
        return cls.from_page(content, snapshot_date)

    @classmethod
    def from_nist_web(cls) -> IUTSnapshot:
        """
        Get an IUT snapshot from the FIPS website right now.
        """
        iut_resp = requests.get(constants.FIPS_IUT_URL)
        if iut_resp.status_code != requests.codes.ok:
            raise ValueError(f"Getting IUT snapshot failed: {iut_resp.status_code}")

        snapshot_date = to_utc(datetime.now())
        return cls.from_page(iut_resp.content, snapshot_date)

    @classmethod
    def from_web(cls) -> IUTSnapshot:
        """
        Fetch a fresh IUT snapshot from sec-certs.org, if the `preferred_source_remote_datasets` config
        entry is equal to "sec-certs".

        Otherwise, the same as `from_nist_web`.
        """
        if config.preferred_source_remote_datasets == "origin":
            return cls.from_nist_web()
        else:
            iut_resp = requests.get(config.fips_iut_latest_snapshot)
            if iut_resp.status_code != requests.codes.ok:
                raise ValueError(f"Getting IUT snapshot failed: {iut_resp.status_code}")
            with NamedTemporaryFile(suffix=".json") as tmpfile:
                tmpfile.write(iut_resp.content)
                return cls.from_json(tmpfile.name)