Files
codebuddy2api1ts/scripts/mitm-redact.py
2026-04-23 04:36:57 +08:00

314 lines
9.2 KiB
Python

import json
import os
import time
from urllib.parse import parse_qs
from mitmproxy import http
OUT = os.environ.get("MITM_REDACT_LOG", "/tmp/codebuddy-mitm-events.jsonl")
FULL_CHAT_OUT = os.environ.get("MITM_FULL_CHAT_OUT", "captures/codebuddy-chat-completion-full.redacted.json")
SYSTEM_PROMPT_OUT = os.environ.get("MITM_SYSTEM_PROMPT_OUT", "captures/codebuddy-system-prompt.txt")
FULL_CAPTURE_HOSTS = set(filter(None, os.environ.get(
"MITM_FULL_CAPTURE_HOSTS",
"copilot.tencent.com,api.openai.com",
).split(",")))
FULL_CAPTURE_PATHS = set(filter(None, os.environ.get(
"MITM_FULL_CAPTURE_PATHS",
"/v2/chat/completions,/v1/chat/completions,/v1/responses",
).split(",")))
SENSITIVE_KEYS = {
"authorization",
"proxy-authorization",
"cookie",
"set-cookie",
"x-api-key",
"api-key",
"apikey",
"token",
"access_token",
"refresh_token",
"id_token",
"code",
"code_verifier",
"device_code",
"user_code",
"client_secret",
"ticket",
"sid",
"session",
"state",
"codebuddy_api_key",
"codebuddy_auth_token",
}
def request(flow: http.HTTPFlow) -> None:
flow.metadata["started_at"] = time.time()
def response(flow: http.HTTPFlow) -> None:
req = flow.request
resp = flow.response
event = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"duration_ms": round((time.time() - flow.metadata.get("started_at", time.time())) * 1000),
"method": req.method,
"scheme": req.scheme,
"host": req.pretty_host,
"port": req.port,
"path": req.path.split("?")[0],
"query_keys": sorted(req.query.keys()),
"request_headers": sanitize_headers(req.headers),
"request_body": summarize_body(req.headers.get("content-type", ""), safe_content(req)),
"status_code": resp.status_code,
"response_headers": sanitize_headers(resp.headers),
"response_body": summarize_body(resp.headers.get("content-type", ""), safe_content(resp)),
}
append(event)
if should_write_full_capture(req):
write_full_chat(req, resp, event)
def error(flow: http.HTTPFlow) -> None:
req = flow.request
append({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"method": req.method,
"scheme": req.scheme,
"host": req.pretty_host,
"port": req.port,
"path": req.path.split("?")[0],
"error": str(flow.error) if flow.error else "unknown",
})
def sanitize_headers(headers) -> dict:
clean = {}
for key, value in headers.items():
lower = key.lower()
clean[key] = "<redacted>" if is_sensitive_header(lower) else trim(value)
return clean
def should_write_full_capture(req: http.Request) -> bool:
return req.pretty_host in FULL_CAPTURE_HOSTS and req.path.split("?")[0] in FULL_CAPTURE_PATHS
def write_full_chat(req: http.Request, resp: http.Response, event: dict) -> None:
request_text = safe_content(req).decode("utf-8", errors="replace")
response_text = safe_content(resp).decode("utf-8", errors="replace")
try:
request_body = sanitize_json(json.loads(request_text), preserve_strings=True)
except Exception:
request_body = request_text
full = {
"metadata": {
"ts": event["ts"],
"duration_ms": event["duration_ms"],
"method": req.method,
"scheme": req.scheme,
"host": req.pretty_host,
"port": req.port,
"path": req.path.split("?")[0],
"status_code": resp.status_code,
},
"request_headers": sanitize_headers(req.headers),
"request_body": request_body,
"response_headers": sanitize_headers(resp.headers),
"response_sse": response_text,
}
write_json_file(FULL_CHAT_OUT, full)
system_prompt = extract_system_prompt(request_body)
if system_prompt:
write_text_file(SYSTEM_PROMPT_OUT, system_prompt)
def safe_content(message):
try:
return message.content
except Exception:
return message.raw_content
def summarize_body(content_type: str, raw: bytes | None):
if not raw:
return {"bytes": 0}
if len(raw) > 2_000_000:
return {"bytes": len(raw), "too_large": True}
text = raw.decode("utf-8", errors="replace")
content_type_lower = content_type.lower()
if "application/x-www-form-urlencoded" in content_type_lower:
return {"bytes": len(raw), "form_shape": sanitize_form(text)}
if "json" in content_type_lower or looks_like_json(text):
try:
return {"bytes": len(raw), "json_shape": sanitize_json(json.loads(text))}
except Exception:
pass
if "text/event-stream" in content_type_lower:
return {"bytes": len(raw), "sse_events": summarize_sse(text)}
return {"bytes": len(raw), "preview": trim(text)}
KEEP_STRING_KEYS = {
"model",
"role",
"type",
"name",
"object",
"finish_reason",
"reasoning_effort",
"grant_type",
"response_type",
"client_id",
"scope",
"redirect_uri",
"method_id",
"environment",
"endpoint",
}
def sanitize_json(value, key_context: str | None = None, preserve_strings: bool = False):
if isinstance(value, dict):
out = {}
for key, item in value.items():
lower = str(key).lower()
if is_sensitive_json_key(lower):
out[key] = "<redacted>"
else:
out[key] = sanitize_json(item, lower, preserve_strings)
return out
if isinstance(value, list):
items = value if preserve_strings else value[:20]
return [sanitize_json(item, key_context, preserve_strings) for item in items]
if isinstance(value, str):
if preserve_strings:
return value
if key_context in KEEP_STRING_KEYS:
return value
return f"<str:{len(value)}>"
if isinstance(value, (int, float, bool)) or value is None:
return value
return f"<{type(value).__name__}>"
def summarize_sse(text: str):
events = []
current = {}
for line in text.splitlines()[:200]:
if line.startswith("event:"):
current["event"] = line[6:].strip()
elif line.startswith("data:"):
current["data"] = summarize_data_line(line[5:].strip())
events.append(current)
current = {}
return events[:20]
def summarize_data_line(text: str):
if text == "[DONE]":
return text
try:
return sanitize_json(json.loads(text))
except Exception:
return trim(text)
def looks_like_json(text: str) -> bool:
stripped = text.strip()
return stripped.startswith("{") or stripped.startswith("[")
def sanitize_form(text: str) -> dict:
parsed = parse_qs(text, keep_blank_values=True)
out = {}
for key, values in parsed.items():
lower = key.lower()
if is_sensitive_json_key(lower):
out[key] = "<redacted>"
elif lower in KEEP_STRING_KEYS:
out[key] = values[0] if len(values) == 1 else values
else:
out[key] = [f"<str:{len(value)}>" for value in values]
return out
def is_sensitive_header(lower: str) -> bool:
return (
lower in SENSITIVE_KEYS
or "authorization" in lower
or "cookie" in lower
or "api-key" in lower
or lower.endswith("-key")
or "secret" in lower
)
def is_sensitive_json_key(lower: str) -> bool:
normalized = lower.replace("-", "_")
return normalized in {
"authorization",
"proxy_authorization",
"cookie",
"set_cookie",
"x_api_key",
"api_key",
"apikey",
"key",
"token",
"access_token",
"accesstoken",
"refresh_token",
"refreshtoken",
"id_token",
"idtoken",
"code",
"code_verifier",
"device_code",
"user_code",
"client_secret",
"secret",
"ticket",
"sid",
"session",
"state",
"codebuddy_api_key",
"codebuddyapikey",
"codebuddy_auth_token",
"codebuddyauthtoken",
}
def trim(text: str, limit: int = 240) -> str:
text = text.replace("\r", "\\r").replace("\n", "\\n")
return text if len(text) <= limit else text[:limit] + "...<truncated>"
def append(event: dict) -> None:
with open(OUT, "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
def extract_system_prompt(request_body) -> str:
if not isinstance(request_body, dict):
return ""
messages = request_body.get("messages")
if not isinstance(messages, list):
return ""
for message in messages:
if isinstance(message, dict) and message.get("role") == "system" and isinstance(message.get("content"), str):
return message["content"]
return ""
def write_json_file(path: str, value: dict) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(value, f, ensure_ascii=False, indent=2)
f.write("\n")
def write_text_file(path: str, text: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(text)