直连接口测试成功

This commit is contained in:
2026-04-23 04:36:57 +08:00
parent 5bc69bcd5b
commit a1587b8d12
15 changed files with 2694 additions and 7 deletions

View File

@@ -1,9 +1,20 @@
import json
import os
import time
from urllib.parse import parse_qs
from mitmproxy import http
OUT = os.environ.get("MITM_REDACT_LOG", "/tmp/codebuddy-mitm-events.jsonl")
FULL_CHAT_OUT = os.environ.get("MITM_FULL_CHAT_OUT", "captures/codebuddy-chat-completion-full.redacted.json")
SYSTEM_PROMPT_OUT = os.environ.get("MITM_SYSTEM_PROMPT_OUT", "captures/codebuddy-system-prompt.txt")
FULL_CAPTURE_HOSTS = set(filter(None, os.environ.get(
"MITM_FULL_CAPTURE_HOSTS",
"copilot.tencent.com,api.openai.com",
).split(",")))
FULL_CAPTURE_PATHS = set(filter(None, os.environ.get(
"MITM_FULL_CAPTURE_PATHS",
"/v2/chat/completions,/v1/chat/completions,/v1/responses",
).split(",")))
SENSITIVE_KEYS = {
"authorization",
"proxy-authorization",
@@ -16,6 +27,15 @@ SENSITIVE_KEYS = {
"access_token",
"refresh_token",
"id_token",
"code",
"code_verifier",
"device_code",
"user_code",
"client_secret",
"ticket",
"sid",
"session",
"state",
"codebuddy_api_key",
"codebuddy_auth_token",
}
@@ -44,6 +64,8 @@ def response(flow: http.HTTPFlow) -> None:
"response_body": summarize_body(resp.headers.get("content-type", ""), safe_content(resp)),
}
append(event)
if should_write_full_capture(req):
write_full_chat(req, resp, event)
def error(flow: http.HTTPFlow) -> None:
@@ -63,10 +85,43 @@ def sanitize_headers(headers) -> dict:
clean = {}
for key, value in headers.items():
lower = key.lower()
clean[key] = "<redacted>" if lower in SENSITIVE_KEYS or "token" in lower or "secret" in lower or "key" in lower else trim(value)
clean[key] = "<redacted>" if is_sensitive_header(lower) else trim(value)
return clean
def should_write_full_capture(req: http.Request) -> bool:
return req.pretty_host in FULL_CAPTURE_HOSTS and req.path.split("?")[0] in FULL_CAPTURE_PATHS
def write_full_chat(req: http.Request, resp: http.Response, event: dict) -> None:
request_text = safe_content(req).decode("utf-8", errors="replace")
response_text = safe_content(resp).decode("utf-8", errors="replace")
try:
request_body = sanitize_json(json.loads(request_text), preserve_strings=True)
except Exception:
request_body = request_text
full = {
"metadata": {
"ts": event["ts"],
"duration_ms": event["duration_ms"],
"method": req.method,
"scheme": req.scheme,
"host": req.pretty_host,
"port": req.port,
"path": req.path.split("?")[0],
"status_code": resp.status_code,
},
"request_headers": sanitize_headers(req.headers),
"request_body": request_body,
"response_headers": sanitize_headers(resp.headers),
"response_sse": response_text,
}
write_json_file(FULL_CHAT_OUT, full)
system_prompt = extract_system_prompt(request_body)
if system_prompt:
write_text_file(SYSTEM_PROMPT_OUT, system_prompt)
def safe_content(message):
try:
return message.content
@@ -80,12 +135,15 @@ def summarize_body(content_type: str, raw: bytes | None):
if len(raw) > 2_000_000:
return {"bytes": len(raw), "too_large": True}
text = raw.decode("utf-8", errors="replace")
if "json" in content_type.lower() or looks_like_json(text):
content_type_lower = content_type.lower()
if "application/x-www-form-urlencoded" in content_type_lower:
return {"bytes": len(raw), "form_shape": sanitize_form(text)}
if "json" in content_type_lower or looks_like_json(text):
try:
return {"bytes": len(raw), "json_shape": sanitize_json(json.loads(text))}
except Exception:
pass
if "text/event-stream" in content_type.lower():
if "text/event-stream" in content_type_lower:
return {"bytes": len(raw), "sse_events": summarize_sse(text)}
return {"bytes": len(raw), "preview": trim(text)}
@@ -98,22 +156,33 @@ KEEP_STRING_KEYS = {
"object",
"finish_reason",
"reasoning_effort",
"grant_type",
"response_type",
"client_id",
"scope",
"redirect_uri",
"method_id",
"environment",
"endpoint",
}
def sanitize_json(value, key_context: str | None = None):
def sanitize_json(value, key_context: str | None = None, preserve_strings: bool = False):
if isinstance(value, dict):
out = {}
for key, item in value.items():
lower = str(key).lower()
if lower in SENSITIVE_KEYS or "token" in lower or "secret" in lower or "key" in lower:
if is_sensitive_json_key(lower):
out[key] = "<redacted>"
else:
out[key] = sanitize_json(item, lower)
out[key] = sanitize_json(item, lower, preserve_strings)
return out
if isinstance(value, list):
return [sanitize_json(item, key_context) for item in value[:20]]
items = value if preserve_strings else value[:20]
return [sanitize_json(item, key_context, preserve_strings) for item in items]
if isinstance(value, str):
if preserve_strings:
return value
if key_context in KEEP_STRING_KEYS:
return value
return f"<str:{len(value)}>"
@@ -149,6 +218,66 @@ def looks_like_json(text: str) -> bool:
return stripped.startswith("{") or stripped.startswith("[")
def sanitize_form(text: str) -> dict:
parsed = parse_qs(text, keep_blank_values=True)
out = {}
for key, values in parsed.items():
lower = key.lower()
if is_sensitive_json_key(lower):
out[key] = "<redacted>"
elif lower in KEEP_STRING_KEYS:
out[key] = values[0] if len(values) == 1 else values
else:
out[key] = [f"<str:{len(value)}>" for value in values]
return out
def is_sensitive_header(lower: str) -> bool:
return (
lower in SENSITIVE_KEYS
or "authorization" in lower
or "cookie" in lower
or "api-key" in lower
or lower.endswith("-key")
or "secret" in lower
)
def is_sensitive_json_key(lower: str) -> bool:
normalized = lower.replace("-", "_")
return normalized in {
"authorization",
"proxy_authorization",
"cookie",
"set_cookie",
"x_api_key",
"api_key",
"apikey",
"key",
"token",
"access_token",
"accesstoken",
"refresh_token",
"refreshtoken",
"id_token",
"idtoken",
"code",
"code_verifier",
"device_code",
"user_code",
"client_secret",
"secret",
"ticket",
"sid",
"session",
"state",
"codebuddy_api_key",
"codebuddyapikey",
"codebuddy_auth_token",
"codebuddyauthtoken",
}
def trim(text: str, limit: int = 240) -> str:
text = text.replace("\r", "\\r").replace("\n", "\\n")
return text if len(text) <= limit else text[:limit] + "...<truncated>"
@@ -157,3 +286,28 @@ def trim(text: str, limit: int = 240) -> str:
def append(event: dict) -> None:
with open(OUT, "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
def extract_system_prompt(request_body) -> str:
if not isinstance(request_body, dict):
return ""
messages = request_body.get("messages")
if not isinstance(messages, list):
return ""
for message in messages:
if isinstance(message, dict) and message.get("role") == "system" and isinstance(message.get("content"), str):
return message["content"]
return ""
def write_json_file(path: str, value: dict) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(value, f, ensure_ascii=False, indent=2)
f.write("\n")
def write_text_file(path: str, text: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(text)