aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/utils/pandas.py
blob: 890cce99944c9be8aa3690b70963e22c661b5227 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
from __future__ import annotations

import copy
import functools
import logging
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from dataclasses import dataclass
from pathlib import Path
from shutil import copyfile
from typing import Any, Final

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from scipy import stats
from tqdm.notebook import tqdm

from sec_certs.dataset.cve import CVEDataset
from sec_certs.sample.sar import SAR
from sec_certs.utils import helpers

logger = logging.getLogger(__name__)


@dataclass(eq=True, frozen=True)
class SecondarySFPCluster:
    name: str
    children: frozenset[int]

    @classmethod
    def from_xml_id(cls, xml_categories: list[ET.Element], cwe_id: int):
        cat = cls.find_correct_category(xml_categories, cwe_id)
        name = cat.attrib["Name"]
        members = cat.find("{http://cwe.mitre.org/cwe-6}Relationships")

        assert members is not None
        member_ids = frozenset(
            int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"
        )
        return cls(name, member_ids)

    @staticmethod
    def find_correct_category(xml_categories: list[ET.Element], cwe_id: int) -> ET.Element:
        for cat in xml_categories:
            if cat.attrib["ID"] == str(cwe_id):
                return cat
        raise ValueError(f"Category with ID {cwe_id} found.")


