aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/sample/fips.py
diff options
context:
space:
mode:
authoradamjanovsky2022-12-09 17:10:19 +0100
committerGitHub2022-12-09 17:10:19 +0100
commit73b3b0c361f9545450fa188bec50606d64bb1afd (patch)
tree0a1f9034c309ba88e5f72a31634b014c23a57df5 /src/sec_certs/sample/fips.py
parent19338dc9fd9ab257c36cfa277994abe202e97de2 (diff)
downloadsec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.gz
sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.zst
sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.zip
flat -> src layout (#294)
- Some mypy fixes - Flat layout -> src layout - Ditch `setup.py` and `setup.cfg` in favour of `pyproject.toml` - Non-pinned requirements moved from `requirements/*.in` to `pyproject.toml`
Diffstat (limited to 'src/sec_certs/sample/fips.py')
-rw-r--r--src/sec_certs/sample/fips.py654
1 files changed, 654 insertions, 0 deletions
diff --git a/src/sec_certs/sample/fips.py b/src/sec_certs/sample/fips.py
new file mode 100644
index 00000000..aec791d4
--- /dev/null
+++ b/src/sec_certs/sample/fips.py
@@ -0,0 +1,654 @@
+from __future__ import annotations
+
+import itertools
+import re
+from dataclasses import dataclass, field
+from datetime import date
+from pathlib import Path
+from typing import Any, Callable, ClassVar, Final, Literal
+
+import dateutil
+import numpy as np
+import pandas as pd
+import requests
+from bs4 import BeautifulSoup, Tag
+from tabula import read_pdf
+
+import sec_certs.constants as constants
+import sec_certs.utils.extract
+import sec_certs.utils.helpers as helpers
+import sec_certs.utils.pdf
+import sec_certs.utils.pdf as pdf
+import sec_certs.utils.tables as tables
+from sec_certs.cert_rules import FIPS_ALGS_IN_TABLE, fips_rules
+from sec_certs.config.configuration import config
+from sec_certs.sample.certificate import Certificate
+from sec_certs.sample.certificate import Heuristics as BaseHeuristics
+from sec_certs.sample.certificate import PdfData as BasePdfData
+from sec_certs.sample.certificate import References, logger
+from sec_certs.sample.cpe import CPE
+from sec_certs.serialization.json import ComplexSerializableType
+from sec_certs.serialization.pandas import PandasSerializableType
+from sec_certs.utils.helpers import fips_dgst
+
+
+class FIPSHTMLParser:
+ def __init__(self, soup: BeautifulSoup):
+ self._soup = soup
+
+ def get_web_data_and_algorithms(self) -> tuple[set[str], FIPSCertificate.WebData]:
+ divs = self._soup.find_all("div", class_="panel panel-default")
+ details_div, vendor_div, related_files_div, validation_history_div = divs
+ details_dict = self._build_details_dict(details_div)
+
+ vendor_dict = self._build_vendor_dict(vendor_div)
+ related_files_dict = self._build_related_files_dict(related_files_div)
+ validation_history_dict = self._build_validation_history_dict(validation_history_div)
+
+ algorithms = set()
+ if "algorithms" in details_dict:
+ algorithms_data = details_dict.pop("algorithms")
+ for category, alg_ids in algorithms_data.items():
+ algorithms |= {category + x for x in alg_ids}
+
+ return algorithms, FIPSCertificate.WebData(
+ **{**details_dict, **vendor_dict, **related_files_dict, **validation_history_dict}
+ )
+
+ def _build_details_dict(self, details_div: Tag) -> dict[str, Any]:
+ def parse_single_detail_entry(key, entry):
+ normalized_key = DETAILS_KEY_NORMALIZATION_DICT[key]
+ normalization_func = DETAILS_KEY_TO_NORMALIZATION_FUNCTION.get(normalized_key, None)
+ normalized_entry = (
+ FIPSHTMLParser.normalize_string(entry.text) if not normalization_func else normalization_func(entry)
+ )
+ return normalized_key, normalized_entry
+
+ entries = details_div.find_all("div", class_="row padrow")
+ entries = zip(
+ [x.find("div", class_="col-md-3") for x in entries], [x.find("div", class_="col-md-9") for x in entries]
+ )
+ entries = [(FIPSHTMLParser.normalize_string(key.text), entry) for key, entry in entries]
+ entries = [parse_single_detail_entry(*x) for x in entries if x[0] in DETAILS_KEY_NORMALIZATION_DICT.keys()]
+ entries = {x: y for x, y in entries}
+
+ if "caveat" in entries:
+ entries["mentioned_certs"] = FIPSHTMLParser.get_mentioned_certs_from_caveat(entries["caveat"])
+
+ # Temporarily disabled, as this isn't extracting anything useful. Only UNKNOWN#1-9 algs were extracted over whole dataset.
+ # if "description" in entries:
+ # algs = FIPSHTMLParser.get_algs_from_description(entries["description"])
+ # if "algorithms" in entries:
+ # entries["algorithms"].update({"UNKNOWN": x for x in algs})
+ # else:
+ # entries["algorithms"] = {"UNKNOWN": x for x in algs}
+
+ return entries
+
+ @staticmethod
+ def _build_vendor_dict(vendor_div: Tag) -> dict[str, Any]:
+ if not (link := vendor_div.find("a")):
+ return {"vendor_url": None, "vendor": list(vendor_div.find("div", "panel-body").children)[0].strip()}
+ else:
+ return {"vendor_url": link.get("href"), "vendor": link.text.strip()}
+
+ @staticmethod
+ def _build_related_files_dict(related_files_div: Tag) -> dict[str, Any]:
+ if cert_link := [x for x in related_files_div.find_all("a") if "Certificate" in x.text]:
+ return {"certificate_pdf_url": constants.FIPS_BASE_URL + cert_link[0].get("href")}
+ else:
+ return {"certificate_pdf_url": None}
+
+ @staticmethod
+ def _build_validation_history_dict(validation_history_div: Tag) -> dict[str, Any]:
+ def parse_row(row):
+ validation_date, validation_type, lab = row.find_all("td")
+ return FIPSCertificate.ValidationHistoryEntry(
+ dateutil.parser.parse(validation_date.text).date(), validation_type.text, lab.text
+ )
+
+ rows = validation_history_div.find("tbody").find_all("tr")
+ history: list[FIPSCertificate.ValidationHistoryEntry] | None = [parse_row(x) for x in rows] if rows else None
+ return {"validation_history": history}
+
+ @staticmethod
+ def get_mentioned_certs_from_caveat(caveat: str) -> dict[str, int]:
+ ids_found: dict[str, int] = {}
+ r_key = r"(?P<word>\w+)?\s?(?:#\s?|Cert\.?(?!.\s)\s?|Certificate\s?)+(?P<id>\d+)"
+ for m in re.finditer(r_key, caveat):
+ if m.group("word") and m.group("word").lower() in {"rsa", "shs", "dsa", "pkcs", "aes"}:
+ continue
+ if m.group("id") in ids_found:
+ ids_found[m.group("id")] += 1
+ else:
+ ids_found[m.group("id")] = 1
+ return ids_found
+
+ @staticmethod
+ def get_algs_from_description(description: str) -> set[str]:
+ return {m.group() for m in re.finditer(FIPS_ALGS_IN_TABLE, description)}
+
+ @staticmethod
+ def parse_algorithms(algorithms_div: Tag) -> dict[str, set[str]]:
+ rows = algorithms_div.find("tbody").find_all("tr")
+ dct: dict[str, set[str]] = dict()
+ for row in rows:
+ cells = row.find_all("td")
+ dct[cells[0].text] = {m.group() for m in re.finditer(FIPS_ALGS_IN_TABLE, cells[1].text)}
+ return dct
+
+ @staticmethod
+ def normalize_string(string: str) -> str:
+ return " ".join(string.split())
+
+ @staticmethod
+ def parse_tested_configurations(tested_configurations: Tag) -> list[str] | None:
+ configurations = [y.text for y in tested_configurations.find_all("li")]
+ return configurations if not configurations == ["N/A"] else None
+
+ @staticmethod
+ def normalize_embodiment(embodiment_element: Tag) -> str:
+ text = FIPSHTMLParser.normalize_string(embodiment_element.text)
+ embodiment_normalization_dict = {
+ "Multi-chip embedded": "Multi-Chip Embedded",
+ "Multi-chip Standalone": "Multi-Chip Stand Alone",
+ "Multi-chip standalone": "Multi-Chip Stand Alone",
+ "Single-chip": "Single Chip",
+ }
+ return embodiment_normalization_dict.get(text, text)
+
+
+DETAILS_KEY_NORMALIZATION_DICT: Final[dict[str, str]] = {
+ "Module Name": "module_name",
+ "Standard": "standard",
+ "Status": "status",
+ "Sunset Date": "date_sunset",
+ "Validation Dates": "date_validation",
+ "Overall Level": "level",
+ "Caveat": "caveat",
+ "Security Level Exceptions": "exceptions",
+ "Module Type": "module_type",
+ "Embodiment": "embodiment",
+ "Approved Algorithms": "algorithms",
+ "Tested Configuration(s)": "tested_conf",
+ "Description": "description",
+ "Historical Reason": "historical_reason",
+ "Hardware Versions": "hw_versions",
+ "Firmware Versions": "fw_versions",
+ "Revoked Reason": "revoked_reason",
+ "Revoked Link": "revoked_link",
+ "Software Versions": "sw_versions",
+ "Product URL": "product_url",
+}
+
+DETAILS_KEY_TO_NORMALIZATION_FUNCTION: dict[str, Callable] = {
+ "date_sunset": lambda x: dateutil.parser.parse(x.text).date(),
+ "algorithms": getattr(FIPSHTMLParser, "parse_algorithms"),
+ "tested_conf": getattr(FIPSHTMLParser, "parse_tested_configurations"),
+ "exceptions": lambda x: [y.text for y in x.find_all("li")],
+ "status": lambda x: FIPSHTMLParser.normalize_string(x.text).lower(),
+ "level": lambda x: int(FIPSHTMLParser.normalize_string(x.text)),
+ "embodiment": getattr(FIPSHTMLParser, "normalize_embodiment"),
+}
+
+
+class FIPSCertificate(
+ Certificate["FIPSCertificate", "FIPSCertificate.Heuristics", "FIPSCertificate.PdfData"],
+ PandasSerializableType,
+ ComplexSerializableType,
+):
+ """
+ Data structure for common FIPS 140 certificate. Contains several inner classes that layer the data logic.
+ Can be serialized into/from json (`ComplexSerializableType`).
+ Is basic element of `FIPSDataset`. The functionality is mostly related to holding data and transformations that
+ the certificate can handle itself. `FIPSDataset` class then instrument this functionality.
+ """
+
+ pandas_columns: ClassVar[list[str]] = [
+ "dgst",
+ "cert_id",
+ "name",
+ "status",
+ "standard",
+ "type",
+ "level",
+ "embodiment",
+ "date_validation",
+ "date_sunset",
+ "algorithms",
+ "extracted_versions",
+ "cpe_matches",
+ "verified_cpe_matches",
+ "related_cves",
+ "module_directly_referenced_by",
+ "module_indirectly_referenced_by",
+ "module_directly_referencing",
+ "module_indirectly_referencing",
+ "policy_directly_referenced_by",
+ "policy_indirectly_referenced_by",
+ "policy_directly_referencing",
+ "policy_indirectly_referencing",
+ ]
+
+ @dataclass(eq=True)
+ class InternalState(ComplexSerializableType):
+ """
+ Holds state of the `FIPSCertificate`
+ """
+
+ module_download_ok: bool
+ policy_download_ok: bool
+
+ policy_convert_garbage: bool
+ policy_convert_ok: bool
+
+ module_extract_ok: bool
+ policy_extract_ok: bool
+
+ policy_pdf_hash: str | None
+ policy_txt_hash: str | None
+
+ policy_pdf_path: Path
+ policy_txt_path: Path
+ module_html_path: Path
+
+ def __init__(
+ self,
+ module_download_ok: bool = False,
+ policy_download_ok: bool = False,
+ policy_convert_garbage: bool = False,
+ policy_convert_ok: bool = False,
+ module_extract_ok: bool = False,
+ policy_extract_ok: bool = False,
+ policy_pdf_hash: str | None = None,
+ policy_txt_hash: str | None = None,
+ ):
+ self.module_download_ok = module_download_ok
+ self.policy_download_ok = policy_download_ok
+ self.policy_convert_garbage = policy_convert_garbage
+ self.policy_convert_ok = policy_convert_ok
+ self.module_extract_ok = module_extract_ok
+ self.policy_extract_ok = policy_extract_ok
+ self.policy_pdf_hash = policy_pdf_hash
+ self.policy_txt_hash = policy_txt_hash
+
+ @property
+ def serialized_attributes(self) -> list[str]:
+ return [
+ "module_download_ok",
+ "policy_download_ok",
+ "policy_convert_garbage",
+ "policy_convert_ok",
+ "module_extract_ok",
+ "policy_extract_ok",
+ "policy_pdf_hash",
+ "policy_txt_hash",
+ ]
+
+ def module_is_ok_to_download(self, fresh: bool = True) -> bool:
+ return True if fresh else not self.module_download_ok
+
+ def policy_is_ok_to_download(self, fresh: bool = True) -> bool:
+ return True if fresh else not self.policy_download_ok
+
+ def policy_is_ok_to_convert(self, fresh: bool = True) -> bool:
+ return self.policy_download_ok if fresh else self.policy_download_ok and not self.policy_convert_ok
+
+ def module_is_ok_to_analyze(self, fresh: bool = True) -> bool:
+ return (
+ self.module_download_ok and self.module_extract_ok
+ if fresh
+ else self.module_download_ok and not self.module_extract_ok
+ )
+
+ def policy_is_ok_to_analyze(self, fresh: bool = True) -> bool:
+ return (
+ self.policy_convert_ok and self.policy_extract_ok
+ if fresh
+ else self.policy_convert_ok and not self.policy_extract_ok
+ )
+
+ def set_local_paths(self, policies_pdf_dir: Path, policies_txt_dir: Path, modules_html_dir: Path) -> None:
+ self.state.policy_pdf_path = (policies_pdf_dir / str(self.dgst)).with_suffix(".pdf")
+ self.state.policy_txt_path = (policies_txt_dir / str(self.dgst)).with_suffix(".txt")
+ self.state.module_html_path = (modules_html_dir / str(self.dgst)).with_suffix(".html")
+
+ @dataclass(eq=True)
+ class ValidationHistoryEntry(ComplexSerializableType):
+ date: date
+ validation_type: Literal["initial", "update"]
+ lab: str
+
+ @classmethod
+ def from_dict(cls, dct: dict) -> FIPSCertificate.ValidationHistoryEntry:
+ new_dct = dct.copy()
+ new_dct["date"] = dateutil.parser.parse(dct["date"]).date()
+ return cls(**new_dct)
+
+ @dataclass(eq=True)
+ class WebData(ComplexSerializableType):
+ """
+ Data structure for data obtained from scanning certificate webpage at NIST.gov
+ """
+
+ module_name: str | None = field(default=None)
+ validation_history: list[FIPSCertificate.ValidationHistoryEntry] | None = field(default=None)
+ vendor_url: str | None = field(default=None)
+ vendor: str | None = field(default=None)
+ certificate_pdf_url: str | None = field(default=None)
+ module_type: str | None = field(default=None)
+ standard: str | None = field(default=None)
+ status: Literal["active", "historical", "revoked"] | None = field(default=None)
+ level: Literal[1, 2, 3, 4] | None = field(default=None)
+ caveat: str | None = field(default=None)
+ exceptions: list[str] | None = field(default=None)
+ embodiment: str | None = field(default=None)
+ description: str | None = field(default=None)
+ tested_conf: list[str] | None = field(default=None)
+ hw_versions: str | None = field(default=None)
+ fw_versions: str | None = field(default=None)
+ sw_versions: str | None = field(default=None)
+ mentioned_certs: dict[str, int] | None = field(default=None) # Cert_id: n_occurences
+ historical_reason: str | None = field(default=None)
+ date_sunset: date | None = field(default=None)
+ revoked_reason: str | None = field(default=None)
+ revoked_link: str | None = field(default=None)
+
+ # Those below are left unused at the moment
+ # product_url: Optional[str] = field(default=None)
+
+ def __repr__(self) -> str:
+ return (
+ self.module_name
+ if self.module_name is not None
+ else "" + " created by " + self.vendor
+ if self.vendor is not None
+ else ""
+ )
+
+ def __str__(self) -> str:
+ return repr(self)
+
+ @classmethod
+ def from_dict(cls, dct: dict) -> FIPSCertificate.WebData:
+ new_dct = dct.copy()
+ if new_dct["date_sunset"]:
+ new_dct["date_sunset"] = dateutil.parser.parse(new_dct["date_sunset"]).date()
+ return cls(**dct)
+
+ @dataclass(eq=True)
+ class PdfData(BasePdfData, ComplexSerializableType):
+ """
+ Data structure that holds data obtained from scanning pdf files (or their converted txt documents).
+ """
+
+ keywords: dict = field(default_factory=dict)
+ policy_metadata: dict[str, Any] = field(default_factory=dict)
+
+ @property
+ def certlike_algorithm_numbers(self) -> set[str]:
+ """Returns numbers of certificates from keywords["fips_certlike"]["Certlike"]"""
+ if self.keywords and "fips_certlike" in self.keywords:
+ fips_certlike = self.keywords["fips_certlike"].get("Certlike", dict())
+ matches = {re.search(r"#\s{0,1}\d{1,4}", x) for x in fips_certlike.keys()}
+ return {"".join([x for x in match.group() if x.isdigit()]) for match in matches if match}
+ else:
+ return set()
+
+ @dataclass(eq=True)
+ class Heuristics(BaseHeuristics, ComplexSerializableType):
+ """
+ Data structure that holds data obtained by processing the certificate and applying various heuristics.
+ """
+
+ algorithms: set[str] = field(default_factory=set)
+ extracted_versions: set[str] = field(default_factory=set)
+ cpe_matches: set[str] | None = field(default=None)
+ verified_cpe_matches: set[CPE] | None = field(default=None)
+ related_cves: set[str] | None = field(default=None)
+ policy_prunned_references: set[str] = field(default_factory=set)
+ module_prunned_references: set[str] = field(default_factory=set)
+ policy_processed_references: References = field(default_factory=References)
+ module_processed_references: References = field(default_factory=References)
+ direct_transitive_cves: set[str] | None = field(default=None)
+ indirect_transitive_cves: set[str] | None = field(default=None)
+
+ @property
+ def algorithm_numbers(self) -> set[str]:
+ """Returns numbers of algorithms"""
+
+ def alg_to_number(alg: str) -> str:
+ return "".join([x for x in alg.split("#")[1] if x.isdigit()])
+
+ return {alg_to_number(x) for x in self.algorithms}
+
+ @property
+ def dgst(self) -> str:
+ """
+ Returns primary key of the certificate, its id.
+ """
+ return fips_dgst(self.cert_id)
+
+ @property
+ def manufacturer(self) -> str | None: # type: ignore
+ return self.web_data.vendor
+
+ @property
+ def module_html_url(self) -> str:
+ return constants.FIPS_MODULE_URL.format(self.cert_id)
+
+ @property
+ def policy_pdf_url(self) -> str:
+ return constants.FIPS_SP_URL.format(self.cert_id)
+
+ @property
+ def name(self) -> str | None: # type: ignore
+ return self.web_data.module_name
+
+ @property
+ def label_studio_title(self) -> str:
+ return (
+ "Vendor: "
+ + str(self.web_data.vendor)
+ + "\n"
+ + "Module name: "
+ + str(self.web_data.module_name)
+ + "\n"
+ + "HW version: "
+ + str(self.web_data.hw_versions)
+ + "\n"
+ + "FW version: "
+ + str(self.web_data.fw_versions)
+ )
+
+ def __init__(
+ self,
+ cert_id: str,
+ web_data: FIPSCertificate.WebData | None = None,
+ pdf_data: FIPSCertificate.PdfData | None = None,
+ heuristics: FIPSCertificate.Heuristics | None = None,
+ state: InternalState | None = None,
+ ):
+ super().__init__()
+
+ self.cert_id = cert_id
+ self.web_data: FIPSCertificate.WebData = web_data if web_data else FIPSCertificate.WebData()
+ self.pdf_data: FIPSCertificate.PdfData = pdf_data if pdf_data else FIPSCertificate.PdfData()
+ self.heuristics: FIPSCertificate.Heuristics = heuristics if heuristics else FIPSCertificate.Heuristics()
+ self.state: FIPSCertificate.InternalState = state if state else FIPSCertificate.InternalState()
+
+ @property
+ def pandas_tuple(self) -> tuple:
+ return (
+ self.dgst,
+ self.cert_id,
+ self.web_data.module_name,
+ self.web_data.status,
+ self.web_data.standard,
+ self.web_data.module_type,
+ self.web_data.level,
+ self.web_data.embodiment,
+ self.web_data.validation_history[0].date if self.web_data.validation_history else np.nan,
+ self.web_data.date_sunset,
+ self.heuristics.algorithms,
+ self.heuristics.extracted_versions,
+ self.heuristics.cpe_matches,
+ self.heuristics.verified_cpe_matches,
+ self.heuristics.related_cves,
+ self.heuristics.module_processed_references.directly_referenced_by,
+ self.heuristics.module_processed_references.indirectly_referenced_by,
+ self.heuristics.module_processed_references.directly_referencing,
+ self.heuristics.module_processed_references.indirectly_referencing,
+ self.heuristics.policy_processed_references.directly_referenced_by,
+ self.heuristics.policy_processed_references.indirectly_referenced_by,
+ self.heuristics.policy_processed_references.directly_referencing,
+ self.heuristics.policy_processed_references.indirectly_referencing,
+ )
+
+ @staticmethod
+ def parse_html_module(cert: FIPSCertificate) -> FIPSCertificate:
+ with cert.state.module_html_path.open("r") as handle:
+ soup = BeautifulSoup(handle, "html5lib")
+
+ parser = FIPSHTMLParser(soup)
+ algorithms, cert.web_data = parser.get_web_data_and_algorithms()
+ cert.heuristics.algorithms |= algorithms
+ cert.state.module_extract_ok = True
+
+ return cert
+
+ @staticmethod
+ def download_module(cert: FIPSCertificate) -> FIPSCertificate:
+ if (exit_code := helpers.download_file(cert.module_html_url, cert.state.module_html_path)) != requests.codes.ok:
+ error_msg = f"failed to download html module from {cert.module_html_url}, code {exit_code}"
+ logger.error(f"Cert dgst: {cert.dgst} " + error_msg)
+ cert.state.module_download_ok = False
+ else:
+ cert.state.module_download_ok = True
+ return cert
+
+ @staticmethod
+ def download_policy(cert: FIPSCertificate) -> FIPSCertificate:
+ if (exit_code := helpers.download_file(cert.policy_pdf_url, cert.state.policy_pdf_path)) != requests.codes.ok:
+ error_msg = f"failed to download pdf policy from {cert.policy_pdf_url}, code {exit_code}"
+ logger.error(f"Cert dgst: {cert.dgst} " + error_msg)
+ cert.state.policy_download_ok = False
+ else:
+ cert.state.policy_download_ok = True
+ cert.state.policy_pdf_hash = helpers.get_sha256_filepath(cert.state.policy_pdf_path)
+ return cert
+
+ @staticmethod
+ def convert_policy_pdf(cert: FIPSCertificate) -> FIPSCertificate:
+ """
+ Converts policy pdf -> txt
+ """
+ ocr_done, ok_result = sec_certs.utils.pdf.convert_pdf_file(
+ cert.state.policy_pdf_path, cert.state.policy_txt_path
+ )
+
+ # If OCR was done and the result was garbage
+ cert.state.policy_convert_garbage = ocr_done
+ # And put the whole result into convert_ok
+ cert.state.policy_convert_ok = ok_result
+
+ if not ok_result:
+ error_msg = "Failed to convert policy pdf->txt"
+ logger.error(f"Cert dgst: {cert.dgst}" + error_msg)
+ else:
+ cert.state.policy_txt_hash = helpers.get_sha256_filepath(cert.state.policy_txt_path)
+
+ return cert
+
+ @staticmethod
+ def extract_policy_pdf_metadata(cert: FIPSCertificate) -> FIPSCertificate:
+ """
+ Extract the PDF metadata from the security policy.
+ """
+ _, metadata = sec_certs.utils.pdf.extract_pdf_metadata(cert.state.policy_pdf_path)
+
+ if metadata:
+ cert.pdf_data.policy_metadata = metadata
+ else:
+ cert.pdf_data.policy_metadata = dict()
+ cert.state.policy_extract_ok = False
+ return cert
+
+ @staticmethod
+ def extract_policy_pdf_keywords(cert: FIPSCertificate) -> FIPSCertificate:
+ """
+ Extract keywords from policy document
+ """
+ keywords = sec_certs.utils.extract.extract_keywords(cert.state.policy_txt_path, fips_rules)
+ if not keywords:
+ cert.state.policy_extract_ok = False
+ else:
+ cert.pdf_data.keywords = keywords
+ return cert
+
+ @staticmethod
+ def get_algorithms_from_policy_tables(cert: FIPSCertificate):
+ """
+ Retrieves IDs of algorithms from tables inside security policy pdfs.
+ External library is used to handle this.
+ """
+ if table_rich_page_numbers := tables.find_pages_with_tables(cert.state.policy_txt_path):
+ pdf.repair_pdf(cert.state.policy_pdf_path)
+ try:
+ tabular_data = read_pdf(cert.state.policy_pdf_path, pages=list(table_rich_page_numbers), silent=True)
+ cert.heuristics.algorithms |= set(
+ itertools.chain.from_iterable(
+ tables.get_algs_from_table(df.to_string())
+ for df in tabular_data
+ if isinstance(df, pd.DataFrame)
+ )
+ )
+ except Exception as e:
+ logger.warning(f"Error when parsing tables from {cert.dgst}: {e}")
+ cert.state.policy_extract_ok = False
+
+ def prune_referenced_cert_ids(self) -> None:
+ """
+ This method goes through all IDs (numbers) that correspond to FIPS Certificates and are stored in
+ pdf_data.keywords or web_data.mentioned_certs. It performs prunning of these attributes and fills attributes
+ heuristics.prunned_module_references and heuristics.prunned_policy_references. These variables are further
+ processed and Reference objects are created from them.
+ """
+ html_module_ids = set(self.web_data.mentioned_certs.keys()) if self.web_data.mentioned_certs else set()
+ self.heuristics.module_prunned_references = self._prune_reference_ids_variable(html_module_ids)
+
+ if self.pdf_data.keywords:
+ pdf_policy_ids = set(self.pdf_data.keywords["fips_cert_id"].get("Cert", dict()).keys())
+ pdf_policy_ids = {"".join([y for y in x if y.isdigit()]) for x in pdf_policy_ids}
+ else:
+ pdf_policy_ids = set()
+
+ self.heuristics.policy_prunned_references = self._prune_reference_ids_variable(pdf_policy_ids)
+
+ def compute_heuristics_version(self) -> None:
+ """
+ Heuristically computes the version of the product.
+ """
+ versions_for_extraction = ""
+ if self.web_data.module_name:
+ versions_for_extraction += f" {self.web_data.module_name}"
+ if self.web_data.hw_versions:
+ versions_for_extraction += f" {self.web_data.hw_versions}"
+ if self.web_data.fw_versions:
+ versions_for_extraction += f" {self.web_data.fw_versions}"
+ self.heuristics.extracted_versions = helpers.compute_heuristics_version(versions_for_extraction)
+
+ def _prune_reference_ids_variable(self, attribute_to_prune: set[str]) -> set[str]:
+ """
+ Prunnes cert_ids from variable "attribute_to_prune", return result. Steps:
+ 0. Consider only ids != self.cert_id
+ 1. Consider only ids > config.always_false_positive_fips_cert_id_threshold
+ 2. Consider only ids s.t. they don't appear in self.heuristics.algorithms
+ 3. Consider only ids s.t. they don't appear in self.pdf_data.keywords["fips_certlike"]["Certlike"]
+ """
+ prunned = {x for x in attribute_to_prune if x != self.cert_id}
+ prunned = {x for x in prunned if int(x) > config.always_false_positive_fips_cert_id_threshold}
+ prunned = {x for x in prunned if x not in self.heuristics.algorithm_numbers}
+ prunned = {x for x in prunned if x not in self.pdf_data.certlike_algorithm_numbers}
+
+ return prunned