aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/sample/cve.py
blob: c9d2db881cf2b42da38f887a6adc012ffd8436f8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import Any, ClassVar

from dateutil.parser import isoparse

from sec_certs.sample.cpe import CPEMatchCriteria, CPEMatchCriteriaConfiguration
from sec_certs.serialization.json import ComplexSerializableType
from sec_certs.serialization.pandas import PandasSerializableType


@dataclass
class CVE(PandasSerializableType, ComplexSerializableType):
    @dataclass
    class Metrics(ComplexSerializableType):
        base_score: float
        severity: str
        exploitability_score: float
        impact_score: float

        __slots__ = ["base_score", "severity", "exploitability_score", "impact_score"]

        @classmethod
        def from_nist_dict(cls, dct: dict[str, Any]) -> CVE.Metrics:
            """
            Loads metrics from dictionary
            """
            if not (metric_dct := CVE.Metrics.find_metrics_to_use(dct.get("metrics"))):
                return cls(0, "", 0, 0)
            return CVE.Metrics.from_metrics_dct(metric_dct)

        @staticmethod
        def find_metrics_to_use(dct: dict | None) -> dict | None:
            """
            Find any `Primary` entry available > any `nvd@nist.gov` entry available > just return the first entry that we can parse.
            """
            if not dct:
                return None
            all_metrics = [
                dct[metric] for metric in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2") if metric in dct
            ]
            # TODO: cvssMetricV40 does not have exploitabilityScore and impactScore, what do we do?
            #       See https://csrc.nist.gov/schema/nvd/api/2.0/cve_api_json_2.0.schema

            for element in all_metrics:
                if element["type"] == "Primary":
                    return element
            for element in all_metrics:
                if element["source"] == "nvd@nist.gov":
                    return element

            if all_metrics:
                return all_metrics[0]

            return None

        @classmethod
        def from_metrics_dct(cls, dct: dict) -> CVE.Metrics:
            """
            Parse any 3.1, 3.0 or 2.0 CVSS metrics dictionary.
            """
            if dct["cvssData"]["version"] == "3.1":
                return cls(
                    dct["cvssData"]["baseScore"],
                    dct["cvssData"]["baseSeverity"],
                    dct["exploitabilityScore"],
                    dct["impactScore"],
                )
            if dct["cvssData"]["version"] == "3.0":
                return cls(
                    dct["cvssData"]["baseScore"],
                    dct["cvssData"]["baseSeverity"],
                    dct["exploitabilityScore"],
                    dct["impactScore"],
                )
            if dct["cvssData"]["version"] == "2.0":
                return cls(
                    dct["cvssData"]["baseScore"],
                    dct["baseSeverity"],
                    dct["exploitabilityScore"],
                    dct["impactScore"],
                )
            raise ValueError(f"Unknown CVSS version occured ({dct['cvssData']['version']}) when parsing CVSS metrics.")

    cve_id: str
    vulnerable_criteria: list[CPEMatchCriteria]
    vulnerable_criteria_configurations: list[CPEMatchCriteriaConfiguration]
    metrics: Metrics
    published_date: datetime.datetime | None
    cwe_ids: set[str] | None

    __slots__ = [
        "cve_id",
        "vulnerable_criteria",
        "vulnerable_criteria_configurations",
        "metrics",
        "published_date",
        "cwe_ids",
    ]

    pandas_columns: ClassVar[list[str]] = [
        "cve_id",
        "vulnerable_cpes",
        "base_score",
        "severity",
        "explotability_score",
        "impact_score",
        "published_date",
        "cwe_ids",
    ]

    # We cannot use frozen=True. It does not work with __slots__ prior to Python 3.10 dataclasses
    # Hence we manually provide __hash__ and __eq__ despite not guaranteeing immutability
    def __hash__(self) -> int:
        return hash(self.cve_id)

    def __eq__(self, other: object) -> bool:
        return isinstance(other, CVE) and self.cve_id == other.cve_id

    def __lt__(self, other: object) -> bool:
        if not isinstance(other, CVE):
            raise ValueError(f"Cannot compare CVE with {type(other)} type.")
        self_year = int(self.cve_id.split("-")[1])
        self_id = int(self.cve_id.split("-")[2])
        other_year = int(other.cve_id.split("-")[1])
        other_id = int(other.cve_id.split("-")[2])

        return self_year < other_year if self_year != other_year else self_id < other_id

    @property
    def pandas_tuple(self):
        return (
            self.cve_id,
            self.vulnerable_criteria,
            self.metrics.base_score,
            self.metrics.severity,
            self.metrics.exploitability_score,
            self.metrics.impact_score,
            self.published_date,
            self.cwe_ids,
        )

    def to_dict(self) -> dict[str, Any]:
        return {
            "cve_id": self.cve_id,
            "vulnerable_cpes": self.vulnerable_criteria,
            "vulnerable_criteria_configurations": self.vulnerable_criteria_configurations,
            "impact": self.metrics,
            "published_date": self.published_date.isoformat() if self.published_date else None,
            "cwe_ids": self.cwe_ids,
        }

    @classmethod
    def from_dict(cls, dct: dict[str, Any]) -> CVE:
        date_to_take = (
            isoparse(dct["published_date"]) if isinstance(dct["published_date"], str) else dct["published_date"]
        )
        return cls(
            dct["cve_id"],
            dct["vulnerable_cpes"],
            dct["vulnerable_criteria_configurations"],
            dct["impact"],
            date_to_take,
            dct["cwe_ids"],
        )

    @classmethod
    def from_nist_dict(cls, dct: dict) -> CVE:
        cve_id = dct["id"]
        metrics = cls.Metrics.from_nist_dict(dct)
        published_date = datetime.datetime.fromisoformat(dct["published"])
        cwe_ids = cls.parse_cwe_data(dct)
        vulnerable_criteria, vulnerable_criteria_configurations = CVE.parse_configurations(dct)
        return cls(cve_id, vulnerable_criteria, vulnerable_criteria_configurations, metrics, published_date, cwe_ids)

    @staticmethod
    def parse_cwe_data(dct: dict) -> set[str] | None:
        if "weaknesses" not in dct:
            return None

        descriptions = [x["description"] for x in dct["weaknesses"]]
        cwes = {x["value"] for description in descriptions for x in description}
        return cwes if cwes else None

    @staticmethod
    def parse_configurations(
        dct: dict[str, Any],
    ) -> tuple[list[CPEMatchCriteria], list[CPEMatchCriteriaConfiguration]]:
        criteria = []
        criteria_configurations = []
        configurations = dct.get("configurations", [])

        for conf in configurations:
            new_criteria, new_criteria_configuration = CVE.parse_single_configuration(conf)
            criteria.extend(new_criteria)
            if new_criteria_configuration:
                criteria_configurations.append(new_criteria_configuration)
        return criteria, criteria_configurations

    @staticmethod
    def parse_single_configuration(
        configuration: dict[str, Any],
    ) -> tuple[list[CPEMatchCriteria], CPEMatchCriteriaConfiguration | None]:
        if CVE.configuration_is_simple(configuration):
            return CVE.get_simple_criteria_from_cpe_matches(configuration["nodes"][0]["cpeMatch"]), None
        else:
            return [], CVE.get_configuration_criteria_from_configuration_nodes(configuration["nodes"])

    @staticmethod
    def configuration_is_simple(configuration: dict) -> bool:
        return (
            len(configuration["nodes"]) == 1
            and "cpeMatch" in configuration["nodes"][0]
            and (configuration.get("operator", "OR") == "OR" or len(configuration["nodes"][0]["cpeMatch"]) == 1)
        )

    @staticmethod
    def get_configuration_criteria_from_configuration_nodes(
        configuration_nodes: dict,
    ) -> CPEMatchCriteriaConfiguration | None:
        """
        Retrieves complex configuration criteria from a dictionary of configuration nodes.
        It is aasserted that the dictionary has two layers at most, that the top-level children are in AND relationship,
        and that the individual elements are in OR relationship (otherwise, they would be parsed by different method.)

        We cannot process configuration when elements of a single component are in AND relationship.
        Out of all configurations in dataset as of April 2023, only 3 were detected in the dataset.
        We ignore those on purpose.

        :param dict configuration_nodes: _description_
        :return CPEMatchCriteriaConfiguration | None: _description_
        """
        assert all("cpeMatch" in x for x in configuration_nodes)  # the next layer are matches
        nodes = [x for x in configuration_nodes if "operator" not in x or x["operator"] == "OR"]
        if nodes:
            return CPEMatchCriteriaConfiguration(
                [CVE.get_simple_criteria_from_cpe_matches(x["cpeMatch"]) for x in nodes]
            )
        return None

    @staticmethod
    def get_simple_criteria_from_cpe_matches(cpe_matches: list[dict[str, Any]]) -> list[CPEMatchCriteria]:
        return [CPEMatchCriteria.from_nist_dict(x) for x in cpe_matches]