@dataclass(eq=True, frozen=True)
class PrimarySFPCluster:
    name: str
    secondary_clusters: frozenset[SecondarySFPCluster]
    cwe_ids: frozenset[int]

    @classmethod
    def from_xml(cls, xml_categories: list[ET.Element], primary_cluster_element: ET.Element):
        name = primary_cluster_element.attrib["Name"].split("SFP Primary Cluster: ")[1]
        members = primary_cluster_element.find("{http://cwe.mitre.org/cwe-6}Relationships")

        assert members is not None
        member_ids = {int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"}

        secondary_clusters = []
        cwe_ids = []
        for member_id in member_ids:
            try:
                secondary_clusters.append(SecondarySFPCluster.from_xml_id(xml_categories, member_id))
            except ValueError:
                cwe_ids.append(member_id)

        return cls(name, frozenset(secondary_clusters), frozenset(cwe_ids))


class SFPModel:
    URL: Final[str] = "https://cwe.mitre.org/data/xml/views/888.xml.zip"
    XML_FILENAME: Final[str] = "888.xml"
    XML_ZIP_NAME: Final[str] = "888.xml.zip"

    def __init__(self, primary_clusters: frozenset[PrimarySFPCluster]):
        self.primary_clusters = primary_clusters

    @classmethod
    def from_xml(cls, xml_filepath: str | Path):
        tree = ET.parse(xml_filepath)
        category_tag = tree.getroot().find("{http://cwe.mitre.org/cwe-6}Categories")

        assert category_tag is not None
        categories = category_tag.findall("{http://cwe.mitre.org/cwe-6}Category")

        # The XML contains two weird primary clusters not specified in https://samate.nist.gov/BF/Enlightenment/SFP.html.
        # After manual inspection, we skip those
        primary_clusters = frozenset(
            PrimarySFPCluster.from_xml(categories, x)
            for x in categories
            if (
                "SFP Primary Cluster" in x.attrib["Name"]
                and x.attrib["Name"] != "SFP Primary Cluster: Failure to Release Memory"
                and x.attrib["Name"] != "SFP Primary Cluster: Faulty Resource Release"
            )
        )

        return cls(primary_clusters)

    @classmethod
    def from_web(cls):
        with tempfile.TemporaryDirectory() as tmp_dir:
            xml_zip_path = Path(tmp_dir) / cls.XML_ZIP_NAME
            helpers.download_file(cls.URL, xml_zip_path)

            with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
                zip_handle.extractall(tmp_dir)

            return cls.from_xml(Path(tmp_dir) / cls.XML_FILENAME)

    def search_cwe(self, cwe_id: int) -> tuple[str | None, str | None]:
        for primary in self.primary_clusters:
            for secondary in primary.secondary_clusters:
                if cwe_id in secondary.children:
                    return primary.name, secondary.name
            if cwe_id in primary.cwe_ids:
                return primary.name, None
        return None, None


def discover_sar_families(ser: pd.Series) -> list[str]:
    """
    Returns a list of all SAR families that occur in the pandas Series, where each entry is a set of SAR objects.
    """
    sars = ser.tolist()
    families = set()
    for cert in sars:
        families |= {x.family for x in cert} if not pd.isnull(cert) else set()
    return list(families)


def get_sar_level_from_set(sars: set[SAR], sar_family: str) -> int | None:
    """
    Given a set of SARs and a family name, will return level of the seeked SAR from the set.
    """
    family_sars_dict = {x.family: x for x in sars} if (sars and not pd.isnull(sars)) else {}
    if sar_family not in family_sars_dict:
        return None
    return family_sars_dict[sar_family].level


def compute_cve_correlations(
    df: pd.DataFrame,
    exclude_vuln_free_certs: bool = False,
    sar_families: list[str] | None = None,
    output_path: str | Path | None = None,
    filter_nans: bool = True,
) -> pd.DataFrame:
    """
    Computes correlations of EAL and different SARs and two columns: (n_cves, worst_cve_score, avg_cve_score). Few assumptions about the passed dataframe:
    - EAL column must be categorical data type
    - SAR column must be a set of SARs
    - `n_cves` and `worst_cve_score`, `avg_cve_score` columns must be present in the dataframe
    Possibly, it can filter columns will both values NaN (due to division by zero or super low supports.)
    To choose correct minimal support is tricky, this is because SAR levels often having huge support, but being imbalanced themselves heavily in the favor
    of a single value that is rarely modified. We recommend choosing 100 and discarding any row where some column would result into NaN
    """
    df_sar = df.loc[:, ["eal", "extracted_sars", "worst_cve_score", "avg_cve_score", "n_cves", "category"]]
    df_sar = df_sar.loc[df_sar.category != "ICs, Smart Cards and Smart Card-Related Devices and Systems"]

    if exclude_vuln_free_certs:
        df_sar = df_sar.loc[df_sar.n_cves > 0]

    families = sar_families if sar_families else discover_sar_families(df_sar.extracted_sars)

    spearmanr = functools.partial(stats.spearmanr, nan_policy="omit", alternative="less")

    df_sar.eal = df_sar.eal.cat.codes
    df_sar.eal = df_sar.eal.map(lambda x: np.nan if x == -1 else x)

    n_cves_eal_corr, n_cves_eal_pvalue = spearmanr(df_sar.eal, df_sar.n_cves)
    n_cves_corrs = [n_cves_eal_corr]
    n_cves_pvalues = [n_cves_eal_pvalue]

    worst_cve_eal_corr, worst_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.worst_cve_score)
    worst_cve_corrs = [worst_cve_eal_corr]
    worst_cve_pvalues = [worst_cve_eal_pvalue]

    avg_cve_eal_corr, avg_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.avg_cve_score)
    avg_cve_corrs = [avg_cve_eal_corr]
    avg_cve_pvalues = [avg_cve_eal_pvalue]

    supports = [df_sar.loc[~df_sar["eal"].isnull()].shape[0]]

    for family in tqdm(families):
        df_sar[family] = df_sar.extracted_sars.map(lambda x: get_sar_level_from_set(x, family))

        n_cves_corr, n_cves_pvalue = spearmanr(df_sar[family], df_sar.n_cves)
        n_cves_corrs.append(n_cves_corr)
        n_cves_pvalues.append(n_cves_pvalue)

        worst_cve_corr, worst_cve_pvalue = spearmanr(df_sar[family], df_sar.worst_cve_score)
        worst_cve_corrs.append(worst_cve_corr)
        worst_cve_pvalues.append(worst_cve_pvalue)

        avg_cve_corr, avg_cve_pvalue = spearmanr(df_sar[family], df_sar.avg_cve_score)
        avg_cve_corrs.append(avg_cve_corr)
        avg_cve_pvalues.append(avg_cve_pvalue)

        supports.append(df_sar.loc[~df_sar[family].isnull()].shape[0])

    df_sar = df_sar.copy()

    tuples = list(
        zip(n_cves_corrs, n_cves_pvalues, worst_cve_corrs, worst_cve_pvalues, avg_cve_corrs, avg_cve_pvalues, supports)
    )
    dct = dict(zip(["eal"] + families, tuples))
    df_corr = pd.DataFrame.from_dict(
        dct,
        orient="index",
        columns=[
            "n_cves_corr",
            "n_cves_pvalue",
            "worst_cve_score_corr",
            "worst_cve_pvalue",
            "avg_cve_score_corr",
            "avg_cve_pvalue",
            "support",
        ],
    )
    df_corr.style.set_caption("Correlations between EAL, SARs and CVEs")
    df_corr = df_corr.sort_values(by="support", ascending=False)

    if filter_nans:
        df_corr = df_corr.dropna(how="any", subset=["n_cves_corr", "worst_cve_score_corr", "avg_cve_score_corr"])

    if output_path:
        df_corr.to_csv(output_path)

    return df_corr


