import json import os import time from urllib.parse import parse_qs from mitmproxy import http OUT = os.environ.get("MITM_REDACT_LOG", "/tmp/codebuddy-mitm-events.jsonl") FULL_CHAT_OUT = os.environ.get("MITM_FULL_CHAT_OUT", "captures/codebuddy-chat-completion-full.redacted.json") SYSTEM_PROMPT_OUT = os.environ.get("MITM_SYSTEM_PROMPT_OUT", "captures/codebuddy-system-prompt.txt") FULL_CAPTURE_HOSTS = set(filter(None, os.environ.get( "MITM_FULL_CAPTURE_HOSTS", "copilot.tencent.com,api.openai.com", ).split(","))) FULL_CAPTURE_PATHS = set(filter(None, os.environ.get( "MITM_FULL_CAPTURE_PATHS", "/v2/chat/completions,/v1/chat/completions,/v1/responses", ).split(","))) SENSITIVE_KEYS = { "authorization", "proxy-authorization", "cookie", "set-cookie", "x-api-key", "api-key", "apikey", "token", "access_token", "refresh_token", "id_token", "code", "code_verifier", "device_code", "user_code", "client_secret", "ticket", "sid", "session", "state", "codebuddy_api_key", "codebuddy_auth_token", } def request(flow: http.HTTPFlow) -> None: flow.metadata["started_at"] = time.time() def response(flow: http.HTTPFlow) -> None: req = flow.request resp = flow.response event = { "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "duration_ms": round((time.time() - flow.metadata.get("started_at", time.time())) * 1000), "method": req.method, "scheme": req.scheme, "host": req.pretty_host, "port": req.port, "path": req.path.split("?")[0], "query_keys": sorted(req.query.keys()), "request_headers": sanitize_headers(req.headers), "request_body": summarize_body(req.headers.get("content-type", ""), safe_content(req)), "status_code": resp.status_code, "response_headers": sanitize_headers(resp.headers), "response_body": summarize_body(resp.headers.get("content-type", ""), safe_content(resp)), } append(event) if should_write_full_capture(req): write_full_chat(req, resp, event) def error(flow: http.HTTPFlow) -> None: req = flow.request append({ "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "method": req.method, "scheme": req.scheme, "host": req.pretty_host, "port": req.port, "path": req.path.split("?")[0], "error": str(flow.error) if flow.error else "unknown", }) def sanitize_headers(headers) -> dict: clean = {} for key, value in headers.items(): lower = key.lower() clean[key] = "" if is_sensitive_header(lower) else trim(value) return clean def should_write_full_capture(req: http.Request) -> bool: return req.pretty_host in FULL_CAPTURE_HOSTS and req.path.split("?")[0] in FULL_CAPTURE_PATHS def write_full_chat(req: http.Request, resp: http.Response, event: dict) -> None: request_text = safe_content(req).decode("utf-8", errors="replace") response_text = safe_content(resp).decode("utf-8", errors="replace") try: request_body = sanitize_json(json.loads(request_text), preserve_strings=True) except Exception: request_body = request_text full = { "metadata": { "ts": event["ts"], "duration_ms": event["duration_ms"], "method": req.method, "scheme": req.scheme, "host": req.pretty_host, "port": req.port, "path": req.path.split("?")[0], "status_code": resp.status_code, }, "request_headers": sanitize_headers(req.headers), "request_body": request_body, "response_headers": sanitize_headers(resp.headers), "response_sse": response_text, } write_json_file(FULL_CHAT_OUT, full) system_prompt = extract_system_prompt(request_body) if system_prompt: write_text_file(SYSTEM_PROMPT_OUT, system_prompt) def safe_content(message): try: return message.content except Exception: return message.raw_content def summarize_body(content_type: str, raw: bytes | None): if not raw: return {"bytes": 0} if len(raw) > 2_000_000: return {"bytes": len(raw), "too_large": True} text = raw.decode("utf-8", errors="replace") content_type_lower = content_type.lower() if "application/x-www-form-urlencoded" in content_type_lower: return {"bytes": len(raw), "form_shape": sanitize_form(text)} if "json" in content_type_lower or looks_like_json(text): try: return {"bytes": len(raw), "json_shape": sanitize_json(json.loads(text))} except Exception: pass if "text/event-stream" in content_type_lower: return {"bytes": len(raw), "sse_events": summarize_sse(text)} return {"bytes": len(raw), "preview": trim(text)} KEEP_STRING_KEYS = { "model", "role", "type", "name", "object", "finish_reason", "reasoning_effort", "grant_type", "response_type", "client_id", "scope", "redirect_uri", "method_id", "environment", "endpoint", } def sanitize_json(value, key_context: str | None = None, preserve_strings: bool = False): if isinstance(value, dict): out = {} for key, item in value.items(): lower = str(key).lower() if is_sensitive_json_key(lower): out[key] = "" else: out[key] = sanitize_json(item, lower, preserve_strings) return out if isinstance(value, list): items = value if preserve_strings else value[:20] return [sanitize_json(item, key_context, preserve_strings) for item in items] if isinstance(value, str): if preserve_strings: return value if key_context in KEEP_STRING_KEYS: return value return f"" if isinstance(value, (int, float, bool)) or value is None: return value return f"<{type(value).__name__}>" def summarize_sse(text: str): events = [] current = {} for line in text.splitlines()[:200]: if line.startswith("event:"): current["event"] = line[6:].strip() elif line.startswith("data:"): current["data"] = summarize_data_line(line[5:].strip()) events.append(current) current = {} return events[:20] def summarize_data_line(text: str): if text == "[DONE]": return text try: return sanitize_json(json.loads(text)) except Exception: return trim(text) def looks_like_json(text: str) -> bool: stripped = text.strip() return stripped.startswith("{") or stripped.startswith("[") def sanitize_form(text: str) -> dict: parsed = parse_qs(text, keep_blank_values=True) out = {} for key, values in parsed.items(): lower = key.lower() if is_sensitive_json_key(lower): out[key] = "" elif lower in KEEP_STRING_KEYS: out[key] = values[0] if len(values) == 1 else values else: out[key] = [f"" for value in values] return out def is_sensitive_header(lower: str) -> bool: return ( lower in SENSITIVE_KEYS or "authorization" in lower or "cookie" in lower or "api-key" in lower or lower.endswith("-key") or "secret" in lower ) def is_sensitive_json_key(lower: str) -> bool: normalized = lower.replace("-", "_") return normalized in { "authorization", "proxy_authorization", "cookie", "set_cookie", "x_api_key", "api_key", "apikey", "key", "token", "access_token", "accesstoken", "refresh_token", "refreshtoken", "id_token", "idtoken", "code", "code_verifier", "device_code", "user_code", "client_secret", "secret", "ticket", "sid", "session", "state", "codebuddy_api_key", "codebuddyapikey", "codebuddy_auth_token", "codebuddyauthtoken", } def trim(text: str, limit: int = 240) -> str: text = text.replace("\r", "\\r").replace("\n", "\\n") return text if len(text) <= limit else text[:limit] + "..." def append(event: dict) -> None: with open(OUT, "a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=False) + "\n") def extract_system_prompt(request_body) -> str: if not isinstance(request_body, dict): return "" messages = request_body.get("messages") if not isinstance(messages, list): return "" for message in messages: if isinstance(message, dict) and message.get("role") == "system" and isinstance(message.get("content"), str): return message["content"] return "" def write_json_file(path: str, value: dict) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(value, f, ensure_ascii=False, indent=2) f.write("\n") def write_text_file(path: str, text: str) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(text)