{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Annotating certificate references with LLMs\n", "## Before you launch\n", "1. Create a file `openai_api_key` in the root directory and paste your key there.\n", "2. Make sure you have the CC dataset extracted into project `cc_data` in the project root directory.\n", "## What is being done here\n", "1. Obtain names, certificate IDs, digests, and old digests from the CCDataset.\n", "2. Create an annotations dataframe based on the annotations in `src/sec_certs/data/reference_annotations/final/train.csv`.\n", "3. Enrich the annotations dataframe with the raw texts in `cc_data/certs/reports/txt`.\n", "4. Enrich the annotations dataframe with the old digests (the annotation dataframe uses old ones), target cert IDs, names and target names.\n", "5. Call the OpenAI API with structured outputs to label the certificate reference based on instructions in `notebooks/llm_annotation_prompt.txt`\n", "6. Analyze the metrics with a confusion matrix." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from sec_certs.dataset.cc import CCDataset\n", "from sec_certs.sample import CCCertificate\n", "import pandas as pd\n", "import csv\n", "import os" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Method auxiliary_datasets_dir can only be called on backed dataset.\n", "Method auxiliary_datasets_dir can only be called on backed dataset.\n", "Method auxiliary_datasets_dir can only be called on backed dataset.\n", "Method auxiliary_datasets_dir can only be called on backed dataset.\n", "Method auxiliary_datasets_dir can only be called on backed dataset.\n", "Method auxiliary_datasets_dir can only be called on backed dataset.\n" ] } ], "source": [ "if \"dgst_map.csv\" not in os.listdir():\n", " # get dgst mappings\n", " if os.path.exists(\"../../cc_data/dataset.json\"):\n", " dset = CCDataset.from_json(\"../../cc_data/dataset.json\")\n", " else:\n", " dset = CCDataset.from_web()\n", " data = [(row.name, row.heuristics.cert_id, row.dgst, row.old_dgst) for row in dset]\n", " with open('dgst_map.csv', 'w', newline='') as file:\n", " writer = csv.writer(file)\n", " writer.writerow(['name', 'cert_id', 'dgst', 'old_dgst']) # Header\n", " writer.writerows(data)\n", "dgst_map_df = pd.read_csv(\"dgst_map.csv\")" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "if not 'dset' in vars() and not 'dset' in globals():\n", " dset = CCDataset.from_json(\"../../cc_data/dataset.json\")" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "from sec_certs.model.references_nlp.segment_extractor import ReferenceSegmentExtractor" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "reports_path = '../../cc_data/certs/reports/txt'\n", "if not os.path.exists(reports_path):\n", " raise FileNotFoundError(f\"The reports directory {os.path.abspath(reports_path)} does not exist, make sure you have the CC dataset unpacked.\")\n", "\n", "reports_files = os.listdir(reports_path)\n", "# print(reports_files)\n", "def add_text_content(dgst: str):\n", " if f\"{dgst}.txt\" not in reports_files:\n", " return \"\"\n", " return open(reports_path + \"/\" + dgst + \".txt\").read()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "df_annotations = pd.read_csv(\"../../src/sec_certs/data/reference_annotations/final/train.csv\") \\\n", " .rename(columns={\"dgst\": \"old_dgst\", \"canonical_reference_keyword\": \"target_cert_id\"}) \\\n", " .merge(dgst_map_df, on=\"old_dgst\") \\\n", " .merge(dgst_map_df[[\"name\", \"cert_id\"]].rename(columns={\"name\": \"target_name\", \"cert_id\": \"target_cert_id\"}), on=\"target_cert_id\")\n", "\n", "## get fulltext\n", "df_annotations[\"text_content\"] = df_annotations[\"dgst\"] \\\n", " .apply(add_text_content)\n", "\n", "## get segments\n", "if \"ref_segments.parquet\" not in os.listdir():\n", " used_certs = []\n", " for cert in dset:\n", " if cert.dgst in df_annotations[\"dgst\"].values:\n", " used_certs.append(cert)\n", " extracted_refs = ReferenceSegmentExtractor()(used_certs)\n", " extracted_refs.to_parquet(\"ref_segments.parquet\")\n", "ref_segments = pd.read_parquet(\"ref_segments.parquet\")\n", "\n", "df_annotations = ref_segments.rename(columns={\"canonical_reference_keyword\": \"target_cert_id\"})[[\"dgst\", \"target_cert_id\", \"segments\", \"actual_reference_keywords\"]].merge(df_annotations, on=[\"dgst\", \"target_cert_id\"])" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [], "source": [ "from enum import Enum\n", "from pydantic import Field, BaseModel\n", "from openai import AsyncOpenAI, RateLimitError\n", "import asyncio\n", "from openai.lib._pydantic import to_strict_json_schema\n", "\n", "class LabelType(Enum):\n", " COMPONENT_USED = \"component_used\"\n", " RE_EVALUATION = \"re-evaluation\"\n", " EVALUATION_REUSED = \"evaluation_reused\"\n", " PREVIOUS_VERSION = \"previous_version\"\n", " COMPONENT_SHARED = \"component_shared\"\n", " IRRELEVANT = \"irrelevant\"\n", " NONE = \"none\"\n", "\n", "class SimplifiedLabelType(Enum):\n", " COMPONENT_REUSE = \"component_reuse\"\n", " PREDECESSOR = \"predecessor\" \n", " NONE = \"none\"\n", "\n", "def convert_label_type(label: LabelType|str) -> SimplifiedLabelType:\n", " if isinstance(label, LabelType):\n", " label_value = label\n", " else:\n", " try:\n", " label_value = LabelType(label)\n", " except ValueError:\n", " return SimplifiedLabelType.NONE\n", " \n", " if label_value in [LabelType.COMPONENT_USED, LabelType.EVALUATION_REUSED, LabelType.COMPONENT_SHARED]:\n", " return SimplifiedLabelType.COMPONENT_REUSE\n", " elif label_value in [LabelType.RE_EVALUATION, LabelType.PREVIOUS_VERSION]:\n", " return SimplifiedLabelType.PREDECESSOR\n", " return SimplifiedLabelType.NONE\n", "\n", "class LabelDetectionResult(BaseModel):\n", " explanation: str = Field(description=\"Explain why this label was chosen.\")\n", " label: LabelType = Field(description=\"Categorization of the relationship between the examined and referenced certificates.\")\n", "\n", "#label_detection_strict_schema = to_strict_json_schema(LabelDetectionResult)" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "label\n", "component_used 58\n", "re-evaluation 16\n", "evaluation_reused 11\n", "previous_version 6\n", "component_shared 5\n", "irrelevant 2\n", "Name: count, dtype: int64" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_annotations.label.value_counts()" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "label\n", "component_reuse 74\n", "predecessor 22\n", "none 3\n", "Name: count, dtype: int64" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_annotations.label.apply(convert_label_type).apply(lambda x: x.value).value_counts()" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "API Key loaded: sk-proj-tco-...\n" ] } ], "source": [ "def load_api_key(filename: str) -> str:\n", " try:\n", " with open(filename, 'r') as file:\n", " api_key = file.readline().strip()\n", " return api_key\n", " except FileNotFoundError:\n", " raise Exception(f\"OpenAI API key not found at {filename}.\")\n", "\n", "api_key = load_api_key('../../openai_api_key')\n", "openapi_model = \"gpt-4o-mini\"\n", "print(f\"API Key loaded: {api_key[:12]}...\")\n", "client = AsyncOpenAI(api_key=api_key)" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "from tqdm.asyncio import tqdm\n", "import nest_asyncio # otherwise async tasks can't be called from a running event loop (jupyter)\n", "nest_asyncio.apply()\n", "import asyncio\n", "\n", "system_message = {\n", " \"role\": \"system\",\n", " \"content\": open(\"llm_annotation_prompt.txt\").read()\n", "}\n", "async def get_ai_label(text_content: str, name: str, target_name: str, cert_id: str, target_cert_id: str, verbose=False):\n", " messages = [\n", " system_message,\n", " {\"role\": \"user\", \"content\": f\"\"\"\n", " Task: Classify how certificate {cert_id} refers to certificate {target_cert_id}. Note that you are to classify ONLY this relationship, and not relationships to other mentioned certificates or their components. Refer to the instructions and examples in the system message.\n", "\n", " Target: {target_cert_id} ({target_name})\n", " Reviewed: {cert_id} ({name})\n", "\n", " Text from the examined certificate {cert_id}:\n", " '{text_content}'\n", " \"\"\"}\n", " ]\n", " while True:\n", " try:\n", " completion = await client.beta.chat.completions.parse(\n", " model=openapi_model,\n", " messages=messages,\n", " response_format=LabelDetectionResult\n", " )\n", " return completion.choices[0].message.parsed\n", " except Exception as e:\n", " wait_time = 60\n", " if verbose:\n", " print(f\"Error: {e}, retrying in {wait_time}s..\")\n", " await asyncio.sleep(wait_time)\n", "\n", "async def process_row_async(text_content: str, name: str, target_name: str, cert_id: str, target_cert_id: str):\n", " result = await get_ai_label(text_content, name, target_name, cert_id, target_cert_id)\n", " return result.label.value, result.explanation\n", "\n", "async def process_dataframe_async_fulltext(df):\n", " tasks = [process_row_async(\n", " row[\"text_content\"], \n", " row[\"name\"], \n", " row[\"target_name\"],\n", " row[\"cert_id\"],\n", " row[\"target_cert_id\"]) for _, row in df.iterrows()]\n", " results = []\n", " for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)): # tracks actual completion\n", " results.append(await coro)\n", " return results\n", "\n", "async def process_dataframe_async_segments(df):\n", " all_tasks = []\n", " row_indices = []\n", " \n", " # flatten for async\n", " for idx, row in df.iterrows():\n", " segments = row[\"segments\"]\n", " for segment in segments:\n", " all_tasks.append(process_row_async(\n", " segment,\n", " row[\"name\"], \n", " row[\"target_name\"],\n", " row[\"cert_id\"],\n", " row[\"target_cert_id\"]\n", " ))\n", " row_indices.append(idx)\n", " \n", " # async\n", " results = []\n", " for coro in tqdm(asyncio.as_completed(all_tasks), total=len(all_tasks)):\n", " results.append(await coro)\n", " \n", " # reorganize and group\n", " row_results = {}\n", " for i, (label, explanation) in enumerate(results):\n", " row_idx = row_indices[i]\n", " if row_idx not in row_results:\n", " row_results[row_idx] = {\"labels\": [], \"explanations\": []}\n", " row_results[row_idx][\"labels\"].append(label)\n", " row_results[row_idx][\"explanations\"].append(explanation)\n", " \n", " # vote on labels\n", " final_results = []\n", " for idx in sorted(row_results.keys()):\n", " labels = row_results[idx][\"labels\"]\n", " explanations = row_results[idx][\"explanations\"]\n", " \n", " if labels:\n", " from collections import Counter\n", " vote_results = Counter(labels)\n", " final_label = vote_results.most_common(1)[0][0]\n", " \n", " # get explanation for the winning label\n", " for i, label in enumerate(labels):\n", " if label == final_label:\n", " explanation = explanations[i]\n", " break\n", " else:\n", " final_label = LabelType.NONE.value\n", " explanation = \"No segments available for analysis\"\n", " \n", " final_results.append((final_label, explanation))\n", " \n", " return final_results" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 406/406 [10:06<00:00, 1.49s/it] \n" ] } ], "source": [ "if \"llm_annotation_results_segments.parquet\" not in os.listdir():\n", " results = asyncio.run(process_dataframe_async_segments(df_annotations))\n", " df_annotations[\"gpt4omini_label\"], df_annotations[\"gpt4omini_reason\"] = zip(*results)\n", " df_annotations.to_parquet(\"llm_annotation_results_segments.parquet\")\n", "df_results = pd.read_parquet(\"llm_annotation_results_segments.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# running this cell costs about 0.15EUR with gpt4o-mini\n", "\"\"\"\n", "if \"llm_annotation_results.parquet\" not in os.listdir(): # delete the file if you want to re-run things\n", " results = asyncio.run(process_dataframe_async(df_annotations))\n", " df_annotations[\"gpt4omini_label\"], df_annotations[\"gpt4omini_reason\"] = zip(*results)\n", " df_annotations.to_parquet(\"llm_annotation_results.parquet\")\n", "df_results = pd.read_parquet(\"llm_annotation_results.parquet\")\n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
| \n", " | target_cert_id | \n", "cert_id | \n", "name | \n", "target_name | \n", "label | \n", "comment | \n", "gpt4omini_label | \n", "gpt4omini_reason | \n", "
|---|---|---|---|---|---|---|---|---|
| 0 | \n", "BSI-DSZ-CC-1040-2019 | \n", "NSCIB-CC-0229284-CR | \n", "NXP eDoc Suite v3.5 on JCOP4 P71 - cryptovisio... | \n", "NXP Secure Smart Card Controller N7121 with IC... | \n", "component_used | \n", "None | \n", "re-evaluation | \n", "The text indicates that BSI-DSZ-CC-0961-2017 i... | \n", "
| 1 | \n", "BSI-DSZ-CC-0827-V8-2020 | \n", "BSI-DSZ-CC-1158-2020 | \n", "Digital Tachograph DTCO 1381, Release 4.0e | \n", "Infineon Technologies Smart Card IC (Security ... | \n", "irrelevant | \n", "unclear | \n", "component_used | \n", "The relationship between ANSSI-CC-2018/18 and ... | \n", "
| 2 | \n", "BSI-DSZ-CC-0782-2012 | \n", "BSI-DSZ-CC-0879-2014 | \n", "Infineon Security Controller M7893 B11 with op... | \n", "Infineon Security Controller M7892 B11 with op... | \n", "re-evaluation | \n", "None | \n", "component_used | \n", "The referenced certificate BSI-DSZ-CC-0891-V3-... | \n", "
| 3 | \n", "BSI-DSZ-CC-0957-V2-2016 | \n", "BSI-DSZ-CC-1035-2017 | \n", "TCOS Secure Crypto Module Version 1.0 Release ... | \n", "TCOS Smart Meter Security Module Version 1.0 R... | \n", "evaluation_reused | \n", "unclear, mentions that this is a “re-evaluatio... | \n", "component_used | \n", "The certificate BSI-DSZ-CC-1040-2019 is refere... | \n", "
| 4 | \n", "ANSSI-CC-2012/68 | \n", "ANSSI-CC-2014/25 | \n", "SOMA801STM - application BAC, version 1.0 | \n", "Microcontrôleurs sécurisés SA23YR80/48 et SB23... | \n", "component_used | \n", "None | \n", "re-evaluation | \n", "The text indicates that BSI-DSZ-CC-0680-2010 i... | \n", "
| ... | \n", "... | \n", "... | \n", "... | \n", "... | \n", "... | \n", "... | \n", "... | \n", "... | \n", "
| 94 | \n", "ANSSI-CC-2018/40 | \n", "ANSSI-CC-2020/71 | \n", "S3FV9RR/S3FV9RQ/S3FV9RP/S3FV9RK 32-bit RISC Mi... | \n", "S3FV9RR/S3FV9RQ/S3FV9RP/S3FV9RK 32-bit RISC Mi... | \n", "previous_version | \n", "None | \n", "component_used | \n", "The examined certificate ANSSI-CC-2020/43 incl... | \n", "
| 95 | \n", "BSI-DSZ-CC-0835-V2-2017 | \n", "BSI-DSZ-CC-0836-V2-2017 | \n", "TCOS Residence Permit Card Version 1.1 Release... | \n", "TCOS Residence Permit Card Version 1.1 Release... | \n", "component_shared | \n", "BAC, EAC thingy | \n", "re-evaluation | \n", "The evaluated certificate CSEC2014007 referenc... | \n", "
| 96 | \n", "ANSSI-CC-2013/55 | \n", "ANSSI-CC-2013/64 | \n", "Carte à puce SLJ 52 Gxx yyy AL : application p... | \n", "Plateforme jTOP INFv#46 masquée sur composants... | \n", "component_used | \n", "None | \n", "none | \n", "The text from the examined certificate BSI-DSZ... | \n", "
| 97 | \n", "ANSSI-CC-2018/52 | \n", "ANSSI-CC-2018/55 | \n", "P73N2M0B0.2C2 | \n", "P73N2M0B0.202 | \n", "component_used | \n", "None | \n", "component_used | \n", "The ANSSI-CC-2020/43 certificate makes a refer... | \n", "
| 98 | \n", "BSI-DSZ-CC-0410-2007 | \n", "ANSSI-CC-2008/14 | \n", "ID-One EPass 64 v2.0 with EAC ECC | \n", "NXP Secure Smart Card Controller P5CD080V0B, P... | \n", "component_used | \n", "None | \n", "previous_version | \n", "The certificate BSI-DSZ-CC-0911-2014 is refere... | \n", "
99 rows × 8 columns
\n", "