def find_earliest_maintenance_after_cve(row):
    "Given dataframe row, will return first maintenance date succeeding first published CVE related to a certificate if exists, else np.nan"
    maintenances_after_cve = [x for x in row["maintenance_dates"] if x > row["earliest_cve"]]
    return min(maintenances_after_cve) if maintenances_after_cve else np.nan


def filter_to_cves_within_validity_period(cc_df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
    """
    Filters the column `related_cves` in `cc_df` DataFrame to CVEs that were published within validity period of the
    studied certificate.
    """

    def filter_cves(
        cve_dset: CVEDataset, cves: set[str], not_valid_before: pd.Timestamp, not_valid_after: pd.Timestamp
    ) -> set[str] | float:
        # Mypy is complaining, but the Optional date is resolved at the beginning of the and condition
        result: set[str] = {
            x
            for x in cves
            if cve_dset[x].published_date
            and not_valid_before < pd.Timestamp(cve_dset[x].published_date.date())  # type: ignore
            and not_valid_after > pd.Timestamp(cve_dset[x].published_date.date())  # type: ignore
        }

        return result if result else np.nan

    if (
        cc_df.loc[
            (cc_df.related_cves.notnull()) & ((cc_df.not_valid_before.isna()) | (cc_df.not_valid_after.isna()))
        ].shape[0]
        > 0
    ):
        raise ValueError(
            "Cannot filter CVEs on certificates that have NaNs in not_valid_after or not_valid_before fields."
        )

    cc_df["related_cves"] = cc_df.apply(
        lambda row: filter_cves(cve_dset, row["related_cves"], row["not_valid_before"], row["not_valid_after"])
        if not pd.isna(row["related_cves"])
        else row["related_cves"],
        axis=1,
    )

    return cc_df


def expand_df_with_cve_cols(df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
    df = df.copy()
    df["n_cves"] = df.related_cves.map(lambda x: 0 if pd.isna(x) else len(x))
    df["cve_published_dates"] = df.related_cves.map(
        lambda x: [cve_dset[y].published_date.date() for y in x] if not pd.isna(x) else np.nan  # type: ignore
    )

    df["earliest_cve"] = df.cve_published_dates.map(lambda x: min(x) if isinstance(x, list) else np.nan)
    df["worst_cve_score"] = df.related_cves.map(
        lambda x: max([cve_dset[cve].metrics.base_score for cve in x]) if not pd.isna(x) else np.nan
    )

    """
    Note: Technically, CVE can have 0 base score. This happens when the CVE is discarded from the database.
    This could skew the results. During May 2022 analysis, we encountered a single CVE with such score.
    Therefore, we do not treat this case.
    To properly treat this, the average should be taken across CVEs with >0 base_socre.
    """
    df["avg_cve_score"] = df.related_cves.map(
        lambda x: np.mean([cve_dset[cve].metrics.base_score for cve in x]) if not pd.isna(x) else np.nan
    )
    return df


def prepare_cwe_df(
    cc_df: pd.DataFrame, cve_dset: CVEDataset, fine_grained: bool = False
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    This function does the following:
    1. Filter CC DF to columns relevant for CWE examination (eal, related_cves, category)
    2. Parses CWE webpage of CWE categories and weaknesses, fetches CWE descriptions and names from there
    3. Explodes the CC DF so that each row corresponds to single CVE
    4. Joins CC DF with CWE DF obtained from CVEDataset
    5. Explodes resulting DF again so that each row corresponds to single CWE

    :param pd.DataFrame cc_df: DataFrame obtained from CCDataset, should be limited to rows with >0 vulnerabilities
    :param CVEDataset cve_dset: CVEDataset instance to retrieve CWE data from
    :param bool fine_grained: If se to True, CWEs won't be merged into weaknesses of higher abstraction
    :return Tuple[pd.DataFrame, pd.DataFrame]: returns two dataframes:
        - DF obtained from CC Dataset, fully exploded to CWEs
        - DF obtained from CWE webpage, contains IDs, names, types, urls of all CWEs
    """
    # Explode CVE_IDs and CWE_IDs so that we have right counts on duplicated CVEs. Measure how much data for analysis we have left.
    vulns = cve_dset.to_pandas()
    df_cwe_relevant = (
        cc_df[["eal", "related_cves", "category"]]
        .explode(column="related_cves")
        .rename(columns={"related_cves": "cve_id"})
    )
    df_cwe_relevant["cwe_ids"] = df_cwe_relevant.cve_id.map(lambda x: vulns.cwe_ids[x])
    df_cwe_relevant = (
        df_cwe_relevant.explode(column="cwe_ids")
        .reset_index()
        .rename(columns={"cwe_ids": "cwe_id", "index": "cert_dgst"})
    )

    df_cwe_relevant.cwe_id = df_cwe_relevant.cwe_id.replace(r"NVD-CWE-*", np.nan, regex=True)
    print(
        f"Filtering {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs that have no CWE assigned. This affects {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates"
    )
    print(
        f"Still left with analysis of {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs in {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates."
    )
    df_cwe_relevant = df_cwe_relevant.dropna()

    # Load CWE IDs and descriptions from CWE website
    with tempfile.TemporaryDirectory() as tmp_dir:
        xml_zip_path = Path(tmp_dir) / "cwec_latest.xml.zip"
        helpers.download_file("https://cwe.mitre.org/data/xml/cwec_latest.xml.zip", xml_zip_path)

        with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
            zip_handle.extractall(tmp_dir)
            xml_filename = zip_handle.namelist()[0]

        root = ET.parse(Path(tmp_dir) / xml_filename).getroot()

    weaknesses = root.find("{http://cwe.mitre.org/cwe-6}Weaknesses")
    categories = root.find("{http://cwe.mitre.org/cwe-6}Categories")
    dct: dict[str, Any] = {
        "cwe_id": [],
        "cwe_name": [],
        "cwe_description": [],
        "type": [],
        "child_of": [],
    }

    assert weaknesses
    for weakness in weaknesses:
        assert weakness
        description = weakness.find("{http://cwe.mitre.org/cwe-6}Description")
        related_weaknesses = weakness.find("{http://cwe.mitre.org/cwe-6}Related_Weaknesses")

        dct["cwe_id"].append("CWE-" + weakness.attrib["ID"])
        dct["cwe_name"].append(weakness.attrib["Name"])
        dct["cwe_description"].append(description.text if description is not None else None)
        dct["type"].append("weakness")

        if related_weaknesses:
            dct["child_of"].append(
                {
                    "CWE-" + x.attrib["CWE_ID"]
                    for x in related_weaknesses
                    if x.tag == "{http://cwe.mitre.org/cwe-6}Related_Weakness" and x.attrib["Nature"] == "ChildOf"
                }
            )
        else:
            dct["child_of"].append(np.nan)

    assert categories
    for category in categories:
        assert category
        summary = category.find("{http://cwe.mitre.org/cwe-6}Summary")

        dct["cwe_id"].append("CWE-" + category.attrib["ID"])
        dct["cwe_name"].append(category.attrib["Name"])
        dct["cwe_description"].append(summary.text if summary is not None else None)
        dct["type"].append("category")
        dct["child_of"].append(np.nan)

    cwe_df = pd.DataFrame(dct).set_index("cwe_id")
    cwe_df["url"] = cwe_df.index.map(lambda x: "https://cwe.mitre.org/data/definitions/" + x.split("-")[1] + ".html")
    cwe_df = cwe_df.replace(r"\n", " ", regex=True)

    if fine_grained:
        return df_cwe_relevant, cwe_df
    else:
        return get_coarse_grained_cwes(df_cwe_relevant, cwe_df), cwe_df


def get_coarse_grained_cwes(fine_grained_df: pd.DataFrame, cwe_df: pd.DataFrame) -> pd.DataFrame:
    """
    Oddly enough, NVD contains CWEs at different levels of abstraction, which makes it difficult to compare between them.
    Among others, some three different CWEs appear in the CVEDataset: CWE-20, CWE-119, CWE-787. Problem is that CWE-787
    is child of CWE-119, which in turn is child of CWE-20. It makes no sense to compute stats of most prevalent CWEs
    unless categories are aligned to the top-most level.

    This function aligns the categories to the top-most level. It works in loop. When an iteration is performed without
    replacing any CWEs with their parents, the algorithm terminates.
    The algorithm inspects every CWE and replaces it with all its parents on condition that they appear in the CVE Dataset.

    :param pd.DataFrame fine_grained_df: First element of the output of `prepare_cwe_df` function
    :param pd.DataFrame cwe_df: Second element of the output of `prepare_cwe_df` function
    :return pd.DataFrame: DF obtained from CC Dataset, fully exploded to coarse-grained CWEs
    """
    all_cwes_in_original_df = set(fine_grained_df.cwe_id.unique())
    parent_dict = cwe_df.child_of.to_dict()
    new_set = set(fine_grained_df.cwe_id.unique())
    mapping = {x: {x} for x in new_set}

    while True:
        old_set = copy.deepcopy(new_set)
        for cwe in old_set:
            parents = parent_dict[cwe]
            if parents and parents is not np.nan and any(x in all_cwes_in_original_df for x in parents):
                new_set.remove(cwe)
                new_set.update({x for x in parents if x in all_cwes_in_original_df})
                for val in mapping.values():
                    if cwe in val:
                        val.remove(cwe)
                        val.update({x for x in parents if x in all_cwes_in_original_df})
        if new_set == old_set:
            break

    # Now we should have complete mapping of fine_grained -> coarse_grained CWEs
    new_df = fine_grained_df.copy()
    new_df.cwe_id = new_df.cwe_id.map(mapping)

    return new_df.explode(column="cwe_id")


def get_top_n_cwes(
    df: pd.DataFrame, cwe_df: pd.DataFrame, category: str | None = None, eal: str | None = None, n_cwes: int = 10
) -> pd.DataFrame:
    """Fetches top-n CWEs, overall, per category, or per EAL"""
    top_n = df.copy()

    if category:
        top_n = top_n.loc[top_n.category == category].copy()
    if eal:
        top_n = top_n.loc[top_n.eal == eal].copy()

    top_n = (
        top_n.cwe_id.value_counts()
        .head(n_cwes)
        .to_frame()
        .rename(columns={"cwe_id": "frequency"})
        .rename_axis("cwe_id")
    )
    top_n["cwe_name"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_name)
    top_n["cwe_description"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_description)
    top_n["url"] = top_n.index.map(lambda x: cwe_df.loc[x].url)
    top_n["type"] = top_n.index.map(lambda x: cwe_df.loc[x].type)

    return top_n


def compute_maintenances_that_come_after_vulns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Given pre-processed CCDataset DataFrame (expanded with MU & CVE cols), computes time to fix CVE and earliest CVE after some vuln.
    """
    df_fixed = df.loc[(df.n_cves > 0) & (df.n_maintenances > 0)].copy()
    df_fixed.maintenance_dates = df_fixed.maintenance_dates.map(lambda x: [y.date() for y in x])
    df_fixed.loc[:, "earliest_maintenance_after_vuln"] = df_fixed.apply(find_earliest_maintenance_after_cve, axis=1)
    df_fixed.index.name = "dgst"
    return df_fixed


def move_fixing_mu_to_directory(
    df_fixed: pd.DataFrame, main_df: pd.DataFrame, outdir: str | Path, inpath: str | Path
) -> pd.DataFrame:
    """
    Localizes reports of maintenance updates that should fix some vulnerability and copies them into a directory.
    df_fixed should be the output of compute_maintenances_that_come_after_vulns method.
    """
    fixed_df_index = (
        df_fixed.loc[~df_fixed.earliest_maintenance_after_vuln.isnull()]
        .reset_index()
        .set_index(["dgst", "earliest_maintenance_after_vuln"])
        .index.to_flat_index()
    )
    main_df.maintenance_date = main_df.maintenance_date.map(lambda x: x.date())
    main_prefiltered = main_df.reset_index().set_index(["related_cert_digest", "maintenance_date"])
    mu_filenames = main_prefiltered.loc[main_prefiltered.index.isin(fixed_df_index), "dgst"]
    mu_filenames = mu_filenames.map(lambda x: x + ".pdf")

    inpath = Path(inpath)
    if not inpath.exists():
        inpath.mkdir()

    for i in mu_filenames:
        copyfile(inpath / i, Path(outdir) / i)

    return mu_filenames


def plot_dataframe_graph(
    data: dict,
    label: str,
    file_name: str,
    density: bool = False,
    cumulative: bool = False,
    bins: int = 50,
    log: bool = True,
    show: bool = True,
) -> None:
    pd_data = pd.Series(data)
    pd_data.hist(bins=bins, label=label, density=density, cumulative=cumulative)
    plt.savefig(file_name)
    if show:
        plt.show()

    if log:
        sorted_data = pd_data.value_counts(ascending=True)

    logger.info(sorted_data.where(sorted_data > 1).dropna())