-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add Promptfoo (LLM eval & red-teaming) parser #15081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Dashtid
wants to merge
1
commit into
DefectDojo:dev
Choose a base branch
from
Dashtid:promptfoo-parser
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,199
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| --- | ||
| title: "Promptfoo (LLM eval & red-teaming)" | ||
| toc_hide: true | ||
| --- | ||
| Input Type: | ||
| - | ||
| This parser imports the JSON results file produced by [promptfoo](https://promptfoo.dev), an LLM evaluation and red-teaming tool. | ||
|
|
||
| Generate the file with `promptfoo eval -o results.json` or, for an adversarial scan, `promptfoo redteam run -o results.json`, and upload that JSON file. | ||
|
|
||
| Tested against the promptfoo results schema (`results.version == 3`). | ||
|
|
||
| Things to note about the Promptfoo parser: | ||
| - | ||
|
|
||
| - **Inverted pass/fail semantics.** promptfoo reports `success: true` when every assertion passes; for a red-team probe that means the target model *defended* the attack, so it is **not** a finding. Only results with `success: false` (a failed assertion / a successful attack) become Findings. | ||
| - **Aggregation:** failures for the same red-team plugin (`pluginId`) against the same target (provider) are aggregated into a single Finding, with `nb_occurences` reflecting the number of failed attempts and the most severe rung retained. | ||
| - **Severity** comes from the red-team `metadata.severity` (`critical`/`high`/`medium`/`low`). A plain `promptfoo eval` failure carries no severity metadata and defaults to **Medium**. | ||
| - **CWE** is mapped from the plugin / harm category as a starter mapping (refined over time): | ||
| - SQL-injection plugin -> **CWE-89**; shell/command-injection plugin -> **CWE-78** | ||
| - prompt-injection / prompt-extraction plugins -> **CWE-1427** (Improper Neutralization of Input Used for LLM Prompting) | ||
| - PII / privacy plugins -> **CWE-200** (Exposure of Sensitive Information to an Unauthorized Actor) | ||
| - everything else -> **CWE-1426** (Improper Validation of Generative AI Output) | ||
| - **Errored results** (`failureReason == 2`, a provider/eval error rather than an assertion failure) are skipped: they indicate the test could not run, not a vulnerability. | ||
|
|
||
| ### Sample Scan Data | ||
| Sample scan data for testing purposes can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/promptfoo). | ||
|
|
||
| ### Deduplication | ||
| The "Promptfoo Scan" scan type uses the `hash_code` [deduplication algorithm](https://docs.defectdojo.com/en/working_with_findings/finding_deduplication/about_deduplication/) with the following fields: | ||
|
|
||
| - title (the harm category and plugin id, e.g. *Hate (harmful:hate)*) | ||
| - component_name (the scanned provider / target model) | ||
|
|
||
| `description` and `severity` are intentionally **excluded** from the hashcode. `description` holds the specific attack input and model output, which promptfoo varies per run. `severity` is an aggregate value that can shift as the set of failed attempts changes between scans. Including either would stop the same weakness from deduplicating across repeated scans of the same target. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,243 @@ | ||
| import json | ||
| import logging | ||
|
|
||
| from dojo.models import Finding | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # promptfoo red-team severity strings -> DefectDojo severity. | ||
| SEVERITY_MAP = { | ||
| "critical": "Critical", | ||
| "high": "High", | ||
| "medium": "Medium", | ||
| "low": "Low", | ||
| } | ||
| # Severity for a failed result that carries no red-team severity (a plain `promptfoo eval` | ||
| # assertion failure has no pluginId/severity metadata). Medium is a neutral middle rung. | ||
| DEFAULT_SEVERITY = "Medium" | ||
|
|
||
| # Ascending severity ranking, used only to keep the most severe rung when aggregating. | ||
| SEVERITY_RANK = {"Info": 0, "Low": 1, "Medium": 2, "High": 3, "Critical": 4} | ||
|
|
||
| # Starter plugin/category -> CWE mapping, matched by substring (most specific first) against | ||
| # the red-team pluginId and harmCategory. Verified against MITRE CWE 4.x: | ||
| # CWE-89 Improper Neutralization of Special Elements used in an SQL Command | ||
| # CWE-78 Improper Neutralization of Special Elements used in an OS Command | ||
| # CWE-1427 Improper Neutralization of Input Used for LLM Prompting (prompt injection) | ||
| # CWE-200 Exposure of Sensitive Information to an Unauthorized Actor (PII / data leak) | ||
| # CWE-1426 Improper Validation of Generative AI Output (default / output safety) | ||
| # promptfoo plugin ids look like "harmful:hate", "pii:direct", "indirect-prompt-injection", | ||
| # "sql-injection". Order matters: the specific *-injection rules must precede the broad | ||
| # "injection" rule. Intentionally coarse; refine as promptfoo's plugin taxonomy is mapped | ||
| # more finely. | ||
| PLUGIN_CWE_RULES = [ | ||
| ("sql-injection", 89), | ||
| ("shell-injection", 78), | ||
| ("injection", 1427), | ||
| ("prompt-extraction", 1427), | ||
| ("pii", 200), | ||
| ("privacy", 200), | ||
| ] | ||
| DEFAULT_CWE = 1426 | ||
|
|
||
| # promptfoo ResultFailureReason enum (src/types/index.ts): NONE=0, ASSERT=1, ERROR=2. | ||
| # A failed assertion is a finding; an ERROR is a provider/eval failure (the test could not | ||
| # run), which is operational noise rather than a vulnerability, so it is skipped. | ||
| FAILURE_REASON_ERROR = 2 | ||
|
|
||
|
|
||
| class PromptfooParser: | ||
|
|
||
| """ | ||
| Parser for promptfoo (https://promptfoo.dev), an LLM evaluation and red-teaming tool. | ||
|
|
||
| Consumes the JSON results file written by ``promptfoo eval -o results.json`` (and | ||
| ``promptfoo redteam run -o results.json``). promptfoo's semantics are inverted relative | ||
| to most scanners: a result with ``success: true`` means every assertion passed -- for a | ||
| red-team probe that means the target model defended successfully -- so it is NOT a | ||
| finding. A result with ``success: false`` is an assertion failure (for a red-team probe, | ||
| the attack succeeded) and becomes a Finding. Failures for the same red-team plugin against | ||
| the same target are aggregated into one Finding. Verified against the promptfoo results | ||
| schema (``results.version == 3``). | ||
| """ | ||
|
|
||
| def get_scan_types(self): | ||
| return ["Promptfoo Scan"] | ||
|
|
||
| def get_label_for_scan_types(self, scan_type): | ||
| return "Promptfoo Scan" | ||
|
|
||
| def get_description_for_scan_types(self, scan_type): | ||
| return ( | ||
| "Import the JSON results file produced by `promptfoo eval -o results.json` " | ||
| "(or `promptfoo redteam run -o results.json`). Each failed evaluation result " | ||
| "(a failed assertion / successful red-team attack) becomes a Finding; failures " | ||
| "for the same plugin and target are aggregated into one Finding." | ||
| ) | ||
|
|
||
| def get_findings(self, file, test): | ||
| self.dupes = {} | ||
| data = self._load(file) | ||
| share_url = data.get("shareableUrl") if isinstance(data, dict) else None | ||
| for result in self._extract_results(data): | ||
| if not isinstance(result, dict): | ||
| continue | ||
| if result.get("success"): | ||
| continue # all assertions passed (model defended) -> not a finding | ||
| if result.get("failureReason") == FAILURE_REASON_ERROR: | ||
| continue # provider/eval error, not a security finding | ||
| self._process_failure(result, share_url, test) | ||
| return list(self.dupes.values()) | ||
|
|
||
| def _load(self, file): | ||
| if file is None: | ||
| return {} | ||
| content = file.read() | ||
| # Uploads may arrive as bytes (binary handle) and may carry a UTF-8 BOM; utf-8-sig | ||
| # strips it. A text handle is BOM-stripped explicitly. | ||
| if isinstance(content, bytes): | ||
| content = content.decode("utf-8-sig") | ||
| elif content[:1] == "\ufeff": | ||
| content = content[1:] | ||
| try: | ||
| return json.loads(content) | ||
| except json.JSONDecodeError as e: | ||
| msg = ( | ||
| "Invalid promptfoo results file: expected the JSON produced by " | ||
| "`promptfoo eval -o results.json` (or `promptfoo redteam run -o results.json`)." | ||
| ) | ||
| raise ValueError(msg) from e | ||
|
|
||
| def _extract_results(self, data): | ||
| # promptfoo nests the EvaluateResult array under results.results. Accept a top-level | ||
| # "results" list or a bare list as lenient fallbacks for hand-trimmed exports. | ||
| if isinstance(data, dict): | ||
| results = data.get("results") | ||
| if isinstance(results, dict) and isinstance(results.get("results"), list): | ||
| return results["results"] | ||
| if isinstance(results, list): | ||
| return results | ||
| elif isinstance(data, list): | ||
| return data | ||
| return [] | ||
|
|
||
| def _process_failure(self, result, share_url, test): | ||
| metadata = result.get("metadata") or {} | ||
| plugin_id = metadata.get("pluginId") | ||
| harm_category = metadata.get("harmCategory") | ||
| provider = self._provider_name(result.get("provider")) | ||
| severity = self._severity(metadata.get("severity")) | ||
|
|
||
| # Weakness identity for aggregation: red-team failures share a pluginId; a plain-eval | ||
| # failure falls back to the failed assertion type. Aggregate same weakness + same target. | ||
| weakness_id = plugin_id or self._assertion_type(result) or "assertion-failure" | ||
| dupe_key = f"{weakness_id}::{provider}" | ||
| if dupe_key in self.dupes: | ||
| finding = self.dupes[dupe_key] | ||
| finding.nb_occurences += 1 | ||
| if SEVERITY_RANK.get(severity, 0) > SEVERITY_RANK.get(finding.severity, 0): | ||
| finding.severity = severity | ||
| return | ||
|
|
||
| title = self._title(plugin_id, harm_category, result) | ||
| finding = Finding( | ||
| test=test, | ||
| title=title, | ||
| description=self._build_description(result, metadata), | ||
| severity=severity, | ||
| cwe=self._cwe(plugin_id, harm_category), | ||
| references=share_url or None, | ||
| component_name=provider or None, | ||
| vuln_id_from_tool=weakness_id, | ||
| unique_id_from_tool=dupe_key, | ||
| static_finding=True, | ||
| dynamic_finding=False, | ||
| nb_occurences=1, | ||
| ) | ||
| finding.unsaved_tags = [tag for tag in ["promptfoo", plugin_id, harm_category] if tag] | ||
| self.dupes[dupe_key] = finding | ||
|
|
||
| def _severity(self, raw): | ||
| if isinstance(raw, str): | ||
| return SEVERITY_MAP.get(raw.strip().lower(), DEFAULT_SEVERITY) | ||
| return DEFAULT_SEVERITY | ||
|
|
||
| def _cwe(self, plugin_id, harm_category): | ||
| haystack = f"{plugin_id or ''} {harm_category or ''}".lower() | ||
| for needle, cwe in PLUGIN_CWE_RULES: | ||
| if needle in haystack: | ||
| return cwe | ||
| return DEFAULT_CWE | ||
|
|
||
| def _title(self, plugin_id, harm_category, result): | ||
| if plugin_id: | ||
| title = f"{harm_category} ({plugin_id})" if harm_category else plugin_id | ||
| else: | ||
| assertion_type = self._assertion_type(result) | ||
| title = f"Failed assertion: {assertion_type}" if assertion_type else "promptfoo assertion failure" | ||
| if len(title) > 255: | ||
| title = title[:252] + "..." | ||
| return title | ||
|
|
||
| def _provider_name(self, provider): | ||
| if isinstance(provider, dict): | ||
| return provider.get("label") or provider.get("id") or "" | ||
| if isinstance(provider, str): | ||
| return provider | ||
| return "" | ||
|
|
||
| def _assertion_type(self, result): | ||
| components = (result.get("gradingResult") or {}).get("componentResults") or [] | ||
| failed = [c for c in components if isinstance(c, dict) and not c.get("pass")] | ||
| # Prefer the assertion that actually failed; fall back to the first component. | ||
| for component in failed or components: | ||
| if isinstance(component, dict): | ||
| assertion = component.get("assertion") or {} | ||
| label = assertion.get("metric") or assertion.get("type") | ||
| if label: | ||
| return label | ||
| return None | ||
|
|
||
| def _build_description(self, result, metadata): | ||
| parts = [] | ||
| plugin_id = metadata.get("pluginId") | ||
| if plugin_id: | ||
| parts.append(f"**Plugin:** {plugin_id}") | ||
| if metadata.get("harmCategory"): | ||
| parts.append(f"**Harm category:** {metadata['harmCategory']}") | ||
| if metadata.get("goal"): | ||
| parts.append(f"**Goal:** {metadata['goal']}") | ||
| provider = self._provider_name(result.get("provider")) | ||
| if provider: | ||
| parts.append(f"**Target:** {provider}") | ||
| reason = (result.get("gradingResult") or {}).get("reason") | ||
| if reason: | ||
| parts.append(f"**Why it failed:** {reason}") | ||
| prompt_text = self._prompt_text(result) | ||
| if prompt_text: | ||
| parts.append(f"**Attack input:**\n```\n{prompt_text}\n```") | ||
| output_text = self._output_text(result) | ||
| if output_text: | ||
| parts.append(f"**Model output:**\n```\n{output_text}\n```") | ||
| return "\n\n".join(parts) | ||
|
|
||
| def _prompt_text(self, result): | ||
| variables = result.get("vars") | ||
| if isinstance(variables, dict) and variables: | ||
| return "\n".join(f"{key}: {value}" for key, value in variables.items()) | ||
| if isinstance(variables, str): | ||
| return variables | ||
| prompt = result.get("prompt") | ||
| if isinstance(prompt, dict): | ||
| return prompt.get("raw") or prompt.get("label") or "" | ||
| return "" | ||
|
|
||
| def _output_text(self, result): | ||
| response = result.get("response") | ||
| if isinstance(response, dict): | ||
| output = response.get("output") | ||
| if isinstance(output, str): | ||
| return output | ||
| if output is not None: | ||
| return json.dumps(output, indent=2) | ||
| return "" | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this throw an "Unknown format" exception or some other exception with a hint of why there are no findings imported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I think we should. I would sure go look at the logs if an import successfully created zero findings.