aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/sample
diff options
context:
space:
mode:
authorAdam Janovsky2023-04-07 14:16:03 +0200
committerAdam Janovsky2023-04-07 14:16:03 +0200
commitb98b491c9ebf087fc73416922bf7572e83c54816 (patch)
tree8039f8353aa14bd84cff4a43c3058cd8ab6004e1 /src/sec_certs/sample
parent6373128e4ebc33548d014c3a07b99ee024d5f9e2 (diff)
downloadsec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.tar.gz
sec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.tar.zst
sec-certs-b98b491c9ebf087fc73416922bf7572e83c54816.zip
WiP new cve and cpe dataset handling
Diffstat (limited to 'src/sec_certs/sample')
-rw-r--r--src/sec_certs/sample/cpe.py67
-rw-r--r--src/sec_certs/sample/cve.py212
2 files changed, 169 insertions, 110 deletions
diff --git a/src/sec_certs/sample/cpe.py b/src/sec_certs/sample/cpe.py
index d56535c2..055373c1 100644
--- a/src/sec_certs/sample/cpe.py
+++ b/src/sec_certs/sample/cpe.py
@@ -11,6 +11,59 @@ from sec_certs.utils import helpers
@dataclass
+class CPEMatchCriteria(ComplexSerializableType):
+ vulnerable: bool
+ criteria: str
+ criteria_id: str
+ version_start: tuple[str, str] | None
+ version_end: tuple[str, str] | None
+
+ __slots__ = ["vulnerable", "criteria", "criteria_id", "version_start", "version_end"]
+
+ # We cannot use frozen=True. It does not work with __slots__ prior to Python 3.10 dataclasses
+ # Hence we manually provide __hash__ and __eq__ despite not guaranteeing immutability
+ def __hash__(self) -> int:
+ return hash(self.criteria_id)
+
+ def __eq__(self, other: object) -> bool:
+ return isinstance(other, CPEMatchCriteria) and self.criteria_id == other.criteria_id
+
+ def __lt__(self, other: CPEMatchCriteria) -> bool:
+ return self.criteria_id < other.criteria_id
+
+ @classmethod
+ def from_nist_dict(cls, dct: dict[str, Any]) -> CPEMatchCriteria:
+ if dct.get("versionStartIncluding", None):
+ version_start = ("including", dct["versionStartIncluding"])
+ elif dct.get("versionStartExcluding"):
+ version_start = ("excluding", dct["versionStartExcluding"])
+ else:
+ version_start = None
+
+ if dct.get("versionEndIncluding", None):
+ version_end = ("including", dct["versionEndIncluding"])
+ elif dct.get("versionEndExcluding", None):
+ version_end = ("excluding", dct["versionEndExcluding"])
+ else:
+ version_end = None
+
+ return cls(dct["vulnerable"], dct["criteria"], dct["matchCriteriaId"], version_start, version_end)
+
+
+@dataclass
+class CPEMatchCriteriaConfiguration(ComplexSerializableType):
+ """
+ This class represents a set of sets of `CPEMatchCriteria` objects, where there's an OR relation between the
+ elements of the set.
+ Our experiments confirm that there are only 3 distinct CVEs in the database that allow AND configuration between
+ the elements. Simplyfing to ORs enables much more simple implementation.
+ """
+
+ components: list[list[CPEMatchCriteria]]
+ __slots__ = ["components"]
+
+
+@dataclass
class CPEConfiguration(ComplexSerializableType):
__slots__ = ["platform", "cpes"]
@@ -41,6 +94,7 @@ class CPEConfiguration(ComplexSerializableType):
@dataclass
class CPE(PandasSerializableType, ComplexSerializableType):
+ cpe_id: str
uri: str
version: str
vendor: str
@@ -49,10 +103,10 @@ class CPE(PandasSerializableType, ComplexSerializableType):
start_version: tuple[str, str] | None
end_version: tuple[str, str] | None
- __slots__ = ["uri", "version", "vendor", "item_name", "title", "start_version", "end_version"]
+ __slots__ = ["cpe_id", "uri", "version", "vendor", "item_name", "title", "start_version", "end_version"]
pandas_columns: ClassVar[list[str]] = [
- "uri",
+ "cpe_id" "uri",
"vendor",
"item_name",
"version",
@@ -61,12 +115,14 @@ class CPE(PandasSerializableType, ComplexSerializableType):
def __init__(
self,
+ cpe_id: str,
uri: str,
title: str | None = None,
start_version: tuple[str, str] | None = None,
end_version: tuple[str, str] | None = None,
):
super().__init__()
+ self.cpe_id = cpe_id
self.uri = uri
splitted = helpers.split_unescape(self.uri, ":")
@@ -97,9 +153,14 @@ class CPE(PandasSerializableType, ComplexSerializableType):
dct["end_version"] = tuple(dct["end_version"])
return super().from_dict(dct)
+ @classmethod
+ def from_nvd_dict(cls, dct: dict[str, Any]) -> CPE:
+ title = [x for x in dct["titles"] if x["lang"] == "en"][0]["title"]
+ return cls(dct["cpeNameId"], dct["cpeName"], title, None, None)
+
@property
def serialized_attributes(self) -> list[str]:
- return ["uri", "title", "start_version", "end_version"]
+ return ["cpe_id", "uri", "title", "start_version", "end_version"]
@property
def update(self) -> str:
diff --git a/src/sec_certs/sample/cve.py b/src/sec_certs/sample/cve.py
index 2289b0e1..3e4c3603 100644
--- a/src/sec_certs/sample/cve.py
+++ b/src/sec_certs/sample/cve.py
@@ -1,13 +1,12 @@
from __future__ import annotations
import datetime
-import itertools
from dataclasses import dataclass
-from typing import Any, ClassVar, Iterable
+from typing import Any, ClassVar
from dateutil.parser import isoparse
-from sec_certs.sample.cpe import CPE, CPEConfiguration, cached_cpe
+from sec_certs.sample.cpe import CPEMatchCriteria, CPEMatchCriteriaConfiguration
from sec_certs.serialization.json import ComplexSerializableType
from sec_certs.serialization.pandas import PandasSerializableType
@@ -15,7 +14,7 @@ from sec_certs.serialization.pandas import PandasSerializableType
@dataclass
class CVE(PandasSerializableType, ComplexSerializableType):
@dataclass
- class Impact(ComplexSerializableType):
+ class Metrics(ComplexSerializableType):
base_score: float
severity: str
exploitability_score: float
@@ -24,36 +23,76 @@ class CVE(PandasSerializableType, ComplexSerializableType):
__slots__ = ["base_score", "severity", "exploitability_score", "impact_score"]
@classmethod
- def from_nist_dict(cls, dct: dict[str, Any]) -> CVE.Impact:
+ def from_nist_dict(cls, dct: dict[str, Any]) -> CVE.Metrics:
"""
- Will load Impact from dictionary defined at https://nvd.nist.gov/feeds/json/cve/1.1
+ Loads metrics from dictionary
"""
- if not dct["impact"]:
+ if not dct["metrics"]:
return cls(0, "", 0, 0)
- elif "baseMetricV3" in dct["impact"]:
+ metric_dct = CVE.Metrics.find_metrics_to_use(dct["metrics"])
+ if not metric_dct:
+ raise ValueError(f"Metrics dictionary for cve {dct['id']} present, but no suitable entry found.")
+ return CVE.Metrics.from_metrics_dct(metric_dct)
+
+ @staticmethod
+ def find_metrics_to_use(dct: dict) -> dict | None:
+ """
+ any `Primary` entry available > any `nvd@nist.gov` entry available > just return the first entry if exists.
+ """
+ all_metrics = dct.get("cvssMetricV31", []) + dct.get("cvssMetricV30", []) + dct.get("cvssMetricV2", [])
+
+ for element in all_metrics:
+ if element["type"] == "Primary":
+ return element
+ for element in all_metrics:
+ if element["source"] == "nvd@nist.gov":
+ return element
+
+ if all_metrics:
+ return all_metrics[0]
+
+ return None
+
+ @classmethod
+ def from_metrics_dct(cls, dct: dict) -> CVE.Metrics:
+ if dct["cvssData"]["version"] == "3.1":
return cls(
- dct["impact"]["baseMetricV3"]["cvssV3"]["baseScore"],
- dct["impact"]["baseMetricV3"]["cvssV3"]["baseSeverity"],
- dct["impact"]["baseMetricV3"]["exploitabilityScore"],
- dct["impact"]["baseMetricV3"]["impactScore"],
+ dct["cvssData"]["baseScore"],
+ dct["cvssData"]["baseSeverity"],
+ dct["exploitabilityScore"],
+ dct["impactScore"],
)
- elif "baseMetricV2" in dct["impact"]:
+ if dct["cvssData"]["version"] == "3.0":
return cls(
- dct["impact"]["baseMetricV2"]["cvssV2"]["baseScore"],
- dct["impact"]["baseMetricV2"]["severity"],
- dct["impact"]["baseMetricV2"]["exploitabilityScore"],
- dct["impact"]["baseMetricV2"]["impactScore"],
+ dct["cvssData"]["baseScore"],
+ dct["cvssData"]["baseSeverity"],
+ dct["exploitabilityScore"],
+ dct["impactScore"],
)
- raise ValueError("NIST Dict for CVE Impact badly formatted.")
+ if dct["cvssData"]["version"] == "2.0":
+ return cls(
+ dct["cvssData"]["baseScore"],
+ dct["baseSeverity"],
+ dct["exploitabilityScore"],
+ dct["impactScore"],
+ )
+ raise ValueError(f"Unknown CVSS version occured ({dct['cvssData']['version']}) when parsing CVSS metrics.")
cve_id: str
- vulnerable_cpes: list[CPE]
- vulnerable_cpe_configurations: list[CPEConfiguration]
- impact: Impact
+ vulnerable_criteria: list[CPEMatchCriteria]
+ vulnerable_criteria_configurations: list[CPEMatchCriteriaConfiguration]
+ metrics: Metrics
published_date: datetime.datetime | None
cwe_ids: set[str] | None
- __slots__ = ["cve_id", "vulnerable_cpes", "vulnerable_cpe_configurations", "impact", "published_date", "cwe_ids"]
+ __slots__ = [
+ "cve_id",
+ "vulnerable_criteria",
+ "vulnerable_criteria_configurations",
+ "metrics",
+ "published_date",
+ "cwe_ids",
+ ]
pandas_columns: ClassVar[list[str]] = [
"cve_id",
@@ -88,11 +127,11 @@ class CVE(PandasSerializableType, ComplexSerializableType):
def pandas_tuple(self):
return (
self.cve_id,
- self.vulnerable_cpes,
- self.impact.base_score,
- self.impact.severity,
- self.impact.exploitability_score,
- self.impact.impact_score,
+ self.vulnerable_criteria,
+ self.metrics.base_score,
+ self.metrics.severity,
+ self.metrics.exploitability_score,
+ self.metrics.impact_score,
self.published_date,
self.cwe_ids,
)
@@ -100,9 +139,9 @@ class CVE(PandasSerializableType, ComplexSerializableType):
def to_dict(self) -> dict[str, Any]:
return {
"cve_id": self.cve_id,
- "vulnerable_cpes": self.vulnerable_cpes,
- "vulnerable_cpe_configurations": self.vulnerable_cpe_configurations,
- "impact": self.impact,
+ "vulnerable_cpes": self.vulnerable_criteria,
+ "vulnerable_criteria_configurations": self.vulnerable_criteria_configurations,
+ "impact": self.metrics,
"published_date": self.published_date.isoformat() if self.published_date else None,
"cwe_ids": self.cwe_ids,
}
@@ -115,7 +154,7 @@ class CVE(PandasSerializableType, ComplexSerializableType):
return cls(
dct["cve_id"],
dct["vulnerable_cpes"],
- dct["vulnerable_cpe_configurations"],
+ dct["vulnerable_criteria_configurations"],
dct["impact"],
date_to_take,
dct["cwe_ids"],
@@ -123,92 +162,51 @@ class CVE(PandasSerializableType, ComplexSerializableType):
@classmethod
def from_nist_dict(cls, dct: dict) -> CVE:
- cve_id = dct["cve"]["CVE_data_meta"]["ID"]
- impact = cls.Impact.from_nist_dict(dct)
- published_date = isoparse(dct["publishedDate"])
+ cve_id = dct["id"]
+ metrics = cls.Metrics.from_nist_dict(dct)
+ published_date = datetime.datetime.fromisoformat(dct["published"])
cwe_ids = cls.parse_cwe_data(dct)
- cpes, cpe_configurations = CVE.get_cpe_data_from_nodes_list(dct["configurations"]["nodes"])
-
- return cls(cve_id, cpes, cpe_configurations, impact, published_date, cwe_ids)
-
- @staticmethod
- def _parse_nist_cpe_dicts(dictionaries: Iterable[dict[str, Any]]) -> list[CPE]:
- cpes: list[CPE] = []
-
- for x in dictionaries:
- cpe_uri = x["cpe23Uri"]
- version_start: tuple[str, str] | None
- version_end: tuple[str, str] | None
- if "versionStartIncluding" in x and x["versionStartIncluding"]:
- version_start = ("including", x["versionStartIncluding"])
- elif "versionStartExcluding" in x and x["versionStartExcluding"]:
- version_start = ("excluding", x["versionStartExcluding"])
- else:
- version_start = None
-
- if "versionEndIncluding" in x and x["versionEndIncluding"]:
- version_end = ("including", x["versionEndIncluding"])
- elif "versionEndExcluding" in x and x["versionEndExcluding"]:
- version_end = ("excluding", x["versionEndExcluding"])
- else:
- version_end = None
-
- cpes.append(cached_cpe(cpe_uri, start_version=version_start, end_version=version_end))
-
- return cpes
-
- @staticmethod
- def _parse_nist_dict(cpe_list: list[dict[str, Any]], parse_only_vulnerable_cpes: bool) -> list[CPE]:
- """
- Method parses list of CPE dicts to the list of CPE objects.
- The <parse_only_vulnerable_cpes> parameter specifies if we want to
- parse only vulnerable CPEs or not.
- """
- return CVE._parse_nist_cpe_dicts(dct for dct in cpe_list if dct["vulnerable"] or not parse_only_vulnerable_cpes)
+ vulnerable_criteria, vulnerable_criteria_configurations = CVE.parse_configurations(dct)
+ return cls(cve_id, vulnerable_criteria, vulnerable_criteria_configurations, metrics, published_date, cwe_ids)
@staticmethod
def parse_cwe_data(dct: dict) -> set[str] | None:
- descriptions = dct["cve"]["problemtype"]["problemtype_data"][0]["description"]
+ if "weaknesses" not in dct:
+ return None
+ assert dct["weaknesses"][0]["type"] == "Primary"
+ descriptions = dct["weaknesses"][0]["description"]
return {x["value"] for x in descriptions} if descriptions else None
@staticmethod
- def get_cpe_data_from_nodes_list(lst: list) -> tuple[list[CPE], list[CPEConfiguration]]:
- or_nodes = [x for x in lst if x["operator"] == "OR"]
- and_nodes = [x for x in lst if x["operator"] == "AND"]
- return CVE.get_simple_cpes_from_nodes_list(or_nodes), CVE.get_cpe_configurations_from_node_list(and_nodes)
+ def parse_configurations(
+ dct: dict[str, Any],
+ ) -> tuple[list[CPEMatchCriteria], list[CPEMatchCriteriaConfiguration]]:
+ criteria = []
+ criteria_configurations = []
- @staticmethod
- def get_simple_cpes_from_nodes_list(lst: list) -> list[CPE]:
- return list(
- itertools.chain.from_iterable(
- CVE._parse_nist_dict(node["cpe_match"], parse_only_vulnerable_cpes=True) for node in lst
- )
- )
+ configurations = dct.get("configurations", [])
+ for conf in configurations:
+ new_criteria, new_criteria_configuration = CVE.parse_single_configuration(conf)
+ criteria.extend(new_criteria)
+ if new_criteria_configuration:
+ criteria_configurations.append(new_criteria_configuration)
+ return criteria, criteria_configurations
@staticmethod
- def get_cpe_configurations_from_node_list(lst: list) -> list[CPEConfiguration]:
- """
- Retrieves only running on/with configurations, not the advanced ones.
- See more at https://nvd.nist.gov/vuln/vulnerability-detail-pages, section `Configurations`
- """
- configurations = [CVE.get_cpe_confiugration_from_node(x) for x in lst]
- return [x for x in configurations if x]
+ def parse_single_configuration(
+ configuration: dict[str, Any]
+ ) -> tuple[list[CPEMatchCriteria], CPEMatchCriteriaConfiguration | None]:
+ if "operator" not in configuration or configuration["operator"] == "OR":
+ assert len(configuration["nodes"]) == 1 and "cpeMatch" in configuration["nodes"][0]
+ return CVE.get_criteria_from_node(configuration["nodes"][0]["cpeMatch"]), None
- @staticmethod
- def get_cpe_confiugration_from_node(node: dict) -> CPEConfiguration | None:
- if node["children"]:
- if len(node["children"]) != 2:
- return None
+ return [], CVE.get_configuration_criteria_from_nodes(configuration["nodes"])
- # Deep variant should have two children, get CPEs from the first one and declare that product, second is platform
- cpes = CVE._parse_nist_dict(node["children"][0]["cpe_match"], parse_only_vulnerable_cpes=True)
- platform = CVE._parse_nist_dict(node["children"][1]["cpe_match"], parse_only_vulnerable_cpes=False)
- return CPEConfiguration(platform[0], cpes)
- else:
- # Shallow variant should have exactly 2 matching CPEs, we declare one a platform, second one the vuln. thing
- cpes = CVE._parse_nist_dict(node["cpe_match"], parse_only_vulnerable_cpes=True)
-
- if len(cpes) != 2:
- return None
+ @staticmethod
+ def get_configuration_criteria_from_nodes(nodes) -> CPEMatchCriteriaConfiguration:
+ assert all("cpeMatch" in x for x in nodes) # the next layer are matches
+ return CPEMatchCriteriaConfiguration([CVE.get_criteria_from_node(x["cpeMatch"]) for x in nodes])
- return CPEConfiguration(cpes[0], [cpes[1]])
+ @staticmethod
+ def get_criteria_from_node(cpe_matches: list[dict[str, Any]]) -> list[CPEMatchCriteria]:
+ return [CPEMatchCriteria.from_nist_dict(x) for x in cpe_matches]