aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorJ08nY2024-11-04 21:45:51 +0100
committerJ08nY2024-11-04 21:45:51 +0100
commite303b6f131f05f4ca700edd6d4241c0d660d0b86 (patch)
treef6853f7113f2170d598135d0301170ac656e2967 /src
parent89190211442f8ef340439ed9ff2bfcd5817abf8f (diff)
downloadsec-certs-e303b6f131f05f4ca700edd6d4241c0d660d0b86.tar.gz
sec-certs-e303b6f131f05f4ca700edd6d4241c0d660d0b86.tar.zst
sec-certs-e303b6f131f05f4ca700edd6d4241c0d660d0b86.zip
Improve scheme parsing.
Only match if category matches. Disregard unwanted warnings. Add progress bars everywhere.
Diffstat (limited to 'src')
-rw-r--r--src/sec_certs/dataset/cc_scheme.py6
-rw-r--r--src/sec_certs/model/cc_matching.py23
-rw-r--r--src/sec_certs/sample/cc_scheme.py227
3 files changed, 166 insertions, 90 deletions
diff --git a/src/sec_certs/dataset/cc_scheme.py b/src/sec_certs/dataset/cc_scheme.py
index e099f9de..19f1a3de 100644
--- a/src/sec_certs/dataset/cc_scheme.py
+++ b/src/sec_certs/dataset/cc_scheme.py
@@ -49,13 +49,15 @@ class CCSchemeDataset(JSONPathDataset, ComplexSerializableType):
return cls(dct["schemes"])
@classmethod
- def from_web(cls, only_schemes: set[str] | None = None) -> CCSchemeDataset:
+ def from_web(
+ cls, only_schemes: set[str] | None = None, enhanced: bool | None = None, artifacts: bool | None = None
+ ) -> CCSchemeDataset:
schemes = {}
for scheme, sources in CCScheme.methods.items():
if only_schemes is not None and scheme not in only_schemes:
continue
try:
- schemes[scheme] = CCScheme.from_web(scheme, sources.keys())
+ schemes[scheme] = CCScheme.from_web(scheme, sources.keys(), enhanced=enhanced, artifacts=artifacts)
except Exception as e:
logger.warning(f"Could not download CC scheme: {scheme} due to error {e}.")
return cls(schemes)
diff --git a/src/sec_certs/model/cc_matching.py b/src/sec_certs/model/cc_matching.py
index 21d94b74..9930fb66 100644
--- a/src/sec_certs/model/cc_matching.py
+++ b/src/sec_certs/model/cc_matching.py
@@ -10,6 +10,24 @@ from sec_certs.sample.cc import CCCertificate
from sec_certs.sample.cc_certificate_id import CertificateId
from sec_certs.utils.strings import fully_sanitize_string
+CATEGORIES = {
+ "ICs, Smart Cards and Smart Card-Related Devices and Systems",
+ "Other Devices and Systems",
+ "Network and Network-Related Devices and Systems",
+ "Multi-Function Devices",
+ "Boundary Protection Devices and Systems",
+ "Data Protection",
+ "Operating Systems",
+ "Products for Digital Signatures",
+ "Access Control Devices and Systems",
+ "Mobility",
+ "Databases",
+ "Trusted Computing",
+ "Detection Devices and Systems",
+ "Key Management Systems",
+ "Biometric Systems and Devices",
+}
+
class CCSchemeMatcher(AbstractMatcher[CCCertificate]):
"""
@@ -46,6 +64,8 @@ class CCSchemeMatcher(AbstractMatcher[CCCertificate]):
if vendor_name := self._get_from_entry("vendor", "developer", "manufacturer", "supplier"):
self._vendor = fully_sanitize_string(vendor_name)
+ self._category = self._get_from_entry("category")
+
self._report_hash = self._get_from_entry("report_hash")
self._target_hash = self._get_from_entry("target_hash")
@@ -69,6 +89,9 @@ class CCSchemeMatcher(AbstractMatcher[CCCertificate]):
# We need to have something to match to.
if self._product is None or self._vendor is None or cert.name is None or cert.manufacturer is None:
return 0
+ # It is a correctly parsed category but the wrong one.
+ if self._category in CATEGORIES and self._category != cert.category:
+ return 0
cert_name = fully_sanitize_string(cert.name)
cert_manufacturer = fully_sanitize_string(cert.manufacturer)
# If we match exactly, return early.
diff --git a/src/sec_certs/sample/cc_scheme.py b/src/sec_certs/sample/cc_scheme.py
index 080e3f52..4c46d67e 100644
--- a/src/sec_certs/sample/cc_scheme.py
+++ b/src/sec_certs/sample/cc_scheme.py
@@ -11,6 +11,7 @@ from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
+from inspect import signature
from pathlib import Path
from typing import Any, ClassVar
from urllib.parse import urljoin
@@ -19,7 +20,7 @@ import requests
import tabula
from bs4 import BeautifulSoup, NavigableString, Tag
from dateutil.parser import isoparse
-from requests import Response
+from requests import ConnectionError, HTTPError, Response
from urllib3.connectionpool import InsecureRequestWarning
from sec_certs import constants
@@ -50,6 +51,8 @@ __all__ = [
"get_korea_certified",
"get_korea_suspended",
"get_korea_archived",
+ "get_poland_certified",
+ "get_poland_ineval",
"get_singapore_certified",
"get_singapore_in_evaluation",
"get_singapore_archived",
@@ -75,7 +78,21 @@ SPOOF_HEADERS = {
"Cache-Control": "no-cache",
"Dnt": "1",
"Pragma": "no-cache",
- "Priority": "u=0, i",
+ "Priority": "u=0, i", # 'ICs, Smart Cards and Smart Card-Related Devices and Systems': 1978,
+ # 'Other Devices and Systems': 1043,
+ # 'Network and Network-Related Devices and Systems': 835,
+ # 'Multi-Function Devices': 671,
+ # 'Boundary Protection Devices and Systems': 253,
+ # 'Data Protection': 240,
+ # 'Operating Systems': 237,
+ # 'Products for Digital Signatures': 179,
+ # 'Access Control Devices and Systems': 155,
+ # 'Mobility': 115,
+ # 'Databases': 103,
+ # 'Trusted Computing': 78,
+ # 'Detection Devices and Systems': 77,
+ # 'Key Management Systems': 59,
+ # 'Biometric Systems and Devices'
"Sec-Ch-Ua": 'Not?A_Brand";v="99", "Chromium";v="130',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Linux"',
@@ -109,12 +126,15 @@ def _get_page(url: str, session=None, **kwargs) -> BeautifulSoup:
return BeautifulSoup(_get(url, session, **kwargs).content, "html5lib")
-def _get_hash(url: str, session=None, **kwargs) -> bytes:
- resp = _get(url, session, **kwargs)
+def _get_hash(url: str, session=None, **kwargs) -> str | None:
+ try:
+ resp = _get(url, session, **kwargs)
+ except (HTTPError, ConnectionError):
+ return None
h = hashlib.sha256()
for chunk in resp.iter_content():
h.update(chunk)
- return h.digest()
+ return h.digest().hex()
def get_australia_in_evaluation( # noqa: C901
@@ -240,7 +260,7 @@ def get_canada_in_evaluation() -> list[dict[str, Any]]:
return results
-def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
+def _get_france(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901
session = requests.session()
challenge_soup = _get_page(constants.CC_ANSSI_BASE_URL, session=session)
bln_script = challenge_soup.find("head").find_all("script")[1]
@@ -256,6 +276,7 @@ def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
raise ValueError
pages = int(last_page_a.group())
results = []
+ pbar = tqdm(desc=f"Get FR scheme {name}.")
for page in range(pages + 1):
soup = _get_page(url + f"?page={page}", session=session)
for row in soup.find_all("article", class_="node--type-produit-certifie-cc"):
@@ -317,17 +338,19 @@ def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
if "Rapport de certification" in a.text:
e["report_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"])
if artifacts:
- e["report_hash"] = _get_hash(e["report_link"], session=session).hex()
+ e["report_hash"] = _get_hash(e["report_link"], session=session)
elif "Cible de sécurité" in a.text:
e["target_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"])
if artifacts:
- e["target_hash"] = _get_hash(e["target_link"], session=session).hex()
+ e["target_hash"] = _get_hash(e["target_link"], session=session)
elif "Certificat" in a.text:
e["cert_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"])
if artifacts:
- e["cert_hash"] = _get_hash(e["cert_link"], session=session).hex()
+ e["cert_hash"] = _get_hash(e["cert_link"], session=session)
cert["enhanced"] = e
+ pbar.update()
results.append(cert)
+ pbar.close()
return results
@@ -339,7 +362,7 @@ def get_france_certified(enhanced: bool = True, artifacts: bool = False) -> list
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_france(constants.CC_ANSSI_CERTIFIED_URL, enhanced, artifacts)
+ return _get_france(constants.CC_ANSSI_CERTIFIED_URL, enhanced, artifacts, "certified")
def get_france_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: # noqa: C901
@@ -350,7 +373,7 @@ def get_france_archived(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_france(constants.CC_ANSSI_ARCHIVED_URL, enhanced, artifacts)
+ return _get_france(constants.CC_ANSSI_ARCHIVED_URL, enhanced, artifacts, "archived")
def get_germany_certified( # noqa: C901
@@ -439,15 +462,15 @@ def get_germany_certified( # noqa: C901
if "Certification Report" in title:
e["report_link"] = href
if artifacts:
- e["report_hash"] = _get_hash(href).hex()
+ e["report_hash"] = _get_hash(href)
elif "Security Target" in title:
e["target_link"] = href
if artifacts:
- e["target_hash"] = _get_hash(href).hex()
+ e["target_hash"] = _get_hash(href)
elif "Certificate" in title:
e["cert_link"] = href
if artifacts:
- e["cert_hash"] = _get_hash(href).hex()
+ e["cert_hash"] = _get_hash(href)
description = content.find("div", attrs={"lang": "en"})
if description:
e["description"] = sns(description.text)
@@ -471,6 +494,7 @@ def get_india_certified() -> list[dict[str, Any]]:
pages = {0}
seen_pages = set()
results = []
+ pbar = tqdm(desc="Get IN scheme certified.")
while pages:
page = pages.pop()
seen_pages.add(page)
@@ -510,7 +534,9 @@ def get_india_certified() -> list[dict[str, Any]]:
"cert_link": urljoin(constants.CC_INDIA_BASE_URL, _fix_india_link(cert_a["href"])),
"cert_name": sns(cert_a.text),
}
+ pbar.update()
results.append(cert)
+ pbar.close()
return results
@@ -523,6 +549,7 @@ def get_india_archived() -> list[dict[str, Any]]:
pages = {0}
seen_pages = set()
results = []
+ pbar = tqdm(desc="Get IN scheme archived.")
while pages:
page = pages.pop()
seen_pages.add(page)
@@ -564,7 +591,9 @@ def get_india_archived() -> list[dict[str, Any]]:
if report_a:
cert["report_link"] = urljoin(constants.CC_INDIA_BASE_URL, _fix_india_link(report_a["href"]))
cert["report_name"] = sns(report_a.text)
+ pbar.update()
results.append(cert)
+ pbar.close()
return results
@@ -577,7 +606,7 @@ def get_italy_certified() -> list[dict[str, Any]]: # noqa: C901
soup = _get_page(constants.CC_ITALY_CERTIFIED_URL)
div = soup.find("div", class_="certificati")
results = []
- for cert_div in div.find_all("div", recursive=False):
+ for cert_div in tqdm(div.find_all("div", recursive=False), desc="Get IT scheme certified."):
title = cert_div.find("h3").text
data_div = cert_div.find("div", class_="collapse")
cert = {"title": title}
@@ -619,7 +648,7 @@ def get_italy_in_evaluation() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_ITALY_INEVAL_URL)
div = soup.find("div", class_="valutazioni")
results = []
- for cert_div in div.find_all("div", recursive=False):
+ for cert_div in tqdm(div.find_all("div", recursive=False), desc="Get IT scheme in evaluation."):
title = cert_div.find("h3").text
data_div = cert_div.find("div", class_="collapse")
cert = {"title": title}
@@ -639,12 +668,12 @@ def get_italy_in_evaluation() -> list[dict[str, Any]]:
return results
-def _get_japan(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
+def _get_japan(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901
soup = _get_page(url)
table = soup.find("table", class_="cert-table")
results = []
trs = list(table.find_all("tr"))
- for tr in trs:
+ for tr in tqdm(trs, desc=f"Get JP scheme {name}."):
tds = tr.find_all("td")
if not tds:
continue
@@ -738,15 +767,15 @@ def _get_japan(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
if "Report" in name:
e["report_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"])
if artifacts:
- e["report_hash"] = _get_hash(e["report_link"]).hex()
+ e["report_hash"] = _get_hash(e["report_link"])
elif "Certificate" in name:
e["cert_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"])
if artifacts:
- e["cert_hash"] = _get_hash(e["cert_link"]).hex()
+ e["cert_hash"] = _get_hash(e["cert_link"])
elif "Target" in name:
e["target_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"])
if artifacts:
- e["target_hash"] = _get_hash(e["target_link"]).hex()
+ e["target_hash"] = _get_hash(e["target_link"])
e["description"] = sns(main.find("div", id="overviewsbox").text)
cert["enhanced"] = e
return results
@@ -760,8 +789,8 @@ def get_japan_certified(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- japan_hw = _get_japan(constants.CC_JAPAN_CERTIFIED_HW_URL, enhanced, artifacts)
- japan_sw = _get_japan(constants.CC_JAPAN_CERTIFIED_SW_URL, enhanced, artifacts)
+ japan_hw = _get_japan(constants.CC_JAPAN_CERTIFIED_HW_URL, enhanced, artifacts, "certified HW")
+ japan_sw = _get_japan(constants.CC_JAPAN_CERTIFIED_SW_URL, enhanced, artifacts, "certified SW")
return japan_sw + japan_hw
@@ -773,7 +802,7 @@ def get_japan_archived(enhanced: bool = True, artifacts: bool = False) -> list[d
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_japan(constants.CC_JAPAN_ARCHIVED_SW_URL, enhanced, artifacts)
+ return _get_japan(constants.CC_JAPAN_ARCHIVED_SW_URL, enhanced, artifacts, "archived SW")
def get_japan_in_evaluation() -> list[dict[str, Any]]:
@@ -785,7 +814,7 @@ def get_japan_in_evaluation() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_JAPAN_INEVAL_URL)
table = soup.find("table")
results = []
- for tr in table.find_all("tr"):
+ for tr in tqdm(table.find_all("tr"), desc="Get JP scheme in evaluation."):
tds = tr.find_all("td")
if not tds:
continue
@@ -800,13 +829,14 @@ def get_japan_in_evaluation() -> list[dict[str, Any]]:
return results
-def _get_malaysia(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901
+def _get_malaysia(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901
soup = _get_page(url)
pages_re = re.search("Page [0-9]+ of ([0-9]+)", soup.find("form").text)
if not pages_re:
raise ValueError
total_pages = int(pages_re.group(1))
results = []
+ pbar = tqdm(desc=f"Get MY scheme {name}.")
for i in range(total_pages):
soup = _get_page(url + f"?start={i * 10}")
table = soup.find("table", class_="directoryTable")
@@ -857,17 +887,19 @@ def _get_malaysia(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C9
if "ST" in a.text:
e["target_link"] = urljoin(constants.CC_MALAYSIA_BASE_URL, a["href"])
if artifacts:
- e["target_hash"] = _get_hash(e["target_link"]).hex()
+ e["target_hash"] = _get_hash(e["target_link"])
elif "CR" in a.text:
e["report_link"] = urljoin(constants.CC_MALAYSIA_BASE_URL, a["href"])
if artifacts:
- e["report_hash"] = _get_hash(e["report_link"]).hex()
+ e["report_hash"] = _get_hash(e["report_link"])
elif "Maintenance" in title:
pass
elif "Status" in title:
e["status"] = value
cert["enhanced"] = e
+ pbar.update()
results.append(cert)
+ pbar.close()
return results
@@ -879,7 +911,7 @@ def get_malaysia_certified(enhanced: bool = True, artifacts: bool = False) -> li
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_malaysia(constants.CC_MALAYSIA_CERTIFIED_URL, enhanced, artifacts)
+ return _get_malaysia(constants.CC_MALAYSIA_CERTIFIED_URL, enhanced, artifacts, "certified")
def get_malaysia_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -890,7 +922,7 @@ def get_malaysia_archived(enhanced: bool = True, artifacts: bool = False) -> lis
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_malaysia(constants.CC_MALAYSIA_ARCHIVED_URL, enhanced, artifacts)
+ return _get_malaysia(constants.CC_MALAYSIA_ARCHIVED_URL, enhanced, artifacts, "archived")
def get_malaysia_in_evaluation() -> list[dict[str, Any]]:
@@ -903,7 +935,7 @@ def get_malaysia_in_evaluation() -> list[dict[str, Any]]:
main_div = soup.find("div", attrs={"itemprop": "articleBody"})
table = main_div.find("table")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get MY scheme in evaluation."):
tds = tr.find_all("td")
if len(tds) != 5:
continue
@@ -926,7 +958,7 @@ def _get_netherlands_certified_old( # noqa: C901
rows = main_div.find_all("div", class_="row", recursive=False)
modals = main_div.find_all("div", class_="modal", recursive=False)
results = []
- for row, modal in zip(rows, modals):
+ for row, modal in tqdm(zip(rows, modals), desc="Get NL scheme certified (old)."):
row_entries = row.find_all("a")
modal_trs = modal.find_all("tr")
cert: dict[str, Any] = {
@@ -945,19 +977,19 @@ def _get_netherlands_certified_old( # noqa: C901
elif "Certificate" in th_text:
cert["cert_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"])
if artifacts:
- cert["cert_hash"] = _get_hash(cert["cert_link"]).hex()
+ cert["cert_hash"] = _get_hash(cert["cert_link"])
elif "Certificationreport" in th_text:
cert["report_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"])
if artifacts:
- cert["report_hash"] = _get_hash(cert["report_link"]).hex()
+ cert["report_hash"] = _get_hash(cert["report_link"])
elif "Securitytarget" in th_text:
cert["target_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"])
if artifacts:
- cert["target_hash"] = _get_hash(cert["target_link"]).hex()
+ cert["target_hash"] = _get_hash(cert["target_link"])
elif "Maintenance report" in th_text:
cert["maintenance_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"])
if artifacts:
- cert["maintenance_hash"] = _get_hash(cert["maintenance_link"]).hex()
+ cert["maintenance_hash"] = _get_hash(cert["maintenance_link"])
results.append(cert)
return results
@@ -968,7 +1000,7 @@ def _get_netherlands_certified_new( # noqa: C901
soup = _get_page(constants.CC_NETHERLANDS_NEW_CERTIFIED_URL)
table = soup.find("table", class_="wpDataTable")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme certified (new)."):
tds = tr.find_all("td")
cert = {
"cert_id": sns(tds[0].text).replace("\n", ""), # type: ignore
@@ -985,7 +1017,7 @@ def _get_netherlands_certified_new( # noqa: C901
href = urljoin(constants.CC_NETHERLANDS_NEW_BASE_URL, a["href"])
cert[f"{name}_link"] = href
if artifacts:
- cert[f"{name}_hash"] = _get_hash(href).hex()
+ cert[f"{name}_hash"] = _get_hash(href)
results.append(cert)
return results
@@ -1006,7 +1038,7 @@ def _get_netherlands_in_evaluation_old() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_NETHERLANDS_OLD_INEVAL_URL)
table = soup.find("table")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme in evaluation (old)."):
tds = tr.find_all("td")
cert = {
"developer": sns(tds[0].text),
@@ -1023,7 +1055,7 @@ def _get_netherlands_in_evaluation_new() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_NETHERLANDS_NEW_INEVAL_URL)
table = soup.find("table", class_="wpDataTable")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme in evaluation (new)."):
tds = tr.find_all("td")
cert = {
"cert_id": sns(tds[0].text),
@@ -1048,11 +1080,11 @@ def get_netherlands_in_evaluation() -> list[dict[str, Any]]:
def _get_norway( # noqa: C901
- url: str, enhanced: bool, artifacts: bool
+ url: str, enhanced: bool, artifacts: bool, name
) -> list[dict[str, Any]]:
soup = _get_page(url)
results = []
- for tr in soup.find_all("tr", class_="certified-product"):
+ for tr in tqdm(soup.find_all("tr", class_="certified-product"), desc=f"Get NO scheme {name}."):
tds = tr.find_all("td")
cert: dict[str, Any] = {
"product": sns(tds[0].text),
@@ -1116,7 +1148,7 @@ def _get_norway( # noqa: C901
a = link.find("a")
entry = {"href": urljoin(constants.CC_NORWAY_BASE_URL, a["href"])}
if artifacts:
- entry["hash"] = _get_hash(entry["href"]).hex()
+ entry["hash"] = _get_hash(entry["href"]) # type: ignore
entries.append(entry)
e["documents"][doc_type] = entries
cert["enhanced"] = e
@@ -1132,7 +1164,7 @@ def get_norway_certified(enhanced: bool = True, artifacts: bool = False) -> list
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_norway(constants.CC_NORWAY_CERTIFIED_URL, enhanced, artifacts)
+ return _get_norway(constants.CC_NORWAY_CERTIFIED_URL, enhanced, artifacts, "certified")
def get_norway_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1143,24 +1175,29 @@ def get_norway_archived(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_norway(constants.CC_NORWAY_ARCHIVED_URL, enhanced, artifacts)
+ return _get_norway(constants.CC_NORWAY_ARCHIVED_URL, enhanced, artifacts, "archived")
def _get_korea( # noqa: C901
- product_class: int, enhanced: bool, artifacts: bool
+ product_class: int, enhanced: bool, artifacts: bool, name
) -> list[dict[str, Any]]:
session = requests.session()
- session.get(constants.CC_KOREA_EN_URL, verify=False)
+ _get_page(constants.CC_KOREA_EN_URL, session=session)
# Get base page
url = constants.CC_KOREA_CERTIFIED_URL + f"?product_class={product_class}"
soup = _get_page(url, session=session)
seen_pages = set()
pages = {1}
results = []
+ pbar = tqdm(desc=f"Get KR scheme {name}.")
while pages:
page = pages.pop()
csrf = soup.find("form", id="fm").find("input", attrs={"name": "csrf"})["value"]
- resp = session.post(url, data={"csrf": csrf, "selectPage": page, "product_class": product_class}, verify=False)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=InsecureRequestWarning)
+ resp = session.post(
+ url, data={"csrf": csrf, "selectPage": page, "product_class": product_class}, verify=False
+ )
soup = BeautifulSoup(resp.content, "html5lib")
tbody = soup.find("table", class_="cpl").find("tbody")
for tr in tbody.find_all("tr"):
@@ -1222,20 +1259,21 @@ def _get_korea( # noqa: C901
elif "Certificate" in title and a:
v["cert_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"])
if artifacts:
- v["cert_hash"] = _get_hash(v["cert_link"], session).hex()
+ v["cert_hash"] = _get_hash(v["cert_link"], session)
elif "Security Target" in title and a:
v["target_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"])
if artifacts:
- v["target_hash"] = _get_hash(v["target_link"], session).hex()
+ v["target_hash"] = _get_hash(v["target_link"], session)
elif "Certification Report" in title and a:
v["report_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"])
if artifacts:
- v["report_hash"] = _get_hash(v["report_link"], session).hex()
+ v["report_hash"] = _get_hash(v["report_link"], session)
elif "Maintenance Report" in title and a:
v["maintenance_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"])
if artifacts:
- v["maintenance_hash"] = _get_hash(v["maintenance_link"], session).hex()
+ v["maintenance_hash"] = _get_hash(v["maintenance_link"], session)
cert["enhanced"] = e
+ pbar.update()
results.append(cert)
seen_pages.add(page)
page_links = soup.find("div", class_="paginate").find_all("a", class_="number_off")
@@ -1246,6 +1284,7 @@ def _get_korea( # noqa: C901
pages.add(new_page)
except Exception:
pass
+ pbar.close()
return results
@@ -1257,7 +1296,7 @@ def get_korea_certified(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_korea(product_class=1, enhanced=enhanced, artifacts=artifacts)
+ return _get_korea(product_class=1, enhanced=enhanced, artifacts=artifacts, name="certified")
def get_korea_suspended(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1268,7 +1307,7 @@ def get_korea_suspended(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_korea(product_class=2, enhanced=enhanced, artifacts=artifacts)
+ return _get_korea(product_class=2, enhanced=enhanced, artifacts=artifacts, name="suspended")
def get_korea_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1279,7 +1318,7 @@ def get_korea_archived(enhanced: bool = True, artifacts: bool = False) -> list[d
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_korea(product_class=4, enhanced=enhanced, artifacts=artifacts)
+ return _get_korea(product_class=4, enhanced=enhanced, artifacts=artifacts, name="archived")
def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1292,7 +1331,7 @@ def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]:
soup = _get_page(constants.CC_POLAND_CERTIFIED_URL)
table = soup.find("table", class_="cert_tb")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get PL scheme certified."):
tds = tr.find_all("td")
cert = {
"client": sns(tds[0].text),
@@ -1316,7 +1355,7 @@ def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]:
href = urljoin(constants.CC_POLAND_BASE_URL, a["href"])
cert[f"{name}_link"] = href
if artifacts:
- cert[f"{name}_hash"] = _get_hash(href).hex()
+ cert[f"{name}_hash"] = _get_hash(href)
results.append(cert)
return results
@@ -1332,7 +1371,7 @@ def get_poland_ineval() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_POLAND_INEVAL_URL)
table = soup.find("table", class_="cert_tb")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get PL scheme in evaluation."):
tds = tr.find_all("td")
cert = {
"client": sns(tds[0].text),
@@ -1351,7 +1390,7 @@ def get_poland_ineval() -> list[dict[str, Any]]:
return results
-def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]:
+def _get_singapore(url: str, artifacts: bool, name) -> list[dict[str, Any]]:
soup = _get_page(url)
page_id = str(soup.find("input", id="CurrentPageId").value)
page = 1
@@ -1368,6 +1407,7 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]:
api_json = api_call.json()
total = api_json["total"]
results: list[dict[str, Any]] = []
+ pbar = tqdm(total=total, desc=f"Get SG scheme {name}.")
while len(results) != total:
for obj in api_json["objects"]:
cert: dict[str, Any] = {
@@ -1389,9 +1429,10 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]:
"target_link": urljoin(constants.CC_SINGAPORE_BASE_URL, obj["securityTarget"]["mediaUrl"]),
}
if artifacts:
- cert["cert_hash"] = _get_hash(cert["cert_link"]).hex()
- cert["report_hash"] = _get_hash(cert["report_link"]).hex()
- cert["target_hash"] = _get_hash(cert["target_link"]).hex()
+ cert["cert_hash"] = _get_hash(cert["cert_link"])
+ cert["report_hash"] = _get_hash(cert["report_link"])
+ cert["target_hash"] = _get_hash(cert["target_link"])
+ pbar.update()
results.append(cert)
page += 1
api_call = requests.post(
@@ -1405,6 +1446,7 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]:
},
)
api_json = api_call.json()
+ pbar.close()
return results
@@ -1415,7 +1457,7 @@ def get_singapore_certified(artifacts: bool = False) -> list[dict[str, Any]]:
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_singapore(constants.CC_SINGAPORE_CERTIFIED_URL, artifacts)
+ return _get_singapore(constants.CC_SINGAPORE_CERTIFIED_URL, artifacts, "certified")
def get_singapore_in_evaluation() -> list[dict[str, Any]]:
@@ -1433,7 +1475,7 @@ def get_singapore_in_evaluation() -> list[dict[str, Any]]:
else:
raise ValueError("Cannot find table.")
results = []
- for tr in table.find_all("tr")[1:]:
+ for tr in tqdm(table.find_all("tr")[1:], desc="Get SG scheme in evaluation."):
tds = tr.find_all("td")
cert = {
"name": sns(tds[0].text),
@@ -1451,7 +1493,7 @@ def get_singapore_archived(artifacts: bool = False) -> list[dict[str, Any]]:
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_singapore(constants.CC_SINGAPORE_ARCHIVED_URL, artifacts)
+ return _get_singapore(constants.CC_SINGAPORE_ARCHIVED_URL, artifacts, "archived")
def get_spain_certified() -> list[dict[str, Any]]:
@@ -1463,7 +1505,7 @@ def get_spain_certified() -> list[dict[str, Any]]:
soup = _get_page(constants.CC_SPAIN_CERTIFIED_URL)
tbody = soup.find("table", class_="djc_items_table").find("tbody")
results = []
- for tr in tbody.find_all("tr", recursive=False):
+ for tr in tqdm(tbody.find_all("tr", recursive=False), desc="Get ES scheme certified."):
tds = tr.find_all("td")
cert = {
"product": sns(tds[0].text),
@@ -1477,12 +1519,12 @@ def get_spain_certified() -> list[dict[str, Any]]:
def _get_sweden( # noqa: C901
- url: str, enhanced: bool, artifacts: bool
+ url: str, enhanced: bool, artifacts: bool, name
) -> list[dict[str, Any]]:
soup = _get_page(url)
nav = soup.find("main").find("nav", class_="component-nav-box__list")
results = []
- for link in nav.find_all("a"):
+ for link in tqdm(nav.find_all("a"), desc=f"Get SE scheme {name}."):
cert: dict[str, Any] = {
"product": sns(link.text),
"url": urljoin(constants.CC_SWEDEN_BASE_URL, link["href"]),
@@ -1527,15 +1569,15 @@ def _get_sweden( # noqa: C901
elif "Security Target" in title and a:
e["target_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"])
if artifacts:
- e["target_hash"] = _get_hash(e["target_link"]).hex()
+ e["target_hash"] = _get_hash(e["target_link"])
elif "Certifieringsrapport" in title and a:
e["report_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"])
if artifacts:
- e["report_hash"] = _get_hash(e["report_hash"]).hex()
+ e["report_hash"] = _get_hash(e["report_hash"])
elif "Certifikat" in title and a:
e["cert_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"])
if artifacts:
- e["cert_hash"] = _get_hash(e["cert_link"]).hex()
+ e["cert_hash"] = _get_hash(e["cert_link"])
cert["enhanced"] = e
results.append(cert)
return results
@@ -1549,7 +1591,7 @@ def get_sweden_certified(enhanced: bool = True, artifacts: bool = False) -> list
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_sweden(constants.CC_SWEDEN_CERTIFIED_URL, enhanced, artifacts)
+ return _get_sweden(constants.CC_SWEDEN_CERTIFIED_URL, enhanced, artifacts, "certified")
def get_sweden_in_evaluation(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1560,7 +1602,7 @@ def get_sweden_in_evaluation(enhanced: bool = True, artifacts: bool = False) ->
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_sweden(constants.CC_SWEDEN_INEVAL_URL, enhanced, artifacts)
+ return _get_sweden(constants.CC_SWEDEN_INEVAL_URL, enhanced, artifacts, "in evaluation")
def get_sweden_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]:
@@ -1571,7 +1613,7 @@ def get_sweden_archived(enhanced: bool = True, artifacts: bool = False) -> list[
:param artifacts: Whether to download and compute artifact hashes (way slower, even more data).
:return: The entries.
"""
- return _get_sweden(constants.CC_SWEDEN_ARCHIVED_URL, enhanced, artifacts)
+ return _get_sweden(constants.CC_SWEDEN_ARCHIVED_URL, enhanced, artifacts, "archived")
def get_turkey_certified() -> list[dict[str, Any]]:
@@ -1588,7 +1630,7 @@ def get_turkey_certified() -> list[dict[str, Any]]:
with pdf_path.open("wb") as f:
f.write(resp.content)
dfs = tabula.read_pdf(str(pdf_path), pages="all")
- for df in dfs:
+ for df in tqdm(dfs, desc="Get TR scheme certified."):
for line in df.values: # type: ignore
values = [value if not (isinstance(value, float) and math.isnan(value)) else None for value in line]
cert = {
@@ -1608,7 +1650,7 @@ def get_turkey_certified() -> list[dict[str, Any]]:
return results
-def _get_usa(args, enhanced: bool, artifacts: bool): # noqa: C901
+def _get_usa(args, enhanced: bool, artifacts: bool, name): # noqa: C901
# TODO: There is more information in the API (like about PPs, etc.)
def map_cert(cert, files=None): # noqa: C901
result = {
@@ -1629,23 +1671,23 @@ def _get_usa(args, enhanced: bool, artifacts: bool): # noqa: C901
result["id"] += f"-{dt.year}"
result["report_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}"
if artifacts:
- result["report_hash"] = _get_hash(result["report_link"]).hex()
+ result["report_hash"] = _get_hash(result["report_link"])
elif file["file_label"] == "CC Certificate":
result["cert_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}"
if artifacts:
- result["cert_hash"] = _get_hash(result["cert_link"]).hex()
+ result["cert_hash"] = _get_hash(result["cert_link"])
elif file["file_label"] == "Security Target":
result["target_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}"
if artifacts:
- result["target_hash"] = _get_hash(result["target_link"]).hex()
+ result["target_hash"] = _get_hash(result["target_link"])
elif file["file_label"] == "Assurance Activity Report (AAR)":
result["aar_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}"
if artifacts:
- result["aar_hash"] = _get_hash(result["aar_link"]).hex()
+ result["aar_hash"] = _get_hash(result["aar_link"])
elif file["file_label"] == "Administrative Guide (AGD)":
result["agd_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}"
if artifacts:
- result["agd_hash"] = _get_hash(result["agd_link"]).hex()
+ result["agd_hash"] = _get_hash(result["agd_link"])
return result
@@ -1653,6 +1695,7 @@ def _get_usa(args, enhanced: bool, artifacts: bool): # noqa: C901
results = []
offset = 0
got = 0
+ pbar = tqdm(desc=f"Get US scheme {name}.")
while True:
resp = _getq(
constants.CC_USA_PRODUCTS_URL,
@@ -1673,10 +1716,12 @@ def _get_usa(args, enhanced: bool, artifacts: bool): # noqa: C901
session,
)
files = resp.json()
+ pbar.update()
results.append(map_cert(cert, files))
offset += 100
if got >= count:
break
+ pbar.close()
return results
@@ -1691,9 +1736,7 @@ def get_usa_certified( # noqa: C901
:return: The entries.
"""
return _get_usa(
- {"certification_status": "Certified", "publish_status": "Published"},
- enhanced,
- artifacts,
+ {"certification_status": "Certified", "publish_status": "Published"}, enhanced, artifacts, "certified"
)
@@ -1703,7 +1746,7 @@ def get_usa_in_evaluation() -> list[dict[str, Any]]:
:return: The entries.
"""
- return _get_usa({"status": "In Progress", "publish_status": "Published"}, False, False)
+ return _get_usa({"status": "In Progress", "publish_status": "Published"}, False, False, "in evaluation")
def get_usa_archived() -> list[dict[str, Any]]:
@@ -1712,7 +1755,7 @@ def get_usa_archived() -> list[dict[str, Any]]:
:return: The entries.
"""
- return _get_usa({"status": "Archived", "publish_status": "Published"}, False, False)
+ return _get_usa({"status": "Archived", "publish_status": "Published"}, False, False, "archived")
class EntryType(Enum):
@@ -1817,7 +1860,9 @@ class CCScheme(ComplexSerializableType):
}
@classmethod
- def from_web(cls, scheme: str, entry_types: Iterable[EntryType]) -> CCScheme:
+ def from_web(
+ cls, scheme: str, entry_types: Iterable[EntryType], enhanced: bool | None = None, artifacts: bool | None = None
+ ) -> CCScheme:
if not (scheme_lists := cls.methods.get(scheme)):
raise ValueError("Unknown scheme.")
entries = {}
@@ -1825,5 +1870,11 @@ class CCScheme(ComplexSerializableType):
for each_type in entry_types:
if not (method := scheme_lists.get(each_type)):
raise ValueError("Wrong entry_type for scheme.")
- entries[each_type] = method()
+ sig = signature(method)
+ args = {}
+ if enhanced is not None and "enhanced" in sig.parameters:
+ args["enhanced"] = enhanced
+ if artifacts is not None and "artifacts" in sig.parameters:
+ args["artifacts"] = artifacts
+ entries[each_type] = method(**args)
return cls(scheme, timestamp, entries)