314 lines
9.2 KiB
Python
314 lines
9.2 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from urllib.parse import parse_qs
|
|
from mitmproxy import http
|
|
|
|
OUT = os.environ.get("MITM_REDACT_LOG", "/tmp/codebuddy-mitm-events.jsonl")
|
|
FULL_CHAT_OUT = os.environ.get("MITM_FULL_CHAT_OUT", "captures/codebuddy-chat-completion-full.redacted.json")
|
|
SYSTEM_PROMPT_OUT = os.environ.get("MITM_SYSTEM_PROMPT_OUT", "captures/codebuddy-system-prompt.txt")
|
|
FULL_CAPTURE_HOSTS = set(filter(None, os.environ.get(
|
|
"MITM_FULL_CAPTURE_HOSTS",
|
|
"copilot.tencent.com,api.openai.com",
|
|
).split(",")))
|
|
FULL_CAPTURE_PATHS = set(filter(None, os.environ.get(
|
|
"MITM_FULL_CAPTURE_PATHS",
|
|
"/v2/chat/completions,/v1/chat/completions,/v1/responses",
|
|
).split(",")))
|
|
SENSITIVE_KEYS = {
|
|
"authorization",
|
|
"proxy-authorization",
|
|
"cookie",
|
|
"set-cookie",
|
|
"x-api-key",
|
|
"api-key",
|
|
"apikey",
|
|
"token",
|
|
"access_token",
|
|
"refresh_token",
|
|
"id_token",
|
|
"code",
|
|
"code_verifier",
|
|
"device_code",
|
|
"user_code",
|
|
"client_secret",
|
|
"ticket",
|
|
"sid",
|
|
"session",
|
|
"state",
|
|
"codebuddy_api_key",
|
|
"codebuddy_auth_token",
|
|
}
|
|
|
|
|
|
def request(flow: http.HTTPFlow) -> None:
|
|
flow.metadata["started_at"] = time.time()
|
|
|
|
|
|
def response(flow: http.HTTPFlow) -> None:
|
|
req = flow.request
|
|
resp = flow.response
|
|
event = {
|
|
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"duration_ms": round((time.time() - flow.metadata.get("started_at", time.time())) * 1000),
|
|
"method": req.method,
|
|
"scheme": req.scheme,
|
|
"host": req.pretty_host,
|
|
"port": req.port,
|
|
"path": req.path.split("?")[0],
|
|
"query_keys": sorted(req.query.keys()),
|
|
"request_headers": sanitize_headers(req.headers),
|
|
"request_body": summarize_body(req.headers.get("content-type", ""), safe_content(req)),
|
|
"status_code": resp.status_code,
|
|
"response_headers": sanitize_headers(resp.headers),
|
|
"response_body": summarize_body(resp.headers.get("content-type", ""), safe_content(resp)),
|
|
}
|
|
append(event)
|
|
if should_write_full_capture(req):
|
|
write_full_chat(req, resp, event)
|
|
|
|
|
|
def error(flow: http.HTTPFlow) -> None:
|
|
req = flow.request
|
|
append({
|
|
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"method": req.method,
|
|
"scheme": req.scheme,
|
|
"host": req.pretty_host,
|
|
"port": req.port,
|
|
"path": req.path.split("?")[0],
|
|
"error": str(flow.error) if flow.error else "unknown",
|
|
})
|
|
|
|
|
|
def sanitize_headers(headers) -> dict:
|
|
clean = {}
|
|
for key, value in headers.items():
|
|
lower = key.lower()
|
|
clean[key] = "<redacted>" if is_sensitive_header(lower) else trim(value)
|
|
return clean
|
|
|
|
|
|
def should_write_full_capture(req: http.Request) -> bool:
|
|
return req.pretty_host in FULL_CAPTURE_HOSTS and req.path.split("?")[0] in FULL_CAPTURE_PATHS
|
|
|
|
|
|
def write_full_chat(req: http.Request, resp: http.Response, event: dict) -> None:
|
|
request_text = safe_content(req).decode("utf-8", errors="replace")
|
|
response_text = safe_content(resp).decode("utf-8", errors="replace")
|
|
try:
|
|
request_body = sanitize_json(json.loads(request_text), preserve_strings=True)
|
|
except Exception:
|
|
request_body = request_text
|
|
full = {
|
|
"metadata": {
|
|
"ts": event["ts"],
|
|
"duration_ms": event["duration_ms"],
|
|
"method": req.method,
|
|
"scheme": req.scheme,
|
|
"host": req.pretty_host,
|
|
"port": req.port,
|
|
"path": req.path.split("?")[0],
|
|
"status_code": resp.status_code,
|
|
},
|
|
"request_headers": sanitize_headers(req.headers),
|
|
"request_body": request_body,
|
|
"response_headers": sanitize_headers(resp.headers),
|
|
"response_sse": response_text,
|
|
}
|
|
write_json_file(FULL_CHAT_OUT, full)
|
|
system_prompt = extract_system_prompt(request_body)
|
|
if system_prompt:
|
|
write_text_file(SYSTEM_PROMPT_OUT, system_prompt)
|
|
|
|
|
|
def safe_content(message):
|
|
try:
|
|
return message.content
|
|
except Exception:
|
|
return message.raw_content
|
|
|
|
|
|
def summarize_body(content_type: str, raw: bytes | None):
|
|
if not raw:
|
|
return {"bytes": 0}
|
|
if len(raw) > 2_000_000:
|
|
return {"bytes": len(raw), "too_large": True}
|
|
text = raw.decode("utf-8", errors="replace")
|
|
content_type_lower = content_type.lower()
|
|
if "application/x-www-form-urlencoded" in content_type_lower:
|
|
return {"bytes": len(raw), "form_shape": sanitize_form(text)}
|
|
if "json" in content_type_lower or looks_like_json(text):
|
|
try:
|
|
return {"bytes": len(raw), "json_shape": sanitize_json(json.loads(text))}
|
|
except Exception:
|
|
pass
|
|
if "text/event-stream" in content_type_lower:
|
|
return {"bytes": len(raw), "sse_events": summarize_sse(text)}
|
|
return {"bytes": len(raw), "preview": trim(text)}
|
|
|
|
|
|
KEEP_STRING_KEYS = {
|
|
"model",
|
|
"role",
|
|
"type",
|
|
"name",
|
|
"object",
|
|
"finish_reason",
|
|
"reasoning_effort",
|
|
"grant_type",
|
|
"response_type",
|
|
"client_id",
|
|
"scope",
|
|
"redirect_uri",
|
|
"method_id",
|
|
"environment",
|
|
"endpoint",
|
|
}
|
|
|
|
|
|
def sanitize_json(value, key_context: str | None = None, preserve_strings: bool = False):
|
|
if isinstance(value, dict):
|
|
out = {}
|
|
for key, item in value.items():
|
|
lower = str(key).lower()
|
|
if is_sensitive_json_key(lower):
|
|
out[key] = "<redacted>"
|
|
else:
|
|
out[key] = sanitize_json(item, lower, preserve_strings)
|
|
return out
|
|
if isinstance(value, list):
|
|
items = value if preserve_strings else value[:20]
|
|
return [sanitize_json(item, key_context, preserve_strings) for item in items]
|
|
if isinstance(value, str):
|
|
if preserve_strings:
|
|
return value
|
|
if key_context in KEEP_STRING_KEYS:
|
|
return value
|
|
return f"<str:{len(value)}>"
|
|
if isinstance(value, (int, float, bool)) or value is None:
|
|
return value
|
|
return f"<{type(value).__name__}>"
|
|
|
|
|
|
def summarize_sse(text: str):
|
|
events = []
|
|
current = {}
|
|
for line in text.splitlines()[:200]:
|
|
if line.startswith("event:"):
|
|
current["event"] = line[6:].strip()
|
|
elif line.startswith("data:"):
|
|
current["data"] = summarize_data_line(line[5:].strip())
|
|
events.append(current)
|
|
current = {}
|
|
return events[:20]
|
|
|
|
|
|
def summarize_data_line(text: str):
|
|
if text == "[DONE]":
|
|
return text
|
|
try:
|
|
return sanitize_json(json.loads(text))
|
|
except Exception:
|
|
return trim(text)
|
|
|
|
|
|
def looks_like_json(text: str) -> bool:
|
|
stripped = text.strip()
|
|
return stripped.startswith("{") or stripped.startswith("[")
|
|
|
|
|
|
def sanitize_form(text: str) -> dict:
|
|
parsed = parse_qs(text, keep_blank_values=True)
|
|
out = {}
|
|
for key, values in parsed.items():
|
|
lower = key.lower()
|
|
if is_sensitive_json_key(lower):
|
|
out[key] = "<redacted>"
|
|
elif lower in KEEP_STRING_KEYS:
|
|
out[key] = values[0] if len(values) == 1 else values
|
|
else:
|
|
out[key] = [f"<str:{len(value)}>" for value in values]
|
|
return out
|
|
|
|
|
|
def is_sensitive_header(lower: str) -> bool:
|
|
return (
|
|
lower in SENSITIVE_KEYS
|
|
or "authorization" in lower
|
|
or "cookie" in lower
|
|
or "api-key" in lower
|
|
or lower.endswith("-key")
|
|
or "secret" in lower
|
|
)
|
|
|
|
|
|
def is_sensitive_json_key(lower: str) -> bool:
|
|
normalized = lower.replace("-", "_")
|
|
return normalized in {
|
|
"authorization",
|
|
"proxy_authorization",
|
|
"cookie",
|
|
"set_cookie",
|
|
"x_api_key",
|
|
"api_key",
|
|
"apikey",
|
|
"key",
|
|
"token",
|
|
"access_token",
|
|
"accesstoken",
|
|
"refresh_token",
|
|
"refreshtoken",
|
|
"id_token",
|
|
"idtoken",
|
|
"code",
|
|
"code_verifier",
|
|
"device_code",
|
|
"user_code",
|
|
"client_secret",
|
|
"secret",
|
|
"ticket",
|
|
"sid",
|
|
"session",
|
|
"state",
|
|
"codebuddy_api_key",
|
|
"codebuddyapikey",
|
|
"codebuddy_auth_token",
|
|
"codebuddyauthtoken",
|
|
}
|
|
|
|
|
|
def trim(text: str, limit: int = 240) -> str:
|
|
text = text.replace("\r", "\\r").replace("\n", "\\n")
|
|
return text if len(text) <= limit else text[:limit] + "...<truncated>"
|
|
|
|
|
|
def append(event: dict) -> None:
|
|
with open(OUT, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def extract_system_prompt(request_body) -> str:
|
|
if not isinstance(request_body, dict):
|
|
return ""
|
|
messages = request_body.get("messages")
|
|
if not isinstance(messages, list):
|
|
return ""
|
|
for message in messages:
|
|
if isinstance(message, dict) and message.get("role") == "system" and isinstance(message.get("content"), str):
|
|
return message["content"]
|
|
return ""
|
|
|
|
|
|
def write_json_file(path: str, value: dict) -> None:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(value, f, ensure_ascii=False, indent=2)
|
|
f.write("\n")
|
|
|
|
|
|
def write_text_file(path: str, text: str) -> None:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(text)
|