diff options
| author | Adam Janovsky | 2023-04-07 14:16:03 +0200 |
|---|---|---|
| committer | Adam Janovsky | 2023-04-07 14:16:03 +0200 |
| commit | b98b491c9ebf087fc73416922bf7572e83c54816 (patch) | |
| tree | 8039f8353aa14bd84cff4a43c3058cd8ab6004e1 /src/sec_certs/sample | |
| parent | 6373128e4ebc33548d014c3a07b99ee024d5f9e2 (diff) | |
| download | sec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.tar.gz sec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.tar.zst sec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.zip | |
WiP new cve and cpe dataset handling
Diffstat (limited to 'src/sec_certs/sample')
| -rw-r--r-- | src/sec_certs/sample/cpe.py | 67 | ||||
| -rw-r--r-- | src/sec_certs/sample/cve.py | 212 |
2 files changed, 169 insertions, 110 deletions
diff --git a/src/sec_certs/sample/cpe.py b/src/sec_certs/sample/cpe.py index d56535c2..055373c1 100644 --- a/src/sec_certs/sample/cpe.py +++ b/src/sec_certs/sample/cpe.py @@ -11,6 +11,59 @@ from sec_certs.utils import helpers @dataclass +class CPEMatchCriteria(ComplexSerializableType): + vulnerable: bool + criteria: str + criteria_id: str + version_start: tuple[str, str] | None + version_end: tuple[str, str] | None + + __slots__ = ["vulnerable", "criteria", "criteria_id", "version_start", "version_end"] + + # We cannot use frozen=True. It does not work with __slots__ prior to Python 3.10 dataclasses + # Hence we manually provide __hash__ and __eq__ despite not guaranteeing immutability + def __hash__(self) -> int: + return hash(self.criteria_id) + + def __eq__(self, other: object) -> bool: + return isinstance(other, CPEMatchCriteria) and self.criteria_id == other.criteria_id + + def __lt__(self, other: CPEMatchCriteria) -> bool: + return self.criteria_id < other.criteria_id + + @classmethod + def from_nist_dict(cls, dct: dict[str, Any]) -> CPEMatchCriteria: + if dct.get("versionStartIncluding", None): + version_start = ("including", dct["versionStartIncluding"]) + elif dct.get("versionStartExcluding"): + version_start = ("excluding", dct["versionStartExcluding"]) + else: + version_start = None + + if dct.get("versionEndIncluding", None): + version_end = ("including", dct["versionEndIncluding"]) + elif dct.get("versionEndExcluding", None): + version_end = ("excluding", dct["versionEndExcluding"]) + else: + version_end = None + + return cls(dct["vulnerable"], dct["criteria"], dct["matchCriteriaId"], version_start, version_end) + + +@dataclass +class CPEMatchCriteriaConfiguration(ComplexSerializableType): + """ + This class represents a set of sets of `CPEMatchCriteria` objects, where there's an OR relation between the + elements of the set. + Our experiments confirm that there are only 3 distinct CVEs in the database that allow AND configuration between + the elements. Simplyfing to ORs enables much more simple implementation. + """ + + components: list[list[CPEMatchCriteria]] + __slots__ = ["components"] + + +@dataclass class CPEConfiguration(ComplexSerializableType): __slots__ = ["platform", "cpes"] @@ -41,6 +94,7 @@ class CPEConfiguration(ComplexSerializableType): @dataclass class CPE(PandasSerializableType, ComplexSerializableType): + cpe_id: str uri: str version: str vendor: str @@ -49,10 +103,10 @@ class CPE(PandasSerializableType, ComplexSerializableType): start_version: tuple[str, str] | None end_version: tuple[str, str] | None - __slots__ = ["uri", "version", "vendor", "item_name", "title", "start_version", "end_version"] + __slots__ = ["cpe_id", "uri", "version", "vendor", "item_name", "title", "start_version", "end_version"] pandas_columns: ClassVar[list[str]] = [ - "uri", + "cpe_id" "uri", "vendor", "item_name", "version", @@ -61,12 +115,14 @@ class CPE(PandasSerializableType, ComplexSerializableType): def __init__( self, + cpe_id: str, uri: str, title: str | None = None, start_version: tuple[str, str] | None = None, end_version: tuple[str, str] | None = None, ): super().__init__() + self.cpe_id = cpe_id self.uri = uri splitted = helpers.split_unescape(self.uri, ":") @@ -97,9 +153,14 @@ class CPE(PandasSerializableType, ComplexSerializableType): dct["end_version"] = tuple(dct["end_version"]) return super().from_dict(dct) + @classmethod + def from_nvd_dict(cls, dct: dict[str, Any]) -> CPE: + title = [x for x in dct["titles"] if x["lang"] == "en"][0]["title"] + return cls(dct["cpeNameId"], dct["cpeName"], title, None, None) + @property def serialized_attributes(self) -> list[str]: - return ["uri", "title", "start_version", "end_version"] + return ["cpe_id", "uri", "title", "start_version", "end_version"] @property def update(self) -> str: diff --git a/src/sec_certs/sample/cve.py b/src/sec_certs/sample/cve.py index 2289b0e1..3e4c3603 100644 --- a/src/sec_certs/sample/cve.py +++ b/src/sec_certs/sample/cve.py @@ -1,13 +1,12 @@ from __future__ import annotations import datetime -import itertools from dataclasses import dataclass -from typing import Any, ClassVar, Iterable +from typing import Any, ClassVar from dateutil.parser import isoparse -from sec_certs.sample.cpe import CPE, CPEConfiguration, cached_cpe +from sec_certs.sample.cpe import CPEMatchCriteria, CPEMatchCriteriaConfiguration from sec_certs.serialization.json import ComplexSerializableType from sec_certs.serialization.pandas import PandasSerializableType @@ -15,7 +14,7 @@ from sec_certs.serialization.pandas import PandasSerializableType @dataclass class CVE(PandasSerializableType, ComplexSerializableType): @dataclass - class Impact(ComplexSerializableType): + class Metrics(ComplexSerializableType): base_score: float severity: str exploitability_score: float @@ -24,36 +23,76 @@ class CVE(PandasSerializableType, ComplexSerializableType): __slots__ = ["base_score", "severity", "exploitability_score", "impact_score"] @classmethod - def from_nist_dict(cls, dct: dict[str, Any]) -> CVE.Impact: + def from_nist_dict(cls, dct: dict[str, Any]) -> CVE.Metrics: """ - Will load Impact from dictionary defined at https://nvd.nist.gov/feeds/json/cve/1.1 + Loads metrics from dictionary """ - if not dct["impact"]: + if not dct["metrics"]: return cls(0, "", 0, 0) - elif "baseMetricV3" in dct["impact"]: + metric_dct = CVE.Metrics.find_metrics_to_use(dct["metrics"]) + if not metric_dct: + raise ValueError(f"Metrics dictionary for cve {dct['id']} present, but no suitable entry found.") + return CVE.Metrics.from_metrics_dct(metric_dct) + + @staticmethod + def find_metrics_to_use(dct: dict) -> dict | None: + """ + any `Primary` entry available > any `nvd@nist.gov` entry available > just return the first entry if exists. + """ + all_metrics = dct.get("cvssMetricV31", []) + dct.get("cvssMetricV30", []) + dct.get("cvssMetricV2", []) + + for element in all_metrics: + if element["type"] == "Primary": + return element + for element in all_metrics: + if element["source"] == "nvd@nist.gov": + return element + + if all_metrics: + return all_metrics[0] + + return None + + @classmethod + def from_metrics_dct(cls, dct: dict) -> CVE.Metrics: + if dct["cvssData"]["version"] == "3.1": return cls( - dct["impact"]["baseMetricV3"]["cvssV3"]["baseScore"], - dct["impact"]["baseMetricV3"]["cvssV3"]["baseSeverity"], - dct["impact"]["baseMetricV3"]["exploitabilityScore"], - dct["impact"]["baseMetricV3"]["impactScore"], + dct["cvssData"]["baseScore"], + dct["cvssData"]["baseSeverity"], + dct["exploitabilityScore"], + dct["impactScore"], ) - elif "baseMetricV2" in dct["impact"]: + if dct["cvssData"]["version"] == "3.0": return cls( - dct["impact"]["baseMetricV2"]["cvssV2"]["baseScore"], - dct["impact"]["baseMetricV2"]["severity"], - dct["impact"]["baseMetricV2"]["exploitabilityScore"], - dct["impact"]["baseMetricV2"]["impactScore"], + dct["cvssData"]["baseScore"], + dct["cvssData"]["baseSeverity"], + dct["exploitabilityScore"], + dct["impactScore"], ) - raise ValueError("NIST Dict for CVE Impact badly formatted.") + if dct["cvssData"]["version"] == "2.0": + return cls( + dct["cvssData"]["baseScore"], + dct["baseSeverity"], + dct["exploitabilityScore"], + dct["impactScore"], + ) + raise ValueError(f"Unknown CVSS version occured ({dct['cvssData']['version']}) when parsing CVSS metrics.") cve_id: str - vulnerable_cpes: list[CPE] - vulnerable_cpe_configurations: list[CPEConfiguration] - impact: Impact + vulnerable_criteria: list[CPEMatchCriteria] + vulnerable_criteria_configurations: list[CPEMatchCriteriaConfiguration] + metrics: Metrics published_date: datetime.datetime | None cwe_ids: set[str] | None - __slots__ = ["cve_id", "vulnerable_cpes", "vulnerable_cpe_configurations", "impact", "published_date", "cwe_ids"] + __slots__ = [ + "cve_id", + "vulnerable_criteria", + "vulnerable_criteria_configurations", + "metrics", + "published_date", + "cwe_ids", + ] pandas_columns: ClassVar[list[str]] = [ "cve_id", @@ -88,11 +127,11 @@ class CVE(PandasSerializableType, ComplexSerializableType): def pandas_tuple(self): return ( self.cve_id, - self.vulnerable_cpes, - self.impact.base_score, - self.impact.severity, - self.impact.exploitability_score, - self.impact.impact_score, + self.vulnerable_criteria, + self.metrics.base_score, + self.metrics.severity, + self.metrics.exploitability_score, + self.metrics.impact_score, self.published_date, self.cwe_ids, ) @@ -100,9 +139,9 @@ class CVE(PandasSerializableType, ComplexSerializableType): def to_dict(self) -> dict[str, Any]: return { "cve_id": self.cve_id, - "vulnerable_cpes": self.vulnerable_cpes, - "vulnerable_cpe_configurations": self.vulnerable_cpe_configurations, - "impact": self.impact, + "vulnerable_cpes": self.vulnerable_criteria, + "vulnerable_criteria_configurations": self.vulnerable_criteria_configurations, + "impact": self.metrics, "published_date": self.published_date.isoformat() if self.published_date else None, "cwe_ids": self.cwe_ids, } @@ -115,7 +154,7 @@ class CVE(PandasSerializableType, ComplexSerializableType): return cls( dct["cve_id"], dct["vulnerable_cpes"], - dct["vulnerable_cpe_configurations"], + dct["vulnerable_criteria_configurations"], dct["impact"], date_to_take, dct["cwe_ids"], @@ -123,92 +162,51 @@ class CVE(PandasSerializableType, ComplexSerializableType): @classmethod def from_nist_dict(cls, dct: dict) -> CVE: - cve_id = dct["cve"]["CVE_data_meta"]["ID"] - impact = cls.Impact.from_nist_dict(dct) - published_date = isoparse(dct["publishedDate"]) + cve_id = dct["id"] + metrics = cls.Metrics.from_nist_dict(dct) + published_date = datetime.datetime.fromisoformat(dct["published"]) cwe_ids = cls.parse_cwe_data(dct) - cpes, cpe_configurations = CVE.get_cpe_data_from_nodes_list(dct["configurations"]["nodes"]) - - return cls(cve_id, cpes, cpe_configurations, impact, published_date, cwe_ids) - - @staticmethod - def _parse_nist_cpe_dicts(dictionaries: Iterable[dict[str, Any]]) -> list[CPE]: - cpes: list[CPE] = [] - - for x in dictionaries: - cpe_uri = x["cpe23Uri"] - version_start: tuple[str, str] | None - version_end: tuple[str, str] | None - if "versionStartIncluding" in x and x["versionStartIncluding"]: - version_start = ("including", x["versionStartIncluding"]) - elif "versionStartExcluding" in x and x["versionStartExcluding"]: - version_start = ("excluding", x["versionStartExcluding"]) - else: - version_start = None - - if "versionEndIncluding" in x and x["versionEndIncluding"]: - version_end = ("including", x["versionEndIncluding"]) - elif "versionEndExcluding" in x and x["versionEndExcluding"]: - version_end = ("excluding", x["versionEndExcluding"]) - else: - version_end = None - - cpes.append(cached_cpe(cpe_uri, start_version=version_start, end_version=version_end)) - - return cpes - - @staticmethod - def _parse_nist_dict(cpe_list: list[dict[str, Any]], parse_only_vulnerable_cpes: bool) -> list[CPE]: - """ - Method parses list of CPE dicts to the list of CPE objects. - The <parse_only_vulnerable_cpes> parameter specifies if we want to - parse only vulnerable CPEs or not. - """ - return CVE._parse_nist_cpe_dicts(dct for dct in cpe_list if dct["vulnerable"] or not parse_only_vulnerable_cpes) + vulnerable_criteria, vulnerable_criteria_configurations = CVE.parse_configurations(dct) + return cls(cve_id, vulnerable_criteria, vulnerable_criteria_configurations, metrics, published_date, cwe_ids) @staticmethod def parse_cwe_data(dct: dict) -> set[str] | None: - descriptions = dct["cve"]["problemtype"]["problemtype_data"][0]["description"] + if "weaknesses" not in dct: + return None + assert dct["weaknesses"][0]["type"] == "Primary" + descriptions = dct["weaknesses"][0]["description"] return {x["value"] for x in descriptions} if descriptions else None @staticmethod - def get_cpe_data_from_nodes_list(lst: list) -> tuple[list[CPE], list[CPEConfiguration]]: - or_nodes = [x for x in lst if x["operator"] == "OR"] - and_nodes = [x for x in lst if x["operator"] == "AND"] - return CVE.get_simple_cpes_from_nodes_list(or_nodes), CVE.get_cpe_configurations_from_node_list(and_nodes) + def parse_configurations( + dct: dict[str, Any], + ) -> tuple[list[CPEMatchCriteria], list[CPEMatchCriteriaConfiguration]]: + criteria = [] + criteria_configurations = [] - @staticmethod - def get_simple_cpes_from_nodes_list(lst: list) -> list[CPE]: - return list( - itertools.chain.from_iterable( - CVE._parse_nist_dict(node["cpe_match"], parse_only_vulnerable_cpes=True) for node in lst - ) - ) + configurations = dct.get("configurations", []) + for conf in configurations: + new_criteria, new_criteria_configuration = CVE.parse_single_configuration(conf) + criteria.extend(new_criteria) + if new_criteria_configuration: + criteria_configurations.append(new_criteria_configuration) + return criteria, criteria_configurations @staticmethod - def get_cpe_configurations_from_node_list(lst: list) -> list[CPEConfiguration]: - """ - Retrieves only running on/with configurations, not the advanced ones. - See more at https://nvd.nist.gov/vuln/vulnerability-detail-pages, section `Configurations` - """ - configurations = [CVE.get_cpe_confiugration_from_node(x) for x in lst] - return [x for x in configurations if x] + def parse_single_configuration( + configuration: dict[str, Any] + ) -> tuple[list[CPEMatchCriteria], CPEMatchCriteriaConfiguration | None]: + if "operator" not in configuration or configuration["operator"] == "OR": + assert len(configuration["nodes"]) == 1 and "cpeMatch" in configuration["nodes"][0] + return CVE.get_criteria_from_node(configuration["nodes"][0]["cpeMatch"]), None - @staticmethod - def get_cpe_confiugration_from_node(node: dict) -> CPEConfiguration | None: - if node["children"]: - if len(node["children"]) != 2: - return None + return [], CVE.get_configuration_criteria_from_nodes(configuration["nodes"]) - # Deep variant should have two children, get CPEs from the first one and declare that product, second is platform - cpes = CVE._parse_nist_dict(node["children"][0]["cpe_match"], parse_only_vulnerable_cpes=True) - platform = CVE._parse_nist_dict(node["children"][1]["cpe_match"], parse_only_vulnerable_cpes=False) - return CPEConfiguration(platform[0], cpes) - else: - # Shallow variant should have exactly 2 matching CPEs, we declare one a platform, second one the vuln. thing - cpes = CVE._parse_nist_dict(node["cpe_match"], parse_only_vulnerable_cpes=True) - - if len(cpes) != 2: - return None + @staticmethod + def get_configuration_criteria_from_nodes(nodes) -> CPEMatchCriteriaConfiguration: + assert all("cpeMatch" in x for x in nodes) # the next layer are matches + return CPEMatchCriteriaConfiguration([CVE.get_criteria_from_node(x["cpeMatch"]) for x in nodes]) - return CPEConfiguration(cpes[0], [cpes[1]]) + @staticmethod + def get_criteria_from_node(cpe_matches: list[dict[str, Any]]) -> list[CPEMatchCriteria]: + return [CPEMatchCriteria.from_nist_dict(x) for x in cpe_matches] |
