1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
|
from __future__ import annotations
import copy
import functools
import logging
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from dataclasses import dataclass
from pathlib import Path
from shutil import copyfile
from typing import Any, Final
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from scipy import stats
from tqdm.notebook import tqdm
from sec_certs.dataset.cve import CVEDataset
from sec_certs.sample.sar import SAR
from sec_certs.utils import helpers
logger = logging.getLogger(__name__)
@dataclass(eq=True, frozen=True)
class SecondarySFPCluster:
name: str
children: frozenset[int]
@classmethod
def from_xml_id(cls, xml_categories: list[ET.Element], cwe_id: int):
cat = cls.find_correct_category(xml_categories, cwe_id)
name = cat.attrib["Name"]
members = cat.find("{http://cwe.mitre.org/cwe-6}Relationships")
assert members is not None
member_ids = frozenset(
int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"
)
return cls(name, member_ids)
@staticmethod
def find_correct_category(xml_categories: list[ET.Element], cwe_id: int) -> ET.Element:
for cat in xml_categories:
if cat.attrib["ID"] == str(cwe_id):
return cat
raise ValueError(f"Category with ID {cwe_id} found.")
@dataclass(eq=True, frozen=True)
class PrimarySFPCluster:
name: str
secondary_clusters: frozenset[SecondarySFPCluster]
cwe_ids: frozenset[int]
@classmethod
def from_xml(cls, xml_categories: list[ET.Element], primary_cluster_element: ET.Element):
name = primary_cluster_element.attrib["Name"].split("SFP Primary Cluster: ")[1]
members = primary_cluster_element.find("{http://cwe.mitre.org/cwe-6}Relationships")
assert members is not None
member_ids = {int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"}
secondary_clusters = []
cwe_ids = []
for member_id in member_ids:
try:
secondary_clusters.append(SecondarySFPCluster.from_xml_id(xml_categories, member_id))
except ValueError:
cwe_ids.append(member_id)
return cls(name, frozenset(secondary_clusters), frozenset(cwe_ids))
class SFPModel:
URL: Final[str] = "https://cwe.mitre.org/data/xml/views/888.xml.zip"
XML_FILENAME: Final[str] = "888.xml"
XML_ZIP_NAME: Final[str] = "888.xml.zip"
def __init__(self, primary_clusters: frozenset[PrimarySFPCluster]):
self.primary_clusters = primary_clusters
@classmethod
def from_xml(cls, xml_filepath: str | Path):
tree = ET.parse(xml_filepath)
category_tag = tree.getroot().find("{http://cwe.mitre.org/cwe-6}Categories")
assert category_tag is not None
categories = category_tag.findall("{http://cwe.mitre.org/cwe-6}Category")
# The XML contains two weird primary clusters not specified in https://samate.nist.gov/BF/Enlightenment/SFP.html.
# After manual inspection, we skip those
primary_clusters = frozenset(
PrimarySFPCluster.from_xml(categories, x)
for x in categories
if (
"SFP Primary Cluster" in x.attrib["Name"]
and x.attrib["Name"] != "SFP Primary Cluster: Failure to Release Memory"
and x.attrib["Name"] != "SFP Primary Cluster: Faulty Resource Release"
)
)
return cls(primary_clusters)
@classmethod
def from_web(cls):
with tempfile.TemporaryDirectory() as tmp_dir:
xml_zip_path = Path(tmp_dir) / cls.XML_ZIP_NAME
helpers.download_file(cls.URL, xml_zip_path)
with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
zip_handle.extractall(tmp_dir)
return cls.from_xml(Path(tmp_dir) / cls.XML_FILENAME)
def search_cwe(self, cwe_id: int) -> tuple[str | None, str | None]:
for primary in self.primary_clusters:
for secondary in primary.secondary_clusters:
if cwe_id in secondary.children:
return primary.name, secondary.name
if cwe_id in primary.cwe_ids:
return primary.name, None
return None, None
def discover_sar_families(ser: pd.Series) -> list[str]:
"""
Returns a list of all SAR families that occur in the pandas Series, where each entry is a set of SAR objects.
"""
sars = ser.tolist()
families = set()
for cert in sars:
families |= {x.family for x in cert} if not pd.isnull(cert) else set()
return list(families)
def get_sar_level_from_set(sars: set[SAR], sar_family: str) -> int | None:
"""
Given a set of SARs and a family name, will return level of the seeked SAR from the set.
"""
family_sars_dict = {x.family: x for x in sars} if (sars and not pd.isnull(sars)) else {}
if sar_family not in family_sars_dict:
return None
return family_sars_dict[sar_family].level
def compute_cve_correlations(
df: pd.DataFrame,
exclude_vuln_free_certs: bool = False,
sar_families: list[str] | None = None,
output_path: str | Path | None = None,
filter_nans: bool = True,
) -> pd.DataFrame:
"""
Computes correlations of EAL and different SARs and two columns: (n_cves, worst_cve_score, avg_cve_score). Few assumptions about the passed dataframe:
- EAL column must be categorical data type
- SAR column must be a set of SARs
- `n_cves` and `worst_cve_score`, `avg_cve_score` columns must be present in the dataframe
Possibly, it can filter columns will both values NaN (due to division by zero or super low supports.)
To choose correct minimal support is tricky, this is because SAR levels often having huge support, but being imbalanced themselves heavily in the favor
of a single value that is rarely modified. We recommend choosing 100 and discarding any row where some column would result into NaN
"""
df_sar = df.loc[:, ["eal", "extracted_sars", "worst_cve_score", "avg_cve_score", "n_cves", "category"]]
df_sar = df_sar.loc[df_sar.category != "ICs, Smart Cards and Smart Card-Related Devices and Systems"]
if exclude_vuln_free_certs:
df_sar = df_sar.loc[df_sar.n_cves > 0]
families = sar_families if sar_families else discover_sar_families(df_sar.extracted_sars)
spearmanr = functools.partial(stats.spearmanr, nan_policy="omit", alternative="less")
df_sar.eal = df_sar.eal.cat.codes
df_sar.eal = df_sar.eal.map(lambda x: np.nan if x == -1 else x)
n_cves_eal_corr, n_cves_eal_pvalue = spearmanr(df_sar.eal, df_sar.n_cves)
n_cves_corrs = [n_cves_eal_corr]
n_cves_pvalues = [n_cves_eal_pvalue]
worst_cve_eal_corr, worst_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.worst_cve_score)
worst_cve_corrs = [worst_cve_eal_corr]
worst_cve_pvalues = [worst_cve_eal_pvalue]
avg_cve_eal_corr, avg_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.avg_cve_score)
avg_cve_corrs = [avg_cve_eal_corr]
avg_cve_pvalues = [avg_cve_eal_pvalue]
supports = [df_sar.loc[~df_sar["eal"].isnull()].shape[0]]
for family in tqdm(families):
df_sar[family] = df_sar.extracted_sars.map(lambda x: get_sar_level_from_set(x, family))
n_cves_corr, n_cves_pvalue = spearmanr(df_sar[family], df_sar.n_cves)
n_cves_corrs.append(n_cves_corr)
n_cves_pvalues.append(n_cves_pvalue)
worst_cve_corr, worst_cve_pvalue = spearmanr(df_sar[family], df_sar.worst_cve_score)
worst_cve_corrs.append(worst_cve_corr)
worst_cve_pvalues.append(worst_cve_pvalue)
avg_cve_corr, avg_cve_pvalue = spearmanr(df_sar[family], df_sar.avg_cve_score)
avg_cve_corrs.append(avg_cve_corr)
avg_cve_pvalues.append(avg_cve_pvalue)
supports.append(df_sar.loc[~df_sar[family].isnull()].shape[0])
df_sar = df_sar.copy()
tuples = list(
zip(n_cves_corrs, n_cves_pvalues, worst_cve_corrs, worst_cve_pvalues, avg_cve_corrs, avg_cve_pvalues, supports)
)
dct = dict(zip(["eal"] + families, tuples))
df_corr = pd.DataFrame.from_dict(
dct,
orient="index",
columns=[
"n_cves_corr",
"n_cves_pvalue",
"worst_cve_score_corr",
"worst_cve_pvalue",
"avg_cve_score_corr",
"avg_cve_pvalue",
"support",
],
)
df_corr.style.set_caption("Correlations between EAL, SARs and CVEs")
df_corr = df_corr.sort_values(by="support", ascending=False)
if filter_nans:
df_corr = df_corr.dropna(how="any", subset=["n_cves_corr", "worst_cve_score_corr", "avg_cve_score_corr"])
if output_path:
df_corr.to_csv(output_path)
return df_corr
def find_earliest_maintenance_after_cve(row):
"Given dataframe row, will return first maintenance date succeeding first published CVE related to a certificate if exists, else np.nan"
maintenances_after_cve = [x for x in row["maintenance_dates"] if x > row["earliest_cve"]]
return min(maintenances_after_cve) if maintenances_after_cve else np.nan
def filter_to_cves_within_validity_period(cc_df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
"""
Filters the column `related_cves` in `cc_df` DataFrame to CVEs that were published within validity period of the
studied certificate.
"""
def filter_cves(
cve_dset: CVEDataset, cves: set[str], not_valid_before: pd.Timestamp, not_valid_after: pd.Timestamp
) -> set[str] | float:
# Mypy is complaining, but the Optional date is resolved at the beginning of the and condition
result: set[str] = {
x
for x in cves
if cve_dset[x].published_date
and not_valid_before < pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore
and not_valid_after > pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore
}
return result if result else np.nan
if (
cc_df.loc[
(cc_df.related_cves.notnull()) & ((cc_df.not_valid_before.isna()) | (cc_df.not_valid_after.isna()))
].shape[0]
> 0
):
raise ValueError(
"Cannot filter CVEs on certificates that have NaNs in not_valid_after or not_valid_before fields."
)
cc_df["related_cves"] = cc_df.apply(
lambda row: filter_cves(cve_dset, row["related_cves"], row["not_valid_before"], row["not_valid_after"])
if not pd.isna(row["related_cves"])
else row["related_cves"],
axis=1,
)
return cc_df
def expand_df_with_cve_cols(df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
df = df.copy()
df["n_cves"] = df.related_cves.map(lambda x: 0 if pd.isna(x) else len(x))
df["cve_published_dates"] = df.related_cves.map(
lambda x: [cve_dset[y].published_date.date() for y in x] if not pd.isna(x) else np.nan # type: ignore
)
df["earliest_cve"] = df.cve_published_dates.map(lambda x: min(x) if isinstance(x, list) else np.nan)
df["worst_cve_score"] = df.related_cves.map(
lambda x: max([cve_dset[cve].metrics.base_score for cve in x]) if not pd.isna(x) else np.nan
)
"""
Note: Technically, CVE can have 0 base score. This happens when the CVE is discarded from the database.
This could skew the results. During May 2022 analysis, we encountered a single CVE with such score.
Therefore, we do not treat this case.
To properly treat this, the average should be taken across CVEs with >0 base_socre.
"""
df["avg_cve_score"] = df.related_cves.map(
lambda x: np.mean([cve_dset[cve].metrics.base_score for cve in x]) if not pd.isna(x) else np.nan
)
return df
def prepare_cwe_df(
cc_df: pd.DataFrame, cve_dset: CVEDataset, fine_grained: bool = False
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
This function does the following:
1. Filter CC DF to columns relevant for CWE examination (eal, related_cves, category)
2. Parses CWE webpage of CWE categories and weaknesses, fetches CWE descriptions and names from there
3. Explodes the CC DF so that each row corresponds to single CVE
4. Joins CC DF with CWE DF obtained from CVEDataset
5. Explodes resulting DF again so that each row corresponds to single CWE
:param pd.DataFrame cc_df: DataFrame obtained from CCDataset, should be limited to rows with >0 vulnerabilities
:param CVEDataset cve_dset: CVEDataset instance to retrieve CWE data from
:param bool fine_grained: If se to True, CWEs won't be merged into weaknesses of higher abstraction
:return Tuple[pd.DataFrame, pd.DataFrame]: returns two dataframes:
- DF obtained from CC Dataset, fully exploded to CWEs
- DF obtained from CWE webpage, contains IDs, names, types, urls of all CWEs
"""
# Explode CVE_IDs and CWE_IDs so that we have right counts on duplicated CVEs. Measure how much data for analysis we have left.
vulns = cve_dset.to_pandas()
df_cwe_relevant = (
cc_df[["eal", "related_cves", "category"]]
.explode(column="related_cves")
.rename(columns={"related_cves": "cve_id"})
)
df_cwe_relevant["cwe_ids"] = df_cwe_relevant.cve_id.map(lambda x: vulns.cwe_ids[x])
df_cwe_relevant = (
df_cwe_relevant.explode(column="cwe_ids")
.reset_index()
.rename(columns={"cwe_ids": "cwe_id", "index": "cert_dgst"})
)
df_cwe_relevant.cwe_id = df_cwe_relevant.cwe_id.replace(r"NVD-CWE-*", np.nan, regex=True)
print(
f"Filtering {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs that have no CWE assigned. This affects {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates"
)
print(
f"Still left with analysis of {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs in {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates."
)
df_cwe_relevant = df_cwe_relevant.dropna()
# Load CWE IDs and descriptions from CWE website
with tempfile.TemporaryDirectory() as tmp_dir:
xml_zip_path = Path(tmp_dir) / "cwec_latest.xml.zip"
helpers.download_file("https://cwe.mitre.org/data/xml/cwec_latest.xml.zip", xml_zip_path)
with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
zip_handle.extractall(tmp_dir)
xml_filename = zip_handle.namelist()[0]
root = ET.parse(Path(tmp_dir) / xml_filename).getroot()
weaknesses = root.find("{http://cwe.mitre.org/cwe-6}Weaknesses")
categories = root.find("{http://cwe.mitre.org/cwe-6}Categories")
dct: dict[str, Any] = {
"cwe_id": [],
"cwe_name": [],
"cwe_description": [],
"type": [],
"child_of": [],
}
assert weaknesses
for weakness in weaknesses:
assert weakness
description = weakness.find("{http://cwe.mitre.org/cwe-6}Description")
related_weaknesses = weakness.find("{http://cwe.mitre.org/cwe-6}Related_Weaknesses")
dct["cwe_id"].append("CWE-" + weakness.attrib["ID"])
dct["cwe_name"].append(weakness.attrib["Name"])
dct["cwe_description"].append(description.text if description is not None else None)
dct["type"].append("weakness")
if related_weaknesses:
dct["child_of"].append(
{
"CWE-" + x.attrib["CWE_ID"]
for x in related_weaknesses
if x.tag == "{http://cwe.mitre.org/cwe-6}Related_Weakness" and x.attrib["Nature"] == "ChildOf"
}
)
else:
dct["child_of"].append(np.nan)
assert categories
for category in categories:
assert category
summary = category.find("{http://cwe.mitre.org/cwe-6}Summary")
dct["cwe_id"].append("CWE-" + category.attrib["ID"])
dct["cwe_name"].append(category.attrib["Name"])
dct["cwe_description"].append(summary.text if summary is not None else None)
dct["type"].append("category")
dct["child_of"].append(np.nan)
cwe_df = pd.DataFrame(dct).set_index("cwe_id")
cwe_df["url"] = cwe_df.index.map(lambda x: "https://cwe.mitre.org/data/definitions/" + x.split("-")[1] + ".html")
cwe_df = cwe_df.replace(r"\n", " ", regex=True)
if fine_grained:
return df_cwe_relevant, cwe_df
else:
return get_coarse_grained_cwes(df_cwe_relevant, cwe_df), cwe_df
def get_coarse_grained_cwes(fine_grained_df: pd.DataFrame, cwe_df: pd.DataFrame) -> pd.DataFrame:
"""
Oddly enough, NVD contains CWEs at different levels of abstraction, which makes it difficult to compare between them.
Among others, some three different CWEs appear in the CVEDataset: CWE-20, CWE-119, CWE-787. Problem is that CWE-787
is child of CWE-119, which in turn is child of CWE-20. It makes no sense to compute stats of most prevalent CWEs
unless categories are aligned to the top-most level.
This function aligns the categories to the top-most level. It works in loop. When an iteration is performed without
replacing any CWEs with their parents, the algorithm terminates.
The algorithm inspects every CWE and replaces it with all its parents on condition that they appear in the CVE Dataset.
:param pd.DataFrame fine_grained_df: First element of the output of `prepare_cwe_df` function
:param pd.DataFrame cwe_df: Second element of the output of `prepare_cwe_df` function
:return pd.DataFrame: DF obtained from CC Dataset, fully exploded to coarse-grained CWEs
"""
all_cwes_in_original_df = set(fine_grained_df.cwe_id.unique())
parent_dict = cwe_df.child_of.to_dict()
new_set = set(fine_grained_df.cwe_id.unique())
mapping = {x: {x} for x in new_set}
while True:
old_set = copy.deepcopy(new_set)
for cwe in old_set:
parents = parent_dict[cwe]
if parents and parents is not np.nan and any(x in all_cwes_in_original_df for x in parents):
new_set.remove(cwe)
new_set.update({x for x in parents if x in all_cwes_in_original_df})
for val in mapping.values():
if cwe in val:
val.remove(cwe)
val.update({x for x in parents if x in all_cwes_in_original_df})
if new_set == old_set:
break
# Now we should have complete mapping of fine_grained -> coarse_grained CWEs
new_df = fine_grained_df.copy()
new_df.cwe_id = new_df.cwe_id.map(mapping)
return new_df.explode(column="cwe_id")
def get_top_n_cwes(
df: pd.DataFrame, cwe_df: pd.DataFrame, category: str | None = None, eal: str | None = None, n_cwes: int = 10
) -> pd.DataFrame:
"""Fetches top-n CWEs, overall, per category, or per EAL"""
top_n = df.copy()
if category:
top_n = top_n.loc[top_n.category == category].copy()
if eal:
top_n = top_n.loc[top_n.eal == eal].copy()
top_n = (
top_n.cwe_id.value_counts()
.head(n_cwes)
.to_frame()
.rename(columns={"cwe_id": "frequency"})
.rename_axis("cwe_id")
)
top_n["cwe_name"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_name)
top_n["cwe_description"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_description)
top_n["url"] = top_n.index.map(lambda x: cwe_df.loc[x].url)
top_n["type"] = top_n.index.map(lambda x: cwe_df.loc[x].type)
return top_n
def compute_maintenances_that_come_after_vulns(df: pd.DataFrame) -> pd.DataFrame:
"""
Given pre-processed CCDataset DataFrame (expanded with MU & CVE cols), computes time to fix CVE and earliest CVE after some vuln.
"""
df_fixed = df.loc[(df.n_cves > 0) & (df.n_maintenances > 0)].copy()
df_fixed.maintenance_dates = df_fixed.maintenance_dates.map(lambda x: [y.date() for y in x])
df_fixed.loc[:, "earliest_maintenance_after_vuln"] = df_fixed.apply(find_earliest_maintenance_after_cve, axis=1)
df_fixed.index.name = "dgst"
return df_fixed
def move_fixing_mu_to_directory(
df_fixed: pd.DataFrame, main_df: pd.DataFrame, outdir: str | Path, inpath: str | Path
) -> pd.DataFrame:
"""
Localizes reports of maintenance updates that should fix some vulnerability and copies them into a directory.
df_fixed should be the output of compute_maintenances_that_come_after_vulns method.
"""
fixed_df_index = (
df_fixed.loc[~df_fixed.earliest_maintenance_after_vuln.isnull()]
.reset_index()
.set_index(["dgst", "earliest_maintenance_after_vuln"])
.index.to_flat_index()
)
main_df.maintenance_date = main_df.maintenance_date.map(lambda x: x.date())
main_prefiltered = main_df.reset_index().set_index(["related_cert_digest", "maintenance_date"])
mu_filenames = main_prefiltered.loc[main_prefiltered.index.isin(fixed_df_index), "dgst"]
mu_filenames = mu_filenames.map(lambda x: x + ".pdf")
inpath = Path(inpath)
if not inpath.exists():
inpath.mkdir()
for i in mu_filenames:
copyfile(inpath / i, Path(outdir) / i)
return mu_filenames
def plot_dataframe_graph(
data: dict,
label: str,
file_name: str,
density: bool = False,
cumulative: bool = False,
bins: int = 50,
log: bool = True,
show: bool = True,
) -> None:
pd_data = pd.Series(data)
pd_data.hist(bins=bins, label=label, density=density, cumulative=cumulative)
plt.savefig(file_name)
if show:
plt.show()
if log:
sorted_data = pd_data.value_counts(ascending=True)
logger.info(sorted_data.where(sorted_data > 1).dropna())
|