from __future__ import annotations from dataclasses import dataclass, field from datetime import date, datetime from pathlib import Path from typing import Any, Literal from urllib.parse import unquote_plus, urlparse import requests from bs4 import Tag from sec_certs import constants from sec_certs.cert_rules import cc_rules from sec_certs.configuration import config from sec_certs.sample.certificate import Certificate, logger from sec_certs.sample.certificate import Heuristics as BaseHeuristics from sec_certs.sample.certificate import PdfData as BasePdfData from sec_certs.sample.document_state import DocumentState from sec_certs.serialization.json import ComplexSerializableType from sec_certs.utils import cc_html_parsing, helpers, sanitization from sec_certs.utils.extract import extract_keywords from sec_certs.utils.pdf import convert_pdf_file, extract_pdf_metadata class ProtectionProfile( Certificate["ProtectionProfile", "ProtectionProfile.Heuristics", "ProtectionProfile.PdfData"], ComplexSerializableType, ): @dataclass class Heuristics(BaseHeuristics, ComplexSerializableType): pass @dataclass class PdfData(BasePdfData, ComplexSerializableType): """ Class to hold data related to PDF and txt files related to protection profiles. """ report_metadata: dict[str, Any] | None = field(default=None) pp_metadata: dict[str, Any] | None = field(default=None) report_keywords: dict[str, Any] | None = field(default=None) pp_keywords: dict[str, Any] | None = field(default=None) report_filename: str | None = field(default=None) pp_filename: str | None = field(default=None) def __bool__(self) -> bool: return any(x is not None for x in vars(self)) @dataclass(eq=True) class WebData(ComplexSerializableType): """ Class to hold metadata about protection profiles found on commoncriteriaportal.org """ category: str status: Literal["active", "archived"] is_collaborative: bool name: str version: str security_level: set[str] not_valid_before: date | None not_valid_after: date | None report_link: str | None pp_link: str | None scheme: str | None maintenances: list[tuple[Any]] @property def eal(self) -> str | None: return helpers.choose_lowest_eal(self.security_level) @classmethod def from_html_row( cls, row: Tag, status: Literal["active", "archived"], category: str, is_collaborative: bool ) -> ProtectionProfile.WebData: """ Given bs4 tag of html row (fetched from cc portal), will build the object. """ if is_collaborative: return cls._from_html_row_collaborative(row, category) return cls._from_html_row_classic_pp(row, status, category) @classmethod def _from_html_row_classic_pp( cls, row: Tag, status: Literal["active", "archived"], category: str ) -> ProtectionProfile.WebData: cells = list(row.find_all("td")) if status == "active" and len(cells) != 6: raise ValueError( f"Unexpected number of elements in PP html row. Expected: 6, actual: {len(cells)}" ) if status == "archived" and len(cells) != 7: raise ValueError( f"Unexpected number of elements in PP html row. Expected: 6, actual: {len(cells)}" ) pp_link = cls._html_row_get_link(cells[0]) pp_name = cls._html_row_get_name(cells[0]) if not sanitization.sanitize_cc_link(pp_link): raise ValueError(f"pp_link for PP {pp_name} is empty, cannot create PP record") mu_div = cc_html_parsing.html_row_get_maintenance_div(row) maintenance_updates = cc_html_parsing.parse_maintenance_div(mu_div) if mu_div else [] if maintenance_updates: # Drop ST links, not filled in for PPs maintenance_updates = [x[:3] for x in maintenance_updates] return cls( category, status, False, pp_name, cls._html_row_get_version(cells[1]), cls._html_row_get_security_level(cells[2]), cls._html_row_get_date(cells[3]), None if status == "active" else cls._html_row_get_date(cells[4]), cls._html_row_get_link(cells[-1]), pp_link, cls._html_row_get_scheme(cells[-2]), maintenance_updates, ) @classmethod def _from_html_row_collaborative(cls, row: Tag, category: str) -> ProtectionProfile.WebData: cells = list(row.find_all("td")) if len(cells) != 5: raise ValueError( f"Unexpected number of elements in collaborative PP html row. Expected: 5, actual: {len(cells)}" ) pp_link = cls._html_row_get_collaborative_pp_link(cells[0]) pp_name = cls._html_row_get_collaborative_name(cells[0]) if not sanitization.sanitize_cc_link(pp_link): raise ValueError(f"pp_link for PP {pp_name} is empty, cannot create PP record") return cls( category, "active", True, pp_name, cls._html_row_get_version(cells[1]), cls._html_row_get_security_level(cells[2]), cls._html_row_get_date(cells[3]), None, cls._html_row_get_link(cells[-1]), pp_link, None, [], ) @staticmethod def _html_row_get_date(cell: Tag) -> date | None: text = cell.get_text() extracted_date = datetime.strptime(text, "%Y-%m-%d").date() if text else None return extracted_date @staticmethod def _html_row_get_name(cell: Tag) -> str: return str(cell.find_all("a")[0].string) @staticmethod def _html_row_get_link(cell: Tag) -> str: return constants.CC_PORTAL_BASE_URL + str(cell.find_all("a")[0].get("href")) @staticmethod def _html_row_get_version(cell: Tag) -> str: return str(cell.text) @staticmethod def _html_row_get_security_level(cell: Tag) -> set[str]: return set(map(str, cell.stripped_strings)) @staticmethod def _html_row_get_scheme(cell: Tag) -> str | None: schemes = list(map(str, cell.stripped_strings)) return schemes[0] if schemes else None @staticmethod def _html_row_get_collaborative_name(cell: Tag) -> str: return list(map(str, cell.stripped_strings))[0] @staticmethod def _html_row_get_collaborative_pp_link(cell: Tag) -> str: return constants.CC_PORTAL_BASE_URL + str( [x for x in cell.find_all("a") if x.string == "Protection Profile"][0].get("href") ) @dataclass class InternalState(ComplexSerializableType): """ Class to hold internal state for each of the documents. """ pp: DocumentState = field(default_factory=DocumentState) report: DocumentState = field(default_factory=DocumentState) def __init__( self, web_data: WebData, pdf_data: PdfData | None = None, heuristics: Heuristics | None = None, state: InternalState | None = None, ): super().__init__() self.web_data: ProtectionProfile.WebData = web_data self.pdf_data: ProtectionProfile.PdfData = pdf_data if pdf_data else ProtectionProfile.PdfData() self.heuristics: ProtectionProfile.Heuristics = heuristics if heuristics else ProtectionProfile.Heuristics() self.state: ProtectionProfile.InternalState = state if state else ProtectionProfile.InternalState() @property def dgst(self) -> str: """ digest of thwe protection profile, formed as first 16 bytes of `category|name|version` fields from `WebData` object. """ return helpers.get_first_16_bytes_sha256( "|".join([self.web_data.category, self.web_data.name, self.web_data.version]) ) def __str__(self) -> str: return f"PP: {self.web_data.name}, dgst: {self.dgst}" @property def label_studio_title(self) -> str: return self.web_data.name def merge(self, other: ProtectionProfile, other_source: str | None = None) -> None: raise ValueError("Merging of PPs not implemented.") def set_local_paths( self, report_pdf_dir: str | Path | None, pp_pdf_dir: str | Path | None, report_txt_dir: str | Path | None, pp_txt_dir: str | Path | None, ) -> None: """ Adjusts local paths for various files. """ if report_pdf_dir: self.state.report.pdf_path = Path(report_pdf_dir) / f"{self.dgst}.pdf" if pp_pdf_dir: self.state.pp.pdf_path = Path(pp_pdf_dir) / f"{self.dgst}.pdf" if report_txt_dir: self.state.report.txt_path = Path(report_txt_dir) / f"{self.dgst}.txt" if pp_txt_dir: self.state.pp.txt_path = Path(pp_txt_dir) / f"{self.dgst}.txt" @classmethod def from_html_row( cls, row: Tag, status: Literal["active", "archived"], category: str, is_collaborative: bool ) -> ProtectionProfile: """ Builds a `ProtectionProfile` object from html row obtained from cc portal html source. """ return cls(ProtectionProfile.WebData.from_html_row(row, status, category, is_collaborative)) @staticmethod def download_pdf_report(cert: ProtectionProfile) -> ProtectionProfile: """ Downloads pdf of certification report for the given protection profile. """ exit_code: str | int | None if not cert.web_data.report_link: exit_code = "No link" else: exit_code = helpers.download_file( cert.web_data.report_link, cert.state.report.pdf_path, proxy=config.cc_use_proxy ) if exit_code != requests.codes.ok: error_msg = f"failed to download report from {cert.web_data.report_link}, code: {exit_code}" logger.error(f"Cert dgst: {cert.dgst} " + error_msg) cert.state.report.download_ok = False else: cert.state.report.download_ok = True cert.state.report.pdf_hash = helpers.get_sha256_filepath(cert.state.report.pdf_path) cert.pdf_data.report_filename = unquote_plus(str(urlparse(cert.web_data.report_link).path).split("/")[-1]) return cert @staticmethod def download_pdf_pp(cert: ProtectionProfile) -> ProtectionProfile: """ Downloads actual pdf of the given protection profile. """ exit_code: str | int | None if not cert.web_data.pp_link: exit_code = "No link" else: exit_code = helpers.download_file(cert.web_data.pp_link, cert.state.pp.pdf_path, proxy=config.cc_use_proxy) if exit_code != requests.codes.ok: error_msg = f"failed to download PP from {cert.web_data.pp_link}, code: {exit_code}" logger.error(f"Cert dgst: {cert.dgst} " + error_msg) cert.state.pp.download_ok = False else: cert.state.pp.download_ok = True cert.state.pp.pdf_hash = helpers.get_sha256_filepath(cert.state.pp.pdf_path) cert.pdf_data.pp_filename = unquote_plus(str(urlparse(cert.web_data.pp_link).path).split("/")[-1]) return cert @staticmethod def convert_report_pdf(cert: ProtectionProfile) -> ProtectionProfile: """ Converts certification reports from pdf to txt. """ ocr_done, ok_result = convert_pdf_file(cert.state.report.pdf_path, cert.state.report.txt_path) cert.state.report.convert_garbage = ocr_done cert.state.report.convert_ok = ok_result if not ok_result: logger.error(f"Cert dgst: {cert.dgst} failed to convert report pdf to txt") else: cert.state.report.txt_hash = helpers.get_sha256_filepath(cert.state.report.txt_path) return cert @staticmethod def convert_pp_pdf(cert: ProtectionProfile) -> ProtectionProfile: """ Converts the actual protection profile from pdf to txt. """ ocr_done, ok_result = convert_pdf_file(cert.state.pp.pdf_path, cert.state.pp.txt_path) cert.state.pp.convert_garbage = ocr_done cert.state.pp.convert_ok = ok_result if not ok_result: logger.error(f"Cert dgst: {cert.dgst} failed to convert PP pdf to txt") else: cert.state.pp.txt_hash = helpers.get_sha256_filepath(cert.state.pp.txt_path) return cert @staticmethod def extract_report_pdf_metadata(cert: ProtectionProfile) -> ProtectionProfile: """ Extracts various pdf metadata from the certification report. """ try: cert.pdf_data.report_metadata = extract_pdf_metadata(cert.state.report.pdf_path) cert.state.report.extract_ok = True except ValueError: cert.state.report.extract_ok = False return cert @staticmethod def extract_pp_pdf_metadata(cert: ProtectionProfile) -> ProtectionProfile: """ Extracts various pdf metadata from the actual protection profile. """ try: cert.pdf_data.pp_metadata = extract_pdf_metadata(cert.state.pp.pdf_path) cert.state.pp.extract_ok = True except ValueError: cert.state.pp.extract_ok = False return cert @staticmethod def extract_report_pdf_keywords(cert: ProtectionProfile) -> ProtectionProfile: """ Extracts keywords using regexes from the certification report. """ report_keywords = extract_keywords(cert.state.report.txt_path, cc_rules) if report_keywords is None: cert.state.report.extract_ok = False else: cert.pdf_data.report_keywords = report_keywords return cert @staticmethod def extract_pp_pdf_keywords(cert: ProtectionProfile) -> ProtectionProfile: """ Extracts keywords using regexes from the actual protection profile. """ pp_keywords = extract_keywords(cert.state.pp.txt_path, cc_rules) if pp_keywords is None: cert.state.pp.extract_ok = False else: cert.pdf_data.pp_keywords = pp_keywords return cert