import json import os import time from mitmproxy import http OUT = os.environ.get("MITM_REDACT_LOG", "/tmp/codebuddy-mitm-events.jsonl") SENSITIVE_KEYS = { "authorization", "proxy-authorization", "cookie", "set-cookie", "x-api-key", "api-key", "apikey", "token", "access_token", "refresh_token", "id_token", "codebuddy_api_key", "codebuddy_auth_token", } def request(flow: http.HTTPFlow) -> None: flow.metadata["started_at"] = time.time() def response(flow: http.HTTPFlow) -> None: req = flow.request resp = flow.response event = { "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "duration_ms": round((time.time() - flow.metadata.get("started_at", time.time())) * 1000), "method": req.method, "scheme": req.scheme, "host": req.pretty_host, "port": req.port, "path": req.path.split("?")[0], "query_keys": sorted(req.query.keys()), "request_headers": sanitize_headers(req.headers), "request_body": summarize_body(req.headers.get("content-type", ""), safe_content(req)), "status_code": resp.status_code, "response_headers": sanitize_headers(resp.headers), "response_body": summarize_body(resp.headers.get("content-type", ""), safe_content(resp)), } append(event) def error(flow: http.HTTPFlow) -> None: req = flow.request append({ "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "method": req.method, "scheme": req.scheme, "host": req.pretty_host, "port": req.port, "path": req.path.split("?")[0], "error": str(flow.error) if flow.error else "unknown", }) def sanitize_headers(headers) -> dict: clean = {} for key, value in headers.items(): lower = key.lower() clean[key] = "" if lower in SENSITIVE_KEYS or "token" in lower or "secret" in lower or "key" in lower else trim(value) return clean def safe_content(message): try: return message.content except Exception: return message.raw_content def summarize_body(content_type: str, raw: bytes | None): if not raw: return {"bytes": 0} if len(raw) > 2_000_000: return {"bytes": len(raw), "too_large": True} text = raw.decode("utf-8", errors="replace") if "json" in content_type.lower() or looks_like_json(text): try: return {"bytes": len(raw), "json_shape": sanitize_json(json.loads(text))} except Exception: pass if "text/event-stream" in content_type.lower(): return {"bytes": len(raw), "sse_events": summarize_sse(text)} return {"bytes": len(raw), "preview": trim(text)} KEEP_STRING_KEYS = { "model", "role", "type", "name", "object", "finish_reason", "reasoning_effort", } def sanitize_json(value, key_context: str | None = None): if isinstance(value, dict): out = {} for key, item in value.items(): lower = str(key).lower() if lower in SENSITIVE_KEYS or "token" in lower or "secret" in lower or "key" in lower: out[key] = "" else: out[key] = sanitize_json(item, lower) return out if isinstance(value, list): return [sanitize_json(item, key_context) for item in value[:20]] if isinstance(value, str): if key_context in KEEP_STRING_KEYS: return value return f"" if isinstance(value, (int, float, bool)) or value is None: return value return f"<{type(value).__name__}>" def summarize_sse(text: str): events = [] current = {} for line in text.splitlines()[:200]: if line.startswith("event:"): current["event"] = line[6:].strip() elif line.startswith("data:"): current["data"] = summarize_data_line(line[5:].strip()) events.append(current) current = {} return events[:20] def summarize_data_line(text: str): if text == "[DONE]": return text try: return sanitize_json(json.loads(text)) except Exception: return trim(text) def looks_like_json(text: str) -> bool: stripped = text.strip() return stripped.startswith("{") or stripped.startswith("[") def trim(text: str, limit: int = 240) -> str: text = text.replace("\r", "\\r").replace("\n", "\\n") return text if len(text) <= limit else text[:limit] + "..." def append(event: dict) -> None: with open(OUT, "a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=False) + "\n")