from __future__ import annotations import shutil from pathlib import Path from typing import ClassVar, Literal from bs4 import BeautifulSoup from pydantic import AnyHttpUrl from sec_certs import constants from sec_certs.configuration import config from sec_certs.dataset.auxiliary_dataset_handling import AuxiliaryDatasetHandler from sec_certs.dataset.dataset import Dataset, logger from sec_certs.sample.protection_profile import ProtectionProfile from sec_certs.serialization.json import ComplexSerializableType, only_backed, serialize from sec_certs.utils import helpers from sec_certs.utils import parallel_processing as cert_processing from sec_certs.utils.profiling import staged class ProtectionProfileDataset(Dataset[ProtectionProfile], ComplexSerializableType): """ Class for processing :class:`sec_certs.sample.protection_profile.ProtectionProfile` samples. Inherits from `ComplexSerializableType` and base abstract `Dataset` class. The dataset directory looks like this: ├── reports │ ├── pdf │ └── txt ├── pps │ ├── pdf │ └── txt └── dataset.json """ FULL_ARCHIVE_URL: ClassVar[AnyHttpUrl] = config.pp_latest_full_archive SNAPSHOT_URL: ClassVar[AnyHttpUrl] = config.pp_latest_snapshot def __init__( self, certs: dict[str, ProtectionProfile] | None = None, root_dir: str | Path | None = None, name: str | None = None, description: str = "", state: Dataset.DatasetInternalState | None = None, aux_handlers: dict[type[AuxiliaryDatasetHandler], AuxiliaryDatasetHandler] | None = None, ): super().__init__(certs, root_dir, name, description, state, aux_handlers) @property @only_backed(throw=False) def json_path(self) -> Path: return self.root_dir / "dataset.json" @property @only_backed(throw=False) def reports_dir(self) -> Path: """ Path to protection profile reports. """ return self.root_dir / "reports" @property @only_backed(throw=False) def pps_dir(self) -> Path: """ Path to actual protection profiles. """ return self.root_dir / "pps" @property @only_backed(throw=False) def reports_pdf_dir(self) -> Path: """ Path to pdfs of protection profile reports. """ return self.reports_dir / "pdf" @property @only_backed(throw=False) def reports_txt_dir(self) -> Path: """ Path to txts of protection profile reports. """ return self.reports_dir / "txt" @property @only_backed(throw=False) def pps_pdf_dir(self) -> Path: """ Path to pdfs of protection profiles """ return self.pps_dir / "pdf" @property @only_backed(throw=False) def pps_txt_dir(self) -> Path: """ Path to txts of protection profiles. """ return self.pps_dir / "txt" @property @only_backed(throw=False) def web_dir(self) -> Path: """ Path to directory with html sources downloaded from commoncriteriaportal.org """ return self.root_dir / "web" def _set_local_paths(self): super()._set_local_paths() if self.root_dir is None: return for cert in self: cert.set_local_paths(self.reports_pdf_dir, self.pps_pdf_dir, self.reports_txt_dir, self.pps_txt_dir) HTML_URL = { "pp_active.html": constants.CC_PORTAL_BASE_URL + "/pps/index.cfm", "pp_archived.html": constants.CC_PORTAL_BASE_URL + "/pps/index.cfm?archived=1", "pp_collaborative.html": constants.CC_PORTAL_BASE_URL + "/pps/collaborativePP.cfm?cpp=1", } @property @only_backed(throw=False) def active_html_tuples(self) -> list[tuple[str, Path]]: return [(x, self.web_dir / y) for y, x in self.HTML_URL.items() if "active" in y] @property @only_backed(throw=False) def archived_html_tuples(self) -> list[tuple[str, Path]]: return [(x, self.web_dir / y) for y, x in self.HTML_URL.items() if "archived" in y] @property @only_backed(throw=False) def collaborative_html_tuples(self) -> list[tuple[str, Path]]: return [(x, self.web_dir / y) for y, x in self.HTML_URL.items() if "collaborative" in y] @serialize @staged(logger, "Downloading and processing CSV and HTML files of certificates.") @only_backed() def get_certs_from_web( self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True, get_collaborative: bool = True, ) -> None: """ Fetches list of protection profiles together with metadata from commoncriteriaportal.org """ if to_download: self._download_html_resources(get_active, get_archived, get_collaborative) logger.info("Adding HTML certificates to ProtectionProfile dataset.") self.certs = self._get_all_certs_from_html(get_active, get_archived, get_collaborative) logger.info(f"The resulting dataset has {len(self)} certificates.") if not keep_metadata: shutil.rmtree(self.web_dir) self._set_local_paths() self.state.meta_sources_parsed = True def _get_all_certs_from_html( self, get_active: bool = True, get_archived: bool = True, get_collaborative: bool = True ) -> dict[str, ProtectionProfile]: html_sources = [] if get_active: html_sources.extend([x for x in self.HTML_URL if "active" in x]) if get_archived: html_sources.extend([x for x in self.HTML_URL if "archived" in x]) if get_collaborative: html_sources.extend([x for x in self.HTML_URL if "collaborative" in x]) new_certs = {} for file in html_sources: partial_certs = self._parse_single_html(self.web_dir / file) logger.info(f"Parsed {len(partial_certs)} protection profiles from: {file}.") new_certs.update(partial_certs) return new_certs def _download_html_resources( self, get_active: bool = True, get_archived: bool = True, get_collaborative: bool = True ) -> None: self.web_dir.mkdir(parents=True, exist_ok=True) html_items = [] if get_active: html_items.extend(self.active_html_tuples) if get_archived: html_items.extend(self.archived_html_tuples) if get_collaborative: html_items.extend(self.collaborative_html_tuples) html_urls, html_paths = [x[0] for x in html_items], [x[1] for x in html_items] logger.info("Downloading required csv and html files.") helpers.download_parallel(html_urls, html_paths) @staticmethod def _parse_single_html(file: Path) -> dict[str, ProtectionProfile]: def _parse_table( soup: BeautifulSoup, cert_status: Literal["active", "archived"], table_id: str, category_string: str, is_collaborative: bool, ) -> dict[str, ProtectionProfile]: tables = soup.find_all("table", id=table_id) if len(tables) > 1: raise ValueError( f'The "{file.name}" was expected to contain 0-1 element. Instead, it contains: {len(tables)}
elements.' ) if not tables: return {} body = list(tables[0].find_all("tr"))[1:] table_certs = {} for row in body: try: pp = ProtectionProfile.from_html_row(row, cert_status, category_string, is_collaborative) table_certs[pp.dgst] = pp except ValueError as e: logger.error(f"Error when creating ProtectionProfile object: {e}") return table_certs cert_status: Literal["active", "archived"] = "active" if "active" in file.name else "archived" is_collaborative = "collaborative" in file.name cc_table_ids = ["tbl" + x for x in constants.CC_CAT_ABBREVIATIONS] if is_collaborative: cc_table_ids = [x + "1" for x in cc_table_ids] cat_dict = dict(zip(cc_table_ids, constants.CC_CATEGORIES)) with file.open("r") as handle: soup = BeautifulSoup(handle, "html5lib") certs = {} for key, val in cat_dict.items(): certs.update(_parse_table(soup, cert_status, key, val, is_collaborative)) return certs def _convert_all_pdfs_body(self, fresh=True): self._convert_reports_to_txt(fresh) self._convert_pps_to_txt(fresh) @staged(logger, "Converting PDFs of PP certification reports to text.") def _convert_reports_to_txt(self, fresh: bool = True): self.reports_txt_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.report.is_ok_to_convert(fresh)] if not fresh and certs_to_process: logger.info( f"Converting {len(certs_to_process)} PDFs of PP certification reports to text for which previous conversion failed." ) cert_processing.process_parallel( ProtectionProfile.convert_report_pdf, certs_to_process, progress_bar_desc="Converting PDFs of PP certification reports to text.", ) @staged(logger, "Converting PDFs of actual Protection Profiles to text.") def _convert_pps_to_txt(self, fresh: bool = True): self.pps_txt_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.pp.is_ok_to_convert(fresh)] if not fresh and certs_to_process: logger.info( f"Converting {len(certs_to_process)} PDFs of actual Protection Profiles to text for which previous conversion failed." ) cert_processing.process_parallel( ProtectionProfile.convert_pp_pdf, certs_to_process, progress_bar_desc="Converting PDFs of actual Protection Profiles to text.", ) def _download_all_artifacts_body(self, fresh=True): self._download_reports(fresh) self._download_pps(fresh) @staged(logger, "Downloading PDFs of PP certification reports.") def _download_reports(self, fresh: bool = True): self.reports_pdf_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.report.is_ok_to_download(fresh) and x.web_data.report_link] if not fresh and certs_to_process: logger.info( f"Downloading {len(certs_to_process)} PDFs of PP certification reports for which previous download failed." ) cert_processing.process_parallel( ProtectionProfile.download_pdf_report, certs_to_process, progress_bar_desc="Downloading PDFs of PP certification reports.", ) @staged(logger, "Downloading PDFs of actual Protection Profiles.") def _download_pps(self, fresh: bool = True): self.pps_pdf_dir.mkdir(parents=True, exist_ok=True) certs_to_process = [x for x in self if x.state.pp.is_ok_to_download(fresh) and x.web_data.pp_link] if not fresh and certs_to_process: logger.info( f"Downloading {len(certs_to_process)} PDFs of actual Protection Profiles for which previous download failed." ) cert_processing.process_parallel( ProtectionProfile.download_pdf_pp, certs_to_process, progress_bar_desc="Downloading PDFs of actual Protection Profiles.", ) @only_backed() def extract_data(self): """ Extracts pdf metadata and keywords from converted text documents. """ logger.info("Extracting various data from certification artifacts.") self._extract_pdf_metadata() self._extract_pdf_keywords() @staged(logger, "Extracting metadata from certification artifacts.") def _extract_pdf_metadata(self): self._extract_report_metadata() self._extract_pp_metadata() @staged(logger, "Extracting keywords from certification artifacts.") def _extract_pdf_keywords(self): self._extract_report_keywords() self._extract_pp_keywords() def _extract_report_metadata(self): certs_to_process = [x for x in self if x.state.report.is_ok_to_analyze()] processed_certs = cert_processing.process_parallel( ProtectionProfile.extract_report_pdf_metadata, certs_to_process, use_threading=False, progress_bar_desc="Extracting metadata from PP certification reports.", ) self.update_with_certs(processed_certs) def _extract_pp_metadata(self): certs_to_process = [x for x in self if x.state.pp.is_ok_to_analyze()] processed_certs = cert_processing.process_parallel( ProtectionProfile.extract_pp_pdf_metadata, certs_to_process, use_threading=False, progress_bar_desc="Extracting metadata from actual Protection Profiles.", ) self.update_with_certs(processed_certs) def _extract_report_keywords(self): certs_to_process = [x for x in self if x.state.report.is_ok_to_analyze()] processed_certs = cert_processing.process_parallel( ProtectionProfile.extract_report_pdf_keywords, certs_to_process, use_threading=False, progress_bar_desc="Extracting keywords from PP certification reports.", ) self.update_with_certs(processed_certs) def _extract_pp_keywords(self): certs_to_process = [x for x in self if x.state.pp.is_ok_to_analyze()] processed_certs = cert_processing.process_parallel( ProtectionProfile.extract_pp_pdf_keywords, certs_to_process, use_threading=False, progress_bar_desc="Extracting keywords from actual Protection Profiles.", ) self.update_with_certs(processed_certs) def _compute_heuristics_body(self): logger.info("Protection profile dataset has no heuristics to compute, skipping.") @only_backed() def process_auxiliary_datasets(self, **kwargs) -> None: """ Dummy method to adhere to `Dataset` interface. `ProtectionProfile` dataset has currently no auxiliary datasets. This will just set the state `auxiliary_datasets_processed = True` """ logger.info("Protection Profile dataset has no auxiliary datasets to process, skipping.") self.state.auxiliary_datasets_processed = True def get_pp_by_pp_link(self, pp_link: str) -> ProtectionProfile | None: """ Given URL to PP pdf, will retrieve `ProtectionProfile` object in the dataset with the link, if such exists. """ for pp in self: if pp.web_data.pp_link == pp_link: return pp return None