diff options
| author | adamjanovsky | 2022-12-09 17:10:19 +0100 |
|---|---|---|
| committer | GitHub | 2022-12-09 17:10:19 +0100 |
| commit | 73b3b0c361f9545450fa188bec50606d64bb1afd (patch) | |
| tree | 0a1f9034c309ba88e5f72a31634b014c23a57df5 /src/sec_certs/utils | |
| parent | 19338dc9fd9ab257c36cfa277994abe202e97de2 (diff) | |
| download | sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.gz sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.tar.zst sec-certs-73b3b0c361f9545450fa188bec50606d64bb1afd.zip | |
flat -> src layout (#294)
- Some mypy fixes
- Flat layout -> src layout
- Ditch `setup.py` and `setup.cfg` in favour of `pyproject.toml`
- Non-pinned requirements moved from `requirements/*.in` to `pyproject.toml`
Diffstat (limited to 'src/sec_certs/utils')
| -rw-r--r-- | src/sec_certs/utils/__init__.py | 0 | ||||
| -rw-r--r-- | src/sec_certs/utils/extract.py | 817 | ||||
| -rw-r--r-- | src/sec_certs/utils/helpers.py | 239 | ||||
| -rw-r--r-- | src/sec_certs/utils/pandas.py | 542 | ||||
| -rw-r--r-- | src/sec_certs/utils/parallel_processing.py | 43 | ||||
| -rw-r--r-- | src/sec_certs/utils/pdf.py | 275 | ||||
| -rw-r--r-- | src/sec_certs/utils/sanitization.py | 51 | ||||
| -rw-r--r-- | src/sec_certs/utils/tables.py | 62 | ||||
| -rw-r--r-- | src/sec_certs/utils/tqdm.py | 9 |
9 files changed, 2038 insertions, 0 deletions
diff --git a/src/sec_certs/utils/__init__.py b/src/sec_certs/utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/sec_certs/utils/__init__.py diff --git a/src/sec_certs/utils/extract.py b/src/sec_certs/utils/extract.py new file mode 100644 index 00000000..09460fb7 --- /dev/null +++ b/src/sec_certs/utils/extract.py @@ -0,0 +1,817 @@ +from __future__ import annotations + +import logging +import os +import re +from collections import Counter +from enum import Enum +from pathlib import Path +from typing import Any, Iterator + +import numpy as np + +from sec_certs import constants as constants +from sec_certs.cert_rules import REGEXEC_SEP, cc_rules +from sec_certs.constants import FILE_ERRORS_STRATEGY, LINE_SEPARATOR, MAX_ALLOWED_MATCH_LENGTH + +logger = logging.getLogger(__name__) + + +def search_only_headers_anssi(filepath: Path): # noqa: C901 + # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!! + class HEADER_TYPE(Enum): + HEADER_FULL = 1 + HEADER_MISSING_CERT_ITEM_VERSION = 2 + HEADER_MISSING_PROTECTION_PROFILES = 3 + HEADER_DUPLICITIES = 4 + + rules_certificate_preface = [ + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)()Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom des produits(.+)Référence/version des produits(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur\\(s\\)(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom des produits(.+)Référence/version des produits(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à des profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur\\(s\\)(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à des profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit \\(référence/version\\)(.+)Nom de la TOE \\(référence/version\\)(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification Report(.+)Nom du produit(.+)Référence/version du produit(.*)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profisl de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centres d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Version du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Conformité aux profils de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur\\(s\\)(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Versions du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeur (.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence du produit(.+)Conformité à un profil de protection(.+)Critères d’évaluation et version(.+)Niveau d’évaluation(.+)Développeurs(.+)Centre d’évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Recognition arrangements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Mutual Recognition Agreements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Product name(.+)Product reference(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer\\(s\\)(.+)Evaluation facility(.+)Recognition arrangements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Products names(.+)Products references(.+)protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)Product name \\(reference / version\\)(.+)TOE name \\(reference / version\\)(.+)Protection profile conformity(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developers(.+)Evaluation facility(.+)Recognition arrangements", + ), + ( + HEADER_TYPE.HEADER_FULL, + "Certification report reference(.+)TOE name(.+)Product's reference/ version(.+)TOE's reference/ version(.+)Conformité à un profil de protection(.+)Evaluation criteria and version(.+)Evaluation level(.+)Developer (.+)Evaluation facility(.+)Recognition arrangements", + ), + # corrupted text (duplicities) + ( + HEADER_TYPE.HEADER_DUPLICITIES, + "Référencce du rapport de d certification n(.+)Nom du p produit(.+)Référencce/version du produit(.+)Conformiité à un profil de d protection(.+)Critères d d’évaluation ett version(.+)Niveau d’’évaluation(.+)Développ peurs(.+)Centre d’’évaluation(.+)Accords d de reconnaisssance applicab bles", + ), + # rules without product version + ( + HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION, + "Référence du rapport de certification(.+)Nom et version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION, + "Référence du rapport de certification(.+)Nom et version du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeur (.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ( + HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION, + "Référence du rapport de certification(.+)Nom du produit(.+)Conformité à un profil de protection(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + # rules without protection profile + ( + HEADER_TYPE.HEADER_MISSING_PROTECTION_PROFILES, + "Référence du rapport de certification(.+)Nom du produit(.+)Référence/version du produit(.+)Critères d'évaluation et version(.+)Niveau d'évaluation(.+)Développeurs(.+)Centre d'évaluation(.+)Accords de reconnaissance applicables", + ), + ] + + # statistics about rules success rate + num_rules_hits = {} + for rule in rules_certificate_preface: + num_rules_hits[rule[1]] = 0 + + items_found = {} # type: ignore # noqa + + try: + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath) + + # for ANSII and DCSSI certificates, front page starts only on third page after 2 newpage signs + pos = whole_text.find("") + if pos != -1: + pos = whole_text.find("", pos) + if pos != -1: + whole_text = whole_text[pos:] + + no_match_yet = True + other_rule_already_match = False + rule_index = -1 + for rule in rules_certificate_preface: + rule_index += 1 + rule_and_sep = rule[1] + REGEXEC_SEP + + for m in re.finditer(rule_and_sep, whole_text): + if no_match_yet: + items_found[constants.TAG_HEADER_MATCH_RULES] = [] + no_match_yet = False + + # insert rule if at least one match for it was found + if rule not in items_found[constants.TAG_HEADER_MATCH_RULES]: + items_found[constants.TAG_HEADER_MATCH_RULES].append(rule[1]) + + if not other_rule_already_match: + other_rule_already_match = True + else: + logger.warning(f"WARNING: multiple rules are matching same certification document: {filepath}") + + num_rules_hits[rule[1]] += 1 # add hit to this rule + match_groups = m.groups() + index_next_item = 0 + items_found[constants.TAG_CERT_ID] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + items_found[constants.TAG_CERT_ITEM] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + if rule[0] == HEADER_TYPE.HEADER_MISSING_CERT_ITEM_VERSION: + items_found[constants.TAG_CERT_ITEM_VERSION] = "" + else: + items_found[constants.TAG_CERT_ITEM_VERSION] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + if rule[0] == HEADER_TYPE.HEADER_MISSING_PROTECTION_PROFILES: + items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = "" + else: + items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = normalize_match_string( + match_groups[index_next_item] + ) + index_next_item += 1 + + items_found[constants.TAG_CC_VERSION] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + items_found[constants.TAG_CC_SECURITY_LEVEL] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + items_found[constants.TAG_DEVELOPER] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + + items_found[constants.TAG_CERT_LAB] = normalize_match_string(match_groups[index_next_item]) + index_next_item += 1 + except Exception as e: + relative_filepath = "/".join(str(filepath).split("/")[-4:]) + error_msg = f"Failed to parse ANSSI frontpage headers from {relative_filepath}; {e}" + logger.error(error_msg) + return error_msg, None + + # if True: + # print('# hits for rule') + # sorted_rules = sorted(num_rules_hits.items(), + # key=operator.itemgetter(1), reverse=True) + # used_rules = [] + # for rule in sorted_rules: + # print('{:4d} : {}'.format(rule[1], rule[0])) + # if rule[1] > 0: + # used_rules.append(rule[0]) + + return constants.RETURNCODE_OK, items_found + + +def search_only_headers_bsi(filepath: Path): # noqa: C901 + # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!! + LINE_SEPARATOR_STRICT = " " + NUM_LINES_TO_INVESTIGATE = 15 + rules_certificate_preface = [ + "(BSI-DSZ-CC-.+?) (?:for|For) (.+?) from (.*)", + "(BSI-DSZ-CC-.+?) zu (.+?) der (.*)", + ] + + items_found = {} # type: ignore # noqa + no_match_yet = True + + try: + # Process front page with info: cert_id, certified_item and developer + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file( + filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT + ) + + for rule in rules_certificate_preface: + rule_and_sep = rule + REGEXEC_SEP + + for m in re.finditer(rule_and_sep, whole_text): + if no_match_yet: + items_found[constants.TAG_HEADER_MATCH_RULES] = [] + no_match_yet = False + + # insert rule if at least one match for it was found + if rule not in items_found[constants.TAG_HEADER_MATCH_RULES]: + items_found[constants.TAG_HEADER_MATCH_RULES].append(rule) + + match_groups = m.groups() + cert_id = match_groups[0] + certified_item = match_groups[1] + developer = match_groups[2] + + FROM_KEYWORD_LIST = [" from ", " der "] + for from_keyword in FROM_KEYWORD_LIST: + from_keyword_len = len(from_keyword) + if certified_item.find(from_keyword) != -1: + logger.warning( + f"string {from_keyword} detected in certified item - shall not be here, fixing..." + ) + certified_item_first = certified_item[: certified_item.find(from_keyword)] + developer = certified_item[certified_item.find(from_keyword) + from_keyword_len :] + certified_item = certified_item_first + continue + + end_pos = developer.find("\f-") + if end_pos == -1: + end_pos = developer.find("\fBSI") + if end_pos == -1: + end_pos = developer.find("Bundesamt") + if end_pos != -1: + developer = developer[:end_pos] + + items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id) + items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item) + items_found[constants.TAG_DEVELOPER] = normalize_match_string(developer) + items_found[constants.TAG_CERT_LAB] = "BSI" + + # Process page with more detailed sample info + # PP Conformance, Functionality, Assurance + rules_certificate_third = ["PP Conformance: (.+)Functionality: (.+)Assurance: (.+)The IT Product identified"] + + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath) + + for rule in rules_certificate_third: + rule_and_sep = rule + REGEXEC_SEP + + for m in re.finditer(rule_and_sep, whole_text): + # check if previous rules had at least one match + if constants.TAG_CERT_ID not in items_found.keys(): + logger.error(f"ERROR: front page not found for file: {filepath}") + + match_groups = m.groups() + ref_protection_profiles = match_groups[0] + cc_version = match_groups[1] + cc_security_level = match_groups[2] + + items_found[constants.TAG_REFERENCED_PROTECTION_PROFILES] = normalize_match_string( + ref_protection_profiles + ) + items_found[constants.TAG_CC_VERSION] = normalize_match_string(cc_version) + items_found[constants.TAG_CC_SECURITY_LEVEL] = normalize_match_string(cc_security_level) + + # print('\n*** Certificates without detected preface:') + # for file_name in files_without_match: + # print('No hits for {}'.format(file_name)) + # print('Total no hits files: {}'.format(len(files_without_match))) + # print('\n**********************************') + except Exception as e: + relative_filepath = "/".join(str(filepath).split("/")[-4:]) + error_msg = f"Failed to parse BSI headers from frontpage: {relative_filepath}; {e}" + logger.error(error_msg) + return error_msg, None + + return constants.RETURNCODE_OK, items_found + + +def search_only_headers_nscib(filepath: Path): # noqa: C901 + # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!! + LINE_SEPARATOR_STRICT = " " + NUM_LINES_TO_INVESTIGATE = 60 + items_found: dict[str, str] = {} + + try: + # Process front page with info: cert_id, certified_item and developer + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file( + filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT + ) + + certified_item = "" + developer = "" + cert_lab = "" + cert_id = "" + + lines = whole_text_with_newlines.splitlines() + no_match_yet = True + item_offset = -1 + + for line_index in range(0, len(lines)): + line = lines[line_index] + + if "Certification Report" in line: + item_offset = line_index + 1 + if "Assurance Continuity Maintenance Report" in line: + item_offset = line_index + 1 + + SPONSORDEVELOPER_STR = "Sponsor and developer:" + + if SPONSORDEVELOPER_STR in line: + if no_match_yet: + items_found = {} + no_match_yet = False + + # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report' + certified_item = "" + for name_index in range(item_offset, line_index): + certified_item += lines[name_index] + " " + developer = line[line.find(SPONSORDEVELOPER_STR) + len(SPONSORDEVELOPER_STR) :] + + SPONSOR_STR = "Sponsor:" + + if SPONSOR_STR in line: + if no_match_yet: + items_found = {} + no_match_yet = False + + # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report' + certified_item = "" + for name_index in range(item_offset, line_index): + certified_item += lines[name_index] + " " + + DEVELOPER_STR = "Developer:" + if DEVELOPER_STR in line: + developer = line[line.find(DEVELOPER_STR) + len(DEVELOPER_STR) :] + + CERTLAB_STR = "Evaluation facility:" + if CERTLAB_STR in line: + cert_lab = line[line.find(CERTLAB_STR) + len(CERTLAB_STR) :] + + REPORTNUM_STR = "Report number:" + if REPORTNUM_STR in line: + cert_id = line[line.find(REPORTNUM_STR) + len(REPORTNUM_STR) :] + + if not no_match_yet: + items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id) + items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item) + items_found[constants.TAG_DEVELOPER] = normalize_match_string(developer) + items_found[constants.TAG_CERT_LAB] = cert_lab + + except Exception as e: + error_msg = f"Failed to parse NSCIB headers from frontpage: {filepath}; {e}" + logger.error(error_msg) + return error_msg, None + + return constants.RETURNCODE_OK, items_found + + +def search_only_headers_niap(filepath: Path): + # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!! + LINE_SEPARATOR_STRICT = " " + NUM_LINES_TO_INVESTIGATE = 15 + items_found: dict[str, str] = {} + + try: + # Process front page with info: cert_id, certified_item and developer + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file( + filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT + ) + + certified_item = "" + cert_id = "" + + lines = whole_text_with_newlines.splitlines() + no_match_yet = True + item_offset = -1 + + for line_index in range(0, len(lines)): + line = lines[line_index] + + if "Validation Report" in line: + item_offset = line_index + 1 + + REPORTNUM_STR = "Report Number:" + if REPORTNUM_STR in line: + if no_match_yet: + items_found = {} + no_match_yet = False + + # all lines above till 'Certification Report' or 'Assurance Continuity Maintenance Report' + certified_item = "" + for name_index in range(item_offset, line_index): + certified_item += lines[name_index] + " " + cert_id = line[line.find(REPORTNUM_STR) + len(REPORTNUM_STR) :] + break + + if not no_match_yet: + items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id) + items_found[constants.TAG_CERT_ITEM] = normalize_match_string(certified_item) + items_found[constants.TAG_CERT_LAB] = "US NIAP" + + except Exception as e: + error_msg = f"Failed to parse NIAP headers from frontpage: {filepath}; {e}" + logger.error(error_msg) + return error_msg, None + + return constants.RETURNCODE_OK, items_found + + +def search_only_headers_canada(filepath: Path): # noqa: C901 + # TODO: Please, refactor me. I reallyyyyyyyyyyyyy need it!!!!!! + LINE_SEPARATOR_STRICT = " " + NUM_LINES_TO_INVESTIGATE = 20 + items_found: dict[str, str] = {} + try: + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file( + filepath, NUM_LINES_TO_INVESTIGATE, LINE_SEPARATOR_STRICT + ) + + cert_id = "" + + lines = whole_text_with_newlines.splitlines() + no_match_yet = True + for line_index in range(0, len(lines)): + line = lines[line_index] + if "Government of Canada, Communications Security Establishment" in line: + REPORTNUM_STR1 = "Evaluation number:" + REPORTNUM_STR2 = "Document number:" + matched_number_str = "" + line_certid = lines[line_index + 1] + if line_certid.startswith(REPORTNUM_STR1): + matched_number_str = REPORTNUM_STR1 + if line_certid.startswith(REPORTNUM_STR2): + matched_number_str = REPORTNUM_STR2 + if matched_number_str != "": + if no_match_yet: + items_found = {} + no_match_yet = False + + cert_id = line_certid[line_certid.find(matched_number_str) + len(matched_number_str) :] + break + + if ( + "Government of Canada. This document is the property of the Government of Canada. It shall not be altered," + in line + ): + REPORTNUM_STR = "Evaluation number:" + for offset in range(1, 20): + line_certid = lines[line_index + offset] + if "UNCLASSIFIED" in line_certid: + if no_match_yet: + items_found = {} + no_match_yet = False + line_certid = lines[line_index + offset - 4] + cert_id = line_certid[line_certid.find(REPORTNUM_STR) + len(REPORTNUM_STR) :] + break + if not no_match_yet: + break + + if ( + "UNCLASSIFIED / NON CLASSIFIÉ" in line + and "COMMON CRITERIA CERTIFICATION REPORT" in lines[line_index + 2] + ): + line_certid = lines[line_index + 1] + if no_match_yet: + items_found = {} + no_match_yet = False + cert_id = line_certid + break + + if not no_match_yet and cert_id: + items_found[constants.TAG_CERT_ID] = normalize_match_string(cert_id) + items_found[constants.TAG_CERT_LAB] = "CANADA" + + except Exception as e: + error_msg = f"Failed to parse Canada headers from frontpage: {filepath}; {e}" + logger.error(error_msg) + return error_msg, None + + return constants.RETURNCODE_OK, items_found + + +def search_files(folder: str | Path) -> Iterator[str]: + for root, _, files in os.walk(str(folder)): + yield from [os.path.join(root, x) for x in files] + + +def flatten_matches(dct: dict) -> dict: + """ + Function to flatten dictionary of matches. + + Turns + ``` + {"a": {"cc": 3}, "b": {}, "d": {"dd": 4, "cc": 2}} + ``` + into + ``` + {"cc": 5, "dd": 4} + ``` + + :param dct: Dictionary to flatten + :return: Flattened dictionary + """ + result: Counter[Any] = Counter() + for key, value in dct.items(): + if isinstance(value, dict): + result.update(flatten_matches(value)) + else: + result[key] = value + return dict(result) + + +def prune_matches(dct: dict) -> dict: + """ + Prune a dictionary of matches. + + Turns + ``` + {"a": {"cc": 3}, "b": {"aa": {}, "bb": {}}, "d": {"dd": 4, "cc": 2}} + ``` + into + ``` + {"a": {"cc": 3}, "b": {}, "d": {"dd": 4, "cc": 2}} + ``` + + :param dct: The dictionary of matches. + :return: The pruned dictionary. + """ + + def walk(obj, depth): + if isinstance(obj, dict): + if not obj: + return None + res = {} + for k, v in obj.items(): + r = walk(v, depth + 1) + if r is not None: + res[k] = r + return res if res or depth == 1 else None + else: + return obj + + return walk(dct, 0) + + +def extract_keywords(filepath: Path, search_rules) -> dict[str, dict[str, int]] | None: + """ + Extract keywords from filepath using the search rules. + + :param filepath: + :param search_rules: + :return: + """ + + try: + whole_text, whole_text_with_newlines, was_unicode_decode_error = load_text_file(filepath, -1, LINE_SEPARATOR) + + def extract(rules): + if isinstance(rules, dict): + return {k: extract(v) for k, v in rules.items()} + elif isinstance(rules, list): + matches = [extract(rule) for rule in rules] + c = Counter() + for match_list in matches: + c += Counter(match_list) + return dict(c) + elif isinstance(rules, re.Pattern): + rule = rules + matches = [] + for match in rule.finditer(whole_text): + match = match.group("match") + match = normalize_match_string(match) + + match_len = len(match) + if match_len > MAX_ALLOWED_MATCH_LENGTH: + logger.warning(f"Excessive match with length of {match_len} detected for rule {rule.pattern}") + matches.append(match) + return matches + + result = extract(search_rules) + return prune_matches(result) + except Exception as e: + relative_filepath = "/".join(str(filepath).split("/")[-4:]) + error_msg = f"Failed to parse keywords from: {relative_filepath}; {e}" + logger.error(error_msg) + return None + + +def normalize_match_string(match: str) -> str: + match = match.strip().strip("[];.”\"':)(,").rstrip(os.sep).replace(" ", " ") + return "".join(filter(str.isprintable, match)) + + +def load_text_file( + file_name: str | Path, limit_max_lines: int = -1, line_separator: str = LINE_SEPARATOR +) -> tuple[str, str, bool]: + """ + Load the text contents of a file at `file_name`, upto `limit_max_lines` of lines, replace + newlines in the text with `line_separator`. + + :param file_name: The file_name to load. + :param limit_max_lines: The limit on number of lines to return. + :param line_separator: The string to replace newlines with. + :return: A tuple of three elements (the text with replaced newlines, the text and a boolean whether a unicode + decoding error happened). + """ + lines = [] + was_unicode_decode_error = False + with Path(file_name).open("r", errors=FILE_ERRORS_STRATEGY) as f: + try: + lines = f.readlines() + except UnicodeDecodeError: + was_unicode_decode_error = True + logger.warning("UnicodeDecodeError, opening as utf8") + + if was_unicode_decode_error: + with open(file_name, encoding="utf8", errors=FILE_ERRORS_STRATEGY) as f2: + # coding failure, try line by line + line = " " + while line: + try: + line = f2.readline() + lines.append(line) + except UnicodeDecodeError: + # ignore error + continue + + whole_text = "" + whole_text_with_newlines = "" + lines_included = 0 + for line in lines: + if limit_max_lines != -1 and lines_included >= limit_max_lines: + break + + whole_text_with_newlines += line + line = line.replace("\n", "") + whole_text += line + whole_text += line_separator + lines_included += 1 + + return whole_text, whole_text_with_newlines, was_unicode_decode_error + + +def load_cert_html_file(file_name: str) -> str: + with open(file_name, errors=FILE_ERRORS_STRATEGY) as f: + try: + return f.read() + except UnicodeDecodeError: + logger.warning("UnicodeDecodeError, opening as utf8") + + with open(file_name, encoding="utf8", errors=FILE_ERRORS_STRATEGY) as f2: + try: + return f2.read() + except UnicodeDecodeError: + logger.error(f"Failed to read file {file_name}") + return "" + + +def rules_get_subset(desired_path: str) -> dict: + """ + Recursively applies cc_certs.get(key) on tokens from desired_path, + returns the keys of the inner-most layer. + """ + dct = cc_rules + for token in desired_path.split("."): + dct = dct[token] + return dct + + +def extract_key_paths(dct: dict, current_path: str) -> list[str]: + """ + Given subset of cc_rules dictionary, will compute full paths to all leafs + in the dictionaries, s.t. the final value of each path is a list of regex + matches in the keywords dictionary. + """ + paths = [] + for key in dct: + if isinstance(dct[key], dict): + paths.extend(extract_key_paths(dct[key], current_path + "." + key)) + elif isinstance(dct[key], list): + paths.append(current_path + "." + key) + return paths + + +def get_sum_of_values_from_dict_path(dct: dict | None, path: str, default: float = np.nan) -> float: + """ + Given dictionary and path, will compute sum of occurences of values in the inner-most layer + of that path. If the key is missing from dict, return default value. + """ + if not dct: + return np.nan + + res = dct + + try: + for token in path.split("."): + res = res[token] + except KeyError: + return default + + return sum(res.values()) + + +def get_sums_for_rules_subset(dct: dict | None, path: str) -> dict[str, float]: + """ + Given path to search in cc_rules (e.g., "symmetric_crypto"), + will get the finest resolution and count occurences of the keys in the + examined dictionary. + """ + cc_rules_subset_to_search = rules_get_subset(path) + paths_to_search = extract_key_paths(cc_rules_subset_to_search, path) + return {x: get_sum_of_values_from_dict_path(dct, x, np.nan) for x in paths_to_search} diff --git a/src/sec_certs/utils/helpers.py b/src/sec_certs/utils/helpers.py new file mode 100644 index 00000000..302f4e6a --- /dev/null +++ b/src/sec_certs/utils/helpers.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import hashlib +import logging +import re +import time +from contextlib import nullcontext +from datetime import datetime +from functools import partial +from pathlib import Path +from typing import Any, Collection + +import numpy as np +import pkgconfig +import requests + +import sec_certs.constants as constants +from sec_certs.config.configuration import config +from sec_certs.utils import parallel_processing +from sec_certs.utils.tqdm import tqdm + +logger = logging.getLogger(__name__) + + +def download_file( + url: str, output: Path, delay: float = 0, show_progress_bar: bool = False, progress_bar_desc: str | None = None +) -> str | int: + try: + time.sleep(delay) + # See https://github.com/psf/requests/issues/3953 for header justification + r = requests.get( + url, allow_redirects=True, timeout=constants.REQUEST_TIMEOUT, stream=True, headers={"Accept-Encoding": None} # type: ignore + ) + ctx: Any + if show_progress_bar: + ctx = partial( + tqdm, + total=int(r.headers.get("content-length", 0)), + unit="B", + unit_scale=True, + unit_divisor=1024, + desc=progress_bar_desc, + ) + else: + ctx = nullcontext + + if r.status_code == requests.codes.ok: + with ctx() as pbar: + with output.open("wb") as f: + for data in r.iter_content(1024): + f.write(data) + if show_progress_bar: + pbar.update(len(data)) + + return r.status_code + except requests.exceptions.Timeout: + return requests.codes.timeout + except Exception as e: + logger.error(f"Failed to download from {url}; {e}") + return constants.RETURNCODE_NOK + return constants.RETURNCODE_NOK + + +def download_parallel( + urls: Collection[str], paths: Collection[Path], progress_bar_desc: str | None = None +) -> list[int]: + exit_codes = parallel_processing.process_parallel( + download_file, list(zip(urls, paths)), config.n_threads, unpack=True, progress_bar_desc=progress_bar_desc + ) + n_successful = len([e for e in exit_codes if e == requests.codes.ok]) + logger.info(f"Successfully downloaded {n_successful} files, {len(exit_codes) - n_successful} failed.") + + for url, e in zip(urls, exit_codes): + if e != requests.codes.ok: + logger.error(f"Failed to download {url}, exit code: {e}") + + return exit_codes + + +def fips_dgst(cert_id: int | str) -> str: + return get_first_16_bytes_sha256(str(cert_id)) + + +def get_first_16_bytes_sha256(string: str) -> str: + return hashlib.sha256(string.encode("utf-8")).hexdigest()[:16] + + +def get_sha256_filepath(filepath: str | Path) -> str: + hash_sha256 = hashlib.sha256() + with Path(filepath).open("rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_sha256.update(chunk) + return hash_sha256.hexdigest() + + +def to_utc(timestamp: datetime) -> datetime: + offset = timestamp.utcoffset() + if offset is None: + return timestamp + timestamp -= offset + timestamp = timestamp.replace(tzinfo=None) + return timestamp + + +def is_in_dict(target_dict: dict, path: str) -> bool: + current_level = target_dict + for item in path: + if item not in current_level: + return False + else: + current_level = current_level[item] + return True + + +def compute_heuristics_version(cert_name: str) -> set[str]: + """ + Will extract possible versions from the name of sample + """ + at_least_something = r"(\b(\d)+\b)" + just_numbers = r"(\d{1,5})(\.\d{1,5})" + + without_version = r"(" + just_numbers + r"+)" + long_version = r"(" + r"(\bversion)\s*" + just_numbers + r"+)" + short_version = r"(" + r"\bv\s*" + just_numbers + r"+)" + full_regex_string = r"|".join([without_version, short_version, long_version]) + normalizer = r"(\d+\.*)+" + + matched_strings = [max(x, key=len) for x in re.findall(full_regex_string, cert_name, re.IGNORECASE)] + if not matched_strings: + matched_strings = [max(x, key=len) for x in re.findall(at_least_something, cert_name, re.IGNORECASE)] + # Only keep the first occurrence but keep order. + matches = [] + for match in matched_strings: + if match not in matches: + matches.append(match) + # identified_versions = list(set([max(x, key=len) for x in re.findall(VERSION_PATTERN, cert_name, re.IGNORECASE | re.VERBOSE)])) + # return identified_versions if identified_versions else ['-'] + + if not matches: + return {constants.CPE_VERSION_NA} + + matched = [re.search(normalizer, x) for x in matches] + return {x.group() for x in matched if x is not None} + + +def tokenize_dataset(dset: list[str], keywords: set[str]) -> np.ndarray: + return np.array([tokenize(x, keywords) for x in dset]) + + +def tokenize(string: str, keywords: set[str]) -> str: + return " ".join([x for x in string.split() if x.lower() in keywords]) + + +def normalize_fips_vendor(string: str) -> str: + """ + "Normalizes" FIPS vendor. Precisely: + - Removes some punctuation and non-alphanumerical symbols + - Returns only first 5 tokens + # TODO: The rationale of the steps outlined above should be investigatated + """ + return " ".join( + string.replace("(R)", "").replace(",", "").replace("®", "").replace("-", " ").replace("+", " ").split()[:4] + ) + + +# Credit: https://stackoverflow.com/questions/18092354/ +def split_unescape(s: str, delim: str, escape: str = "\\", unescape: bool = True) -> list[str]: + """ + >>> split_unescape('foo,bar', ',') + ['foo', 'bar'] + >>> split_unescape('foo$,bar', ',', '$') + ['foo,bar'] + >>> split_unescape('foo$$,bar', ',', '$', unescape=True) + ['foo$', 'bar'] + >>> split_unescape('foo$$,bar', ',', '$', unescape=False) + ['foo$$', 'bar'] + >>> split_unescape('foo$', ',', '$', unescape=True) + ['foo$'] + """ + ret = [] + current = [] + itr = iter(s) + for ch in itr: + if ch == escape: + try: + # skip the next character; it has been escaped! + if not unescape: + current.append(escape) + current.append(next(itr)) + except StopIteration: + if unescape: + current.append(escape) + elif ch == delim: + # split! (add current to the list and reset it) + ret.append("".join(current)) + current = [] + else: + current.append(ch) + ret.append("".join(current)) + return ret + + +def warn_if_missing_poppler() -> None: + """ + Warns user if he misses a poppler dependency + """ + try: + if not pkgconfig.installed("poppler-cpp", ">=0.30"): + logger.warning( + "Attempting to run pipeline with pdf->txt conversion, but poppler-cpp dependency was not found." + ) + except OSError: + logger.warning("Attempting to find poppler-cpp, but pkg-config was not found.") + + +def warn_if_missing_tesseract() -> None: + """ + Warns user if he misses a tesseract dependency + """ + try: + if not pkgconfig.installed("tesseract", ">=5.0.0"): + logger.warning( + "Attempting to run pipeline with pdf->txt conversion, that requires tesseract, but tesseract was not found." + ) + except OSError: + logger.warning("Attempting to find tesseract, but pkg-config was not found.") + + +def choose_lowest_eal(eals: set[str] | None) -> str | None: + """ + Given a set of EAL strings, chooses the lowest one. + """ + if not eals: + return None + + matches = [(re.search(r"\d+", x)) for x in eals] + min_number = min([int(x.group()) for x in matches if x]) + candidates = [x for x in eals if str(min_number) in x] + return "EAL" + str(min_number) if len(candidates) == 2 else candidates[0] diff --git a/src/sec_certs/utils/pandas.py b/src/sec_certs/utils/pandas.py new file mode 100644 index 00000000..97068e77 --- /dev/null +++ b/src/sec_certs/utils/pandas.py @@ -0,0 +1,542 @@ +from __future__ import annotations + +import copy +import functools +import logging +import tempfile +import xml.etree.ElementTree as ET +import zipfile +from dataclasses import dataclass +from pathlib import Path +from shutil import copyfile +from typing import Any, Final + +import numpy as np +import pandas as pd +from matplotlib import pyplot as plt +from scipy import stats +from tqdm.notebook import tqdm + +from sec_certs.dataset.cve import CVEDataset +from sec_certs.sample.sar import SAR +from sec_certs.utils import helpers + +logger = logging.getLogger(__name__) + + +@dataclass(eq=True, frozen=True) +class SecondarySFPCluster: + name: str + children: frozenset[int] + + @classmethod + def from_xml_id(cls, xml_categories: list[ET.Element], cwe_id: int): + cat = cls.find_correct_category(xml_categories, cwe_id) + name = cat.attrib["Name"] + members = cat.find("{http://cwe.mitre.org/cwe-6}Relationships") + + assert members is not None + member_ids = frozenset( + int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member" + ) + return cls(name, member_ids) + + @staticmethod + def find_correct_category(xml_categories: list[ET.Element], cwe_id: int) -> ET.Element: + for cat in xml_categories: + if cat.attrib["ID"] == str(cwe_id): + return cat + raise ValueError(f"Category with ID {cwe_id} found.") + + +@dataclass(eq=True, frozen=True) +class PrimarySFPCluster: + name: str + secondary_clusters: frozenset[SecondarySFPCluster] + cwe_ids: frozenset[int] + + @classmethod + def from_xml(cls, xml_categories: list[ET.Element], primary_cluster_element: ET.Element): + name = primary_cluster_element.attrib["Name"].split("SFP Primary Cluster: ")[1] + members = primary_cluster_element.find("{http://cwe.mitre.org/cwe-6}Relationships") + + assert members is not None + member_ids = {int(x.attrib["CWE_ID"]) for x in members if x.tag == "{http://cwe.mitre.org/cwe-6}Has_Member"} + + secondary_clusters = [] + cwe_ids = [] + for member_id in member_ids: + try: + secondary_clusters.append(SecondarySFPCluster.from_xml_id(xml_categories, member_id)) + except ValueError: + cwe_ids.append(member_id) + + return cls(name, frozenset(secondary_clusters), frozenset(cwe_ids)) + + +class SFPModel: + URL: Final[str] = "https://cwe.mitre.org/data/xml/views/888.xml.zip" + XML_FILENAME: Final[str] = "888.xml" + XML_ZIP_NAME: Final[str] = "888.xml.zip" + + def __init__(self, primary_clusters: frozenset[PrimarySFPCluster]): + self.primary_clusters = primary_clusters + + @classmethod + def from_xml(cls, xml_filepath: str | Path): + tree = ET.parse(xml_filepath) + category_tag = tree.getroot().find("{http://cwe.mitre.org/cwe-6}Categories") + + assert category_tag is not None + categories = category_tag.findall("{http://cwe.mitre.org/cwe-6}Category") + + # The XML contains two weird primary clusters not specified in https://samate.nist.gov/BF/Enlightenment/SFP.html. + # After manual inspection, we skip those + primary_clusters = frozenset( + PrimarySFPCluster.from_xml(categories, x) + for x in categories + if ( + "SFP Primary Cluster" in x.attrib["Name"] + and x.attrib["Name"] != "SFP Primary Cluster: Failure to Release Memory" + and x.attrib["Name"] != "SFP Primary Cluster: Faulty Resource Release" + ) + ) + + return cls(primary_clusters) + + @classmethod + def from_web(cls): + with tempfile.TemporaryDirectory() as tmp_dir: + xml_zip_path = Path(tmp_dir) / cls.XML_ZIP_NAME + helpers.download_file(cls.URL, xml_zip_path) + + with zipfile.ZipFile(xml_zip_path, "r") as zip_handle: + zip_handle.extractall(tmp_dir) + + return cls.from_xml(Path(tmp_dir) / cls.XML_FILENAME) + + def search_cwe(self, cwe_id: int) -> tuple[str | None, str | None]: + for primary in self.primary_clusters: + for secondary in primary.secondary_clusters: + if cwe_id in secondary.children: + return primary.name, secondary.name + if cwe_id in primary.cwe_ids: + return primary.name, None + return None, None + + +def discover_sar_families(ser: pd.Series) -> list[str]: + """ + Returns a list of all SAR families that occur in the pandas Series, where each entry is a set of SAR objects. + """ + sars = ser.tolist() + families = set() + for cert in sars: + families |= {x.family for x in cert} if not pd.isnull(cert) else set() + return list(families) + + +def get_sar_level_from_set(sars: set[SAR], sar_family: str) -> int | None: + """ + Given a set of SARs and a family name, will return level of the seeked SAR from the set. + """ + family_sars_dict = {x.family: x for x in sars} if (sars and not pd.isnull(sars)) else dict() + if sar_family not in family_sars_dict.keys(): + return None + return family_sars_dict[sar_family].level + + +def compute_cve_correlations( + df: pd.DataFrame, + exclude_vuln_free_certs: bool = False, + sar_families: list[str] | None = None, + output_path: str | Path | None = None, + filter_nans: bool = True, +) -> pd.DataFrame: + """ + Computes correlations of EAL and different SARs and two columns: (n_cves, worst_cve_score, avg_cve_score). Few assumptions about the passed dataframe: + - EAL column must be categorical data type + - SAR column must be a set of SARs + - `n_cves` and `worst_cve_score`, `avg_cve_score` columns must be present in the dataframe + Possibly, it can filter columns will both values NaN (due to division by zero or super low supports.) + To choose correct minimal support is tricky, this is because SAR levels often having huge support, but being imbalanced themselves heavily in the favor + of a single value that is rarely modified. We recommend choosing 100 and discarding any row where some column would result into NaN + """ + df_sar = df.loc[:, ["eal", "extracted_sars", "worst_cve_score", "avg_cve_score", "n_cves", "category"]] + df_sar = df_sar.loc[df_sar.category != "ICs, Smart Cards and Smart Card-Related Devices and Systems"] + + if exclude_vuln_free_certs: + df_sar = df_sar.loc[df_sar.n_cves > 0] + + families = sar_families if sar_families else discover_sar_families(df_sar.extracted_sars) + + spearmanr = functools.partial(stats.spearmanr, nan_policy="omit", alternative="less") + + df_sar.eal = df_sar.eal.cat.codes + df_sar.eal = df_sar.eal.map(lambda x: np.NaN if x == -1 else x) + + n_cves_eal_corr, n_cves_eal_pvalue = spearmanr(df_sar.eal, df_sar.n_cves) + n_cves_corrs = [n_cves_eal_corr] + n_cves_pvalues = [n_cves_eal_pvalue] + + worst_cve_eal_corr, worst_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.worst_cve_score) + worst_cve_corrs = [worst_cve_eal_corr] + worst_cve_pvalues = [worst_cve_eal_pvalue] + + avg_cve_eal_corr, avg_cve_eal_pvalue = spearmanr(df_sar.eal, df_sar.avg_cve_score) + avg_cve_corrs = [avg_cve_eal_corr] + avg_cve_pvalues = [avg_cve_eal_pvalue] + + supports = [df_sar.loc[~df_sar["eal"].isnull()].shape[0]] + + for family in tqdm(families): + df_sar[family] = df_sar.extracted_sars.map(lambda x: get_sar_level_from_set(x, family)) + + n_cves_corr, n_cves_pvalue = spearmanr(df_sar[family], df_sar.n_cves) + n_cves_corrs.append(n_cves_corr) + n_cves_pvalues.append(n_cves_pvalue) + + worst_cve_corr, worst_cve_pvalue = spearmanr(df_sar[family], df_sar.worst_cve_score) + worst_cve_corrs.append(worst_cve_corr) + worst_cve_pvalues.append(worst_cve_pvalue) + + avg_cve_corr, avg_cve_pvalue = spearmanr(df_sar[family], df_sar.avg_cve_score) + avg_cve_corrs.append(avg_cve_corr) + avg_cve_pvalues.append(avg_cve_pvalue) + + supports.append(df_sar.loc[~df_sar[family].isnull()].shape[0]) + + df_sar = df_sar.copy() + + tuples = list( + zip(n_cves_corrs, n_cves_pvalues, worst_cve_corrs, worst_cve_pvalues, avg_cve_corrs, avg_cve_pvalues, supports) + ) + dct = {family: correlations for family, correlations in zip(["eal"] + families, tuples)} + df_corr = pd.DataFrame.from_dict( + dct, + orient="index", + columns=[ + "n_cves_corr", + "n_cves_pvalue", + "worst_cve_score_corr", + "worst_cve_pvalue", + "avg_cve_score_corr", + "avg_cve_pvalue", + "support", + ], + ) + df_corr.style.set_caption("Correlations between EAL, SARs and CVEs") + df_corr = df_corr.sort_values(by="support", ascending=False) + + if filter_nans: + df_corr = df_corr.dropna(how="any", subset=["n_cves_corr", "worst_cve_score_corr", "avg_cve_score_corr"]) + + if output_path: + df_corr.to_csv(output_path) + + return df_corr + + +def find_earliest_maintenance_after_cve(row): + "Given dataframe row, will return first maintenance date succeeding first published CVE related to a certificate if exists, else np.nan" + maintenances_after_cve = [x for x in row["maintenance_dates"] if x > row["earliest_cve"]] + return min(maintenances_after_cve) if maintenances_after_cve else np.nan + + +def filter_to_cves_within_validity_period(cc_df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame: + """ + Filters the column `related_cves` in `cc_df` DataFrame to CVEs that were published within validity period of the + studied certificate. + """ + + def filter_cves( + cve_dset: CVEDataset, cves: set[str], not_valid_before: pd.Timestamp, not_valid_after: pd.Timestamp + ) -> set[str] | float: + + # Mypy is complaining, but the Optional date is resolved at the beginning of the and condition + result: set[str] = { + x + for x in cves + if cve_dset[x].published_date + and not_valid_before < pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore + and not_valid_after > pd.Timestamp(cve_dset[x].published_date.date()) # type: ignore + } + + return result if result else np.nan + + if ( + cc_df.loc[ + (cc_df.related_cves.notnull()) & ((cc_df.not_valid_before.isna()) | (cc_df.not_valid_after.isna())) + ].shape[0] + > 0 + ): + raise ValueError( + "Cannot filter CVEs on certificates that have NaNs in not_valid_after or not_valid_before fields." + ) + + cc_df["related_cves"] = cc_df.apply( + lambda row: filter_cves(cve_dset, row["related_cves"], row["not_valid_before"], row["not_valid_after"]) + if not pd.isna(row["related_cves"]) + else row["related_cves"], + axis=1, + ) + + return cc_df + + +def expand_df_with_cve_cols(df: pd.DataFrame, cve_dset: CVEDataset) -> pd.DataFrame: + df = df.copy() + + df["n_cves"] = df.related_cves.map(lambda x: len(x) if x is not np.nan else 0) + df["cve_published_dates"] = df.related_cves.map( + lambda x: [cve_dset[y].published_date.date() for y in x] if x is not np.nan else np.nan # type: ignore + ) + + df["earliest_cve"] = df.cve_published_dates.map(lambda x: min(x) if isinstance(x, list) else np.nan) + df["worst_cve_score"] = df.related_cves.map( + lambda x: max([cve_dset[cve].impact.base_score for cve in x]) if x is not np.nan else np.nan + ) + + """ + Note: Technically, CVE can have 0 base score. This happens when the CVE is discarded from the database. + This could skew the results. During May 2022 analysis, we encountered a single CVE with such score. + Therefore, we do not treat this case. + To properly treat this, the average should be taken across CVEs with >0 base_socre. + """ + df["avg_cve_score"] = df.related_cves.map( + lambda x: np.mean([cve_dset[cve].impact.base_score for cve in x]) if x is not np.nan else np.nan + ) + return df + + +def prepare_cwe_df( + cc_df: pd.DataFrame, cve_dset: CVEDataset, fine_grained: bool = False +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + This function does the following: + 1. Filter CC DF to columns relevant for CWE examination (eal, related_cves, category) + 2. Parses CWE webpage of CWE categories and weaknesses, fetches CWE descriptions and names from there + 3. Explodes the CC DF so that each row corresponds to single CVE + 4. Joins CC DF with CWE DF obtained from CVEDataset + 5. Explodes resulting DF again so that each row corresponds to single CWE + + :param pd.DataFrame cc_df: DataFrame obtained from CCDataset, should be limited to rows with >0 vulnerabilities + :param CVEDataset cve_dset: CVEDataset instance to retrieve CWE data from + :param bool fine_grained: If se to True, CWEs won't be merged into weaknesses of higher abstraction + :return Tuple[pd.DataFrame, pd.DataFrame]: returns two dataframes: + - DF obtained from CC Dataset, fully exploded to CWEs + - DF obtained from CWE webpage, contains IDs, names, types, urls of all CWEs + """ + # Explode CVE_IDs and CWE_IDs so that we have right counts on duplicated CVEs. Measure how much data for analysis we have left. + vulns = cve_dset.to_pandas() + df_cwe_relevant = ( + cc_df[["eal", "related_cves", "category"]] + .explode(column="related_cves") + .rename(columns={"related_cves": "cve_id"}) + ) + df_cwe_relevant["cwe_ids"] = df_cwe_relevant.cve_id.map(lambda x: vulns.cwe_ids[x]) + df_cwe_relevant = ( + df_cwe_relevant.explode(column="cwe_ids") + .reset_index() + .rename(columns={"cwe_ids": "cwe_id", "index": "cert_dgst"}) + ) + + df_cwe_relevant.cwe_id = df_cwe_relevant.cwe_id.replace(r"NVD-CWE-*", np.nan, regex=True) + print( + f"Filtering {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs that have no CWE assigned. This affects {df_cwe_relevant.loc[df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates" + ) + print( + f"Still left with analysis of {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cve_id'].nunique()} CVEs in {df_cwe_relevant.loc[~df_cwe_relevant.cwe_id.isna(), 'cert_dgst'].nunique()} certificates." + ) + df_cwe_relevant = df_cwe_relevant.dropna() + + # Load CWE IDs and descriptions from CWE website + with tempfile.TemporaryDirectory() as tmp_dir: + xml_zip_path = Path(tmp_dir) / "cwec_latest.xml.zip" + helpers.download_file("https://cwe.mitre.org/data/xml/cwec_latest.xml.zip", xml_zip_path) + + with zipfile.ZipFile(xml_zip_path, "r") as zip_handle: + zip_handle.extractall(tmp_dir) + xml_filename = zip_handle.namelist()[0] + + root = ET.parse(Path(tmp_dir) / xml_filename).getroot() + + weaknesses = root.find("{http://cwe.mitre.org/cwe-6}Weaknesses") + categories = root.find("{http://cwe.mitre.org/cwe-6}Categories") + dct: dict[str, Any] = { + "cwe_id": [], + "cwe_name": [], + "cwe_description": [], + "type": [], + "child_of": [], + } + + assert weaknesses + for weakness in weaknesses: + assert weakness + description = weakness.find("{http://cwe.mitre.org/cwe-6}Description") + related_weaknesses = weakness.find("{http://cwe.mitre.org/cwe-6}Related_Weaknesses") + + dct["cwe_id"].append("CWE-" + weakness.attrib["ID"]) + dct["cwe_name"].append(weakness.attrib["Name"]) + dct["cwe_description"].append(description.text if description is not None else None) + dct["type"].append("weakness") + + if related_weaknesses: + dct["child_of"].append( + { + "CWE-" + x.attrib["CWE_ID"] + for x in related_weaknesses + if x.tag == "{http://cwe.mitre.org/cwe-6}Related_Weakness" and x.attrib["Nature"] == "ChildOf" + } + ) + else: + dct["child_of"].append(np.nan) + + assert categories + for category in categories: + assert category + summary = category.find("{http://cwe.mitre.org/cwe-6}Summary") + + dct["cwe_id"].append("CWE-" + category.attrib["ID"]) + dct["cwe_name"].append(category.attrib["Name"]) + dct["cwe_description"].append(summary.text if summary is not None else None) + dct["type"].append("category") + dct["child_of"].append(np.nan) + + cwe_df = pd.DataFrame(dct).set_index("cwe_id") + cwe_df["url"] = cwe_df.index.map(lambda x: "https://cwe.mitre.org/data/definitions/" + x.split("-")[1] + ".html") + cwe_df = cwe_df.replace(r"\n", " ", regex=True) + + if fine_grained: + return df_cwe_relevant, cwe_df + else: + return get_coarse_grained_cwes(df_cwe_relevant, cwe_df), cwe_df + + +def get_coarse_grained_cwes(fine_grained_df: pd.DataFrame, cwe_df: pd.DataFrame) -> pd.DataFrame: + """ + Oddly enough, NVD contains CWEs at different levels of abstraction, which makes it difficult to compare between them. + Among others, some three different CWEs appear in the CVEDataset: CWE-20, CWE-119, CWE-787. Problem is that CWE-787 + is child of CWE-119, which in turn is child of CWE-20. It makes no sense to compute stats of most prevalent CWEs + unless categories are aligned to the top-most level. + + This function aligns the categories to the top-most level. It works in loop. When an iteration is performed without + replacing any CWEs with their parents, the algorithm terminates. + The algorithm inspects every CWE and replaces it with all its parents on condition that they appear in the CVE Dataset. + + :param pd.DataFrame fine_grained_df: First element of the output of `prepare_cwe_df` function + :param pd.DataFrame cwe_df: Second element of the output of `prepare_cwe_df` function + :return pd.DataFrame: DF obtained from CC Dataset, fully exploded to coarse-grained CWEs + """ + all_cwes_in_original_df = set(fine_grained_df.cwe_id.unique()) + parent_dict = cwe_df.child_of.to_dict() + new_set = set(fine_grained_df.cwe_id.unique()) + mapping = {x: {x} for x in new_set} + + while True: + old_set = copy.deepcopy(new_set) + for cwe in old_set: + parents = parent_dict[cwe] + if parents and parents is not np.nan and any(x in all_cwes_in_original_df for x in parents): + new_set.remove(cwe) + new_set.update({x for x in parents if x in all_cwes_in_original_df}) + for val in mapping.values(): + if cwe in val: + val.remove(cwe) + val.update({x for x in parents if x in all_cwes_in_original_df}) + if new_set == old_set: + break + + # Now we should have complete mapping of fine_grained -> coarse_grained CWEs + new_df = fine_grained_df.copy() + new_df.cwe_id = new_df.cwe_id.map(mapping) + + return new_df.explode(column="cwe_id") + + +def get_top_n_cwes( + df: pd.DataFrame, cwe_df: pd.DataFrame, category: str | None = None, eal: str | None = None, n_cwes: int = 10 +) -> pd.DataFrame: + """Fetches top-n CWEs, overall, per category, or per EAL""" + top_n = df.copy() + + if category: + top_n = top_n.loc[top_n.category == category].copy() + if eal: + top_n = top_n.loc[top_n.eal == eal].copy() + + top_n = ( + top_n.cwe_id.value_counts() + .head(n_cwes) + .to_frame() + .rename(columns={"cwe_id": "frequency"}) + .rename_axis("cwe_id") + ) + top_n["cwe_name"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_name) + top_n["cwe_description"] = top_n.index.map(lambda x: cwe_df.loc[x].cwe_description) + top_n["url"] = top_n.index.map(lambda x: cwe_df.loc[x].url) + top_n["type"] = top_n.index.map(lambda x: cwe_df.loc[x].type) + + return top_n + + +def compute_maintenances_that_come_after_vulns(df: pd.DataFrame) -> pd.DataFrame: + """ + Given pre-processed CCDataset DataFrame (expanded with MU & CVE cols), computes time to fix CVE and earliest CVE after some vuln. + """ + df_fixed = df.loc[(df.n_cves > 0) & (df.n_maintenances > 0)].copy() + df_fixed.maintenance_dates = df_fixed.maintenance_dates.map(lambda x: [y.date() for y in x]) + df_fixed.loc[:, "earliest_maintenance_after_vuln"] = df_fixed.apply(find_earliest_maintenance_after_cve, axis=1) + df_fixed.index.name = "dgst" + return df_fixed + + +def move_fixing_mu_to_directory( + df_fixed: pd.DataFrame, main_df: pd.DataFrame, outdir: str | Path, inpath: str | Path +) -> pd.DataFrame: + """ + Localizes reports of maintenance updates that should fix some vulnerability and copies them into a directory. + df_fixed should be the output of compute_maintenances_that_come_after_vulns method. + """ + fixed_df_index = ( + df_fixed.loc[~df_fixed.earliest_maintenance_after_vuln.isnull()] + .reset_index() + .set_index(["dgst", "earliest_maintenance_after_vuln"]) + .index.to_flat_index() + ) + main_df.maintenance_date = main_df.maintenance_date.map(lambda x: x.date()) + main_prefiltered = main_df.reset_index().set_index(["related_cert_digest", "maintenance_date"]) + mu_filenames = main_prefiltered.loc[main_prefiltered.index.isin(fixed_df_index), "dgst"] + mu_filenames = mu_filenames.map(lambda x: x + ".pdf") + + inpath = Path(inpath) + if not inpath.exists(): + inpath.mkdir() + + for i in mu_filenames: + copyfile(inpath / i, Path(outdir) / i) + + return mu_filenames + + +def plot_dataframe_graph( + data: dict, + label: str, + file_name: str, + density: bool = False, + cumulative: bool = False, + bins: int = 50, + log: bool = True, + show: bool = True, +) -> None: + pd_data = pd.Series(data) + pd_data.hist(bins=bins, label=label, density=density, cumulative=cumulative) + plt.savefig(file_name) + if show: + plt.show() + + if log: + sorted_data = pd_data.value_counts(ascending=True) + + logger.info(sorted_data.where(sorted_data > 1).dropna()) diff --git a/src/sec_certs/utils/parallel_processing.py b/src/sec_certs/utils/parallel_processing.py new file mode 100644 index 00000000..50806a67 --- /dev/null +++ b/src/sec_certs/utils/parallel_processing.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import time +from multiprocessing.pool import ThreadPool +from typing import Any, Callable, Iterable + +from billiard.pool import Pool + +from sec_certs.utils.tqdm import tqdm + + +def process_parallel( + func: Callable, + items: Iterable, + max_workers: int, + callback: Callable | None = None, + use_threading: bool = True, + progress_bar: bool = True, + unpack: bool = False, + progress_bar_desc: str | None = None, +) -> list[Any]: + + pool: Pool | ThreadPool = ThreadPool(max_workers) if use_threading else Pool(max_workers) + results = ( + [pool.apply_async(func, (*i,), callback=callback) for i in items] + if unpack + else [pool.apply_async(func, (i,), callback=callback) for i in items] + ) + + if progress_bar is True and items: + bar = tqdm(total=len(results), desc=progress_bar_desc) + while not all(all_done := [x.ready() for x in results]): + done_count = len(list(filter(lambda x: x, all_done))) + bar.update(done_count - bar.n) + time.sleep(1) + bar.update(len(results) - bar.n) + bar.close() + + pool.close() + pool.join() + pool.terminate() + + return [r.get() for r in results] diff --git a/src/sec_certs/utils/pdf.py b/src/sec_certs/utils/pdf.py new file mode 100644 index 00000000..1d04a697 --- /dev/null +++ b/src/sec_certs/utils/pdf.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +import glob +import logging +import subprocess +from datetime import datetime, timedelta, timezone +from functools import reduce +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any + +import pdftotext +import pikepdf +from PyPDF2 import PdfFileReader +from PyPDF2.generic import BooleanObject, ByteStringObject, FloatObject, IndirectObject, NumberObject, TextStringObject + +from sec_certs import constants as constants +from sec_certs.constants import ( + GARBAGE_ALPHA_CHARS_THRESHOLD, + GARBAGE_AVG_LLEN_THRESHOLD, + GARBAGE_EVERY_SECOND_CHAR_THRESHOLD, + GARBAGE_LINES_THRESHOLD, + GARBAGE_SIZE_THRESHOLD, +) + +logger = logging.getLogger(__name__) + + +def repair_pdf(file: Path) -> None: + """ + Some pdfs can't be opened by PyPDF2 - opening them with pikepdf and then saving them fixes this issue. + By opening this file in a pdf reader, we can already extract number of pages. + + :param file: file name + :return: number of pages in pdf file + """ + pdf = pikepdf.Pdf.open(file, allow_overwriting_input=True) + pdf.save(file) + + +def ocr_pdf_file(pdf_path: Path) -> str: + """ + OCR a PDF file and return its text contents, uses `pdftoppm` and `tesseract`. + + :param pdf_path: The PDF file to OCR. + :return: The text contents. + """ + with TemporaryDirectory() as tmpdir: + tmppath = Path(tmpdir) + ppm = subprocess.run( + ["pdftoppm", pdf_path, tmppath / "image"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + if ppm.returncode != 0: + raise ValueError(f"pdftoppm failed: {ppm.returncode}") + for ppm_path in map(Path, glob.glob(str(tmppath / "image*.ppm"))): + base = ppm_path.with_suffix("") + tes = subprocess.run( + ["tesseract", "-l", "eng+deu+fra", ppm_path, base], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + if tes.returncode != 0: + raise ValueError(f"tesseract failed: {tes.returncode}") + contents = "" + txt_paths = list(glob.glob(str(tmppath / "image*.txt"))) + for txt_path in map(Path, sorted(txt_paths, key=lambda fname: int(fname[6:-4]))): + with txt_path.open("r", encoding="utf-8") as f: + contents += f.read() + return contents + + +def convert_pdf_file(pdf_path: Path, txt_path: Path) -> tuple[bool, bool]: + """ + Convert a PDF tile to text and save it on the `txt_path`. + + :param pdf_path: Path to the to-be-converted PDF file. + :param txt_path: Path to the resulting text file. + :return: A tuple of two results, whether OCR was done and what the complete result + was (OK/NOK). + """ + txt = None + ok = False + ocr = False + try: + with pdf_path.open("rb") as pdf_handle: + pdf = pdftotext.PDF(pdf_handle, "", True) # No password, Raw=True + txt = "".join(pdf) + except Exception as e: + logger.error(f"Error when converting pdf->txt: {e}") + + if txt is None or text_is_garbage(txt): + logger.warning(f"Detected garbage during conversion of {pdf_path}") + ocr = True + try: + txt = ocr_pdf_file(pdf_path) + logger.info(f"OCR OK for {pdf_path}") + except Exception as e: + logger.error(f"Error during OCR of {pdf_path}, using garbage: {e}") + + if txt is not None: + ok = True + with txt_path.open("w", encoding="utf-8") as txt_handle: + txt_handle.write(txt) + + return ocr, ok + + +def parse_pdf_date(dateval: bytes | None) -> datetime | None: + """ + Parse PDF metadata date format: + + ``` + parse_pdf_date(b"D:20110617082321-04'00'") + ``` + into + ``` + datetime.datetime(2011, 6, 17, 8, 23, 21, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=72000))) + ``` + + :param dateval: The date as in the PDF metadata. + :return: The parsed datetime, if successful, else `None`. + """ + if dateval is None: + return None + clean = dateval.decode("utf-8").replace("D:", "") + tz = None + tzoff = None + if "+" in clean: + clean, tz = clean.split("+") + tzoff = 1 + if "-" in clean: + clean, tz = clean.split("-") + tzoff = -1 + elif "Z" in clean: + clean, tz = clean.split("Z") + tzoff = 1 + try: + res_datetime = datetime.strptime(clean, "%Y%m%d%H%M%S") + if tz and tzoff: + tz_datetime = datetime.strptime(tz, "%H'%M'") + delta = tzoff * timedelta(hours=tz_datetime.hour, minutes=tz_datetime.minute) + res_tz = timezone(delta) + res_datetime = res_datetime.replace(tzinfo=res_tz) + return res_datetime + except ValueError: + return None + + +def extract_pdf_metadata(filepath: Path) -> tuple[str, dict[str, Any] | None]: # noqa: C901 + """ + Extract PDF metadata, such as the number of pages, author, title, etc. + + :param filepath: THe path to the PDF. + :return: A tuple of the result code (see constants) and the metadata dictionary. + """ + + def map_metadata_value(val, nope_out=False): + if isinstance(val, BooleanObject): + val = val.value + elif isinstance(val, FloatObject): + val = float(val) + elif isinstance(val, NumberObject): + val = int(val) + elif isinstance(val, IndirectObject) and not nope_out: + # Let's make sure to nope out in case of cycles + val = map_metadata_value(val.getObject(), nope_out=True) + elif isinstance(val, TextStringObject): + val = str(val) + elif isinstance(val, ByteStringObject): + try: + val = val.decode("utf-8") + except UnicodeDecodeError: + val = str(val) + else: + val = str(val) + return val + + def resolve_indirect(val, bound=10): + if isinstance(val, list) and bound: + return [resolve_indirect(v, bound - 1) for v in val] + elif isinstance(val, IndirectObject) and bound: + return resolve_indirect(val.getObject(), bound - 1) + else: + return val + + metadata: dict[str, Any] = dict() + + try: + metadata["pdf_file_size_bytes"] = filepath.stat().st_size + with filepath.open("rb") as handle: + pdf = PdfFileReader(handle, strict=False) + metadata["pdf_is_encrypted"] = pdf.getIsEncrypted() + + # see https://stackoverflow.com/questions/26242952/pypdf-2-decrypt-not-working + if metadata["pdf_is_encrypted"]: + pikepdf.open(filepath, allow_overwriting_input=True).save() + + with filepath.open("rb") as handle: + pdf = PdfFileReader(handle, strict=False) + metadata["pdf_number_of_pages"] = pdf.getNumPages() + pdf_document_info = pdf.getDocumentInfo() + + if pdf_document_info is None: + raise ValueError("PDF metadata unavailable") + + for key, val in pdf_document_info.items(): + metadata[str(key)] = map_metadata_value(val) + + # Get the hyperlinks in the PDF + annots = [page.get("/Annots", []) for page in pdf.pages] + annots = reduce(lambda x, y: x + y, map(resolve_indirect, annots)) + links = set() + for annot in annots: + try: + A = resolve_indirect(annot.get("/A", {})) + link = resolve_indirect(A.get("/URI")) + if link: + links.add(map_metadata_value(link)) + except Exception: + pass + metadata["pdf_hyperlinks"] = links + + except Exception as e: + relative_filepath = "/".join(str(filepath).split("/")[-4:]) + error_msg = f"Failed to read metadata of {relative_filepath}, error: {e}" + logger.error(error_msg) + return error_msg, None + + return constants.RETURNCODE_OK, metadata + + +def text_is_garbage(text: str) -> bool: + """ + Detect whether the given text is "garbage". A series of tests is applied, + using the number of lines, average line length, total size, every second character on a line + and the ratio of alphanumeric characters. + + :param text: The tested text. + :return: Whether the text is a "garbage" result of pdftotext conversion. + """ + size = len(text) + content_len = 0 + lines = 0 + every_second = 0 + alpha_len = len("".join(filter(str.isalpha, text))) + for line in text.splitlines(): + content_len += len(line) + lines += 1 + if len(set(line[1::2])) > 1: + every_second += 1 + + if lines: + avg_line_len = content_len / lines + else: + avg_line_len = 0 + if size: + alpha = alpha_len / size + else: + alpha = 0 + + # If number of lines is small, this is garbage. + if lines < GARBAGE_LINES_THRESHOLD: + return True + # If the file size is small, this is garbage. + if size < GARBAGE_SIZE_THRESHOLD: + return True + # If the average length of a line is small, this is garbage. + if avg_line_len < GARBAGE_AVG_LLEN_THRESHOLD: + return True + # If there a small amount of lines that have more than one character at every second character, this is garbage. + # This detects the ANSSI spacing issues. + if every_second < GARBAGE_EVERY_SECOND_CHAR_THRESHOLD: + return True + # If there is a small ratio of alphanumeric chars to all chars, this is garbage. + if alpha < GARBAGE_ALPHA_CHARS_THRESHOLD: + return True + return False diff --git a/src/sec_certs/utils/sanitization.py b/src/sec_certs/utils/sanitization.py new file mode 100644 index 00000000..2f9cd046 --- /dev/null +++ b/src/sec_certs/utils/sanitization.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import html +import logging +from datetime import date + +import numpy as np +import pandas as pd +from bs4 import NavigableString + +logger = logging.getLogger(__name__) + + +def sanitize_navigable_string(string: NavigableString | str | None) -> str | None: + if not string: + return None + return str(string).strip().replace("\xad", "").replace("\xa0", "") + + +def sanitize_link(record: str | None) -> str | None: + if not record: + return None + return record.replace(":443", "").replace(" ", "%20").replace("http://", "https://") + + +def sanitize_date(record: pd.Timestamp | date | np.datetime64) -> date | None: + if pd.isnull(record): + return None + elif isinstance(record, pd.Timestamp): + return record.date() + elif isinstance(record, (date, type(None))): + return record + raise ValueError("Unsupported type given as input") + + +def sanitize_string(record: str) -> str: + # There is a sample with name 'ATMEL Secure Microcontroller AT90SC12872RCFT / AT90SC12836RCFT rev. I &#38; J' that has to be unescaped twice + string = html.unescape(html.unescape(record)).replace("\n", "") + return " ".join(string.split()) + + +def sanitize_security_levels(record: str | set[str]) -> set[str]: + if isinstance(record, str): + record = set(record.split(",")) + return record - {"Basic", "ND-PP", "PP\xa0Compliant", "None", "Medium"} + + +def sanitize_protection_profiles(record: str) -> list: + if not record: + return [] + return record.split(",") diff --git a/src/sec_certs/utils/tables.py b/src/sec_certs/utils/tables.py new file mode 100644 index 00000000..29def971 --- /dev/null +++ b/src/sec_certs/utils/tables.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import logging +import re +from pathlib import Path + +from sec_certs.cert_rules import FIPS_LIST_OF_TABLES + +logger = logging.getLogger(__name__) + + +def parse_list_of_tables(txt: str) -> set[int]: + """ + Parses list of tables in policy txt, returns page numbers of tables that mention algorithms + """ + rr = re.compile(r"^.+?(?:[Ff]unction|[Aa]lgorithm|[Ss]ecurity [Ff]unctions?).+?(?P<page_num>\d+)$", re.MULTILINE) + return {int(m.group("page_num")) for m in rr.finditer(txt)} + + +def get_table_rich_page_numbers_from_footer(file_text: str) -> set[int]: + """ + Parses page numbers of policy txt pages that may contain tables with algorithm data + """ + current_page = 1 + pages = set() + + for line in file_text.split("\n"): + if "\f" in line: + current_page += 1 + if line.startswith("Table ") or line.startswith("Exhibit"): + pages.add(current_page) + pages.add(current_page + 1) + if current_page > 2: + pages.add(current_page - 1) + + for page in pages: + if page > current_page - 1: + return pages - {page} + + return pages + + +def find_pages_with_tables(txt_filepath: Path) -> set[int]: + """ + Identifies pages in txt file that may contain tables. Return their page numbers. + """ + with txt_filepath.open("r", encoding="utf-8") as handle: + txt = handle.read() + + # Parse page numbers from list of tables if available + # Else look for "Table" in text and \f representing footer, then extract page number from footer + if list_of_tables := FIPS_LIST_OF_TABLES.search(txt): + result = parse_list_of_tables(list_of_tables.group()) + else: + result = get_table_rich_page_numbers_from_footer(txt) + + return result if result else set() + + +def get_algs_from_table(dataframe_text: str) -> set[str]: + reg = r"(?:#?\s?|(?:Cert)\.?[^. ]*?\s?)(?:[CcAa]\s)?(?P<id>[CcAa]? ?\d+)" + return {m.group() for m in re.finditer(reg, dataframe_text)} diff --git a/src/sec_certs/utils/tqdm.py b/src/sec_certs/utils/tqdm.py new file mode 100644 index 00000000..77eeae94 --- /dev/null +++ b/src/sec_certs/utils/tqdm.py @@ -0,0 +1,9 @@ +from tqdm import tqdm as tqdm_original + +from sec_certs.config.configuration import config + + +def tqdm(*args, **kwargs): + if "disable" in kwargs: + return tqdm_original(*args, **kwargs) + return tqdm_original(*args, **kwargs, disable=not config.enable_progress_bars) |
