aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorJán Jančár2024-10-18 20:53:19 +0200
committerGitHub2024-10-18 20:53:19 +0200
commit2a3d45cd8068d79ebbe281ee7b46bd150b4058ca (patch)
tree486e9946c32d2e3205006e46994d506edc1fdfd1 /src
parent17af7f668386985e5f761b2a48b6e6aca2995b3f (diff)
parente7ba5ef02170ad9b96e795fd714f68b3d666c866 (diff)
downloadsec-certs-2a3d45cd8068d79ebbe281ee7b46bd150b4058ca.tar.gz
sec-certs-2a3d45cd8068d79ebbe281ee7b46bd150b4058ca.tar.zst
sec-certs-2a3d45cd8068d79ebbe281ee7b46bd150b4058ca.zip
Merge pull request #446 from crocs-muni/feat/full-dset-archive-download
Add a way to download full dataset archive (including PDFs) from the web.
Diffstat (limited to 'src')
-rw-r--r--src/sec_certs/configuration.py48
-rw-r--r--src/sec_certs/constants.py1
-rw-r--r--src/sec_certs/dataset/cc.py147
-rw-r--r--src/sec_certs/dataset/dataset.py106
-rw-r--r--src/sec_certs/dataset/fips.py57
5 files changed, 301 insertions, 58 deletions
diff --git a/src/sec_certs/configuration.py b/src/sec_certs/configuration.py
index 47ac5821..0305bdea 100644
--- a/src/sec_certs/configuration.py
+++ b/src/sec_certs/configuration.py
@@ -31,7 +31,9 @@ class Configuration(BaseSettings):
description=" During validation we don't connect certificates with validation dates difference higher than _this_.",
)
n_threads: int = Field(
- -1, description="How many threads to use for parallel computations. Set to -1 to use all logical cores.", ge=-1
+ -1,
+ description="How many threads to use for parallel computations. Set to -1 to use all logical cores.",
+ ge=-1,
)
cpe_matching_threshold: int = Field(
92,
@@ -40,12 +42,18 @@ class Configuration(BaseSettings):
le=100,
)
cpe_n_max_matches: int = Field(
- 99, description="Maximum number of candidate CPE items that may be related to given certificate, >0", gt=0
+ 99,
+ description="Maximum number of candidate CPE items that may be related to given certificate, >0",
+ gt=0,
)
cc_latest_snapshot: AnyHttpUrl = Field(
"https://sec-certs.org/cc/dataset.json",
description="URL from where to fetch the latest snapshot of fully processed CC dataset.",
)
+ cc_latest_full_archive: AnyHttpUrl = Field(
+ "https://sec-certs.org/cc/cc.tar.gz",
+ description="URL from where to fetch the latest full archive of fully processed CC dataset.",
+ )
cc_maintenances_latest_snapshot: AnyHttpUrl = Field(
"https://sec-certs.org/cc/maintenance_updates.json",
description="URL from where to fetch the latest snapshot of CC maintenance updates",
@@ -55,25 +63,36 @@ class Configuration(BaseSettings):
description="URL from where to fetch the latest snapshot of the PP dataset.",
)
fips_latest_snapshot: AnyHttpUrl = Field(
- "https://sec-certs.org/fips/dataset.json", description="URL for the latest snapshot of FIPS dataset."
+ "https://sec-certs.org/fips/dataset.json",
+ description="URL for the latest snapshot of FIPS dataset.",
+ )
+ fips_latest_full_archive: AnyHttpUrl = Field(
+ "https://sec-certs.org/fips/fips.tar.gz",
+ description="URL from where to fetch the latest full archive of fully processed FIPS dataset.",
)
fips_iut_dataset: AnyHttpUrl = Field(
- "https://sec-certs.org/fips/iut/dataset.json", description="URL for the dataset of FIPS IUT data."
+ "https://sec-certs.org/fips/iut/dataset.json",
+ description="URL for the dataset of FIPS IUT data.",
)
fips_iut_latest_snapshot: AnyHttpUrl = Field(
- "https://sec-certs.org/fips/iut/latest.json", description="URL for the latest snapshot of FIPS IUT data."
+ "https://sec-certs.org/fips/iut/latest.json",
+ description="URL for the latest snapshot of FIPS IUT data.",
)
fips_mip_dataset: AnyHttpUrl = Field(
- "https://sec-certs.org/fips/mip/dataset.json", description="URL for the dataset of FIPS MIP data"
+ "https://sec-certs.org/fips/mip/dataset.json",
+ description="URL for the dataset of FIPS MIP data",
)
fips_mip_latest_snapshot: AnyHttpUrl = Field(
- "https://sec-certs.org/fips/mip/latest.json", description="URL for the latest snapshot of FIPS MIP data"
+ "https://sec-certs.org/fips/mip/latest.json",
+ description="URL for the latest snapshot of FIPS MIP data",
)
cpe_latest_snapshot: AnyHttpUrl = Field(
- "https://sec-certs.org/vuln/cpe/cpe.json.gz", description="URL for the latest snapshot of CPEDataset."
+ "https://sec-certs.org/vuln/cpe/cpe.json.gz",
+ description="URL for the latest snapshot of CPEDataset.",
)
cve_latest_snapshot: AnyHttpUrl = Field(
- "https://sec-certs.org/vuln/cve/cve.json.gz", description="URL for the latest snapshot of CVEDataset."
+ "https://sec-certs.org/vuln/cve/cve.json.gz",
+ description="URL for the latest snapshot of CVEDataset.",
)
cpe_match_latest_snapshot: AnyHttpUrl = Field(
"https://sec-certs.org/vuln/cpe/cpe_match.json.gz",
@@ -91,14 +110,16 @@ class Configuration(BaseSettings):
ge=0,
)
ignore_first_page: bool = Field(
- True, description="During keyword search, first page usually contains addresses - ignore it."
+ True,
+ description="During keyword search, first page usually contains addresses - ignore it.",
)
cc_reference_annotator_dir: Optional[Path] = Field( # noqa: UP007
None,
description="Path to directory with serialized reference annotator model. If set to `null`, tool will search default directory for the given dataset.",
)
cc_reference_annotator_should_train: bool = Field(
- True, description="True if new reference annotator model shall be build, False otherwise."
+ True,
+ description="True if new reference annotator model shall be build, False otherwise.",
)
cc_matching_threshold: int = Field(
90,
@@ -109,14 +130,15 @@ class Configuration(BaseSettings):
cc_use_proxy: bool = Field(False, description="Download CC artifacts through the sec-certs.org proxy.")
fips_use_proxy: bool = Field(False, description="Download FIPS artifacts through the sec-certs.org proxy.")
enable_progress_bars: bool = Field(
- True, description="If true, progress bars will be printed to stdout during computation."
+ True,
+ description="If true, progress bars will be printed to stdout during computation.",
)
nvd_api_key: Optional[str] = Field(None, description="NVD API key for access to CVEs and CPEs.") # noqa: UP007
preferred_source_nvd_datasets: Literal["sec-certs", "api"] = Field(
"sec-certs",
description="If set to `sec-certs`, will fetch CPE and CVE datasets from sec-certs.org."
+ " If set to `api`, will fetch these resources from NVD API. It is advised to set an"
- + " `nvd_api_key` when setting this to `nvd`.",
+ + " `nvd_api_key` when setting this to `api`.",
)
def _get_nondefault_keys(self) -> set[str]:
diff --git a/src/sec_certs/constants.py b/src/sec_certs/constants.py
index 956f8fb7..b1c25fe5 100644
--- a/src/sec_certs/constants.py
+++ b/src/sec_certs/constants.py
@@ -7,6 +7,7 @@ REF_ANNOTATION_MODES = Literal["training", "evaluation", "production", "cross-va
REF_EMBEDDING_METHOD = Literal["tf_idf", "transformer"]
+# This stupid thing should die in a fire...
DUMMY_NONEXISTING_PATH = Path("/this/is/dummy/nonexisting/path")
RESPONSE_OK = 200
diff --git a/src/sec_certs/dataset/cc.py b/src/sec_certs/dataset/cc.py
index 79478d3d..4c1114f3 100644
--- a/src/sec_certs/dataset/cc.py
+++ b/src/sec_certs/dataset/cc.py
@@ -80,13 +80,21 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
"""
Return self serialized into pandas DataFrame
"""
- df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=CCCertificate.pandas_columns)
+ df = pd.DataFrame(
+ [x.pandas_tuple for x in self.certs.values()],
+ columns=CCCertificate.pandas_columns,
+ )
df = df.set_index("dgst")
df.not_valid_before = pd.to_datetime(df.not_valid_before, errors="coerce")
df.not_valid_after = pd.to_datetime(df.not_valid_after, errors="coerce")
df = df.astype(
- {"category": "category", "status": "category", "scheme": "category", "cert_lab": "category"}
+ {
+ "category": "category",
+ "status": "category",
+ "scheme": "category",
+ "cert_lab": "category",
+ }
).fillna(value=np.nan)
df = df.loc[
~df.manufacturer.isnull()
@@ -212,7 +220,10 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
"cc_pp_collaborative.html": BASE_URL + "/pps/collaborativePP.cfm?cpp=1",
"cc_pp_archived.html": BASE_URL + "/pps/index.cfm?archived=1",
}
- PP_CSV = {"cc_pp_active.csv": BASE_URL + "/pps/pps.csv", "cc_pp_archived.csv": BASE_URL + "/pps/pps-archived.csv"}
+ PP_CSV = {
+ "cc_pp_active.csv": BASE_URL + "/pps/pps.csv",
+ "cc_pp_archived.csv": BASE_URL + "/pps/pps-archived.csv",
+ }
@property
def active_html_tuples(self) -> list[tuple[str, Path]]:
@@ -247,11 +258,33 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
return [(x, self.web_dir / y) for y, x in self.CSV_PRODUCTS_URL.items() if "archived" in y]
@classmethod
- def from_web_latest(cls) -> CCDataset:
+ def from_web_latest(
+ cls,
+ path: str | Path | None = None,
+ auxiliary_datasets: bool = False,
+ artifacts: bool = False,
+ ) -> CCDataset:
"""
- Fetches the fresh snapshot of CCDataset from sec-certs.org
+ Fetches the fresh snapshot of CCDataset from sec-certs.org.
+
+ Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs).
+
+ :::{note}
+ Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes.
+ :::
+
+ :param path: Path to a directory where to store the dataset, or `None` if it should not be stored.
+ :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets).
+ :param artifacts: Whether to also download artifacts (i.e. PDFs).
"""
- return cls.from_web(config.cc_latest_snapshot, "Downloading CC Dataset", "cc_latest_dataset.json")
+ return cls.from_web(
+ config.cc_latest_full_archive,
+ config.cc_latest_snapshot,
+ "Downloading CC",
+ path,
+ auxiliary_datasets,
+ artifacts,
+ )
def _set_local_paths(self):
super()._set_local_paths()
@@ -262,6 +295,9 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
if self.auxiliary_datasets.mu_dset:
self.auxiliary_datasets.mu_dset.root_dir = self.mu_dataset_dir
+ if self.auxiliary_datasets.scheme_dset:
+ self.auxiliary_datasets.scheme_dset.json_path = self.scheme_dataset_path
+
for cert in self:
cert.set_local_paths(
self.reports_pdf_dir,
@@ -271,7 +307,6 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
self.targets_txt_dir,
self.certificates_txt_dir,
)
- # TODO: This forgets to set local paths for other auxiliary datasets
def _merge_certs(self, certs: dict[str, CCCertificate], cert_source: str | None = None) -> None:
"""
@@ -308,7 +343,11 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
@serialize
@staged(logger, "Downloading and processing CSV and HTML files of certificates.")
def get_certs_from_web(
- self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True
+ self,
+ to_download: bool = True,
+ keep_metadata: bool = True,
+ get_active: bool = True,
+ get_archived: bool = True,
) -> None:
"""
Downloads CSV and HTML files that hold lists of certificates from common criteria website. Parses these files
@@ -410,7 +449,10 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
["not_valid_before", "not_valid_after", "maintenance_date"]
].apply(pd.to_datetime, errors="coerce")
- df["dgst"] = df.apply(lambda row: helpers.get_first_16_bytes_sha256(_get_primary_key_str(row)), axis=1)
+ df["dgst"] = df.apply(
+ lambda row: helpers.get_first_16_bytes_sha256(_get_primary_key_str(row)),
+ axis=1,
+ )
df_base = df.loc[~df.is_maintenance].copy()
df_main = df.loc[df.is_maintenance].copy()
@@ -444,7 +486,10 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
for x in df_main.itertuples():
updates[x.dgst].add(
CCCertificate.MaintenanceReport(
- x.maintenance_date.date(), x.maintenance_title, x.maintenance_report_link, x.maintenance_st_link
+ x.maintenance_date.date(),
+ x.maintenance_title,
+ x.maintenance_report_link,
+ x.maintenance_st_link,
)
)
@@ -538,7 +583,22 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
cert_status = "active" if "active" in str(file) else "archived"
- cc_cat_abbreviations = ["AC", "BP", "DP", "DB", "DD", "IC", "KM", "MD", "MF", "NS", "OS", "OD", "DG", "TC"]
+ cc_cat_abbreviations = [
+ "AC",
+ "BP",
+ "DP",
+ "DB",
+ "DD",
+ "IC",
+ "KM",
+ "MD",
+ "MF",
+ "NS",
+ "OS",
+ "OD",
+ "DG",
+ "TC",
+ ]
cc_table_ids = ["tbl" + x for x in cc_cat_abbreviations]
cc_categories = [
"Access Control Devices and Systems",
@@ -774,18 +834,27 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
self._extract_pdf_frontpage()
self._extract_pdf_keywords()
- @staged(logger, "Computing heuristics: Deriving information about laboratories involved in certification.")
+ @staged(
+ logger,
+ "Computing heuristics: Deriving information about laboratories involved in certification.",
+ )
def _compute_cert_labs(self) -> None:
certs_to_process = [x for x in self if x.state.report.is_ok_to_analyze()]
for cert in certs_to_process:
cert.compute_heuristics_cert_lab()
- @staged(logger, "Computing heuristics: Deriving information about certificate ids from artifacts.")
+ @staged(
+ logger,
+ "Computing heuristics: Deriving information about certificate ids from artifacts.",
+ )
def _compute_normalized_cert_ids(self) -> None:
for cert in self:
cert.compute_heuristics_cert_id()
- @staged(logger, "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.")
+ @staged(
+ logger,
+ "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.",
+ )
def _compute_transitive_vulnerabilities(self):
transitive_cve_finder = TransitiveVulnerabilityFinder(lambda cert: cert.heuristics.cert_id)
transitive_cve_finder.fit(self.certs, lambda cert: cert.heuristics.report_references)
@@ -851,7 +920,11 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
finder.fit(self.certs, lambda cert: cert.heuristics.cert_id, ref_lookup(kw_source)) # type: ignore
for dgst in self.certs:
- setattr(self.certs[dgst].heuristics, dep_attr, finder.predict_single_cert(dgst, keep_unknowns=False))
+ setattr(
+ self.certs[dgst].heuristics,
+ dep_attr,
+ finder.predict_single_cert(dgst, keep_unknowns=False),
+ )
@serialize
def process_auxiliary_datasets(self, download_fresh: bool = False) -> None:
@@ -915,7 +988,9 @@ class CCDataset(Dataset[CCCertificate, CCAuxiliaryDatasets], ComplexSerializable
itertools.chain.from_iterable(CCMaintenanceUpdate.get_updates_from_cc_cert(x) for x in maintained_certs)
)
update_dset = CCDatasetMaintenanceUpdates(
- {x.dgst: x for x in updates}, root_dir=self.mu_dataset_dir, name="maintenance_updates"
+ {x.dgst: x for x in updates},
+ root_dir=self.mu_dataset_dir,
+ name="maintenance_updates",
)
else:
update_dset = CCDatasetMaintenanceUpdates.from_json(self.mu_dataset_path)
@@ -983,18 +1058,28 @@ class CCDatasetMaintenanceUpdates(CCDataset, ComplexSerializableType):
raise NotImplementedError
def get_certs_from_web(
- self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True
+ self,
+ to_download: bool = True,
+ keep_metadata: bool = True,
+ get_active: bool = True,
+ get_archived: bool = True,
) -> None:
raise NotImplementedError
@classmethod
def from_json(cls, input_path: str | Path, is_compressed: bool = False) -> CCDatasetMaintenanceUpdates:
- dset = cast(CCDatasetMaintenanceUpdates, ComplexSerializableType.from_json(input_path, is_compressed))
+ dset = cast(
+ CCDatasetMaintenanceUpdates,
+ ComplexSerializableType.from_json(input_path, is_compressed),
+ )
dset._root_dir = Path(input_path).parent.absolute()
return dset
def to_pandas(self) -> pd.DataFrame:
- df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=CCMaintenanceUpdate.pandas_columns)
+ df = pd.DataFrame(
+ [x.pandas_tuple for x in self.certs.values()],
+ columns=CCMaintenanceUpdate.pandas_columns,
+ )
df = df.set_index("dgst")
df.index.name = "dgst"
@@ -1002,11 +1087,29 @@ class CCDatasetMaintenanceUpdates(CCDataset, ComplexSerializableType):
return df.fillna(value=np.nan)
@classmethod
- def from_web_latest(cls) -> CCDatasetMaintenanceUpdates:
+ def from_web_latest(
+ cls,
+ path: str | Path | None = None,
+ auxiliary_datasets: bool = False,
+ artifacts: bool = False,
+ ) -> CCDatasetMaintenanceUpdates:
+ if auxiliary_datasets or artifacts:
+ raise ValueError(
+ "Maintenance update dataset does not support downloading artifacts or other auxiliary datasets."
+ )
+ if path:
+ path = Path(path)
+ if not path.exists():
+ path.mkdir(parents=True)
+ if not path.is_dir():
+ raise ValueError("Path needs to be a directory.")
with tempfile.TemporaryDirectory() as tmp_dir:
- dset_path = Path(tmp_dir) / "cc_maintenances_latest_dataset.json"
+ dset_path = Path(tmp_dir) / "maintenance_updates.json"
helpers.download_file(config.cc_maintenances_latest_snapshot, dset_path)
- return cls.from_json(dset_path)
+ dset = cls.from_json(dset_path)
+ if path:
+ dset.move_dataset(path)
+ return dset
def get_n_maintenances_df(self) -> pd.DataFrame:
"""
diff --git a/src/sec_certs/dataset/dataset.py b/src/sec_certs/dataset/dataset.py
index 218936c4..2da2bdc4 100644
--- a/src/sec_certs/dataset/dataset.py
+++ b/src/sec_certs/dataset/dataset.py
@@ -6,6 +6,7 @@ import json
import logging
import re
import shutil
+import tarfile
import tempfile
from abc import ABC, abstractmethod
from collections.abc import Iterator
@@ -23,9 +24,17 @@ from sec_certs.dataset.cve import CVEDataset
from sec_certs.model.cpe_matching import CPEClassifier
from sec_certs.sample.certificate import Certificate
from sec_certs.sample.cpe import CPE
-from sec_certs.serialization.json import ComplexSerializableType, get_class_fullname, serialize
+from sec_certs.serialization.json import (
+ ComplexSerializableType,
+ get_class_fullname,
+ serialize,
+)
from sec_certs.utils import helpers
-from sec_certs.utils.nvd_dataset_builder import CpeMatchNvdDatasetBuilder, CpeNvdDatasetBuilder, CveNvdDatasetBuilder
+from sec_certs.utils.nvd_dataset_builder import (
+ CpeMatchNvdDatasetBuilder,
+ CpeNvdDatasetBuilder,
+ CveNvdDatasetBuilder,
+)
from sec_certs.utils.profiling import staged
from sec_certs.utils.tqdm import tqdm
@@ -170,16 +179,74 @@ class Dataset(Generic[CertSubType, AuxiliaryDatasetsSubType], ComplexSerializabl
return str(type(self).__name__) + ":" + self.name + ", " + str(len(self)) + " certificates"
@classmethod
- def from_web(cls: type[DatasetSubType], url: str, progress_bar_desc: str, filename: str) -> DatasetSubType:
+ def from_web( # noqa
+ cls: type[DatasetSubType],
+ archive_url: str,
+ snapshot_url: str,
+ progress_bar_desc: str,
+ path: None | str | Path = None,
+ auxiliary_datasets: bool = False,
+ artifacts: bool = False,
+ ) -> DatasetSubType:
"""
- Fetches a fully processed dataset instance from static site that hosts it.
+ Fetches the fresh dataset snapshot from sec-certs.org.
+
+ Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs).
+
+ :::{note}
+ Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes.
+ :::
+
+ :param archive_url: The URL of the full dataset archive.
+ :param snapshot_url: The URL of the full dataset snapshot.
+ :param progress_bar_desc: Description of the download progress bar.
+ :param path: Path to a directory where to store the dataset, or `None` if it should not be stored.
+ :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets).
+ :param artifacts: Whether to also download artifacts (i.e. PDFs).
"""
- with tempfile.TemporaryDirectory() as tmp_dir:
- dset_path = Path(tmp_dir) / filename
- helpers.download_file(url, dset_path, show_progress_bar=True, progress_bar_desc=progress_bar_desc)
- dset = cls.from_json(dset_path)
- dset.root_dir = constants.DUMMY_NONEXISTING_PATH
- return dset
+ if (artifacts or auxiliary_datasets) and path is None:
+ raise ValueError("Path needs to be defined if artifacts or auxiliary datasets are to be downloaded.")
+ if artifacts and not auxiliary_datasets:
+ raise ValueError("Auxiliary datasets need to be downloaded if artifacts are to be downloaded.")
+ if path is not None:
+ path = Path(path)
+ if not path.exists():
+ path.mkdir(parents=True)
+ if not path.is_dir():
+ raise ValueError("Path needs to be a directory.")
+ if artifacts:
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ dset_path = Path(tmp_dir) / "dataset.tar.gz"
+ res = helpers.download_file(
+ archive_url,
+ dset_path,
+ show_progress_bar=True,
+ progress_bar_desc=progress_bar_desc,
+ )
+ if res != constants.RESPONSE_OK:
+ raise ValueError(f"Download failed: {res}")
+ with tarfile.open(dset_path, "r:gz") as tar:
+ tar.extractall(str(path))
+ dset = cls.from_json(path / "dataset.json") # type: ignore
+ if auxiliary_datasets:
+ dset.process_auxiliary_datasets(download_fresh=False)
+ else:
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ dset_path = Path(tmp_dir) / "dataset.json"
+ helpers.download_file(
+ snapshot_url,
+ dset_path,
+ show_progress_bar=True,
+ progress_bar_desc=progress_bar_desc,
+ )
+ dset = cls.from_json(dset_path)
+ if path:
+ dset.move_dataset(path)
+ else:
+ dset.root_dir = constants.DUMMY_NONEXISTING_PATH
+ if auxiliary_datasets:
+ dset.process_auxiliary_datasets(download_fresh=True)
+ return dset
def to_dict(self) -> dict[str, Any]:
return {
@@ -204,7 +271,10 @@ class Dataset(Generic[CertSubType, AuxiliaryDatasetsSubType], ComplexSerializabl
@classmethod
def from_json(cls: type[DatasetSubType], input_path: str | Path, is_compressed: bool = False) -> DatasetSubType:
- dset = cast("DatasetSubType", ComplexSerializableType.from_json(input_path, is_compressed))
+ dset = cast(
+ "DatasetSubType",
+ ComplexSerializableType.from_json(input_path, is_compressed),
+ )
dset._root_dir = Path(input_path).parent.absolute()
dset._set_local_paths()
return dset
@@ -411,7 +481,7 @@ class Dataset(Generic[CertSubType, AuxiliaryDatasetsSubType], ComplexSerializabl
if download_fresh:
if config.preferred_source_nvd_datasets == "api":
- logger.info("Fetchnig CPE Match feed from NVD APi.")
+ logger.info("Fetching CPE Match feed from NVD APi.")
with CpeMatchNvdDatasetBuilder(api_key=config.nvd_api_key) as builder:
cpe_match_dict = builder.build_dataset(cpe_match_dict)
else:
@@ -444,8 +514,16 @@ class Dataset(Generic[CertSubType, AuxiliaryDatasetsSubType], ComplexSerializabl
Computes matching CPEs for the certificates.
"""
WINDOWS_WEAK_CPES: set[CPE] = {
- CPE("", "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x64:*", "Microsoft Windows on X64"),
- CPE("", "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*", "Microsoft Windows on X86"),
+ CPE(
+ "",
+ "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x64:*",
+ "Microsoft Windows on X64",
+ ),
+ CPE(
+ "",
+ "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*",
+ "Microsoft Windows on X86",
+ ),
}
def filter_condition(cpe: CPE) -> bool:
diff --git a/src/sec_certs/dataset/fips.py b/src/sec_certs/dataset/fips.py
index 77f38754..eeaec0a0 100644
--- a/src/sec_certs/dataset/fips.py
+++ b/src/sec_certs/dataset/fips.py
@@ -18,7 +18,9 @@ from sec_certs.dataset.cve import CVEDataset
from sec_certs.dataset.dataset import AuxiliaryDatasets, Dataset
from sec_certs.dataset.fips_algorithm import FIPSAlgorithmDataset
from sec_certs.model.reference_finder import ReferenceFinder
-from sec_certs.model.transitive_vulnerability_finder import TransitiveVulnerabilityFinder
+from sec_certs.model.transitive_vulnerability_finder import (
+ TransitiveVulnerabilityFinder,
+)
from sec_certs.sample.fips import FIPSCertificate
from sec_certs.serialization.json import ComplexSerializableType, serialize
from sec_certs.utils import helpers
@@ -215,11 +217,33 @@ class FIPSDataset(Dataset[FIPSCertificate, FIPSAuxiliaryDatasets], ComplexSerial
return [FIPSCertificate(int(cert_id)) for cert_id in cert_ids]
@classmethod
- def from_web_latest(cls) -> FIPSDataset:
+ def from_web_latest(
+ cls,
+ path: str | Path | None = None,
+ auxiliary_datasets: bool = False,
+ artifacts: bool = False,
+ ) -> FIPSDataset:
"""
- Fetches the fresh snapshot of FIPSDataset from mirror.
+ Fetches the fresh snapshot of FIPSDataset from sec-certs.org.
+
+ Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs).
+
+ :::{note}
+ Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes.
+ :::
+
+ :param path: Path to a directory where to store the dataset, or `None` if it should not be stored.
+ :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets).
+ :param artifacts: Whether to also download artifacts (i.e. PDFs).
"""
- return cls.from_web(config.fips_latest_snapshot, "Downloading FIPS Dataset", "fips_latest_dataset.json")
+ return cls.from_web(
+ config.fips_latest_full_archive,
+ config.fips_latest_snapshot,
+ "Downloading FIPS",
+ path,
+ auxiliary_datasets,
+ artifacts,
+ )
def _set_local_paths(self) -> None:
super()._set_local_paths()
@@ -283,7 +307,10 @@ class FIPSDataset(Dataset[FIPSCertificate, FIPSAuxiliaryDatasets], ComplexSerial
)
self.update_with_certs(processed_certs)
- @staged(logger, "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.")
+ @staged(
+ logger,
+ "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.",
+ )
def _compute_transitive_vulnerabilities(self) -> None:
transitive_cve_finder = TransitiveVulnerabilityFinder(lambda cert: str(cert.cert_id))
transitive_cve_finder.fit(self.certs, lambda cert: cert.heuristics.policy_processed_references)
@@ -306,12 +333,16 @@ class FIPSDataset(Dataset[FIPSCertificate, FIPSAuxiliaryDatasets], ComplexSerial
policy_reference_finder = ReferenceFinder()
policy_reference_finder.fit(
- self.certs, lambda cert: str(cert.cert_id), lambda cert: cert.heuristics.policy_prunned_references
+ self.certs,
+ lambda cert: str(cert.cert_id),
+ lambda cert: cert.heuristics.policy_prunned_references,
)
module_reference_finder = ReferenceFinder()
module_reference_finder.fit(
- self.certs, lambda cert: str(cert.cert_id), lambda cert: cert.heuristics.module_prunned_references
+ self.certs,
+ lambda cert: str(cert.cert_id),
+ lambda cert: cert.heuristics.module_prunned_references,
)
for cert in self:
@@ -323,7 +354,10 @@ class FIPSDataset(Dataset[FIPSCertificate, FIPSAuxiliaryDatasets], ComplexSerial
)
def to_pandas(self) -> pd.DataFrame:
- df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=FIPSCertificate.pandas_columns)
+ df = pd.DataFrame(
+ [x.pandas_tuple for x in self.certs.values()],
+ columns=FIPSCertificate.pandas_columns,
+ )
df = df.set_index("dgst")
df.date_validation = pd.to_datetime(df.date_validation, errors="coerce")
@@ -333,7 +367,12 @@ class FIPSDataset(Dataset[FIPSCertificate, FIPSAuxiliaryDatasets], ComplexSerial
df = df.loc[~(df.embodiment == "*")]
df = df.astype(
- {"type": "category", "status": "category", "standard": "category", "embodiment": "category"}
+ {
+ "type": "category",
+ "status": "category",
+ "standard": "category",
+ "embodiment": "category",
+ }
).fillna(value=np.nan)
df.level = df.level.fillna(value=np.nan).astype("float")