diff options
| author | J08nY | 2021-12-23 21:17:06 +0100 |
|---|---|---|
| committer | J08nY | 2021-12-23 21:17:31 +0100 |
| commit | 699ae4babc3de9c71596a77d97453fac138af14f (patch) | |
| tree | b9d5bf6aee620faa136fa351fbc7a79c8f2799d7 | |
| parent | ea302d256ead14555194d110ac822e83a441bde6 (diff) | |
| download | sec-certs-699ae4babc3de9c71596a77d97453fac138af14f.tar.gz sec-certs-699ae4babc3de9c71596a77d97453fac138af14f.tar.zst sec-certs-699ae4babc3de9c71596a77d97453fac138af14f.zip | |
Use dgst field for FIPS instead of cert_id.
Fixes #125.
| -rw-r--r-- | sec_certs/dataset/fips.py | 54 | ||||
| -rw-r--r-- | sec_certs/helpers.py | 4 | ||||
| -rw-r--r-- | sec_certs/sample/fips.py | 10 | ||||
| -rw-r--r-- | tests/test_fips_oop.py | 125 |
4 files changed, 104 insertions, 89 deletions
diff --git a/sec_certs/dataset/fips.py b/sec_certs/dataset/fips.py index d1ce2dea..6a664544 100644 --- a/sec_certs/dataset/fips.py +++ b/sec_certs/dataset/fips.py @@ -15,6 +15,7 @@ from sec_certs import parallel_processing as cert_processing from sec_certs.config.configuration import config from sec_certs.dataset.dataset import Dataset from sec_certs.dataset.fips_algorithm import FIPSAlgorithmDataset +from sec_certs.helpers import fips_dgst from sec_certs.sample.certificate import Certificate from sec_certs.sample.fips import FIPSCertificate from sec_certs.serialization.json import ComplexSerializableType, serialize @@ -111,7 +112,7 @@ class FIPSDataset(Dataset, ComplexSerializableType): raise RuntimeError("You need to provide cert ids to FIPS download PDFs functionality.") for cert_id in cert_ids: if not (self.policies_dir / f"{cert_id}.pdf").exists() or ( - cert_id in self.certs and not self.certs[cert_id].state.txt_state + cert_id in self.certs and not self.certs[fips_dgst(cert_id)].state.txt_state ): sp_urls.append( f"https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp{cert_id}.pdf" @@ -221,20 +222,21 @@ class FIPSDataset(Dataset, ComplexSerializableType): return entries @serialize - def web_scan(self, cert_ids: Set[str], redo: bool = False): + def web_scan(self, cert_ids: Set[int], redo: bool = False): logger.info("Entering web scan.") for cert_id in cert_ids: - self.certs[cert_id] = FIPSCertificate.html_from_file( + dgst = fips_dgst(cert_id) + self.certs[dgst] = FIPSCertificate.html_from_file( self.web_dir / f"{cert_id}.html", FIPSCertificate.State( - (self.policies_dir / cert_id).with_suffix(".pdf"), - (self.web_dir / cert_id).with_suffix(".html"), - (self.fragments_dir / cert_id).with_suffix(".txt"), + (self.policies_dir / str(cert_id)).with_suffix(".pdf"), + (self.web_dir / str(cert_id)).with_suffix(".html"), + (self.fragments_dir / str(cert_id)).with_suffix(".txt"), False, None, False, ), - self.certs[cert_id] if cert_id in self.certs else None, + self.certs.get(dgst), redo=redo, ) @@ -356,7 +358,8 @@ class FIPSDataset(Dataset, ComplexSerializableType): # returns True if candidates should _not_ be matched def _compare_certs(self, current_certificate: "FIPSCertificate", other_id: str): - other_cert = self.certs[other_id] + other_dgst = fips_dgst(other_id) + other_cert = self.certs[other_dgst] if ( current_certificate.web_scan.date_validation is None or other_cert is None @@ -395,29 +398,30 @@ class FIPSDataset(Dataset, ComplexSerializableType): and cert_id != current_cert.cert_id ] - def _validate_id(self, processed_cert: FIPSCertificate, cert_candidate: str) -> bool: - if cert_candidate not in self.certs or not cert_candidate.isdecimal(): + def _validate_id(self, processed_cert: FIPSCertificate, cert_candidate_id: str) -> bool: + candidate_dgst = fips_dgst(cert_candidate_id) + if candidate_dgst not in self.certs or not cert_candidate_id.isdecimal(): return False # "< number" still needs to be used, because of some old certs being revalidated - if int(cert_candidate) < config.smallest_certificate_id_to_connect or self._compare_certs( - processed_cert, cert_candidate + if int(cert_candidate_id) < config.smallest_certificate_id_to_connect or self._compare_certs( + processed_cert, cert_candidate_id ): return False if self.algorithms is None: raise RuntimeError("Dataset was probably not built correctly - this should not be happening.") - if cert_candidate not in self.algorithms.certs: + if cert_candidate_id not in self.algorithms.certs: return True for cert_alg in processed_cert.heuristics.algorithms: for certificate in cert_alg["Certificate"]: curr_id = "".join(filter(str.isdigit, certificate)) - if curr_id == cert_candidate: + if curr_id == cert_candidate_id: return False - algs = self.algorithms.certs[cert_candidate] + algs = self.algorithms.certs[candidate_dgst] for current_alg in algs: if current_alg.vendor is None or processed_cert.web_scan.vendor is None: raise RuntimeError("Dataset was probably not built correctly - this should not be happening.") @@ -476,8 +480,8 @@ class FIPSDataset(Dataset, ComplexSerializableType): self.compute_cpe_heuristics() self.compute_related_cves(use_nist_cpe_matching_dict=use_nist_cpe_matching_dict) - def _highlight_vendor_in_dot(self, dot: Digraph, current_key: str, highlighted_vendor: str): - current_cert = self.certs[current_key] + def _highlight_vendor_in_dot(self, dot: Digraph, current_dgst: str, highlighted_vendor: str): + current_cert = self.certs[current_dgst] if current_cert.web_scan.vendor != highlighted_vendor: return @@ -488,24 +492,24 @@ class FIPSDataset(Dataset, ComplexSerializableType): if current_cert.web_scan.status == "Historical": dot.attr("node", color="gold3") - def _add_colored_node(self, dot: Digraph, current_key: str, highlighted_vendor: str): - current_cert = self.certs[current_key] + def _add_colored_node(self, dot: Digraph, current_dgst: str, highlighted_vendor: str): + current_cert = self.certs[current_dgst] dot.attr("node", color="lightgreen") if current_cert.web_scan.status == "Revoked": dot.attr("node", color="lightgrey") if current_cert.web_scan.status == "Historical": dot.attr("node", color="gold") - self._highlight_vendor_in_dot(dot, current_key, highlighted_vendor) + self._highlight_vendor_in_dot(dot, current_dgst, highlighted_vendor) dot.node( - current_key, - label=current_key + " " + current_cert.web_scan.vendor + str(current_cert.cert_id), + label=str(current_cert.cert_id) + " " + current_cert.web_scan.vendor if current_cert.web_scan.vendor is not None else "" + " " + (current_cert.web_scan.module_name if current_cert.web_scan.module_name else ""), ) - def _get_processed_list(self, connection_list: str, key: str): + def _get_processed_list(self, connection_list: str, dgst: str): attr = {"pdf": "pdf_scan", "web": "web_scan", "heuristics": "heuristics"}[connection_list] - return getattr(self.certs[key], attr).connections + return getattr(self.certs[dgst], attr).connections def get_dot_graph( self, @@ -537,6 +541,7 @@ class FIPSDataset(Dataset, ComplexSerializableType): for key in self.certs: cert = self.certs[key] + # TODO: What? How can a key from self.certs be "Not Found"? if key == "Not found" or not cert.state.file_status: continue @@ -558,6 +563,7 @@ class FIPSDataset(Dataset, ComplexSerializableType): for key in self.certs: cert = self.certs[key] + # TODO: What? How can a key from self.certs be "Not Found"? if key == "Not found" or not cert.state.file_status: continue processed = self._get_processed_list(connection_list, key) diff --git a/sec_certs/helpers.py b/sec_certs/helpers.py index 3637fe38..1cc60368 100644 --- a/sec_certs/helpers.py +++ b/sec_certs/helpers.py @@ -71,6 +71,10 @@ def download_parallel(items: Sequence[Tuple[str, Path]], num_threads: int) -> Se return responses +def fips_dgst(cert_id: Union[int, str]): + return get_first_16_bytes_sha256(str(cert_id)) + + def get_first_16_bytes_sha256(string: str) -> str: return hashlib.sha256(string.encode("utf-8")).hexdigest()[:16] diff --git a/sec_certs/sample/fips.py b/sec_certs/sample/fips.py index b66d11df..5c47c4d7 100644 --- a/sec_certs/sample/fips.py +++ b/sec_certs/sample/fips.py @@ -16,7 +16,7 @@ from sec_certs.cert_rules import REGEXEC_SEP, fips_common_rules, fips_rules from sec_certs.config.configuration import config from sec_certs.constants import LINE_SEPARATOR from sec_certs.dataset.cpe import CPEDataset -from sec_certs.helpers import load_cert_file, normalize_match_string, save_modified_cert_file +from sec_certs.helpers import fips_dgst, load_cert_file, normalize_match_string, save_modified_cert_file from sec_certs.model.cpe_matching import CPEClassifier from sec_certs.sample.certificate import Certificate, logger from sec_certs.sample.cpe import CPE @@ -193,7 +193,7 @@ class FIPSCertificate(Certificate, ComplexSerializableType): @property def dgst(self) -> str: - return self.cert_id + return fips_dgst(self.cert_id) @property def label_studio_title(self): @@ -219,7 +219,7 @@ class FIPSCertificate(Certificate, ComplexSerializableType): def __init__( self, - cert_id: str, + cert_id: int, web_scan: "FIPSCertificate.WebScan", pdf_scan: "FIPSCertificate.PdfScan", heuristics: "FIPSCertificate.FIPSHeuristics", @@ -440,7 +440,7 @@ class FIPSCertificate(Certificate, ComplexSerializableType): } if not initialized: items_found = FIPSCertificate.initialize_dictionary() - items_found["cert_id"] = file.stem + items_found["cert_id"] = int(file.stem) else: items_found = initialized.web_scan.__dict__ items_found["cert_id"] = initialized.cert_id @@ -454,7 +454,7 @@ class FIPSCertificate(Certificate, ComplexSerializableType): if redo: items_found = FIPSCertificate.initialize_dictionary() - items_found["cert_id"] = file.stem + items_found["cert_id"] = int(file.stem) text = helpers.load_cert_html_file(file) soup = BeautifulSoup(text, "html.parser") diff --git a/tests/test_fips_oop.py b/tests/test_fips_oop.py index 17b3325a..bb592e08 100644 --- a/tests/test_fips_oop.py +++ b/tests/test_fips_oop.py @@ -8,6 +8,7 @@ import tests.data.test_fips_oop from sec_certs.config.configuration import config from sec_certs.dataset.fips import FIPSDataset from sec_certs.dataset.fips_algorithm import FIPSAlgorithmDataset +from sec_certs.helpers import fips_dgst from tests.fips_test_utils import generate_html @@ -112,84 +113,88 @@ class TestFipsOOP(TestCase): with TemporaryDirectory() as tmp_dir: dataset = _set_up_dataset_for_full(tmp_dir, certs, self.cpe_dset_path, self.cve_dset_path) - self.assertEqual(set(dataset.certs["3095"].heuristics.connections), {"3093", "3096", "3094"}) - self.assertEqual(set(dataset.certs["3651"].heuristics.connections), {"3615"}) - self.assertEqual(set(dataset.certs["3093"].heuristics.connections), {"3090", "3091"}) - self.assertEqual(set(dataset.certs["3090"].heuristics.connections), {"3089"}) + self.assertEqual(set(dataset.certs[fips_dgst("3095")].heuristics.connections), {"3093", "3096", "3094"}) + self.assertEqual(set(dataset.certs[fips_dgst("3651")].heuristics.connections), {"3615"}) + self.assertEqual(set(dataset.certs[fips_dgst("3093")].heuristics.connections), {"3090", "3091"}) + self.assertEqual(set(dataset.certs[fips_dgst("3090")].heuristics.connections), {"3089"}) self.assertEqual( - set(dataset.certs["3197"].heuristics.connections), {x for x in ["3195", "3096", "3196", "3644", "3651"]} + set(dataset.certs[fips_dgst("3197")].heuristics.connections), + {x for x in ["3195", "3096", "3196", "3644", "3651"]}, ) self.assertEqual( - set(dataset.certs["3196"].heuristics.connections), {x for x in ["3194", "3091", "3480", "3615"]} + set(dataset.certs[fips_dgst("3196")].heuristics.connections), + {x for x in ["3194", "3091", "3480", "3615"]}, ) - self.assertEqual(set(dataset.certs["3089"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["3195"].heuristics.connections), {"3194", "3091", "3480"}) - self.assertEqual(set(dataset.certs["3480"].heuristics.connections), {"3089"}) - self.assertEqual(set(dataset.certs["3615"].heuristics.connections), {"3089"}) - self.assertEqual(set(dataset.certs["3194"].heuristics.connections), {"3089"}) - self.assertEqual(set(dataset.certs["3091"].heuristics.connections), {"3089"}) - self.assertEqual(set(dataset.certs["3690"].heuristics.connections), {"3644", "3196", "3651"}) - self.assertEqual(set(dataset.certs["3644"].heuristics.connections), {"3615"}) - self.assertEqual(set(dataset.certs["3527"].heuristics.connections), {"3090", "3091"}) - self.assertEqual(set(dataset.certs["3094"].heuristics.connections), {"3090", "3091"}) - self.assertEqual(set(dataset.certs["3544"].heuristics.connections), {"3093", "3096", "3527"}) - self.assertEqual(set(dataset.certs["3096"].heuristics.connections), {"3090", "3194", "3091", "3480"}) + self.assertEqual(set(dataset.certs[fips_dgst("3089")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("3195")].heuristics.connections), {"3194", "3091", "3480"}) + self.assertEqual(set(dataset.certs[fips_dgst("3480")].heuristics.connections), {"3089"}) + self.assertEqual(set(dataset.certs[fips_dgst("3615")].heuristics.connections), {"3089"}) + self.assertEqual(set(dataset.certs[fips_dgst("3194")].heuristics.connections), {"3089"}) + self.assertEqual(set(dataset.certs[fips_dgst("3091")].heuristics.connections), {"3089"}) + self.assertEqual(set(dataset.certs[fips_dgst("3690")].heuristics.connections), {"3644", "3196", "3651"}) + self.assertEqual(set(dataset.certs[fips_dgst("3644")].heuristics.connections), {"3615"}) + self.assertEqual(set(dataset.certs[fips_dgst("3527")].heuristics.connections), {"3090", "3091"}) + self.assertEqual(set(dataset.certs[fips_dgst("3094")].heuristics.connections), {"3090", "3091"}) + self.assertEqual(set(dataset.certs[fips_dgst("3544")].heuristics.connections), {"3093", "3096", "3527"}) self.assertEqual( - set(dataset.certs["3092"].heuristics.connections), {"3093", "3195", "3096", "3644", "3651"} + set(dataset.certs[fips_dgst("3096")].heuristics.connections), {"3090", "3194", "3091", "3480"} + ) + self.assertEqual( + set(dataset.certs[fips_dgst("3092")].heuristics.connections), {"3093", "3195", "3096", "3644", "3651"} ) def test_connections_redhat(self): certs = self.certs_to_parse["redhat"] with TemporaryDirectory() as tmp_dir: dataset = _set_up_dataset_for_full(tmp_dir, certs, self.cpe_dset_path, self.cve_dset_path) - self.assertEqual(set(dataset.certs["2630"].heuristics.connections), {"2441"}) - self.assertEqual(set(dataset.certs["2633"].heuristics.connections), {"2441"}) - self.assertEqual(set(dataset.certs["2441"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["2997"].heuristics.connections), {"2711"}) - self.assertEqual(set(dataset.certs["2446"].heuristics.connections), {"2441"}) - self.assertEqual(set(dataset.certs["2447"].heuristics.connections), {"2441"}) - self.assertEqual(set(dataset.certs["3733"].heuristics.connections), {"2441"}) - self.assertEqual(set(dataset.certs["2441"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["2711"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["2908"].heuristics.connections), {"2711"}) - self.assertEqual(set(dataset.certs["3613"].heuristics.connections), {"2997"}) - self.assertEqual(set(dataset.certs["2721"].heuristics.connections), {"2441", "2711"}) - self.assertEqual(set(dataset.certs["2798"].heuristics.connections), {"2721", "2711"}) - self.assertEqual(set(dataset.certs["2711"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["2997"].heuristics.connections), {"2711"}) - self.assertEqual(set(dataset.certs["2742"].heuristics.connections), {"2721", "2711"}) - self.assertEqual(set(dataset.certs["2721"].heuristics.connections), {"2441", "2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2630")].heuristics.connections), {"2441"}) + self.assertEqual(set(dataset.certs[fips_dgst("2633")].heuristics.connections), {"2441"}) + self.assertEqual(set(dataset.certs[fips_dgst("2441")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("2997")].heuristics.connections), {"2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2446")].heuristics.connections), {"2441"}) + self.assertEqual(set(dataset.certs[fips_dgst("2447")].heuristics.connections), {"2441"}) + self.assertEqual(set(dataset.certs[fips_dgst("3733")].heuristics.connections), {"2441"}) + self.assertEqual(set(dataset.certs[fips_dgst("2441")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("2711")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("2908")].heuristics.connections), {"2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("3613")].heuristics.connections), {"2997"}) + self.assertEqual(set(dataset.certs[fips_dgst("2721")].heuristics.connections), {"2441", "2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2798")].heuristics.connections), {"2721", "2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2711")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("2997")].heuristics.connections), {"2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2742")].heuristics.connections), {"2721", "2711"}) + self.assertEqual(set(dataset.certs[fips_dgst("2721")].heuristics.connections), {"2441", "2711"}) def test_docusign_chunk(self): certs = self.certs_to_parse["docusign"] with TemporaryDirectory() as tmp_dir: dataset = _set_up_dataset_for_full(tmp_dir, certs, self.cpe_dset_path, self.cve_dset_path) - self.assertEqual(set(dataset.certs["3850"].heuristics.connections), {"3518", "1883"}) - self.assertEqual(set(dataset.certs["2779"].heuristics.connections), {"1883"}) - self.assertEqual(set(dataset.certs["2860"].heuristics.connections), {"1883"}) - self.assertEqual(set(dataset.certs["2665"].heuristics.connections), {"1883"}) - self.assertEqual(set(dataset.certs["1883"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["3518"].heuristics.connections), {"1883"}) - self.assertEqual(set(dataset.certs["3141"].heuristics.connections), {"1883"}) - self.assertEqual(set(dataset.certs["2590"].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("3850")].heuristics.connections), {"3518", "1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("2779")].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("2860")].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("2665")].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("1883")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("3518")].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("3141")].heuristics.connections), {"1883"}) + self.assertEqual(set(dataset.certs[fips_dgst("2590")].heuristics.connections), {"1883"}) def test_openssl_chunk(self): certs = self.certs_to_parse["referencing_openssl"] with TemporaryDirectory() as tmp_dir: dataset = _set_up_dataset_for_full(tmp_dir, certs, self.cpe_dset_path, self.cve_dset_path) - self.assertEqual(set(dataset.certs["3493"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3495"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3711"].heuristics.connections), {"3220"}) - self.assertEqual(set(dataset.certs["3176"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3488"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3126"].heuristics.connections), {"3126", "2398"}) - self.assertEqual(set(dataset.certs["3269"].heuristics.connections), {"3269", "3220"}) - self.assertEqual(set(dataset.certs["3524"].heuristics.connections), {"3220"}) - self.assertEqual(set(dataset.certs["3220"].heuristics.connections), {"3220", "2398"}) - self.assertEqual(set(dataset.certs["2398"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["3543"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["2676"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3313"].heuristics.connections), {"3313", "3220"}) - self.assertEqual(set(dataset.certs["3363"].heuristics.connections), set()) - self.assertEqual(set(dataset.certs["3608"].heuristics.connections), {"2398"}) - self.assertEqual(set(dataset.certs["3158"].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3493")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3495")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3711")].heuristics.connections), {"3220"}) + self.assertEqual(set(dataset.certs[fips_dgst("3176")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3488")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3126")].heuristics.connections), {"3126", "2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3269")].heuristics.connections), {"3269", "3220"}) + self.assertEqual(set(dataset.certs[fips_dgst("3524")].heuristics.connections), {"3220"}) + self.assertEqual(set(dataset.certs[fips_dgst("3220")].heuristics.connections), {"3220", "2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("2398")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("3543")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("2676")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3313")].heuristics.connections), {"3313", "3220"}) + self.assertEqual(set(dataset.certs[fips_dgst("3363")].heuristics.connections), set()) + self.assertEqual(set(dataset.certs[fips_dgst("3608")].heuristics.connections), {"2398"}) + self.assertEqual(set(dataset.certs[fips_dgst("3158")].heuristics.connections), {"2398"}) |
