from __future__ import annotations import locale import shutil from collections.abc import Iterator from datetime import datetime from pathlib import Path from typing import ClassVar, cast import numpy as np import pandas as pd from bs4 import BeautifulSoup, Tag from pydantic import AnyHttpUrl from sec_certs import constants from sec_certs.configuration import config from sec_certs.dataset.auxiliary_dataset_handling import ( AuxiliaryDatasetHandler, CCMaintenanceUpdateDatasetHandler, CCSchemeDatasetHandler, CPEDatasetHandler, CPEMatchDictHandler, CVEDatasetHandler, ProtectionProfileDatasetHandler, ) from sec_certs.dataset.dataset import Dataset, logger from sec_certs.heuristics.cc import ( compute_cert_labs, compute_eals, compute_normalized_cert_ids, compute_references, compute_sars, compute_scheme_data, link_to_protection_profiles, ) from sec_certs.heuristics.common import compute_cpe_heuristics, compute_related_cves, compute_transitive_vulnerabilities from sec_certs.sample.cc import CCCertificate from sec_certs.sample.cc_maintenance_update import CCMaintenanceUpdate from sec_certs.serialization.json import ComplexSerializableType, only_backed, serialize from sec_certs.utils import helpers, sanitization from sec_certs.utils import parallel_processing as cert_processing from sec_certs.utils.profiling import staged class CCDataset(Dataset[CCCertificate], ComplexSerializableType): """ Class that holds :class:`sec_certs.sample.cc.CCCertificate` samples. Serializable into json, pandas, dictionary. Conveys basic certificate manipulations and dataset transformations. Many private methods that perform internal operations, feel free to exploit them. The dataset directory looks like this: ├── auxiliary_datasets │ ├── cpe_dataset.json │ ├── cve_dataset.json │ ├── cpe_match.json │ ├── cc_scheme.json │ ├── protection_profiles │ │ ├── reports │ │ │ ├── pdf │ │ │ └── txt │ │ ├── pps │ │ │ ├── pdf │ │ │ └── txt │ │ └── dataset.json │ └── maintenances │ ├── certs │ │ ├── reports │ │ │ ├── pdf │ │ │ └── txt │ │ └── targets │ │ ├── pdf │ │ └── txt │ └── maintenance_updates.json ├── certs │ ├── reports │ │ ├── pdf │ │ └── txt │ ├── targets │ │ ├── pdf │ │ └── txt │ └── certificates │ ├── pdf │ └── txt └── dataset.json """ FULL_ARCHIVE_URL: ClassVar[AnyHttpUrl] = config.cc_latest_full_archive SNAPSHOT_URL: ClassVar[AnyHttpUrl] = config.cc_latest_snapshot def __init__( self, certs: dict[str, CCCertificate] | None = None, root_dir: str | Path | None = None, name: str | None = None, description: str = "", state: Dataset.DatasetInternalState | None = None, aux_handlers: dict[type[AuxiliaryDatasetHandler], AuxiliaryDatasetHandler] | None = None, ): super().__init__(certs, root_dir, name, description, state, aux_handlers) if aux_handlers is None: self.aux_handlers = { CPEDatasetHandler: CPEDatasetHandler(self.auxiliary_datasets_dir if self.is_backed else None), CVEDatasetHandler: CVEDatasetHandler(self.auxiliary_datasets_dir if self.is_backed else None), CPEMatchDictHandler: CPEMatchDictHandler(self.auxiliary_datasets_dir if self.is_backed else None), CCSchemeDatasetHandler: CCSchemeDatasetHandler(self.auxiliary_datasets_dir if self.is_backed else None), ProtectionProfileDatasetHandler: ProtectionProfileDatasetHandler( self.auxiliary_datasets_dir if self.is_backed else None ), CCMaintenanceUpdateDatasetHandler: CCMaintenanceUpdateDatasetHandler( self.auxiliary_datasets_dir if self.is_backed else None ), } def to_pandas(self) -> pd.DataFrame: """ Return self serialized into pandas DataFrame """ df = pd.DataFrame( [x.pandas_tuple for x in self.certs.values()], columns=CCCertificate.pandas_columns, ) df = df.set_index("dgst") df.not_valid_before = pd.to_datetime(df.not_valid_before, errors="coerce") df.not_valid_after = pd.to_datetime(df.not_valid_after, errors="coerce") df = df.astype( { "category": "category", "status": "category", "scheme": "category", "cert_lab": "category", } ).fillna(value=np.nan) df = df.loc[ ~df.manufacturer.isnull() ] # Manually delete one certificate with None manufacturer (seems to have many blank fields) # Categorize EAL df.eal = df.eal.fillna(value=np.nan) df.eal = pd.Categorical(df.eal, categories=sorted(df.eal.dropna().unique().tolist()), ordered=True) # Introduce year when cert got valid df["year_from"] = pd.DatetimeIndex(df.not_valid_before).year return df @property @only_backed(throw=False) def reports_dir(self) -> Path: """ Returns directory that holds files associated with certification reports """ return self.certs_dir / "reports" @property @only_backed(throw=False) def reports_pdf_dir(self) -> Path: """ Returns directory that holds PDFs associated with certification reports """ return self.reports_dir / "pdf" @property @only_backed(throw=False) def reports_txt_dir(self) -> Path: """ Returns directory that holds TXTs associated with certification reports """ return self.reports_dir / "txt" @property @only_backed(throw=False) def targets_dir(self) -> Path: """ Returns directory that holds files associated with security targets """ return self.certs_dir / "targets" @property @only_backed(throw=False) def targets_pdf_dir(self) -> Path: """ Returns directory that holds PDFs associated with security targets """ return self.targets_dir / "pdf" @property @only_backed(throw=False) def targets_txt_dir(self) -> Path: """ Returns directory that holds TXTs associated with security targets """ return self.targets_dir / "txt" @property @only_backed(throw=False) def certificates_dir(self) -> Path: """ Returns directory that holds files associated with the certificates """ return self.certs_dir / "certificates" @property @only_backed(throw=False) def certificates_pdf_dir(self) -> Path: """ Returns directory that holds PDFs associated with certificates """ return self.certificates_dir / "pdf" @property @only_backed(throw=False) def certificates_txt_dir(self) -> Path: """ Returns directory that holds TXTs associated with certificates """ return self.certificates_dir / "txt" @property @only_backed(throw=False) def reference_annotator_dir(self) -> Path: return self.root_dir / "reference_annotator" HTML_PRODUCTS_URL = { "cc_products_active.html": constants.CC_PORTAL_BASE_URL + "/products/index.cfm", "cc_products_archived.html": constants.CC_PORTAL_BASE_URL + "/products/index.cfm?archived=1", } HTML_LABS_URL = {"cc_labs.html": constants.CC_PORTAL_BASE_URL + "/labs"} CSV_PRODUCTS_URL = { "cc_products_active.csv": constants.CC_PORTAL_BASE_URL + "/products/certified_products.csv", "cc_products_archived.csv": constants.CC_PORTAL_BASE_URL + "/products/certified_products-archived.csv", } PP_URL = { "cc_pp_active.html": constants.CC_PORTAL_BASE_URL + "/pps/", "cc_pp_collaborative.html": constants.CC_PORTAL_BASE_URL + "/pps/collaborativePP.cfm?cpp=1", "cc_pp_archived.html": constants.CC_PORTAL_BASE_URL + "/pps/index.cfm?archived=1", } PP_CSV = { "cc_pp_active.csv": constants.CC_PORTAL_BASE_URL + "/pps/pps.csv", "cc_pp_archived.csv": constants.CC_PORTAL_BASE_URL + "/pps/pps-archived.csv", } @property @only_backed(throw=False) def active_html_tuples(self) -> list[tuple[str, Path]]: """ Returns List Tuple[str, Path] where first element is name of html file and second element is its Path. The files correspond to html files parsed from CC website that list all *active* certificates. """ return [(x, self.web_dir / y) for y, x in self.HTML_PRODUCTS_URL.items() if "active" in y] @property @only_backed(throw=False) def archived_html_tuples(self) -> list[tuple[str, Path]]: """ Returns List Tuple[str, Path] where first element is name of html file and second element is its Path. The files correspond to html files parsed from CC website that list all *archived* certificates. """ return [(x, self.web_dir / y) for y, x in self.HTML_PRODUCTS_URL.items() if "archived" in y] @property @only_backed(throw=False) def active_csv_tuples(self) -> list[tuple[str, Path]]: """ Returns List Tuple[str, Path] where first element is name of csv file and second element is its Path. The files correspond to csv files downloaded from CC website that list all *active* certificates. """ return [(x, self.web_dir / y) for y, x in self.CSV_PRODUCTS_URL.items() if "active" in y] @property @only_backed(throw=False) def archived_csv_tuples(self) -> list[tuple[str, Path]]: """ Returns List Tuple[str, Path] where first element is name of csv file and second element is its Path. The files correspond to csv files downloaded from CC website that list all *archived* certificates. """ return [(x, self.web_dir / y) for y, x in self.CSV_PRODUCTS_URL.items() if "archived" in y] def _set_local_paths(self): super()._set_local_paths() if self.root_dir is None: return for cert in self: cert.set_local_paths( self.reports_pdf_dir, self.targets_pdf_dir, self.certificates_pdf_dir, self.reports_txt_dir, self.targets_txt_dir, self.certificates_txt_dir, ) @only_backed() def process_auxiliary_datasets( self, download_fresh: bool = False, skip_schemes: bool = False, **kwargs, ) -> None: if CCMaintenanceUpdateDatasetHandler in self.aux_handlers: self.aux_handlers[CCMaintenanceUpdateDatasetHandler].certs_with_updates = [ # type: ignore x for x in self if x.maintenance_updates ] if CCSchemeDatasetHandler in self.aux_handlers: self.aux_handlers[CCSchemeDatasetHandler].only_schemes = {x.scheme for x in self} # type: ignore if skip_schemes: self.aux_handlers[CCSchemeDatasetHandler].only_schemes = {} # type: ignore super().process_auxiliary_datasets(download_fresh, **kwargs) def _merge_certs(self, certs: dict[str, CCCertificate], cert_source: str | None = None) -> None: """ Merges dictionary of certificates into the dataset. Assuming they all are CommonCriteria certificates """ new_certs = {x.dgst: x for x in certs.values() if x not in self} certs_to_merge = [x for x in certs.values() if x in self] self.certs.update(new_certs) for crt in certs_to_merge: self[crt.dgst].merge(crt, cert_source) logger.info(f"Added {len(new_certs)} new and merged further {len(certs_to_merge)} certificates to the dataset.") def _download_csv_html_resources(self, get_active: bool = True, get_archived: bool = True) -> None: self.web_dir.mkdir(parents=True, exist_ok=True) html_items = [] csv_items = [] if get_active is True: html_items.extend(self.active_html_tuples) csv_items.extend(self.active_csv_tuples) if get_archived is True: html_items.extend(self.archived_html_tuples) csv_items.extend(self.archived_csv_tuples) html_urls, html_paths = [x[0] for x in html_items], [x[1] for x in html_items] csv_urls, csv_paths = [x[0] for x in csv_items], [x[1] for x in csv_items] logger.info("Downloading required csv and html files.") helpers.download_parallel(html_urls, html_paths) helpers.download_parallel(csv_urls, csv_paths) @serialize @staged(logger, "Downloading and processing CSV and HTML files of certificates.") @only_backed() def get_certs_from_web( self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True, ) -> None: """ Downloads CSV and HTML files that hold lists of certificates from common criteria website. Parses these files and constructs CCCertificate objects, fills the dataset with those. :param bool to_download: If CSV and HTML files shall be downloaded (or existing files utilized), defaults to True :param bool keep_metadata: If CSV and HTML files shall be kept on disk after download, defaults to True :param bool get_active: If active certificates shall be parsed, defaults to True :param bool get_archived: If archived certificates shall be parsed, defaults to True """ if to_download is True: self._download_csv_html_resources(get_active, get_archived) logger.info("Adding CSV certificates to CommonCriteria dataset.") csv_certs = self._get_all_certs_from_csv(get_active, get_archived) self._merge_certs(csv_certs, cert_source="csv") # Someway along the way, 3 certificates get lost. logger.info("Adding HTML certificates to CommonCriteria dataset.") html_certs = self._get_all_certs_from_html(get_active, get_archived) self._merge_certs(html_certs, cert_source="html") logger.info(f"The resulting dataset has {len(self)} certificates.") if not keep_metadata: shutil.rmtree(self.web_dir) self._set_local_paths() self.state.meta_sources_parsed = True def _get_all_certs_from_csv(self, get_active: bool, get_archived: bool) -> dict[str, CCCertificate]: """ Creates dictionary of new certificates from csv sources. """ csv_sources = list(self.CSV_PRODUCTS_URL.keys()) csv_sources = [x for x in csv_sources if "active" not in x or get_active] csv_sources = [x for x in csv_sources if "archived" not in x or get_archived] new_certs = {} for file in csv_sources: partial_certs = self._parse_single_csv(self.web_dir / file) logger.info(f"Parsed {len(partial_certs)} certificates from: {file}") new_certs.update(partial_certs) return new_certs @staticmethod def _parse_single_csv(file: Path) -> dict[str, CCCertificate]: """ Using pandas, this parses a single CSV file. """ def map_ip_to_hostname(url: str) -> str: if not url: return url tokens = url.split("/") relative_path = "/" + "/".join(tokens[3:]) return constants.CC_PORTAL_BASE_URL + relative_path def _get_primary_key_str(row: Tag): return "|".join( [ row["category"], row["cert_name"], sanitization.sanitize_link_fname(row["report_link"]) or "None", sanitization.sanitize_link_fname(row["st_link"]) or "None", ] ) cert_status = "active" if "active" in str(file) else "archived" csv_header = [ "category", "cert_name", "manufacturer", "scheme", "security_level", "protection_profiles", "not_valid_before", "not_valid_after", "report_link", "st_link", "maintenance_date", "maintenance_title", "maintenance_report_link", "maintenance_st_link", ] # TODO: Now skipping bad lines, smarter heuristics to be built for dumb files try: df = pd.read_csv(file, engine="python", encoding="utf-8", on_bad_lines="skip") except UnicodeDecodeError: df = pd.read_csv(file, engine="python", encoding="windows-1252", on_bad_lines="skip") df = df.rename(columns=dict(zip(list(df.columns), csv_header))) df["is_maintenance"] = ~df.maintenance_title.isnull() df = df.fillna(value="") df[["not_valid_before", "not_valid_after", "maintenance_date"]] = df[ ["not_valid_before", "not_valid_after", "maintenance_date"] ].apply(pd.to_datetime, errors="coerce") df["dgst"] = df.apply( lambda row: helpers.get_first_16_bytes_sha256(_get_primary_key_str(row)), axis=1, ) df_base = df.loc[~df.is_maintenance].copy() df_main = df.loc[df.is_maintenance].copy() df_base.report_link = df_base.report_link.map(map_ip_to_hostname).map(sanitization.sanitize_link) df_base.st_link = df_base.st_link.map(map_ip_to_hostname).map(sanitization.sanitize_link) df_main.maintenance_report_link = df_main.maintenance_report_link.map(map_ip_to_hostname).map( sanitization.sanitize_link ) df_main.maintenance_st_link = df_main.maintenance_st_link.map(map_ip_to_hostname).map( sanitization.sanitize_link ) n_all = len(df_base) n_deduplicated = len(df_base.drop_duplicates(subset=["dgst"])) if (n_dup := n_all - n_deduplicated) > 0: logger.warning(f"The CSV {file} contains {n_dup} duplicates by the primary key.") df_base = df_base.drop_duplicates(subset=["dgst"]) df_main = df_main.drop_duplicates() updates: dict[str, set] = {x.dgst: set() for x in df_base.itertuples()} for x in df_main.itertuples(): updates[x.dgst].add( CCCertificate.MaintenanceReport( x.maintenance_date.date(), x.maintenance_title, x.maintenance_report_link, x.maintenance_st_link, ) ) return { x.dgst: CCCertificate( cert_status, x.category, x.cert_name, x.manufacturer, x.scheme, x.security_level, x.not_valid_before, x.not_valid_after, x.report_link, x.st_link, None, None, None, updates.get(x.dgst, None), None, None, None, ) for x in df_base.itertuples() } def _get_all_certs_from_html(self, get_active: bool, get_archived: bool) -> dict[str, CCCertificate]: """ Prepares dictionary of certificates from all html files. """ html_sources = list(self.HTML_PRODUCTS_URL.keys()) if get_active is False: html_sources = [x for x in html_sources if "active" not in x] if get_archived is False: html_sources = [x for x in html_sources if "archived" not in x] new_certs = {} for file in html_sources: partial_certs = self._parse_single_html(self.web_dir / file) logger.info(f"Parsed {len(partial_certs)} certificates from: {file}") new_certs.update(partial_certs) return new_certs @staticmethod def _parse_single_html(file: Path) -> dict[str, CCCertificate]: """ Prepares a dictionary of certificates from a single html file. """ def _get_timestamp_from_footer(footer): locale.setlocale(locale.LC_ALL, "en_US") footer_text = list(footer.stripped_strings)[0] date_string = footer_text.split(",")[1:3] time_string = footer_text.split(",")[3].split(" at ")[1] formatted_datetime = date_string[0] + date_string[1] + " " + time_string return datetime.strptime(formatted_datetime, " %B %d %Y %I:%M %p") def _parse_table( soup: BeautifulSoup, cert_status: str, table_id: str, category_string: str ) -> dict[str, CCCertificate]: tables = soup.find_all("table", id=table_id) if len(tables) > 1: raise ValueError( f'The "{file.name}" was expected to contain 0-1