aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/sec_certs/utils
diff options
context:
space:
mode:
authoradamjanovsky2022-12-09 17:10:19 +0100
committerGitHub2022-12-09 17:10:19 +0100
commit73b3b0c361f9545450fa188bec50606d64bb1afd (patch)
tree0a1f9034c309ba88e5f72a31634b014c23a57df5 /src/sec_certs/utils
parent19338dc9fd9ab257c36cfa277994abe202e97de2 (diff)
downloadsec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.gz
sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.zst
sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.zip
flat -> src layout (#294)
- Some mypy fixes - Flat layout -> src layout - Ditch `setup.py` and `setup.cfg` in favour of `pyproject.toml` - Non-pinned requirements moved from `requirements/*.in` to `pyproject.toml`
Diffstat (limited to 'src/sec_certs/utils')
-rw-r--r--src/sec_certs/utils/__init__.py0
-rw-r--r--src/sec_certs/utils/extract.py817
-rw-r--r--src/sec_certs/utils/helpers.py239
-rw-r--r--src/sec_certs/utils/pandas.py542
-rw-r--r--src/sec_certs/utils/parallel_processing.py43
-rw-r--r--src/sec_certs/utils/pdf.py275
-rw-r--r--src/sec_certs/utils/sanitization.py51
-rw-r--r--src/sec_certs/utils/tables.py62
-rw-r--r--src/sec_certs/utils/tqdm.py9
9 files changed, 2038 insertions, 0 deletions
diff --git a/src/sec_certs/utils/__init__.py b/src/sec_certs/utils/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/sec_certs/utils/__init__.py
diff --git a/src/sec_certs/utils/extract.py b/src/sec_certs/utils/extract.py
new file mode 100644
index 00000000..09460fb7
--- /dev/null
+++ b/src/sec_certs/utils/extract.py
@@ -0,0 +1,817 @@
+from __future__ import annotations
+
+import logging
+import os
+import re
+from collections import Counter
+from enum import Enum
+from pathlib import Path
+from typing import Any, Iterator
+
+import numpy as np
+
+from sec_certs import constants as constants
+from sec_certs.cert_rules import REGEXEC_SEP, cc_rules
+from sec_certs.constants import FILE_ERRORS_STRATEGY, LINE_SEPARATOR, MAX_ALLOWED_MATCH_LENGTH
+
+logger = logging.getLogger(__name__)
+
+
+def search_only_headers_anssi(filepath: Path): # noqa: C901
+ # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!!
+ class HEADER_TYPE(Enum):
+ HEADER_FULL = 1
+ HEADER_MISSING_CERT_ITEM_VERSION = 2
+ HEADER_MISSING_PROTECTION_PROFILES = 3
+ HEADER_DUPLICITIES = 4
+
+ rules_certificate_preface = [
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)()Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom des produits(.+)Référence/version des produits(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur\\(s\\)(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom des produits(.+)Référence/version des produits(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à des profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur\\(s\\)(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à des profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification Report(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profisl de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centres d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Versions du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Mutual Recognition Agreements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer\\(s\\)(.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Products names(.+)Products references(.+)protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)Product name \\(reference / version\\)(.+)TOE name \\(reference / version\\)(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ (
+ HEADER_TYPE.HEADER_FULL,
+ "Certification report reference(.+)TOE name(.+)Product's reference/ version(.+)TOE's reference/ version(.+)Conformité à un profil de protection(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Recognition arrangements",
+ ),
+ # corrupted text (duplicities)
+ (
+ HEADER_TYPE.HEADER_DUPLICITIES,
+ "Référencce du rapport de d certification n(.+)Nom du p produit(.+)Référencce/version du produit(.+)Conformiité à un profil de d protection(.+)Critères d d’évaluation ett version(.+)Niveau d’’évaluation(.+)Développ peurs(.+)Centre d’’évaluation(.+)Accords d de reconnaisssance applicab bles",
+ ),
+ # rules without product version
+ (
+ HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION,
+ "Référence du rapport de certification(.+)Nom et version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION,
+ "Référence du rapport de certification(.+)Nom et version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ (
+ HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ # rules without protection profile
+ (
+ HEADER_TYPE.HEADER_MISSING_PROTECTION_PROFILES,
+ "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables",
+ ),
+ ]
+
+ # statistics about rules success rate
+ num_rules_hits = {}
+ for rule in rules_certificate_preface:
+ num_rules_hits[rule[1]] = 0
+
+ items_found = {} # type: ignore # noqa
+
+ try:
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath)
+
+ # for ANSII and DCSSI certificates, front page starts only on third page after 2 newpage signs
+ pos = whole_text.find(" ")
+ if pos != -1:
+ pos = whole_text.find(" ", pos)
+ if pos != -1:
+ whole_text = whole_text[pos:]
+
+ no_match_yet = True
+ other_rule_already_match = False
+ rule_index = -1
+ for rule in rules_certificate_preface:
+ rule_index += 1
+ rule_and_sep = rule[1] + REGEXEC_SEP
+
+ for m in re.finditer(rule_and_sep, whole_text):
+ if no_match_yet:
+ items_found[constants.TAG_HEADER_MATCH_RULES] = []
+ no_match_yet = False
+
+ # insert rule if at least one match for it was found
+ if rule not in items_found[constants.TAG_HEADER_MATCH_RULES]:
+ items_found[constants.TAG_HEADER_MATCH_RULES].append(rule[1])
+
+ if not other_rule_already_match:
+ other_rule_already_match = True
+ else:
+ logger.warning(f"WARNING: multiple rules are matching same certification document: {filepath}")
+
+ num_rules_hits[rule[1]] += 1 # add hit to this rule
+ match_groups = m.groups()
+ index_next_item = 0
+ items_found[constants.TAG_CERT_ID] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ items_found[constants.TAG_CERT_ITEM] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ if rule[0] == HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION:
+ items_found[constants.TAG_CERT_ITEM_VERSION] = ""
+ else:
+ items_found[constants.TAG_CERT_ITEM_VERSION] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ if rule[0] == HEADER_TYPE.HEADER_MISSING_PROTECTION_PROFILES:
+ items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = ""
+ else:
+ items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = normalize_match_string(
+ match_groups[index_next_item]
+ )
+ index_next_item += 1
+
+ items_found[constants.TAG_CC_VERSION] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ items_found[constants.TAG_CC_SECURITY_LEVEL] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ items_found[constants.TAG_DEVELOPER] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+
+ items_found[constants.TAG_CERT_LAB] = normalize_match_string(match_groups[index_next_item])
+ index_next_item += 1
+ except Exception as e:
+ relative_filepath = "/".join(str(filepath).split("/")[-4:])
+ error_msg = f"Failed to parse ANSSI frontpage headers from {relative_filepath}; {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ # if True:
+ # print('# hits for rule')
+ # sorted_rules = sorted(num_rules_hits.items(),
+ # key=operator.itemgetter(1), reverse=True)
+ # used_rules = []
+ # for rule in sorted_rules:
+ # print('{:4d} : {}'.format(rule[1], rule[0]))
+ # if rule[1] > 0:
+ # used_rules.append(rule[0])
+
+ return constants.RETURNCODE_OK, items_found
+
+
+def search_only_headers_bsi(filepath: Path): # noqa: C901
+ # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!!
+ LINE_SEPARATOR_STRICT = " "
+ NUM_LINES_TO_INVESTIGATE = 15
+ rules_certificate_preface = [
+ "(BSI-DSZ-CC-.+?) (?:for|For) (.+?) from (.*)",
+ "(BSI-DSZ-CC-.+?) zu (.+?) der (.*)",
+ ]
+
+ items_found = {} # type: ignore # noqa
+ no_match_yet = True
+
+ try:
+ # Process front page with info: cert_id, certified_item and developer
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(
+ filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT
+ )
+
+ for rule in rules_certificate_preface:
+ rule_and_sep = rule + REGEXEC_SEP
+
+ for m in re.finditer(rule_and_sep, whole_text):
+ if no_match_yet:
+ items_found[constants.TAG_HEADER_MATCH_RULES] = []
+ no_match_yet = False
+
+ # insert rule if at least one match for it was found
+ if rule not in items_found[constants.TAG_HEADER_MATCH_RULES]:
+ items_found[constants.TAG_HEADER_MATCH_RULES].append(rule)
+
+ match_groups = m.groups()
+ cert_id = match_groups[0]
+ certified_item = match_groups[1]
+ developer = match_groups[2]
+
+ FROM_KEYWORD_LIST = [" from ", " der "]
+ for from_keyword in FROM_KEYWORD_LIST:
+ from_keyword_len = len(from_keyword)
+ if certified_item.find(from_keyword) != -1:
+ logger.warning(
+ f"string {from_keyword} detected in certified item - shall not be here, fixing..."
+ )
+ certified_item_first = certified_item[: certified_item.find(from_keyword)]
+ developer = certified_item[certified_item.find(from_keyword) + from_keyword_len :]
+ certified_item = certified_item_first
+ continue
+
+ end_pos = developer.find("\f-")
+ if end_pos == -1:
+ end_pos = developer.find("\fBSI")
+ if end_pos == -1:
+ end_pos = developer.find("Bundesamt")
+ if end_pos != -1:
+ developer = developer[:end_pos]
+
+ items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id)
+ items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item)
+ items_found[constants.TAG_DEVELOPER] = normalize_match_string(developer)
+ items_found[constants.TAG_CERT_LAB] = "BSI"
+
+ # Process page with more detailed sample info
+ # PP Conformance, Functionality, Assurance
+ rules_certificate_third = ["PP Conformance: (.+)Functionality: (.+)Assurance: (.+)The IT Product identified"]
+
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath)
+
+ for rule in rules_certificate_third:
+ rule_and_sep = rule + REGEXEC_SEP
+
+ for m in re.finditer(rule_and_sep, whole_text):
+ # check if previous rules had at least one match
+ if constants.TAG_CERT_ID not in items_found.keys():
+ logger.error(f"ERROR: front page not found for file: {filepath}")
+
+ match_groups = m.groups()
+ ref_protection_profiles = match_groups[0]
+ cc_version = match_groups[1]
+ cc_security_level = match_groups[2]
+
+ items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = normalize_match_string(
+ ref_protection_profiles
+ )
+ items_found[constants.TAG_CC_VERSION] = normalize_match_string(cc_version)
+ items_found[constants.TAG_CC_SECURITY_LEVEL] = normalize_match_string(cc_security_level)
+
+ # print('\n*** Certificates without detected preface:')
+ # for file_name in files_without_match:
+ # print('No hits for {}'.format(file_name))
+ # print('Total no hits files: {}'.format(len(files_without_match)))
+ # print('\n**********************************')
+ except Exception as e:
+ relative_filepath = "/".join(str(filepath).split("/")[-4:])
+ error_msg = f"Failed to parse BSI headers from frontpage: {relative_filepath}; {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ return constants.RETURNCODE_OK, items_found
+
+
+def search_only_headers_nscib(filepath: Path): # noqa: C901
+ # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!!
+ LINE_SEPARATOR_STRICT = " "
+ NUM_LINES_TO_INVESTIGATE = 60
+ items_found: dict[str, str] = {}
+
+ try:
+ # Process front page with info: cert_id, certified_item and developer
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(
+ filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT
+ )
+
+ certified_item = ""
+ developer = ""
+ cert_lab = ""
+ cert_id = ""
+
+ lines = whole_text_with_newlines.splitlines()
+ no_match_yet = True
+ item_offset = -1
+
+ for line_index in range(0, len(lines)):
+ line = lines[line_index]
+
+ if "Certification Report" in line:
+ item_offset = line_index + 1
+ if "Assurance Continuity Maintenance Report" in line:
+ item_offset = line_index + 1
+
+ SPONSORDEVELOPER_STR = "Sponsor and developer:"
+
+ if SPONSORDEVELOPER_STR in line:
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+
+ # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report'
+ certified_item = ""
+ for name_index in range(item_offset, line_index):
+ certified_item += lines[name_index] + " "
+ developer = line[line.find(SPONSORDEVELOPER_STR) + len(SPONSORDEVELOPER_STR) :]
+
+ SPONSOR_STR = "Sponsor:"
+
+ if SPONSOR_STR in line:
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+
+ # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report'
+ certified_item = ""
+ for name_index in range(item_offset, line_index):
+ certified_item += lines[name_index] + " "
+
+ DEVELOPER_STR = "Developer:"
+ if DEVELOPER_STR in line:
+ developer = line[line.find(DEVELOPER_STR) + len(DEVELOPER_STR) :]
+
+ CERTLAB_STR = "Evaluation facility:"
+ if CERTLAB_STR in line:
+ cert_lab = line[line.find(CERTLAB_STR) + len(CERTLAB_STR) :]
+
+ REPORTNUM_STR = "Report number:"
+ if REPORTNUM_STR in line:
+ cert_id = line[line.find(REPORTNUM_STR) + len(REPORTNUM_STR) :]
+
+ if not no_match_yet:
+ items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id)
+ items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item)
+ items_found[constants.TAG_DEVELOPER] = normalize_match_string(developer)
+ items_found[constants.TAG_CERT_LAB] = cert_lab
+
+ except Exception as e:
+ error_msg = f"Failed to parse NSCIB headers from frontpage: {filepath}; {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ return constants.RETURNCODE_OK, items_found
+
+
+def search_only_headers_niap(filepath: Path):
+ # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!!
+ LINE_SEPARATOR_STRICT = " "
+ NUM_LINES_TO_INVESTIGATE = 15
+ items_found: dict[str, str] = {}
+
+ try:
+ # Process front page with info: cert_id, certified_item and developer
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(
+ filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT
+ )
+
+ certified_item = ""
+ cert_id = ""
+
+ lines = whole_text_with_newlines.splitlines()
+ no_match_yet = True
+ item_offset = -1
+
+ for line_index in range(0, len(lines)):
+ line = lines[line_index]
+
+ if "Validation Report" in line:
+ item_offset = line_index + 1
+
+ REPORTNUM_STR = "Report Number:"
+ if REPORTNUM_STR in line:
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+
+ # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report'
+ certified_item = ""
+ for name_index in range(item_offset, line_index):
+ certified_item += lines[name_index] + " "
+ cert_id = line[line.find(REPORTNUM_STR) + len(REPORTNUM_STR) :]
+ break
+
+ if not no_match_yet:
+ items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id)
+ items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item)
+ items_found[constants.TAG_CERT_LAB] = "US NIAP"
+
+ except Exception as e:
+ error_msg = f"Failed to parse NIAP headers from frontpage: {filepath}; {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ return constants.RETURNCODE_OK, items_found
+
+
+def search_only_headers_canada(filepath: Path): # noqa: C901
+ # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!!
+ LINE_SEPARATOR_STRICT = " "
+ NUM_LINES_TO_INVESTIGATE = 20
+ items_found: dict[str, str] = {}
+ try:
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(
+ filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT
+ )
+
+ cert_id = ""
+
+ lines = whole_text_with_newlines.splitlines()
+ no_match_yet = True
+ for line_index in range(0, len(lines)):
+ line = lines[line_index]
+ if "Government of Canada, Communications Security Establishment" in line:
+ REPORTNUM_STR1 = "Evaluation number:"
+ REPORTNUM_STR2 = "Document number:"
+ matched_number_str = ""
+ line_certid = lines[line_index + 1]
+ if line_certid.startswith(REPORTNUM_STR1):
+ matched_number_str = REPORTNUM_STR1
+ if line_certid.startswith(REPORTNUM_STR2):
+ matched_number_str = REPORTNUM_STR2
+ if matched_number_str != "":
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+
+ cert_id = line_certid[line_certid.find(matched_number_str) + len(matched_number_str) :]
+ break
+
+ if (
+ "Government of Canada. This document is the property of the Government of Canada. It shall not be altered,"
+ in line
+ ):
+ REPORTNUM_STR = "Evaluation number:"
+ for offset in range(1, 20):
+ line_certid = lines[line_index + offset]
+ if "UNCLASSIFIED" in line_certid:
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+ line_certid = lines[line_index + offset - 4]
+ cert_id = line_certid[line_certid.find(REPORTNUM_STR) + len(REPORTNUM_STR) :]
+ break
+ if not no_match_yet:
+ break
+
+ if (
+ "UNCLASSIFIED / NON CLASSIFIÉ" in line
+ and "COMMON CRITERIA CERTIFICATION REPORT" in lines[line_index + 2]
+ ):
+ line_certid = lines[line_index + 1]
+ if no_match_yet:
+ items_found = {}
+ no_match_yet = False
+ cert_id = line_certid
+ break
+
+ if not no_match_yet and cert_id:
+ items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id)
+ items_found[constants.TAG_CERT_LAB] = "CANADA"
+
+ except Exception as e:
+ error_msg = f"Failed to parse Canada headers from frontpage: {filepath}; {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ return constants.RETURNCODE_OK, items_found
+
+
+def search_files(folder: str | Path) -> Iterator[str]:
+ for root, _, files in os.walk(str(folder)):
+ yield from [os.path.join(root, x) for x in files]
+
+
+def flatten_matches(dct: dict) -> dict:
+ """
+ Function to flatten dictionary of matches.
+
+ Turns
+ ```
+ {"a": {"cc": 3}, "b": {}, "d": {"dd": 4, "cc": 2}}
+ ```
+ into
+ ```
+ {"cc": 5, "dd": 4}
+ ```
+
+ :param dct: Dictionary to flatten
+ :return: Flattened dictionary
+ """
+ result: Counter[Any] = Counter()
+ for key, value in dct.items():
+ if isinstance(value, dict):
+ result.update(flatten_matches(value))
+ else:
+ result[key] = value
+ return dict(result)
+
+
+def prune_matches(dct: dict) -> dict:
+ """
+ Prune a dictionary of matches.
+
+ Turns
+ ```
+ {"a": {"cc": 3}, "b": {"aa": {}, "bb": {}}, "d": {"dd": 4, "cc": 2}}
+ ```
+ into
+ ```
+ {"a": {"cc": 3}, "b": {}, "d": {"dd": 4, "cc": 2}}
+ ```
+
+ :param dct: The dictionary of matches.
+ :return: The pruned dictionary.
+ """
+
+ def walk(obj, depth):
+ if isinstance(obj, dict):
+ if not obj:
+ return None
+ res = {}
+ for k, v in obj.items():
+ r = walk(v, depth + 1)
+ if r is not None:
+ res[k] = r
+ return res if res or depth == 1 else None
+ else:
+ return obj
+
+ return walk(dct, 0)
+
+
+def extract_keywords(filepath: Path, search_rules) -> dict[str, dict[str, int]] | None:
+ """
+ Extract keywords from filepath using the search rules.
+
+ :param filepath:
+ :param search_rules:
+ :return:
+ """
+
+ try:
+ whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath, -1, LINE_SEPARATOR)
+
+ def extract(rules):
+ if isinstance(rules, dict):
+ return {k: extract(v) for k, v in rules.items()}
+ elif isinstance(rules, list):
+ matches = [extract(rule) for rule in rules]
+ c = Counter()
+ for match_list in matches:
+ c += Counter(match_list)
+ return dict(c)
+ elif isinstance(rules, re.Pattern):
+ rule = rules
+ matches = []
+ for match in rule.finditer(whole_text):
+ match = match.group("match")
+ match = normalize_match_string(match)
+
+ match_len = len(match)
+ if match_len > MAX_ALLOWED_MATCH_LENGTH:
+ logger.warning(f"Excessive match with length of {match_len} detected for rule {rule.pattern}")
+ matches.append(match)
+ return matches
+
+ result = extract(search_rules)
+ return prune_matches(result)
+ except Exception as e:
+ relative_filepath = "/".join(str(filepath).split("/")[-4:])
+ error_msg = f"Failed to parse keywords from: {relative_filepath}; {e}"
+ logger.error(error_msg)
+ return None
+
+
+def normalize_match_string(match: str) -> str:
+ match = match.strip().strip("[];.”\"':)(,").rstrip(os.sep).replace(" ", " ")
+ return "".join(filter(str.isprintable, match))
+
+
+def load_text_file(
+ file_name: str | Path, limit_max_lines: int = -1, line_separator: str = LINE_SEPARATOR
+) -> tuple[str, str, bool]:
+ """
+ Load the text contents of a file at `file_name`, upto `limit_max_lines` of lines, replace
+ newlines in the text with `line_separator`.
+
+ :param file_name: The file_name to load.
+ :param limit_max_lines: The limit on number of lines to return.
+ :param line_separator: The string to replace newlines with.
+ :return: A tuple of three elements (the text with replaced newlines, the text and a boolean whether a unicode
+ decoding error happened).
+ """
+ lines = []
+ was_unicode_decode_error = False
+ with Path(file_name).open("r", errors=FILE_ERRORS_STRATEGY) as f:
+ try:
+ lines = f.readlines()
+ except UnicodeDecodeError:
+ was_unicode_decode_error = True
+ logger.warning("UnicodeDecodeError, opening as utf8")
+
+ if was_unicode_decode_error:
+ with open(file_name, encoding="utf8", errors=FILE_ERRORS_STRATEGY) as f2:
+ # coding failure, try line by line
+ line = " "
+ while line:
+ try:
+ line = f2.readline()
+ lines.append(line)
+ except UnicodeDecodeError:
+ # ignore error
+ continue
+
+ whole_text = ""
+ whole_text_with_newlines = ""
+ lines_included = 0
+ for line in lines:
+ if limit_max_lines != -1 and lines_included >= limit_max_lines:
+ break
+
+ whole_text_with_newlines += line
+ line = line.replace("\n", "")
+ whole_text += line
+ whole_text += line_separator
+ lines_included += 1
+
+ return whole_text, whole_text_with_newlines, was_unicode_decode_error
+
+
+def load_cert_html_file(file_name: str) -> str:
+ with open(file_name, errors=FILE_ERRORS_STRATEGY) as f:
+ try:
+ return f.read()
+ except UnicodeDecodeError:
+ logger.warning("UnicodeDecodeError, opening as utf8")
+
+ with open(file_name, encoding="utf8", errors=FILE_ERRORS_STRATEGY) as f2:
+ try:
+ return f2.read()
+ except UnicodeDecodeError:
+ logger.error(f"Failed to read file {file_name}")
+ return ""
+
+
+def rules_get_subset(desired_path: str) -> dict:
+ """
+ Recursively applies cc_certs.get(key) on tokens from desired_path,
+ returns the keys of the inner-most layer.
+ """
+ dct = cc_rules
+ for token in desired_path.split("."):
+ dct = dct[token]
+ return dct
+
+
+def extract_key_paths(dct: dict, current_path: str) -> list[str]:
+ """
+ Given subset of cc_rules dictionary, will compute full paths to all leafs
+ in the dictionaries, s.t. the final value of each path is a list of regex
+ matches in the keywords dictionary.
+ """
+ paths = []
+ for key in dct:
+ if isinstance(dct[key], dict):
+ paths.extend(extract_key_paths(dct[key], current_path + "." + key))
+ elif isinstance(dct[key], list):
+ paths.append(current_path + "." + key)
+ return paths
+
+
+def get_sum_of_values_from_dict_path(dct: dict | None, path: str, default: float = np.nan) -> float:
+ """
+ Given dictionary and path, will compute sum of occurences of values in the inner-most layer
+ of that path. If the key is missing from dict, return default value.
+ """
+ if not dct:
+ return np.nan
+
+ res = dct
+
+ try:
+ for token in path.split("."):
+ res = res[token]
+ except KeyError:
+ return default
+
+ return sum(res.values())
+
+
+def get_sums_for_rules_subset(dct: dict | None, path: str) -> dict[str, float]:
+ """
+ Given path to search in cc_rules (e.g., "symmetric_crypto"),
+ will get the finest resolution and count occurences of the keys in the
+ examined dictionary.
+ """
+ cc_rules_subset_to_search = rules_get_subset(path)
+ paths_to_search = extract_key_paths(cc_rules_subset_to_search, path)
+ return {x: get_sum_of_values_from_dict_path(dct, x, np.nan) for x in paths_to_search}
diff --git a/src/sec_certs/utils/helpers.py b/src/sec_certs/utils/helpers.py
new file mode 100644
index 00000000..302f4e6a
--- /dev/null
+++ b/src/sec_certs/utils/helpers.py
@@ -0,0 +1,239 @@
+from __future__ import annotations
+
+import hashlib
+import logging
+import re
+import time
+from contextlib import nullcontext
+from datetime import datetime
+from functools import partial
+from pathlib import Path
+from typing import Any, Collection
+
+import numpy as np
+import pkgconfig
+import requests
+
+import sec_certs.constants as constants
+from sec_certs.config.configuration import config
+from sec_certs.utils import parallel_processing
+from sec_certs.utils.tqdm import tqdm
+
+logger = logging.getLogger(__name__)
+
+
+def download_file(
+ url: str, output: Path, delay: float = 0, show_progress_bar: bool = False, progress_bar_desc: str | None = None
+) -> str | int:
+ try:
+ time.sleep(delay)
+ # See https://github.com/psf/requests/issues/3953 for header justification
+ r = requests.get(
+ url, allow_redirects=True, timeout=constants.REQUEST_TIMEOUT, stream=True, headers={"Accept-Encoding": None} # type: ignore
+ )
+ ctx: Any
+ if show_progress_bar:
+ ctx = partial(
+ tqdm,
+ total=int(r.headers.get("content-length", 0)),
+ unit="B",
+ unit_scale=True,
+ unit_divisor=1024,
+ desc=progress_bar_desc,
+ )
+ else:
+ ctx = nullcontext
+
+ if r.status_code == requests.codes.ok:
+ with ctx() as pbar:
+ with output.open("wb") as f:
+ for data in r.iter_content(1024):
+ f.write(data)
+ if show_progress_bar:
+ pbar.update(len(data))
+
+ return r.status_code
+ except requests.exceptions.Timeout:
+ return requests.codes.timeout
+ except Exception as e:
+ logger.error(f"Failed to download from {url}; {e}")
+ return constants.RETURNCODE_NOK
+ return constants.RETURNCODE_NOK
+
+
+def download_parallel(
+ urls: Collection[str], paths: Collection[Path], progress_bar_desc: str | None = None
+) -> list[int]:
+ exit_codes = parallel_processing.process_parallel(
+ download_file, list(zip(urls, paths)), config.n_threads, unpack=True, progress_bar_desc=progress_bar_desc
+ )
+ n_successful = len([e for e in exit_codes if e == requests.codes.ok])
+ logger.info(f"Successfully downloaded {n_successful} files, {len(exit_codes) - n_successful} failed.")
+
+ for url, e in zip(urls, exit_codes):
+ if e != requests.codes.ok:
+ logger.error(f"Failed to download {url}, exit code: {e}")
+
+ return exit_codes
+
+
+def fips_dgst(cert_id: int | str) -> str:
+ return get_first_16_bytes_sha256(str(cert_id))
+
+
+def get_first_16_bytes_sha256(string: str) -> str:
+ return hashlib.sha256(string.encode("utf-8")).hexdigest()[:16]
+
+
+def get_sha256_filepath(filepath: str | Path) -> str:
+ hash_sha256 = hashlib.sha256()
+ with Path(filepath).open("rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_sha256.update(chunk)
+ return hash_sha256.hexdigest()
+
+
+def to_utc(timestamp: datetime) -> datetime:
+ offset = timestamp.utcoffset()
+ if offset is None:
+ return timestamp
+ timestamp -= offset
+ timestamp = timestamp.replace(tzinfo=None)
+ return timestamp
+
+
+def is_in_dict(target_dict: dict, path: str) -> bool:
+ current_level = target_dict
+ for item in path:
+ if item not in current_level:
+ return False
+ else:
+ current_level = current_level[item]
+ return True
+
+
+def compute_heuristics_version(cert_name: str) -> set[str]:
+ """
+ Will extract possible versions from the name of sample
+ """
+ at_least_something = r"(\b(\d)+\b)"
+ just_numbers = r"(\d{1,5})(\.\d{1,5})"
+
+ without_version = r"(" + just_numbers + r"+)"
+ long_version = r"(" + r"(\bversion)\s*" + just_numbers + r"+)"
+ short_version = r"(" + r"\bv\s*" + just_numbers + r"+)"
+ full_regex_string = r"|".join([without_version, short_version, long_version])
+ normalizer = r"(\d+\.*)+"
+
+ matched_strings = [max(x, key=len) for x in re.findall(full_regex_string, cert_name, re.IGNORECASE)]
+ if not matched_strings:
+ matched_strings = [max(x, key=len) for x in re.findall(at_least_something, cert_name, re.IGNORECASE)]
+ # Only keep the first occurrence but keep order.
+ matches = []
+ for match in matched_strings:
+ if match not in matches:
+ matches.append(match)
+ # identified_versions = list(set([max(x, key=len) for x in re.findall(VERSION_PATTERN, cert_name, re.IGNORECASE | re.VERBOSE)]))
+ # return identified_versions if identified_versions else ['-']
+
+ if not matches:
+ return {constants.CPE_VERSION_NA}
+
+ matched = [re.search(normalizer, x) for x in matches]
+ return {x.group() for x in matched if x is not None}
+
+
+def tokenize_dataset(dset: list[str], keywords: set[str]) -> np.ndarray:
+ return np.array([tokenize(x, keywords) for x in dset])
+
+
+def tokenize(string: str, keywords: set[str]) -> str:
+ return " ".join([x for x in string.split() if x.lower() in keywords])
+
+
+def normalize_fips_vendor(string: str) -> str:
+ """
+ "Normalizes" FIPS vendor. Precisely:
+ - Removes some punctuation and non-alphanumerical symbols
+ - Returns only first 5 tokens
+ # TODO: The rationale of the steps outlined above should be investigatated
+ """
+ return " ".join(
+ string.replace("(R)", "").replace(",", "").replace("®", "").replace("-", " ").replace("+", " ").split()[:4]
+ )
+
+
+# Credit: https://stackoverflow.com/questions/18092354/
+def split_unescape(s: str, delim: str, escape: str = "\\", unescape: bool = True) -> list[str]:
+ """
+ >>> split_unescape('foo,bar', ',')
+ ['foo', 'bar']
+ >>> split_unescape('foo$,bar', ',', '$')
+ ['foo,bar']
+ >>> split_unescape('foo$$,bar', ',', '$', unescape=True)
+ ['foo$', 'bar']
+ >>> split_unescape('foo$$,bar', ',', '$', unescape=False)
+ ['foo$$', 'bar']
+ >>> split_unescape('foo$', ',', '$', unescape=True)
+ ['foo$']
+ """
+ ret = []
+ current = []
+ itr = iter(s)
+ for ch in itr:
+ if ch == escape:
+ try:
+ # skip the next character; it has been escaped!
+ if not unescape:
+ current.append(escape)
+ current.append(next(itr))
+ except StopIteration:
+ if unescape:
+ current.append(escape)
+ elif ch == delim:
+ # split! (add current to the list and reset it)
+ ret.append("".join(current))
+ current = []
+ else:
+ current.append(ch)
+ ret.append("".join(current))
+ return ret
+
+
+def warn_if_missing_poppler() -> None:
+ """
+ Warns user if he misses a poppler dependency
+ """
+ try:
+ if not pkgconfig.installed("poppler-cpp", ">=0.30"):
+ logger.warning(
+ "Attempting to run pipeline with pdf->txt conversion, but poppler-cpp dependency was not found."
+ )
+ except OSError:
+ logger.warning("Attempting to find poppler-cpp, but pkg-config was not found.")
+
+
+def warn_if_missing_tesseract() -> None:
+ """
+ Warns user if he misses a tesseract dependency
+ """
+ try:
+ if not pkgconfig.installed("tesseract", ">=5.0.0"):
+ logger.warning(
+ "Attempting to run pipeline with pdf->txt conversion, that requires tesseract, but tesseract was not found."
+ )
+ except OSError:
+ logger.warning("Attempting to find tesseract, but pkg-config was not found.")
+
+
+def choose_lowest_eal(eals: set[str] | None) -> str | None:
+ """
+ Given a set of EAL strings, chooses the lowest one.
+ """
+ if not eals:
+ return None
+
+ matches = [(re.search(r"\d+", x)) for x in eals]
+ min_number = min([int(x.group()) for x in matches if x])
+ candidates = [x for x in eals if str(min_number) in x]
+ return "EAL" + str(min_number) if len(candidates) == 2 else candidates[0]
diff --git a/src/sec_certs/utils/pandas.py b/src/sec_certs/utils/pandas.py
new file mode 100644
index 00000000..97068e77
--- /dev/null
+++ b/src/sec_certs/utils/pandas.py
@@ -0,0 +1,542 @@
+from __future__ import annotations
+
+import copy
+import functools
+import logging
+import tempfile
+import xml.etree.ElementTree as ET
+import zipfile
+from dataclasses import dataclass
+from pathlib import Path
+from shutil import copyfile
+from typing import Any, Final
+
+import numpy as np
+import pandas as pd
+from matplotlib import pyplot as plt
+from scipy import stats
+from tqdm.notebook import tqdm
+
+from sec_certs.dataset.cve import CVEDataset
+from sec_certs.sample.sar import SAR
+from sec_certs.utils import helpers
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(eq=True, frozen=True)
+class SecondarySFPCluster:
+ name: str
+ children: frozenset[int]
+
+ @classmethod
+ def from_xml_id(cls, xml_categories: list[ET.Element], cwe_id: int):
+ cat = cls.find_correct_category(xml_categories, cwe_id)
+ name = cat.attrib["Name"]
+ members = cat.find("{http://cwe.mitre.org/cwe-6}Relationships")
+
+ assert members is not None
+ member_ids = frozenset(
+ int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"
+ )
+ return cls(name, member_ids)
+
+ @staticmethod
+ def find_correct_category(xml_categories: list[ET.Element], cwe_id: int) -> ET.Element:
+ for cat in xml_categories:
+ if cat.attrib["ID"] == str(cwe_id):
+ return cat
+ raise ValueError(f"Category with ID {cwe_id} found.")
+
+
+@dataclass(eq=True, frozen=True)
+class PrimarySFPCluster:
+ name: str
+ secondary_clusters: frozenset[SecondarySFPCluster]
+ cwe_ids: frozenset[int]
+
+ @classmethod
+ def from_xml(cls, xml_categories: list[ET.Element], primary_cluster_element: ET.Element):
+ name = primary_cluster_element.attrib["Name"].split("SFP Primary Cluster: ")[1]
+ members = primary_cluster_element.find("{http://cwe.mitre.org/cwe-6}Relationships")
+
+ assert members is not None
+ member_ids = {int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"}
+
+ secondary_clusters = []
+ cwe_ids = []
+ for member_id in member_ids:
+ try:
+ secondary_clusters.append(SecondarySFPCluster.from_xml_id(xml_categories, member_id))
+ except ValueError:
+ cwe_ids.append(member_id)
+
+ return cls(name, frozenset(secondary_clusters), frozenset(cwe_ids))
+
+
+class SFPModel:
+ URL: Final[str] = "https://cwe.mitre.org/data/xml/views/888.xml.zip"
+ XML_FILENAME: Final[str] = "888.xml"
+ XML_ZIP_NAME: Final[str] = "888.xml.zip"
+
+ def __init__(self, primary_clusters: frozenset[PrimarySFPCluster]):
+ self.primary_clusters = primary_clusters
+
+ @classmethod
+ def from_xml(cls, xml_filepath: str | Path):
+ tree = ET.parse(xml_filepath)
+ category_tag = tree.getroot().find("{http://cwe.mitre.org/cwe-6}Categories")
+
+ assert category_tag is not None
+ categories = category_tag.findall("{http://cwe.mitre.org/cwe-6}Category")
+
+ # The XML contains two weird primary clusters not specified in https://samate.nist.gov/BF/Enlightenment/SFP.html.
+ # After manual inspection, we skip those
+ primary_clusters = frozenset(
+ PrimarySFPCluster.from_xml(categories, x)
+ for x in categories
+ if (
+ "SFP Primary Cluster" in x.attrib["Name"]
+ and x.attrib["Name"] != "SFP Primary Cluster: Failure to Release Memory"
+ and x.attrib["Name"] != "SFP Primary Cluster: Faulty Resource Release"
+ )
+ )
+
+ return cls(primary_clusters)
+
+ @classmethod
+ def from_web(cls):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ xml_zip_path = Path(tmp_dir) / cls.XML_ZIP_NAME
+ helpers.download_file(cls.URL, xml_zip_path)
+
+ with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
+ zip_handle.extractall(tmp_dir)
+
+ return cls.from_xml(Path(tmp_dir) / cls.XML_FILENAME)
+
+ def search_cwe(self, cwe_id: int) -> tuple[str | None, str | None]:
+ for primary in self.primary_clusters:
+ for secondary in primary.secondary_clusters:
+ if cwe_id in secondary.children:
+ return primary.name, secondary.name
+ if cwe_id in primary.cwe_ids:
+ return primary.name, None
+ return None, None
+
+
+def discover_sar_families(ser: pd.Series) -> list[str]:
+ """
+ Returns a list of all SAR families that occur in the pandas Series, where each entry is a set of SAR objects.
+ """
+ sars = ser.tolist()
+ families = set()
+ for cert in sars:
+ families |= {x.family for x in cert} if not pd.isnull(cert) else set()
+ return list(families)
+
+
+def get_sar_level_from_set(sars: set[SAR], sar_family: str) -> int | None:
+ """
+ Given a set of SARs and a family name, will return level of the seeked SAR from the set.
+ """
+ family_sars_dict = {x.family: x for x in sars} if (sars and not pd.isnull(sars)) else dict()
+ if sar_family not in family_sars_dict.keys():
+ return None
+ return family_sars_dict[sar_family].level
+
+
+def compute_cve_correlations(
+ df: pd.DataFrame,
+ exclude_vuln_free_certs: bool = False,
+ sar_families: list[str] | None = None,
+ output_path: str | Path | None = None,
+ filter_nans: bool = True,
+) -> pd.DataFrame:
+ """
+ Computes correlations of EAL and different SARs and two columns: (n_cves, worst_cve_score, avg_cve_score). Few assumptions about the passed dataframe:
+ - EAL column must be categorical data type
+ - SAR column must be a set of SARs
+ - `n_cves` and `worst_cve_score`, `avg_cve_score` columns must be present in the dataframe
+ Possibly, it can filter columns will both values NaN (due to division by zero or super low supports.)
+ To choose correct minimal support is tricky, this is because SAR levels often having huge support, but being imbalanced themselves heavily in the favor
+ of a single value that is rarely modified. We recommend choosing 100 and discarding any row where some column would result into NaN
+ """
+ df_sar = df.loc[:, ["eal", "extracted_sars", "worst_cve_score", "avg_cve_score", "n_cves", "category"]]
+ df_sar = df_sar.loc[df_sar.category != "ICs, Smart Cards and Smart Card-Related Devices and Systems"]
+
+ if exclude_vuln_free_certs:
+ df_sar = df_sar.loc[df_sar.n_cves > 0]
+
+ families = sar_families if sar_families else discover_sar_families(df_sar.extracted_sars)
+
+ spearmanr = functools.partial(stats.spearmanr, nan_policy="omit", alternative="less")
+
+ df_sar.eal = df_sar.eal.cat.codes
+ df_sar.eal = df_sar.eal.map(lambda x: np.NaN if x == -1 else x)
+
+ n_cves_eal_corr, n_cves_eal_pvalue = spearmanr(df_sar.eal, df_sar.n_cves)
+ n_cves_corrs = [n_cves_eal_corr]
+ n_cves_pvalues = [n_cves_eal_pvalue]
+
+ worst_cve_eal_corr, worst_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.worst_cve_score)
+ worst_cve_corrs = [worst_cve_eal_corr]
+ worst_cve_pvalues = [worst_cve_eal_pvalue]
+
+ avg_cve_eal_corr, avg_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.avg_cve_score)
+ avg_cve_corrs = [avg_cve_eal_corr]
+ avg_cve_pvalues = [avg_cve_eal_pvalue]
+
+ supports = [df_sar.loc[~df_sar["eal"].isnull()].shape[0]]
+
+ for family in tqdm(families):
+ df_sar[family] = df_sar.extracted_sars.map(lambda x: get_sar_level_from_set(x, family))
+
+ n_cves_corr, n_cves_pvalue = spearmanr(df_sar[family], df_sar.n_cves)
+ n_cves_corrs.append(n_cves_corr)
+ n_cves_pvalues.append(n_cves_pvalue)
+
+ worst_cve_corr, worst_cve_pvalue = spearmanr(df_sar[family], df_sar.worst_cve_score)
+ worst_cve_corrs.append(worst_cve_corr)
+ worst_cve_pvalues.append(worst_cve_pvalue)
+
+ avg_cve_corr, avg_cve_pvalue = spearmanr(df_sar[family], df_sar.avg_cve_score)
+ avg_cve_corrs.append(avg_cve_corr)
+ avg_cve_pvalues.append(avg_cve_pvalue)
+
+ supports.append(df_sar.loc[~df_sar[family].isnull()].shape[0])
+
+ df_sar = df_sar.copy()
+
+ tuples = list(
+ zip(n_cves_corrs, n_cves_pvalues, worst_cve_corrs, worst_cve_pvalues, avg_cve_corrs, avg_cve_pvalues, supports)
+ )
+ dct = {family: correlations for family, correlations in zip(["eal"] + families, tuples)}
+ df_corr = pd.DataFrame.from_dict(
+ dct,
+ orient="index",
+ columns=[
+ "n_cves_corr",
+ "n_cves_pvalue",
+ "worst_cve_score_corr",
+ "worst_cve_pvalue",
+ "avg_cve_score_corr",
+ "avg_cve_pvalue",
+ "support",
+ ],
+ )
+ df_corr.style.set_caption("Correlations between EAL, SARs and CVEs")
+ df_corr = df_corr.sort_values(by="support", ascending=False)
+
+ if filter_nans:
+ df_corr = df_corr.dropna(how="any", subset=["n_cves_corr", "worst_cve_score_corr", "avg_cve_score_corr"])
+
+ if output_path:
+ df_corr.to_csv(output_path)
+
+ return df_corr
+
+
+def find_earliest_maintenance_after_cve(row):
+ "Given dataframe row, will return first maintenance date succeeding first published CVE related to a certificate if exists, else np.nan"
+ maintenances_after_cve = [x for x in row["maintenance_dates"] if x > row["earliest_cve"]]
+ return min(maintenances_after_cve) if maintenances_after_cve else np.nan
+
+
+def filter_to_cves_within_validity_period(cc_df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
+ """
+ Filters the column `related_cves` in `cc_df` DataFrame to CVEs that were published within validity period of the
+ studied certificate.
+ """
+
+ def filter_cves(
+ cve_dset: CVEDataset, cves: set[str], not_valid_before: pd.Timestamp, not_valid_after: pd.Timestamp
+ ) -> set[str] | float:
+
+ # Mypy is complaining, but the Optional date is resolved at the beginning of the and condition
+ result: set[str] = {
+ x
+ for x in cves
+ if cve_dset[x].published_date
+ and not_valid_before < pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore
+ and not_valid_after > pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore
+ }
+
+ return result if result else np.nan
+
+ if (
+ cc_df.loc[
+ (cc_df.related_cves.notnull()) & ((cc_df.not_valid_before.isna()) | (cc_df.not_valid_after.isna()))
+ ].shape[0]
+ > 0
+ ):
+ raise ValueError(
+ "Cannot filter CVEs on certificates that have NaNs in not_valid_after or not_valid_before fields."
+ )
+
+ cc_df["related_cves"] = cc_df.apply(
+ lambda row: filter_cves(cve_dset, row["related_cves"], row["not_valid_before"], row["not_valid_after"])
+ if not pd.isna(row["related_cves"])
+ else row["related_cves"],
+ axis=1,
+ )
+
+ return cc_df
+
+
+def expand_df_with_cve_cols(df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame:
+ df = df.copy()
+
+ df["n_cves"] = df.related_cves.map(lambda x: len(x) if x is not np.nan else 0)
+ df["cve_published_dates"] = df.related_cves.map(
+ lambda x: [cve_dset[y].published_date.date() for y in x] if x is not np.nan else np.nan # type: ignore
+ )
+
+ df["earliest_cve"] = df.cve_published_dates.map(lambda x: min(x) if isinstance(x, list) else np.nan)
+ df["worst_cve_score"] = df.related_cves.map(
+ lambda x: max([cve_dset[cve].impact.base_score for cve in x]) if x is not np.nan else np.nan
+ )
+
+ """
+ Note: Technically, CVE can have 0 base score. This happens when the CVE is discarded from the database.
+ This could skew the results. During May 2022 analysis, we encountered a single CVE with such score.
+ Therefore, we do not treat this case.
+ To properly treat this, the average should be taken across CVEs with >0 base_socre.
+ """
+ df["avg_cve_score"] = df.related_cves.map(
+ lambda x: np.mean([cve_dset[cve].impact.base_score for cve in x]) if x is not np.nan else np.nan
+ )
+ return df
+
+
+def prepare_cwe_df(
+ cc_df: pd.DataFrame, cve_dset: CVEDataset, fine_grained: bool = False
+) -> tuple[pd.DataFrame, pd.DataFrame]:
+ """
+ This function does the following:
+ 1. Filter CC DF to columns relevant for CWE examination (eal, related_cves, category)
+ 2. Parses CWE webpage of CWE categories and weaknesses, fetches CWE descriptions and names from there
+ 3. Explodes the CC DF so that each row corresponds to single CVE
+ 4. Joins CC DF with CWE DF obtained from CVEDataset
+ 5. Explodes resulting DF again so that each row corresponds to single CWE
+
+ :param pd.DataFrame cc_df: DataFrame obtained from CCDataset, should be limited to rows with >0 vulnerabilities
+ :param CVEDataset cve_dset: CVEDataset instance to retrieve CWE data from
+ :param bool fine_grained: If se to True, CWEs won't be merged into weaknesses of higher abstraction
+ :return Tuple[pd.DataFrame, pd.DataFrame]: returns two dataframes:
+ - DF obtained from CC Dataset, fully exploded to CWEs
+ - DF obtained from CWE webpage, contains IDs, names, types, urls of all CWEs
+ """
+ # Explode CVE_IDs and CWE_IDs so that we have right counts on duplicated CVEs. Measure how much data for analysis we have left.
+ vulns = cve_dset.to_pandas()
+ df_cwe_relevant = (
+ cc_df[["eal", "related_cves", "category"]]
+ .explode(column="related_cves")
+ .rename(columns={"related_cves": "cve_id"})
+ )
+ df_cwe_relevant["cwe_ids"] = df_cwe_relevant.cve_id.map(lambda x: vulns.cwe_ids[x])
+ df_cwe_relevant = (
+ df_cwe_relevant.explode(column="cwe_ids")
+ .reset_index()
+ .rename(columns={"cwe_ids": "cwe_id", "index": "cert_dgst"})
+ )
+
+ df_cwe_relevant.cwe_id = df_cwe_relevant.cwe_id.replace(r"NVD-CWE-*", np.nan, regex=True)
+ print(
+ f"Filtering {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs that have no CWE assigned. This affects {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates"
+ )
+ print(
+ f"Still left with analysis of {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs in {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates."
+ )
+ df_cwe_relevant = df_cwe_relevant.dropna()
+
+ # Load CWE IDs and descriptions from CWE website
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ xml_zip_path = Path(tmp_dir) / "cwec_latest.xml.zip"
+ helpers.download_file("https://cwe.mitre.org/data/xml/cwec_latest.xml.zip", xml_zip_path)
+
+ with zipfile.ZipFile(xml_zip_path, "r") as zip_handle:
+ zip_handle.extractall(tmp_dir)
+ xml_filename = zip_handle.namelist()[0]
+
+ root = ET.parse(Path(tmp_dir) / xml_filename).getroot()
+
+ weaknesses = root.find("{http://cwe.mitre.org/cwe-6}Weaknesses")
+ categories = root.find("{http://cwe.mitre.org/cwe-6}Categories")
+ dct: dict[str, Any] = {
+ "cwe_id": [],
+ "cwe_name": [],
+ "cwe_description": [],
+ "type": [],
+ "child_of": [],
+ }
+
+ assert weaknesses
+ for weakness in weaknesses:
+ assert weakness
+ description = weakness.find("{http://cwe.mitre.org/cwe-6}Description")
+ related_weaknesses = weakness.find("{http://cwe.mitre.org/cwe-6}Related_Weaknesses")
+
+ dct["cwe_id"].append("CWE-" + weakness.attrib["ID"])
+ dct["cwe_name"].append(weakness.attrib["Name"])
+ dct["cwe_description"].append(description.text if description is not None else None)
+ dct["type"].append("weakness")
+
+ if related_weaknesses:
+ dct["child_of"].append(
+ {
+ "CWE-" + x.attrib["CWE_ID"]
+ for x in related_weaknesses
+ if x.tag == "{http://cwe.mitre.org/cwe-6}Related_Weakness" and x.attrib["Nature"] == "ChildOf"
+ }
+ )
+ else:
+ dct["child_of"].append(np.nan)
+
+ assert categories
+ for category in categories:
+ assert category
+ summary = category.find("{http://cwe.mitre.org/cwe-6}Summary")
+
+ dct["cwe_id"].append("CWE-" + category.attrib["ID"])
+ dct["cwe_name"].append(category.attrib["Name"])
+ dct["cwe_description"].append(summary.text if summary is not None else None)
+ dct["type"].append("category")
+ dct["child_of"].append(np.nan)
+
+ cwe_df = pd.DataFrame(dct).set_index("cwe_id")
+ cwe_df["url"] = cwe_df.index.map(lambda x: "https://cwe.mitre.org/data/definitions/" + x.split("-")[1] + ".html")
+ cwe_df = cwe_df.replace(r"\n", " ", regex=True)
+
+ if fine_grained:
+ return df_cwe_relevant, cwe_df
+ else:
+ return get_coarse_grained_cwes(df_cwe_relevant, cwe_df), cwe_df
+
+
+def get_coarse_grained_cwes(fine_grained_df: pd.DataFrame, cwe_df: pd.DataFrame) -> pd.DataFrame:
+ """
+ Oddly enough, NVD contains CWEs at different levels of abstraction, which makes it difficult to compare between them.
+ Among others, some three different CWEs appear in the CVEDataset: CWE-20, CWE-119, CWE-787. Problem is that CWE-787
+ is child of CWE-119, which in turn is child of CWE-20. It makes no sense to compute stats of most prevalent CWEs
+ unless categories are aligned to the top-most level.
+
+ This function aligns the categories to the top-most level. It works in loop. When an iteration is performed without
+ replacing any CWEs with their parents, the algorithm terminates.
+ The algorithm inspects every CWE and replaces it with all its parents on condition that they appear in the CVE Dataset.
+
+ :param pd.DataFrame fine_grained_df: First element of the output of `prepare_cwe_df` function
+ :param pd.DataFrame cwe_df: Second element of the output of `prepare_cwe_df` function
+ :return pd.DataFrame: DF obtained from CC Dataset, fully exploded to coarse-grained CWEs
+ """
+ all_cwes_in_original_df = set(fine_grained_df.cwe_id.unique())
+ parent_dict = cwe_df.child_of.to_dict()
+ new_set = set(fine_grained_df.cwe_id.unique())
+ mapping = {x: {x} for x in new_set}
+
+ while True:
+ old_set = copy.deepcopy(new_set)
+ for cwe in old_set:
+ parents = parent_dict[cwe]
+ if parents and parents is not np.nan and any(x in all_cwes_in_original_df for x in parents):
+ new_set.remove(cwe)
+ new_set.update({x for x in parents if x in all_cwes_in_original_df})
+ for val in mapping.values():
+ if cwe in val:
+ val.remove(cwe)
+ val.update({x for x in parents if x in all_cwes_in_original_df})
+ if new_set == old_set:
+ break
+
+ # Now we should have complete mapping of fine_grained -> coarse_grained CWEs
+ new_df = fine_grained_df.copy()
+ new_df.cwe_id = new_df.cwe_id.map(mapping)
+
+ return new_df.explode(column="cwe_id")
+
+
+def get_top_n_cwes(
+ df: pd.DataFrame, cwe_df: pd.DataFrame, category: str | None = None, eal: str | None = None, n_cwes: int = 10
+) -> pd.DataFrame:
+ """Fetches top-n CWEs, overall, per category, or per EAL"""
+ top_n = df.copy()
+
+ if category:
+ top_n = top_n.loc[top_n.category == category].copy()
+ if eal:
+ top_n = top_n.loc[top_n.eal == eal].copy()
+
+ top_n = (
+ top_n.cwe_id.value_counts()
+ .head(n_cwes)
+ .to_frame()
+ .rename(columns={"cwe_id": "frequency"})
+ .rename_axis("cwe_id")
+ )
+ top_n["cwe_name"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_name)
+ top_n["cwe_description"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_description)
+ top_n["url"] = top_n.index.map(lambda x: cwe_df.loc[x].url)
+ top_n["type"] = top_n.index.map(lambda x: cwe_df.loc[x].type)
+
+ return top_n
+
+
+def compute_maintenances_that_come_after_vulns(df: pd.DataFrame) -> pd.DataFrame:
+ """
+ Given pre-processed CCDataset DataFrame (expanded with MU & CVE cols), computes time to fix CVE and earliest CVE after some vuln.
+ """
+ df_fixed = df.loc[(df.n_cves > 0) & (df.n_maintenances > 0)].copy()
+ df_fixed.maintenance_dates = df_fixed.maintenance_dates.map(lambda x: [y.date() for y in x])
+ df_fixed.loc[:, "earliest_maintenance_after_vuln"] = df_fixed.apply(find_earliest_maintenance_after_cve, axis=1)
+ df_fixed.index.name = "dgst"
+ return df_fixed
+
+
+def move_fixing_mu_to_directory(
+ df_fixed: pd.DataFrame, main_df: pd.DataFrame, outdir: str | Path, inpath: str | Path
+) -> pd.DataFrame:
+ """
+ Localizes reports of maintenance updates that should fix some vulnerability and copies them into a directory.
+ df_fixed should be the output of compute_maintenances_that_come_after_vulns method.
+ """
+ fixed_df_index = (
+ df_fixed.loc[~df_fixed.earliest_maintenance_after_vuln.isnull()]
+ .reset_index()
+ .set_index(["dgst", "earliest_maintenance_after_vuln"])
+ .index.to_flat_index()
+ )
+ main_df.maintenance_date = main_df.maintenance_date.map(lambda x: x.date())
+ main_prefiltered = main_df.reset_index().set_index(["related_cert_digest", "maintenance_date"])
+ mu_filenames = main_prefiltered.loc[main_prefiltered.index.isin(fixed_df_index), "dgst"]
+ mu_filenames = mu_filenames.map(lambda x: x + ".pdf")
+
+ inpath = Path(inpath)
+ if not inpath.exists():
+ inpath.mkdir()
+
+ for i in mu_filenames:
+ copyfile(inpath / i, Path(outdir) / i)
+
+ return mu_filenames
+
+
+def plot_dataframe_graph(
+ data: dict,
+ label: str,
+ file_name: str,
+ density: bool = False,
+ cumulative: bool = False,
+ bins: int = 50,
+ log: bool = True,
+ show: bool = True,
+) -> None:
+ pd_data = pd.Series(data)
+ pd_data.hist(bins=bins, label=label, density=density, cumulative=cumulative)
+ plt.savefig(file_name)
+ if show:
+ plt.show()
+
+ if log:
+ sorted_data = pd_data.value_counts(ascending=True)
+
+ logger.info(sorted_data.where(sorted_data > 1).dropna())
diff --git a/src/sec_certs/utils/parallel_processing.py b/src/sec_certs/utils/parallel_processing.py
new file mode 100644
index 00000000..50806a67
--- /dev/null
+++ b/src/sec_certs/utils/parallel_processing.py
@@ -0,0 +1,43 @@
+from __future__ import annotations
+
+import time
+from multiprocessing.pool import ThreadPool
+from typing import Any, Callable, Iterable
+
+from billiard.pool import Pool
+
+from sec_certs.utils.tqdm import tqdm
+
+
+def process_parallel(
+ func: Callable,
+ items: Iterable,
+ max_workers: int,
+ callback: Callable | None = None,
+ use_threading: bool = True,
+ progress_bar: bool = True,
+ unpack: bool = False,
+ progress_bar_desc: str | None = None,
+) -> list[Any]:
+
+ pool: Pool | ThreadPool = ThreadPool(max_workers) if use_threading else Pool(max_workers)
+ results = (
+ [pool.apply_async(func, (*i,), callback=callback) for i in items]
+ if unpack
+ else [pool.apply_async(func, (i,), callback=callback) for i in items]
+ )
+
+ if progress_bar is True and items:
+ bar = tqdm(total=len(results), desc=progress_bar_desc)
+ while not all(all_done := [x.ready() for x in results]):
+ done_count = len(list(filter(lambda x: x, all_done)))
+ bar.update(done_count - bar.n)
+ time.sleep(1)
+ bar.update(len(results) - bar.n)
+ bar.close()
+
+ pool.close()
+ pool.join()
+ pool.terminate()
+
+ return [r.get() for r in results]
diff --git a/src/sec_certs/utils/pdf.py b/src/sec_certs/utils/pdf.py
new file mode 100644
index 00000000..1d04a697
--- /dev/null
+++ b/src/sec_certs/utils/pdf.py
@@ -0,0 +1,275 @@
+from __future__ import annotations
+
+import glob
+import logging
+import subprocess
+from datetime import datetime, timedelta, timezone
+from functools import reduce
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from typing import Any
+
+import pdftotext
+import pikepdf
+from PyPDF2 import PdfFileReader
+from PyPDF2.generic import BooleanObject, ByteStringObject, FloatObject, IndirectObject, NumberObject, TextStringObject
+
+from sec_certs import constants as constants
+from sec_certs.constants import (
+ GARBAGE_ALPHA_CHARS_THRESHOLD,
+ GARBAGE_AVG_LLEN_THRESHOLD,
+ GARBAGE_EVERY_SECOND_CHAR_THRESHOLD,
+ GARBAGE_LINES_THRESHOLD,
+ GARBAGE_SIZE_THRESHOLD,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def repair_pdf(file: Path) -> None:
+ """
+ Some pdfs can't be opened by PyPDF2 - opening them with pikepdf and then saving them fixes this issue.
+ By opening this file in a pdf reader, we can already extract number of pages.
+
+ :param file: file name
+ :return: number of pages in pdf file
+ """
+ pdf = pikepdf.Pdf.open(file, allow_overwriting_input=True)
+ pdf.save(file)
+
+
+def ocr_pdf_file(pdf_path: Path) -> str:
+ """
+ OCR a PDF file and return its text contents, uses `pdftoppm` and `tesseract`.
+
+ :param pdf_path: The PDF file to OCR.
+ :return: The text contents.
+ """
+ with TemporaryDirectory() as tmpdir:
+ tmppath = Path(tmpdir)
+ ppm = subprocess.run(
+ ["pdftoppm", pdf_path, tmppath / "image"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
+ if ppm.returncode != 0:
+ raise ValueError(f"pdftoppm failed: {ppm.returncode}")
+ for ppm_path in map(Path, glob.glob(str(tmppath / "image*.ppm"))):
+ base = ppm_path.with_suffix("")
+ tes = subprocess.run(
+ ["tesseract", "-l", "eng+deu+fra", ppm_path, base], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
+ if tes.returncode != 0:
+ raise ValueError(f"tesseract failed: {tes.returncode}")
+ contents = ""
+ txt_paths = list(glob.glob(str(tmppath / "image*.txt")))
+ for txt_path in map(Path, sorted(txt_paths, key=lambda fname: int(fname[6:-4]))):
+ with txt_path.open("r", encoding="utf-8") as f:
+ contents += f.read()
+ return contents
+
+
+def convert_pdf_file(pdf_path: Path, txt_path: Path) -> tuple[bool, bool]:
+ """
+ Convert a PDF tile to text and save it on the `txt_path`.
+
+ :param pdf_path: Path to the to-be-converted PDF file.
+ :param txt_path: Path to the resulting text file.
+ :return: A tuple of two results, whether OCR was done and what the complete result
+ was (OK/NOK).
+ """
+ txt = None
+ ok = False
+ ocr = False
+ try:
+ with pdf_path.open("rb") as pdf_handle:
+ pdf = pdftotext.PDF(pdf_handle, "", True) # No password, Raw=True
+ txt = "".join(pdf)
+ except Exception as e:
+ logger.error(f"Error when converting pdf->txt: {e}")
+
+ if txt is None or text_is_garbage(txt):
+ logger.warning(f"Detected garbage during conversion of {pdf_path}")
+ ocr = True
+ try:
+ txt = ocr_pdf_file(pdf_path)
+ logger.info(f"OCR OK for {pdf_path}")
+ except Exception as e:
+ logger.error(f"Error during OCR of {pdf_path}, using garbage: {e}")
+
+ if txt is not None:
+ ok = True
+ with txt_path.open("w", encoding="utf-8") as txt_handle:
+ txt_handle.write(txt)
+
+ return ocr, ok
+
+
+def parse_pdf_date(dateval: bytes | None) -> datetime | None:
+ """
+ Parse PDF metadata date format:
+
+ ```
+ parse_pdf_date(b"D:20110617082321-04'00'")
+ ```
+ into
+ ```
+ datetime.datetime(2011, 6, 17, 8, 23, 21, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=72000)))
+ ```
+
+ :param dateval: The date as in the PDF metadata.
+ :return: The parsed datetime, if successful, else `None`.
+ """
+ if dateval is None:
+ return None
+ clean = dateval.decode("utf-8").replace("D:", "")
+ tz = None
+ tzoff = None
+ if "+" in clean:
+ clean, tz = clean.split("+")
+ tzoff = 1
+ if "-" in clean:
+ clean, tz = clean.split("-")
+ tzoff = -1
+ elif "Z" in clean:
+ clean, tz = clean.split("Z")
+ tzoff = 1
+ try:
+ res_datetime = datetime.strptime(clean, "%Y%m%d%H%M%S")
+ if tz and tzoff:
+ tz_datetime = datetime.strptime(tz, "%H'%M'")
+ delta = tzoff * timedelta(hours=tz_datetime.hour, minutes=tz_datetime.minute)
+ res_tz = timezone(delta)
+ res_datetime = res_datetime.replace(tzinfo=res_tz)
+ return res_datetime
+ except ValueError:
+ return None
+
+
+def extract_pdf_metadata(filepath: Path) -> tuple[str, dict[str, Any] | None]: # noqa: C901
+ """
+ Extract PDF metadata, such as the number of pages, author, title, etc.
+
+ :param filepath: THe path to the PDF.
+ :return: A tuple of the result code (see constants) and the metadata dictionary.
+ """
+
+ def map_metadata_value(val, nope_out=False):
+ if isinstance(val, BooleanObject):
+ val = val.value
+ elif isinstance(val, FloatObject):
+ val = float(val)
+ elif isinstance(val, NumberObject):
+ val = int(val)
+ elif isinstance(val, IndirectObject) and not nope_out:
+ # Let's make sure to nope out in case of cycles
+ val = map_metadata_value(val.getObject(), nope_out=True)
+ elif isinstance(val, TextStringObject):
+ val = str(val)
+ elif isinstance(val, ByteStringObject):
+ try:
+ val = val.decode("utf-8")
+ except UnicodeDecodeError:
+ val = str(val)
+ else:
+ val = str(val)
+ return val
+
+ def resolve_indirect(val, bound=10):
+ if isinstance(val, list) and bound:
+ return [resolve_indirect(v, bound - 1) for v in val]
+ elif isinstance(val, IndirectObject) and bound:
+ return resolve_indirect(val.getObject(), bound - 1)
+ else:
+ return val
+
+ metadata: dict[str, Any] = dict()
+
+ try:
+ metadata["pdf_file_size_bytes"] = filepath.stat().st_size
+ with filepath.open("rb") as handle:
+ pdf = PdfFileReader(handle, strict=False)
+ metadata["pdf_is_encrypted"] = pdf.getIsEncrypted()
+
+ # see https://stackoverflow.com/questions/26242952/pypdf-2-decrypt-not-working
+ if metadata["pdf_is_encrypted"]:
+ pikepdf.open(filepath, allow_overwriting_input=True).save()
+
+ with filepath.open("rb") as handle:
+ pdf = PdfFileReader(handle, strict=False)
+ metadata["pdf_number_of_pages"] = pdf.getNumPages()
+ pdf_document_info = pdf.getDocumentInfo()
+
+ if pdf_document_info is None:
+ raise ValueError("PDF metadata unavailable")
+
+ for key, val in pdf_document_info.items():
+ metadata[str(key)] = map_metadata_value(val)
+
+ # Get the hyperlinks in the PDF
+ annots = [page.get("/Annots", []) for page in pdf.pages]
+ annots = reduce(lambda x, y: x + y, map(resolve_indirect, annots))
+ links = set()
+ for annot in annots:
+ try:
+ A = resolve_indirect(annot.get("/A", {}))
+ link = resolve_indirect(A.get("/URI"))
+ if link:
+ links.add(map_metadata_value(link))
+ except Exception:
+ pass
+ metadata["pdf_hyperlinks"] = links
+
+ except Exception as e:
+ relative_filepath = "/".join(str(filepath).split("/")[-4:])
+ error_msg = f"Failed to read metadata of {relative_filepath}, error: {e}"
+ logger.error(error_msg)
+ return error_msg, None
+
+ return constants.RETURNCODE_OK, metadata
+
+
+def text_is_garbage(text: str) -> bool:
+ """
+ Detect whether the given text is "garbage". A series of tests is applied,
+ using the number of lines, average line length, total size, every second character on a line
+ and the ratio of alphanumeric characters.
+
+ :param text: The tested text.
+ :return: Whether the text is a "garbage" result of pdftotext conversion.
+ """
+ size = len(text)
+ content_len = 0
+ lines = 0
+ every_second = 0
+ alpha_len = len("".join(filter(str.isalpha, text)))
+ for line in text.splitlines():
+ content_len += len(line)
+ lines += 1
+ if len(set(line[1::2])) > 1:
+ every_second += 1
+
+ if lines:
+ avg_line_len = content_len / lines
+ else:
+ avg_line_len = 0
+ if size:
+ alpha = alpha_len / size
+ else:
+ alpha = 0
+
+ # If number of lines is small, this is garbage.
+ if lines < GARBAGE_LINES_THRESHOLD:
+ return True
+ # If the file size is small, this is garbage.
+ if size < GARBAGE_SIZE_THRESHOLD:
+ return True
+ # If the average length of a line is small, this is garbage.
+ if avg_line_len < GARBAGE_AVG_LLEN_THRESHOLD:
+ return True
+ # If there a small amount of lines that have more than one character at every second character, this is garbage.
+ # This detects the ANSSI spacing issues.
+ if every_second < GARBAGE_EVERY_SECOND_CHAR_THRESHOLD:
+ return True
+ # If there is a small ratio of alphanumeric chars to all chars, this is garbage.
+ if alpha < GARBAGE_ALPHA_CHARS_THRESHOLD:
+ return True
+ return False
diff --git a/src/sec_certs/utils/sanitization.py b/src/sec_certs/utils/sanitization.py
new file mode 100644
index 00000000..2f9cd046
--- /dev/null
+++ b/src/sec_certs/utils/sanitization.py
@@ -0,0 +1,51 @@
+from __future__ import annotations
+
+import html
+import logging
+from datetime import date
+
+import numpy as np
+import pandas as pd
+from bs4 import NavigableString
+
+logger = logging.getLogger(__name__)
+
+
+def sanitize_navigable_string(string: NavigableString | str | None) -> str | None:
+ if not string:
+ return None
+ return str(string).strip().replace("\xad", "").replace("\xa0", "")
+
+
+def sanitize_link(record: str | None) -> str | None:
+ if not record:
+ return None
+ return record.replace(":443", "").replace(" ", "%20").replace("http://", "https://")
+
+
+def sanitize_date(record: pd.Timestamp | date | np.datetime64) -> date | None:
+ if pd.isnull(record):
+ return None
+ elif isinstance(record, pd.Timestamp):
+ return record.date()
+ elif isinstance(record, (date, type(None))):
+ return record
+ raise ValueError("Unsupported type given as input")
+
+
+def sanitize_string(record: str) -> str:
+ # There is a sample with name 'ATMEL Secure Microcontroller AT90SC12872RCFT &#x2f; AT90SC12836RCFT rev. I &amp;&#x23;38&#x3b; J' that has to be unescaped twice
+ string = html.unescape(html.unescape(record)).replace("\n", "")
+ return " ".join(string.split())
+
+
+def sanitize_security_levels(record: str | set[str]) -> set[str]:
+ if isinstance(record, str):
+ record = set(record.split(","))
+ return record - {"Basic", "ND-PP", "PP\xa0Compliant", "None", "Medium"}
+
+
+def sanitize_protection_profiles(record: str) -> list:
+ if not record:
+ return []
+ return record.split(",")
diff --git a/src/sec_certs/utils/tables.py b/src/sec_certs/utils/tables.py
new file mode 100644
index 00000000..29def971
--- /dev/null
+++ b/src/sec_certs/utils/tables.py
@@ -0,0 +1,62 @@
+from __future__ import annotations
+
+import logging
+import re
+from pathlib import Path
+
+from sec_certs.cert_rules import FIPS_LIST_OF_TABLES
+
+logger = logging.getLogger(__name__)
+
+
+def parse_list_of_tables(txt: str) -> set[int]:
+ """
+ Parses list of tables in policy txt, returns page numbers of tables that mention algorithms
+ """
+ rr = re.compile(r"^.+?(?:[Ff]unction|[Aa]lgorithm|[Ss]ecurity [Ff]unctions?).+?(?P<page_num>\d+)$", re.MULTILINE)
+ return {int(m.group("page_num")) for m in rr.finditer(txt)}
+
+
+def get_table_rich_page_numbers_from_footer(file_text: str) -> set[int]:
+ """
+ Parses page numbers of policy txt pages that may contain tables with algorithm data
+ """
+ current_page = 1
+ pages = set()
+
+ for line in file_text.split("\n"):
+ if "\f" in line:
+ current_page += 1
+ if line.startswith("Table ") or line.startswith("Exhibit"):
+ pages.add(current_page)
+ pages.add(current_page + 1)
+ if current_page > 2:
+ pages.add(current_page - 1)
+
+ for page in pages:
+ if page > current_page - 1:
+ return pages - {page}
+
+ return pages
+
+
+def find_pages_with_tables(txt_filepath: Path) -> set[int]:
+ """
+ Identifies pages in txt file that may contain tables. Return their page numbers.
+ """
+ with txt_filepath.open("r", encoding="utf-8") as handle:
+ txt = handle.read()
+
+ # Parse page numbers from list of tables if available
+ # Else look for "Table" in text and \f representing footer, then extract page number from footer
+ if list_of_tables := FIPS_LIST_OF_TABLES.search(txt):
+ result = parse_list_of_tables(list_of_tables.group())
+ else:
+ result = get_table_rich_page_numbers_from_footer(txt)
+
+ return result if result else set()
+
+
+def get_algs_from_table(dataframe_text: str) -> set[str]:
+ reg = r"(?:#?\s?|(?:Cert)\.?[^. ]*?\s?)(?:[CcAa]\s)?(?P<id>[CcAa]? ?\d+)"
+ return {m.group() for m in re.finditer(reg, dataframe_text)}
diff --git a/src/sec_certs/utils/tqdm.py b/src/sec_certs/utils/tqdm.py
new file mode 100644
index 00000000..77eeae94
--- /dev/null
+++ b/src/sec_certs/utils/tqdm.py
@@ -0,0 +1,9 @@
+from tqdm import tqdm as tqdm_original
+
+from sec_certs.config.configuration import config
+
+
+def tqdm(*args, **kwargs):
+ if "disable" in kwargs:
+ return tqdm_original(*args, **kwargs)
+ return tqdm_original(*args, **kwargs, disable=not config.enable_progress_bars)