Rewriting Project Claw Code - Python port with Rust on the way

This commit is contained in:
instructkr
2026-03-31 08:03:46 -07:00
parent 507c2460b9
commit 01bf54ad15
31 changed files with 1207 additions and 111 deletions

1
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1 @@
github: instructkr

View File

@@ -1,88 +0,0 @@
# Is legal the same as legitimate: AI reimplementation and the erosion of copyleft
- **Date:** March 9, 2026
- **Author:** Hong Minhee
- **Source context:** _Hong Minhee on Things_ (English / 日本語 / 朝鮮語 (國漢文) / 한국어 (한글))
- **Archive note:** This copy was normalized from user-provided text for this repository's research/archive context. Site navigation/footer language links were converted into metadata.
Last week, Dan Blanchard, the maintainer of chardet—a Python library for detecting text encodings used by roughly 130 million projects a month—released a new version. Version 7.0 is 48 times faster than its predecessor, supports multiple cores, and was redesigned from the ground up. Anthropic's Claude is listed as a contributor. The license changed from LGPL to MIT.
Blanchard's account is that he never looked at the existing source code directly. He fed only the API and the test suite to Claude and asked it to reimplement the library from scratch. The resulting code shares less than 1.3% similarity with any prior version, as measured by JPlag. His conclusion: this is an independent new work, and he is under no obligation to carry forward the LGPL. Mark Pilgrim, the library's original author, opened a GitHub issue to object. The LGPL requires that modifications be distributed under the same license, and a reimplementation produced with ample exposure to the original codebase cannot, in Pilgrim's view, pass as a clean-room effort.
The dispute drew responses from two prominent figures in the open source world. Armin Ronacher, the creator of Flask, welcomed the relicensing. Salvatore Sanfilippo (antirez), the creator of Redis, published a broader defense of AI reimplementation, grounding it in copyright law and the history of the GNU project. Both conclude, by different routes, that what Blanchard did is legitimate. I respect both writers, and I think both are wrong—or more precisely, both are evading the question that actually matters.
That question is this: does legal mean legitimate? Neither piece answers it. Both move from “this is legally permissible” to “this is therefore fine,” without pausing at the gap between those two claims. Law sets a floor; clearing it does not mean the conduct is right. That gap is where this essay begins.
## The analogy points the wrong way
Antirez builds his case on history. When the GNU project reimplemented the UNIX userspace, it was lawful. So was Linux. Copyright law prohibits copying “protected expressions”—the actual code, its structure, its specific mechanisms—but it does not protect ideas or behavior. AI-assisted reimplementation occupies the same legal ground. Therefore, it is lawful.
The legal analysis is largely correct, and I am not disputing it. The problem lies in what antirez does next: he presents the legal conclusion as if it were also a social one, and uses a historical analogy that, examined more carefully, argues against his own position.
When GNU reimplemented the UNIX userspace, the vector ran from proprietary to free. Stallman was using the limits of copyright law to turn proprietary software into free software. The ethical force of that project did not come from its legal permissibility—it came from the direction it was moving, from the fact that it was expanding the commons. That is why people cheered.
The vector in the chardet case runs the other way. Software protected by a copyleft license—one that guarantees users the right to study, modify, and redistribute derivative works under the same terms—has been reimplemented under a permissive license that carries no such guarantee. This is not a reimplementation that expands the commons. It is one that removes the fencing that protected the commons. Derivative works built on chardet 7.0 are under no obligation to share their source code. That obligation, which applied to a library downloaded 130 million times a month, is now gone.
Antirez does not address this directional difference. He invokes the GNU precedent, but that precedent is a counterexample to his conclusion, not a supporting one.
## Does the GPL work against sharing?
Ronacher's argument is different. He discloses upfront that he has a stake in the outcome: “I personally have a horse in the race here because I too wanted chardet to be under a non-GPL license for many years. So consider me a very biased person in that regard.” He goes on to write that he considers “the GPL to run against that spirit by restricting what can be done with it”—the spirit being that society is better off when we share.
This claim rests on a fundamental misreading of what the GPL does.
Start with what the GPL actually prohibits. It does not prohibit keeping source code private. It imposes no constraint on privately modifying GPL software and using it yourself. The GPL's conditions are triggered only by distribution. If you distribute modified code, or offer it as a networked service, you must make the source available under the same terms. This is not a restriction on sharing. It is a condition placed on sharing: if you share, you must share in kind.
The requirement that improvements be returned to the commons is not a mechanism that suppresses sharing. It is a mechanism that makes sharing recursive and self-reinforcing. The claim that imposing contribution obligations on users of a commons undermines sharing culture does not hold together logically.
The contrast with the MIT license clarifies the point. Under MIT, anyone may take code, improve it, and close it off into a proprietary product. You can receive from the commons without giving back. If Ronacher calls this structure “more share-friendly,” he is using a concept of sharing with a specific directionality built in: sharing flows toward whoever has more capital and more engineers to take advantage of it.
The historical record bears this out. In the 1990s, companies routinely absorbed GPL code into proprietary products—not because they had chosen permissive licenses, but because copyleft enforcement was slack. The strengthening of copyleft mechanisms closed that gap. For individual developers and small projects without the resources to compete on anything but reciprocity, copyleft was what made the exchange approximately fair.
The creator of Flask knows this distinction. If he elides it anyway, the argument is not naïve—it is convenient.
## A self-refuting example
The most interesting moment in Ronacher's piece is not the argument but a detail he mentions in passing: Vercel reimplemented GNU Bash using AI and published it, then got visibly upset when Cloudflare reimplemented Next.js the same way.
Ronacher notes this as an irony and moves on. But the irony cuts deeper than he lets on. Next.js is MIT licensed. Cloudflare's vinext did not violate any license—it did exactly what Ronacher calls a contribution to the culture of openness, applied to a permissively licensed codebase. Vercel's reaction had nothing to do with license infringement; it was purely competitive and territorial. The implicit position is: reimplementing GPL software as MIT is a victory for sharing, but having our own MIT software reimplemented by a competitor is cause for outrage. This is what the claim that permissive licensing is “more share-friendly” than copyleft looks like in practice. The spirit of sharing, it turns out, runs in one direction only: outward from oneself.
Ronacher registers the contradiction and does not stop. “This development plays into my worldview,” he writes. When you present evidence that cuts against your own position, acknowledge it, and then proceed to your original conclusion unchanged, that is a signal that the conclusion preceded the argument.
## Legality and social legitimacy are different registers
Back to the question posed at the start. Is legal the same as legitimate?
Antirez closes his careful legal analysis as though it settles the matter. Ronacher acknowledges that “there is an obvious moral question here, but that isn't necessarily what I'm interested in.” Both pieces treat legal permissibility as a proxy for social legitimacy. But law only says what conduct it will not prevent—it does not certify that conduct as right. Aggressive tax minimization that never crosses into illegality may still be widely regarded as antisocial. A pharmaceutical company that legally acquires a patent on a long-generic drug and raises the price a hundredfold has done something legal, but that does not make it fine. Legality is a necessary condition; it is not a sufficient one.
In the chardet case, the distinction is sharper still. What the LGPL protected was not Blanchard's labor alone. It was a social compact agreed to by everyone who contributed to the library over twelve years. The terms of that compact were: if you take this and build on it, you share back under the same terms. This compact operated as a legal instrument, yes, but it was also the foundation of trust that made contribution rational. The fact that a reimplementation may qualify legally as a new work, and the fact that it breaks faith with the original contributors, are separate questions. If a court eventually rules in Blanchard's favor, that ruling will tell us what the law permits. It will not tell us that the act was right.
Zoë Kooyman, executive director of the FSF, put it plainly: “Refusing to grant others the rights you yourself received as a user is highly antisocial, no matter what method you use.”
## Whose perspective is the default?
Reading this debate, I keep returning to a question about position. From where are these two writers looking at the situation?
Antirez created Redis. Ronacher created Flask. Both are figures at the center of the open source ecosystem, with large audiences and well-established reputations. For them, falling costs of AI reimplementation means something specific: it is easier to reimplement things they want in a different form. Ronacher says explicitly that he had begun reimplementing GNU Readline precisely because of its copyleft terms.
For the people who have spent years contributing to a library like chardet, the same shift in costs means something else entirely: the copyleft protection around their contributions can be removed. The two writers are speaking from the former position to people in the latter, telling them that this was always lawful, that historical precedent supports it, and that the appropriate response is adaptation.
When positional asymmetry of this kind is ignored, and the argument is presented as universal analysis, what you get is not analysis but rationalization. Both writers arrive at conclusions that align precisely with their own interests. Readers should hold that fact in mind.
## What this fight points toward
Bruce Perens, who wrote the original Open Source Definition, told The Register: “The entire economics of software development are dead, gone, over, kaput!” He meant it as an alarm. Antirez, from a similar assessment of the situation, draws the conclusion: adapt. Ronacher says he finds the direction exciting.
None of the three responses addresses the central question. When copyleft becomes technically easier to circumvent, does that make it less necessary, or more?
I think more. What the GPL protected was not the scarcity of code but the freedom of users. The fact that producing code has become cheaper does not make it acceptable to use that code as a vehicle for eroding freedom. If anything, as the friction of reimplementation disappears, so does the friction of stripping copyleft from anything left exposed. The erosion of enforcement capacity is a legal problem. It does not touch the underlying normative judgment.
That judgment is this: those who take from the commons owe something back to the commons. The principle does not change depending on whether a reimplementation takes five years or five days. No court ruling on AI-generated code will alter its social weight.
This is where law and community norms diverge. Law is made slowly, after the fact, reflecting existing power arrangements. The norms that open source communities built over decades did not wait for court approval. People chose the GPL when the law offered them no guarantee of its enforcement, because it expressed the values of the communities they wanted to belong to. Those values do not expire when the law changes.
In previous writing, I argued for a training copyleft (TGPL) as the next step in this line of development. The chardet situation suggests the argument has to go further: to a specification copyleft covering the layer below source code. If source code can now be generated from a specification, the specification is where the essential intellectual content of a GPL project resides. Blanchard's own claim—that he worked only from the test suite and API without reading the source—is, paradoxically, an argument for protecting that test suite and API specification under copyleft terms.
The history of the GPL is the history of licensing tools evolving in response to new forms of exploitation: GPLv2 to GPLv3, then AGPL. What drove each evolution was not a court ruling but a community reaching a value judgment first and then seeking legal instruments to express it. The same sequence is available now. Whatever courts eventually decide about AI reimplementation, the question we need to answer first is not a legal one. It is a social one. Do those who take from the commons owe something back? I think they do. That judgment does not require a verdict.
What makes the pieces by antirez and Ronacher worth reading is not that they are right. It is that they make visible, with unusual clarity, what they are choosing not to see. When legality is used as a substitute for a value judgment, the question that actually matters gets buried in the footnotes of a law it has already outgrown.

View File

@@ -1,6 +1,51 @@
# Claude Code Python Porting Workspace # Rewriting Project Claw Code
> The primary `src/` tree in this repository is now dedicated to **Python porting work**. The March 31, 2026 Claude Code source exposure is part of the project's background, but the tracked repository is now centered on Python source rather than the exposed TypeScript snapshot. <p align="center">
<img src="assets/clawd-hero.jpeg" alt="Claw" width="300" />
</p>
<p align="center">
<strong>Better Harness Tools, not merely storing the archive of leaked Claude Code</strong>
</p>
<p align="center">
<a href="https://github.com/sponsors/instructkr"><img src="https://img.shields.io/badge/Sponsor-%E2%9D%A4-pink?logo=github&style=for-the-badge" alt="Sponsor on GitHub" /></a>
</p>
> [!IMPORTANT]
> **Rust port is now in progress** on the [`dev/rust`](https://github.com/instructkr/claw-code/tree/dev/rust) branch and is expected to be merged into main today. The Rust implementation aims to deliver a faster, memory-safe harness runtime. Stay tuned — this will be the definitive version of the project.
> If you find this work useful, consider [sponsoring @instructkr on GitHub](https://github.com/sponsors/instructkr) to support continued open-source harness engineering research.
---
## Backstory
At 4 AM on March 31, 2026, I woke up to my phone blowing up with notifications. The Claude Code source had been exposed, and the entire dev community was in a frenzy. My girlfriend in Korea was genuinely worried I might face legal action from Anthropic just for having the code on my machine — so I did what any engineer would do under pressure: I sat down, ported the core features to Python from scratch, and pushed it before the sun came up.
The whole thing was orchestrated end-to-end using [oh-my-codex (OmX)](https://github.com/Yeachan-Heo/oh-my-codex) by [@bellman_ych](https://x.com/bellman_ych) — a workflow layer built on top of OpenAI's Codex ([@OpenAIDevs](https://x.com/OpenAIDevs)). I used `$team` mode for parallel code review and `$ralph` mode for persistent execution loops with architect-level verification. The entire porting session — from reading the original harness structure to producing a working Python tree with tests — was driven through OmX orchestration.
The result is a clean-room Python rewrite that captures the architectural patterns of Claude Code's agent harness without copying any proprietary source. I'm now actively collaborating with [@bellman_ych](https://x.com/bellman_ych) — the creator of OmX himself — to push this further. The basic Python foundation is already in place and functional, but we're just getting started. **Stay tuned — a much more capable version is on the way.**
https://github.com/instructkr/claw-code
![Tweet screenshot](assets/tweet-screenshot.png)
## The Creators Featured in Wall Street Journal For Avid Claude Code Fans
I've been deeply interested in **harness engineering** — studying how agent systems wire tools, orchestrate tasks, and manage runtime context. This isn't a sudden thing. The Wall Street Journal featured my work earlier this month, documenting how I've been one of the most active power users exploring these systems:
> AI startup worker Sigrid Jin, who attended the Seoul dinner, single-handedly used 25 billion of Claude Code tokens last year. At the time, usage limits were looser, allowing early enthusiasts to reach tens of billions of tokens at a very low cost.
>
> Despite his countless hours with Claude Code, Jin isn't faithful to any one AI lab. The tools available have different strengths and weaknesses, he said. Codex is better at reasoning, while Claude Code generates cleaner, more shareable code.
>
> Jin flew to San Francisco in February for Claude Code's first birthday party, where attendees waited in line to compare notes with Cherny. The crowd included a practicing cardiologist from Belgium who had built an app to help patients navigate care, and a California lawyer who made a tool for automating building permit approvals using Claude Code.
>
> "It was basically like a sharing party," Jin said. "There were lawyers, there were doctors, there were dentists. They did not have software engineering backgrounds."
>
> — *The Wall Street Journal*, March 21, 2026, [*"The Trillion Dollar Race to Automate Our Entire Lives"*](https://lnkd.in/gs9td3qd)
![WSJ Feature](assets/wsj-feature.png)
--- ---
@@ -93,11 +138,6 @@ python3 -m src.main tools --limit 10
The port now mirrors the archived root-entry file surface, top-level subsystem names, and command/tool inventories much more closely than before. However, it is **not yet** a full runtime-equivalent replacement for the original TypeScript system; the Python tree still contains fewer executable runtime slices than the archived source. The port now mirrors the archived root-entry file surface, top-level subsystem names, and command/tool inventories much more closely than before. However, it is **not yet** a full runtime-equivalent replacement for the original TypeScript system; the Python tree still contains fewer executable runtime slices than the archived source.
## Related Essay
- [*Is legal the same as legitimate: AI reimplementation and the erosion of copyleft*](https://writings.hongminhee.org/2026/03/legal-vs-legitimate/)
The essay is dated **March 9, 2026**, so it should be read as companion analysis that predates the **March 31, 2026** source exposure that motivated this rewrite direction.
## Built with `oh-my-codex` ## Built with `oh-my-codex`
@@ -117,6 +157,30 @@ The restructuring and documentation work on this repository was AI-assisted and
*Split-pane review and verification flow during the final README wording pass.* *Split-pane review and verification flow during the final README wording pass.*
## Community
<p align="center">
<a href="https://instruct.kr/"><img src="assets/instructkr.png" alt="instructkr" width="400" /></a>
</p>
Join the [**instructkr Discord**](https://instruct.kr/) — the best Korean language model community. Come chat about LLMs, harness engineering, agent workflows, and everything in between.
[![Discord](https://img.shields.io/badge/Join%20Discord-instruct.kr-5865F2?logo=discord&style=for-the-badge)](https://instruct.kr/)
## Star History
This repository became **the fastest GitHub repo in history to surpass 30K stars**, reaching the milestone in just a few hours after publication.
<a href="https://star-history.com/#instructkr/claw-code&Date">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=instructkr/claw-code&type=Date&theme=dark" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=instructkr/claw-code&type=Date" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=instructkr/claw-code&type=Date" />
</picture>
</a>
![Star History Screenshot](assets/star-history.png)
## Ownership / Affiliation Disclaimer ## Ownership / Affiliation Disclaimer
- This repository does **not** claim ownership of the original Claude Code source material. - This repository does **not** claim ownership of the original Claude Code source material.

BIN
assets/clawd-hero.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

BIN
assets/instructkr.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.8 KiB

BIN
assets/star-history.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 312 KiB

BIN
assets/tweet-screenshot.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 812 KiB

BIN
assets/wsj-feature.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 873 KiB

View File

@@ -3,17 +3,27 @@
from .commands import PORTED_COMMANDS, build_command_backlog from .commands import PORTED_COMMANDS, build_command_backlog
from .parity_audit import ParityAuditResult, run_parity_audit from .parity_audit import ParityAuditResult, run_parity_audit
from .port_manifest import PortManifest, build_port_manifest from .port_manifest import PortManifest, build_port_manifest
from .query_engine import QueryEnginePort from .query_engine import QueryEnginePort, TurnResult
from .runtime import PortRuntime, RuntimeSession
from .session_store import StoredSession, load_session, save_session
from .system_init import build_system_init_message
from .tools import PORTED_TOOLS, build_tool_backlog from .tools import PORTED_TOOLS, build_tool_backlog
__all__ = [ __all__ = [
'ParityAuditResult', 'ParityAuditResult',
'PortManifest', 'PortManifest',
'PortRuntime',
'QueryEnginePort', 'QueryEnginePort',
'RuntimeSession',
'StoredSession',
'TurnResult',
'PORTED_COMMANDS', 'PORTED_COMMANDS',
'PORTED_TOOLS', 'PORTED_TOOLS',
'build_command_backlog', 'build_command_backlog',
'build_port_manifest', 'build_port_manifest',
'build_system_init_message',
'build_tool_backlog', 'build_tool_backlog',
'load_session',
'run_parity_audit', 'run_parity_audit',
'save_session',
] ]

27
src/bootstrap_graph.py Normal file
View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class BootstrapGraph:
stages: tuple[str, ...]
def as_markdown(self) -> str:
lines = ['# Bootstrap Graph', '']
lines.extend(f'- {stage}' for stage in self.stages)
return '\n'.join(lines)
def build_bootstrap_graph() -> BootstrapGraph:
return BootstrapGraph(
stages=(
'top-level prefetch side effects',
'warning handler and environment guards',
'CLI parser and pre-action trust gate',
'setup() + commands/agents parallel load',
'deferred init after trust',
'mode routing: local / remote / ssh / teleport / direct-connect / deep-link',
'query engine submit loop',
)
)

34
src/command_graph.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from dataclasses import dataclass
from .commands import get_commands
from .models import PortingModule
@dataclass(frozen=True)
class CommandGraph:
builtins: tuple[PortingModule, ...]
plugin_like: tuple[PortingModule, ...]
skill_like: tuple[PortingModule, ...]
def flattened(self) -> tuple[PortingModule, ...]:
return self.builtins + self.plugin_like + self.skill_like
def as_markdown(self) -> str:
lines = [
'# Command Graph',
'',
f'Builtins: {len(self.builtins)}',
f'Plugin-like commands: {len(self.plugin_like)}',
f'Skill-like commands: {len(self.skill_like)}',
]
return '\n'.join(lines)
def build_command_graph() -> CommandGraph:
commands = get_commands()
builtins = tuple(module for module in commands if 'plugin' not in module.source_hint.lower() and 'skills' not in module.source_hint.lower())
plugin_like = tuple(module for module in commands if 'plugin' in module.source_hint.lower())
skill_like = tuple(module for module in commands if 'skills' in module.source_hint.lower())
return CommandGraph(builtins=builtins, plugin_like=plugin_like, skill_like=skill_like)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from dataclasses import dataclass
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
@@ -9,6 +10,15 @@ from .models import PortingBacklog, PortingModule
SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'commands_snapshot.json' SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'commands_snapshot.json'
@dataclass(frozen=True)
class CommandExecution:
name: str
source_hint: str
prompt: str
handled: bool
message: str
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def load_command_snapshot() -> tuple[PortingModule, ...]: def load_command_snapshot() -> tuple[PortingModule, ...]:
raw_entries = json.loads(SNAPSHOT_PATH.read_text()) raw_entries = json.loads(SNAPSHOT_PATH.read_text())
@@ -26,6 +36,11 @@ def load_command_snapshot() -> tuple[PortingModule, ...]:
PORTED_COMMANDS = load_command_snapshot() PORTED_COMMANDS = load_command_snapshot()
@lru_cache(maxsize=1)
def built_in_command_names() -> frozenset[str]:
return frozenset(module.name for module in PORTED_COMMANDS)
def build_command_backlog() -> PortingBacklog: def build_command_backlog() -> PortingBacklog:
return PortingBacklog(title='Command surface', modules=list(PORTED_COMMANDS)) return PortingBacklog(title='Command surface', modules=list(PORTED_COMMANDS))
@@ -42,12 +57,29 @@ def get_command(name: str) -> PortingModule | None:
return None return None
def get_commands(cwd: str | None = None, include_plugin_commands: bool = True, include_skill_commands: bool = True) -> tuple[PortingModule, ...]:
commands = list(PORTED_COMMANDS)
if not include_plugin_commands:
commands = [module for module in commands if 'plugin' not in module.source_hint.lower()]
if not include_skill_commands:
commands = [module for module in commands if 'skills' not in module.source_hint.lower()]
return tuple(commands)
def find_commands(query: str, limit: int = 20) -> list[PortingModule]: def find_commands(query: str, limit: int = 20) -> list[PortingModule]:
needle = query.lower() needle = query.lower()
matches = [module for module in PORTED_COMMANDS if needle in module.name.lower() or needle in module.source_hint.lower()] matches = [module for module in PORTED_COMMANDS if needle in module.name.lower() or needle in module.source_hint.lower()]
return matches[:limit] return matches[:limit]
def execute_command(name: str, prompt: str = '') -> CommandExecution:
module = get_command(name)
if module is None:
return CommandExecution(name=name, source_hint='', prompt=prompt, handled=False, message=f'Unknown mirrored command: {name}')
action = f"Mirrored command '{module.name}' from {module.source_hint} would handle prompt {prompt!r}."
return CommandExecution(name=module.name, source_hint=module.source_hint, prompt=prompt, handled=True, message=action)
def render_command_index(limit: int = 20, query: str | None = None) -> str: def render_command_index(limit: int = 20, query: str | None = None) -> str:
modules = find_commands(query, limit) if query else list(PORTED_COMMANDS[:limit]) modules = find_commands(query, limit) if query else list(PORTED_COMMANDS[:limit])
lines = [f'Command entries: {len(PORTED_COMMANDS)}', ''] lines = [f'Command entries: {len(PORTED_COMMANDS)}', '']

View File

@@ -9,8 +9,39 @@ class PortContext:
source_root: Path source_root: Path
tests_root: Path tests_root: Path
assets_root: Path assets_root: Path
archive_root: Path
python_file_count: int
test_file_count: int
asset_file_count: int
archive_available: bool
def build_port_context(base: Path | None = None) -> PortContext: def build_port_context(base: Path | None = None) -> PortContext:
root = base or Path(__file__).resolve().parent.parent root = base or Path(__file__).resolve().parent.parent
return PortContext(source_root=root / 'src', tests_root=root / 'tests', assets_root=root / 'assets') source_root = root / 'src'
tests_root = root / 'tests'
assets_root = root / 'assets'
archive_root = root / 'archive' / 'claude_code_ts_snapshot' / 'src'
return PortContext(
source_root=source_root,
tests_root=tests_root,
assets_root=assets_root,
archive_root=archive_root,
python_file_count=sum(1 for path in source_root.rglob('*.py') if path.is_file()),
test_file_count=sum(1 for path in tests_root.rglob('*.py') if path.is_file()),
asset_file_count=sum(1 for path in assets_root.rglob('*') if path.is_file()),
archive_available=archive_root.exists(),
)
def render_context(context: PortContext) -> str:
return '\n'.join([
f'Source root: {context.source_root}',
f'Test root: {context.tests_root}',
f'Assets root: {context.assets_root}',
f'Archive root: {context.archive_root}',
f'Python files: {context.python_file_count}',
f'Test files: {context.test_file_count}',
f'Assets: {context.asset_file_count}',
f'Archive available: {context.archive_available}',
])

31
src/deferred_init.py Normal file
View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class DeferredInitResult:
trusted: bool
plugin_init: bool
skill_init: bool
mcp_prefetch: bool
session_hooks: bool
def as_lines(self) -> tuple[str, ...]:
return (
f'- plugin_init={self.plugin_init}',
f'- skill_init={self.skill_init}',
f'- mcp_prefetch={self.mcp_prefetch}',
f'- session_hooks={self.session_hooks}',
)
def run_deferred_init(trusted: bool) -> DeferredInitResult:
enabled = bool(trusted)
return DeferredInitResult(
trusted=trusted,
plugin_init=enabled,
skill_init=enabled,
mcp_prefetch=enabled,
session_hooks=enabled,
)

21
src/direct_modes.py Normal file
View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class DirectModeReport:
mode: str
target: str
active: bool
def as_text(self) -> str:
return f'mode={self.mode}\ntarget={self.target}\nactive={self.active}'
def run_direct_connect(target: str) -> DirectModeReport:
return DirectModeReport(mode='direct-connect', target=target, active=True)
def run_deep_link(target: str) -> DirectModeReport:
return DirectModeReport(mode='deep-link', target=target, active=True)

51
src/execution_registry.py Normal file
View File

@@ -0,0 +1,51 @@
from __future__ import annotations
from dataclasses import dataclass
from .commands import PORTED_COMMANDS, execute_command
from .tools import PORTED_TOOLS, execute_tool
@dataclass(frozen=True)
class MirroredCommand:
name: str
source_hint: str
def execute(self, prompt: str) -> str:
return execute_command(self.name, prompt).message
@dataclass(frozen=True)
class MirroredTool:
name: str
source_hint: str
def execute(self, payload: str) -> str:
return execute_tool(self.name, payload).message
@dataclass(frozen=True)
class ExecutionRegistry:
commands: tuple[MirroredCommand, ...]
tools: tuple[MirroredTool, ...]
def command(self, name: str) -> MirroredCommand | None:
lowered = name.lower()
for command in self.commands:
if command.name.lower() == lowered:
return command
return None
def tool(self, name: str) -> MirroredTool | None:
lowered = name.lower()
for tool in self.tools:
if tool.name.lower() == lowered:
return tool
return None
def build_execution_registry() -> ExecutionRegistry:
return ExecutionRegistry(
commands=tuple(MirroredCommand(module.name, module.source_hint) for module in PORTED_COMMANDS),
tools=tuple(MirroredTool(module.name, module.source_hint) for module in PORTED_TOOLS),
)

View File

@@ -15,3 +15,8 @@ class HistoryLog:
def add(self, title: str, detail: str) -> None: def add(self, title: str, detail: str) -> None:
self.events.append(HistoryEvent(title=title, detail=detail)) self.events.append(HistoryEvent(title=title, detail=detail))
def as_markdown(self) -> str:
lines = ['# Session History', '']
lines.extend(f'- {event.title}: {event.detail}' for event in self.events)
return '\n'.join(lines)

View File

@@ -2,12 +2,20 @@ from __future__ import annotations
import argparse import argparse
from .commands import get_command, render_command_index from .bootstrap_graph import build_bootstrap_graph
from .command_graph import build_command_graph
from .commands import execute_command, get_command, get_commands, render_command_index
from .direct_modes import run_deep_link, run_direct_connect
from .parity_audit import run_parity_audit from .parity_audit import run_parity_audit
from .permissions import ToolPermissionContext
from .port_manifest import build_port_manifest from .port_manifest import build_port_manifest
from .query_engine import QueryEnginePort from .query_engine import QueryEnginePort
from .remote_runtime import run_remote_mode, run_ssh_mode, run_teleport_mode
from .runtime import PortRuntime from .runtime import PortRuntime
from .tools import get_tool, render_tool_index from .session_store import load_session
from .setup import run_setup
from .tool_pool import assemble_tool_pool
from .tools import execute_tool, get_tool, get_tools, render_tool_index
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
@@ -16,21 +24,70 @@ def build_parser() -> argparse.ArgumentParser:
subparsers.add_parser('summary', help='render a Markdown summary of the Python porting workspace') subparsers.add_parser('summary', help='render a Markdown summary of the Python porting workspace')
subparsers.add_parser('manifest', help='print the current Python workspace manifest') subparsers.add_parser('manifest', help='print the current Python workspace manifest')
subparsers.add_parser('parity-audit', help='compare the Python workspace against the local ignored TypeScript archive when available') subparsers.add_parser('parity-audit', help='compare the Python workspace against the local ignored TypeScript archive when available')
subparsers.add_parser('setup-report', help='render the startup/prefetch setup report')
subparsers.add_parser('command-graph', help='show command graph segmentation')
subparsers.add_parser('tool-pool', help='show assembled tool pool with default settings')
subparsers.add_parser('bootstrap-graph', help='show the mirrored bootstrap/runtime graph stages')
list_parser = subparsers.add_parser('subsystems', help='list the current Python modules in the workspace') list_parser = subparsers.add_parser('subsystems', help='list the current Python modules in the workspace')
list_parser.add_argument('--limit', type=int, default=32) list_parser.add_argument('--limit', type=int, default=32)
commands_parser = subparsers.add_parser('commands', help='list mirrored command entries from the archived snapshot') commands_parser = subparsers.add_parser('commands', help='list mirrored command entries from the archived snapshot')
commands_parser.add_argument('--limit', type=int, default=20) commands_parser.add_argument('--limit', type=int, default=20)
commands_parser.add_argument('--query') commands_parser.add_argument('--query')
commands_parser.add_argument('--no-plugin-commands', action='store_true')
commands_parser.add_argument('--no-skill-commands', action='store_true')
tools_parser = subparsers.add_parser('tools', help='list mirrored tool entries from the archived snapshot') tools_parser = subparsers.add_parser('tools', help='list mirrored tool entries from the archived snapshot')
tools_parser.add_argument('--limit', type=int, default=20) tools_parser.add_argument('--limit', type=int, default=20)
tools_parser.add_argument('--query') tools_parser.add_argument('--query')
tools_parser.add_argument('--simple-mode', action='store_true')
tools_parser.add_argument('--no-mcp', action='store_true')
tools_parser.add_argument('--deny-tool', action='append', default=[])
tools_parser.add_argument('--deny-prefix', action='append', default=[])
route_parser = subparsers.add_parser('route', help='route a prompt across mirrored command/tool inventories') route_parser = subparsers.add_parser('route', help='route a prompt across mirrored command/tool inventories')
route_parser.add_argument('prompt') route_parser.add_argument('prompt')
route_parser.add_argument('--limit', type=int, default=5) route_parser.add_argument('--limit', type=int, default=5)
bootstrap_parser = subparsers.add_parser('bootstrap', help='build a runtime-style session report from the mirrored inventories')
bootstrap_parser.add_argument('prompt')
bootstrap_parser.add_argument('--limit', type=int, default=5)
loop_parser = subparsers.add_parser('turn-loop', help='run a small stateful turn loop for the mirrored runtime')
loop_parser.add_argument('prompt')
loop_parser.add_argument('--limit', type=int, default=5)
loop_parser.add_argument('--max-turns', type=int, default=3)
loop_parser.add_argument('--structured-output', action='store_true')
flush_parser = subparsers.add_parser('flush-transcript', help='persist and flush a temporary session transcript')
flush_parser.add_argument('prompt')
load_session_parser = subparsers.add_parser('load-session', help='load a previously persisted session')
load_session_parser.add_argument('session_id')
remote_parser = subparsers.add_parser('remote-mode', help='simulate remote-control runtime branching')
remote_parser.add_argument('target')
ssh_parser = subparsers.add_parser('ssh-mode', help='simulate SSH runtime branching')
ssh_parser.add_argument('target')
teleport_parser = subparsers.add_parser('teleport-mode', help='simulate teleport runtime branching')
teleport_parser.add_argument('target')
direct_parser = subparsers.add_parser('direct-connect-mode', help='simulate direct-connect runtime branching')
direct_parser.add_argument('target')
deep_link_parser = subparsers.add_parser('deep-link-mode', help='simulate deep-link runtime branching')
deep_link_parser.add_argument('target')
show_command = subparsers.add_parser('show-command', help='show one mirrored command entry by exact name') show_command = subparsers.add_parser('show-command', help='show one mirrored command entry by exact name')
show_command.add_argument('name') show_command.add_argument('name')
show_tool = subparsers.add_parser('show-tool', help='show one mirrored tool entry by exact name') show_tool = subparsers.add_parser('show-tool', help='show one mirrored tool entry by exact name')
show_tool.add_argument('name') show_tool.add_argument('name')
exec_command_parser = subparsers.add_parser('exec-command', help='execute a mirrored command shim by exact name')
exec_command_parser.add_argument('name')
exec_command_parser.add_argument('prompt')
exec_tool_parser = subparsers.add_parser('exec-tool', help='execute a mirrored tool shim by exact name')
exec_tool_parser.add_argument('name')
exec_tool_parser.add_argument('payload')
return parser return parser
@@ -47,15 +104,40 @@ def main(argv: list[str] | None = None) -> int:
if args.command == 'parity-audit': if args.command == 'parity-audit':
print(run_parity_audit().to_markdown()) print(run_parity_audit().to_markdown())
return 0 return 0
if args.command == 'setup-report':
print(run_setup().as_markdown())
return 0
if args.command == 'command-graph':
print(build_command_graph().as_markdown())
return 0
if args.command == 'tool-pool':
print(assemble_tool_pool().as_markdown())
return 0
if args.command == 'bootstrap-graph':
print(build_bootstrap_graph().as_markdown())
return 0
if args.command == 'subsystems': if args.command == 'subsystems':
for subsystem in manifest.top_level_modules[: args.limit]: for subsystem in manifest.top_level_modules[: args.limit]:
print(f'{subsystem.name}\t{subsystem.file_count}\t{subsystem.notes}') print(f'{subsystem.name}\t{subsystem.file_count}\t{subsystem.notes}')
return 0 return 0
if args.command == 'commands': if args.command == 'commands':
print(render_command_index(limit=args.limit, query=args.query)) if args.query:
print(render_command_index(limit=args.limit, query=args.query))
else:
commands = get_commands(include_plugin_commands=not args.no_plugin_commands, include_skill_commands=not args.no_skill_commands)
output_lines = [f'Command entries: {len(commands)}', '']
output_lines.extend(f'- {module.name}{module.source_hint}' for module in commands[: args.limit])
print('\n'.join(output_lines))
return 0 return 0
if args.command == 'tools': if args.command == 'tools':
print(render_tool_index(limit=args.limit, query=args.query)) if args.query:
print(render_tool_index(limit=args.limit, query=args.query))
else:
permission_context = ToolPermissionContext.from_iterables(args.deny_tool, args.deny_prefix)
tools = get_tools(simple_mode=args.simple_mode, include_mcp=not args.no_mcp, permission_context=permission_context)
output_lines = [f'Tool entries: {len(tools)}', '']
output_lines.extend(f'- {module.name}{module.source_hint}' for module in tools[: args.limit])
print('\n'.join(output_lines))
return 0 return 0
if args.command == 'route': if args.command == 'route':
matches = PortRuntime().route_prompt(args.prompt, limit=args.limit) matches = PortRuntime().route_prompt(args.prompt, limit=args.limit)
@@ -65,20 +147,64 @@ def main(argv: list[str] | None = None) -> int:
for match in matches: for match in matches:
print(f'{match.kind}\t{match.name}\t{match.score}\t{match.source_hint}') print(f'{match.kind}\t{match.name}\t{match.score}\t{match.source_hint}')
return 0 return 0
if args.command == 'bootstrap':
print(PortRuntime().bootstrap_session(args.prompt, limit=args.limit).as_markdown())
return 0
if args.command == 'turn-loop':
results = PortRuntime().run_turn_loop(args.prompt, limit=args.limit, max_turns=args.max_turns, structured_output=args.structured_output)
for idx, result in enumerate(results, start=1):
print(f'## Turn {idx}')
print(result.output)
print(f'stop_reason={result.stop_reason}')
return 0
if args.command == 'flush-transcript':
engine = QueryEnginePort.from_workspace()
engine.submit_message(args.prompt)
path = engine.persist_session()
print(path)
print(f'flushed={engine.transcript_store.flushed}')
return 0
if args.command == 'load-session':
session = load_session(args.session_id)
print(f'{session.session_id}\n{len(session.messages)} messages\nin={session.input_tokens} out={session.output_tokens}')
return 0
if args.command == 'remote-mode':
print(run_remote_mode(args.target).as_text())
return 0
if args.command == 'ssh-mode':
print(run_ssh_mode(args.target).as_text())
return 0
if args.command == 'teleport-mode':
print(run_teleport_mode(args.target).as_text())
return 0
if args.command == 'direct-connect-mode':
print(run_direct_connect(args.target).as_text())
return 0
if args.command == 'deep-link-mode':
print(run_deep_link(args.target).as_text())
return 0
if args.command == 'show-command': if args.command == 'show-command':
module = get_command(args.name) module = get_command(args.name)
if module is None: if module is None:
print(f'Command not found: {args.name}') print(f'Command not found: {args.name}')
return 1 return 1
print(f'{module.name}\n{module.source_hint}\n{module.responsibility}') print('\n'.join([module.name, module.source_hint, module.responsibility]))
return 0 return 0
if args.command == 'show-tool': if args.command == 'show-tool':
module = get_tool(args.name) module = get_tool(args.name)
if module is None: if module is None:
print(f'Tool not found: {args.name}') print(f'Tool not found: {args.name}')
return 1 return 1
print(f'{module.name}\n{module.source_hint}\n{module.responsibility}') print('\n'.join([module.name, module.source_hint, module.responsibility]))
return 0 return 0
if args.command == 'exec-command':
result = execute_command(args.name, args.prompt)
print(result.message)
return 0 if result.handled else 1
if args.command == 'exec-tool':
result = execute_tool(args.name, args.payload)
print(result.message)
return 0 if result.handled else 1
parser.error(f'unknown command: {args.command}') parser.error(f'unknown command: {args.command}')
return 2 return 2

View File

@@ -19,6 +19,24 @@ class PortingModule:
status: str = 'planned' status: str = 'planned'
@dataclass(frozen=True)
class PermissionDenial:
tool_name: str
reason: str
@dataclass(frozen=True)
class UsageSummary:
input_tokens: int = 0
output_tokens: int = 0
def add_turn(self, prompt: str, output: str) -> 'UsageSummary':
return UsageSummary(
input_tokens=self.input_tokens + len(prompt.split()),
output_tokens=self.output_tokens + len(output.split()),
)
@dataclass @dataclass
class PortingBacklog: class PortingBacklog:
title: str title: str

20
src/permissions.py Normal file
View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(frozen=True)
class ToolPermissionContext:
deny_names: frozenset[str] = field(default_factory=frozenset)
deny_prefixes: tuple[str, ...] = ()
@classmethod
def from_iterables(cls, deny_names: list[str] | None = None, deny_prefixes: list[str] | None = None) -> 'ToolPermissionContext':
return cls(
deny_names=frozenset(name.lower() for name in (deny_names or [])),
deny_prefixes=tuple(prefix.lower() for prefix in (deny_prefixes or [])),
)
def blocks(self, tool_name: str) -> bool:
lowered = tool_name.lower()
return lowered in self.deny_names or any(lowered.startswith(prefix) for prefix in self.deny_prefixes)

23
src/prefetch.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class PrefetchResult:
name: str
started: bool
detail: str
def start_mdm_raw_read() -> PrefetchResult:
return PrefetchResult('mdm_raw_read', True, 'Simulated MDM raw-read prefetch for workspace bootstrap')
def start_keychain_prefetch() -> PrefetchResult:
return PrefetchResult('keychain_prefetch', True, 'Simulated keychain prefetch for trusted startup path')
def start_project_scan(root: Path) -> PrefetchResult:
return PrefetchResult('project_scan', True, f'Scanned project root {root}')

View File

@@ -1,20 +1,173 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass import json
from dataclasses import dataclass, field
from uuid import uuid4
from .commands import PORTED_COMMANDS, build_command_backlog from .commands import build_command_backlog
from .models import PermissionDenial, UsageSummary
from .port_manifest import PortManifest, build_port_manifest from .port_manifest import PortManifest, build_port_manifest
from .tools import PORTED_TOOLS, build_tool_backlog from .session_store import StoredSession, load_session, save_session
from .tools import build_tool_backlog
from .transcript import TranscriptStore
@dataclass(frozen=True)
class QueryEngineConfig:
max_turns: int = 8
max_budget_tokens: int = 2000
compact_after_turns: int = 12
structured_output: bool = False
structured_retry_limit: int = 2
@dataclass(frozen=True)
class TurnResult:
prompt: str
output: str
matched_commands: tuple[str, ...]
matched_tools: tuple[str, ...]
permission_denials: tuple[PermissionDenial, ...]
usage: UsageSummary
stop_reason: str
@dataclass @dataclass
class QueryEnginePort: class QueryEnginePort:
manifest: PortManifest manifest: PortManifest
config: QueryEngineConfig = field(default_factory=QueryEngineConfig)
session_id: str = field(default_factory=lambda: uuid4().hex)
mutable_messages: list[str] = field(default_factory=list)
permission_denials: list[PermissionDenial] = field(default_factory=list)
total_usage: UsageSummary = field(default_factory=UsageSummary)
transcript_store: TranscriptStore = field(default_factory=TranscriptStore)
@classmethod @classmethod
def from_workspace(cls) -> 'QueryEnginePort': def from_workspace(cls) -> 'QueryEnginePort':
return cls(manifest=build_port_manifest()) return cls(manifest=build_port_manifest())
@classmethod
def from_saved_session(cls, session_id: str) -> 'QueryEnginePort':
stored = load_session(session_id)
transcript = TranscriptStore(entries=list(stored.messages), flushed=True)
return cls(
manifest=build_port_manifest(),
session_id=stored.session_id,
mutable_messages=list(stored.messages),
total_usage=UsageSummary(stored.input_tokens, stored.output_tokens),
transcript_store=transcript,
)
def submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
) -> TurnResult:
if len(self.mutable_messages) >= self.config.max_turns:
output = f'Max turns reached before processing prompt: {prompt}'
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
stop_reason='max_turns_reached',
)
summary_lines = [
f'Prompt: {prompt}',
f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
f'Permission denials: {len(denied_tools)}',
]
output = self._format_output(summary_lines)
projected_usage = self.total_usage.add_turn(prompt, output)
stop_reason = 'completed'
if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
stop_reason = 'max_budget_reached'
self.mutable_messages.append(prompt)
self.transcript_store.append(prompt)
self.permission_denials.extend(denied_tools)
self.total_usage = projected_usage
self.compact_messages_if_needed()
return TurnResult(
prompt=prompt,
output=output,
matched_commands=matched_commands,
matched_tools=matched_tools,
permission_denials=denied_tools,
usage=self.total_usage,
stop_reason=stop_reason,
)
def stream_submit_message(
self,
prompt: str,
matched_commands: tuple[str, ...] = (),
matched_tools: tuple[str, ...] = (),
denied_tools: tuple[PermissionDenial, ...] = (),
):
yield {'type': 'message_start', 'session_id': self.session_id, 'prompt': prompt}
if matched_commands:
yield {'type': 'command_match', 'commands': matched_commands}
if matched_tools:
yield {'type': 'tool_match', 'tools': matched_tools}
if denied_tools:
yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
yield {'type': 'message_delta', 'text': result.output}
yield {
'type': 'message_stop',
'usage': {'input_tokens': result.usage.input_tokens, 'output_tokens': result.usage.output_tokens},
'stop_reason': result.stop_reason,
'transcript_size': len(self.transcript_store.entries),
}
def compact_messages_if_needed(self) -> None:
if len(self.mutable_messages) > self.config.compact_after_turns:
self.mutable_messages[:] = self.mutable_messages[-self.config.compact_after_turns :]
self.transcript_store.compact(self.config.compact_after_turns)
def replay_user_messages(self) -> tuple[str, ...]:
return self.transcript_store.replay()
def flush_transcript(self) -> None:
self.transcript_store.flush()
def persist_session(self) -> str:
self.flush_transcript()
path = save_session(
StoredSession(
session_id=self.session_id,
messages=tuple(self.mutable_messages),
input_tokens=self.total_usage.input_tokens,
output_tokens=self.total_usage.output_tokens,
)
)
return str(path)
def _format_output(self, summary_lines: list[str]) -> str:
if self.config.structured_output:
payload = {
'summary': summary_lines,
'session_id': self.session_id,
}
return self._render_structured_output(payload)
return '\n'.join(summary_lines)
def _render_structured_output(self, payload: dict[str, object]) -> str:
last_error: Exception | None = None
for _ in range(self.config.structured_retry_limit):
try:
return json.dumps(payload, indent=2)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive branch
last_error = exc
payload = {'summary': ['structured output retry'], 'session_id': self.session_id}
raise RuntimeError('structured output rendering failed') from last_error
def render_summary(self) -> str: def render_summary(self) -> str:
command_backlog = build_command_backlog() command_backlog = build_command_backlog()
tool_backlog = build_tool_backlog() tool_backlog = build_tool_backlog()
@@ -23,10 +176,18 @@ class QueryEnginePort:
'', '',
self.manifest.to_markdown(), self.manifest.to_markdown(),
'', '',
f'{command_backlog.title}: {len(PORTED_COMMANDS)} mirrored entries', f'Command surface: {len(command_backlog.modules)} mirrored entries',
*command_backlog.summary_lines()[:10], *command_backlog.summary_lines()[:10],
'', '',
f'{tool_backlog.title}: {len(PORTED_TOOLS)} mirrored entries', f'Tool surface: {len(tool_backlog.modules)} mirrored entries',
*tool_backlog.summary_lines()[:10], *tool_backlog.summary_lines()[:10],
'',
f'Session id: {self.session_id}',
f'Conversation turns stored: {len(self.mutable_messages)}',
f'Permission denials tracked: {len(self.permission_denials)}',
f'Usage totals: in={self.total_usage.input_tokens} out={self.total_usage.output_tokens}',
f'Max turns: {self.config.max_turns}',
f'Max budget tokens: {self.config.max_budget_tokens}',
f'Transcript flushed: {self.transcript_store.flushed}',
] ]
return '\n'.join(sections) return '\n'.join(sections)

25
src/remote_runtime.py Normal file
View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class RuntimeModeReport:
mode: str
connected: bool
detail: str
def as_text(self) -> str:
return f'mode={self.mode}\nconnected={self.connected}\ndetail={self.detail}'
def run_remote_mode(target: str) -> RuntimeModeReport:
return RuntimeModeReport('remote', True, f'Remote control placeholder prepared for {target}')
def run_ssh_mode(target: str) -> RuntimeModeReport:
return RuntimeModeReport('ssh', True, f'SSH proxy placeholder prepared for {target}')
def run_teleport_mode(target: str) -> RuntimeModeReport:
return RuntimeModeReport('teleport', True, f'Teleport resume/create placeholder prepared for {target}')

View File

@@ -3,8 +3,14 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from .commands import PORTED_COMMANDS from .commands import PORTED_COMMANDS
from .context import PortContext, build_port_context, render_context
from .history import HistoryLog
from .models import PermissionDenial, PortingModule
from .query_engine import QueryEngineConfig, QueryEnginePort, TurnResult
from .setup import SetupReport, WorkspaceSetup, run_setup
from .system_init import build_system_init_message
from .tools import PORTED_TOOLS from .tools import PORTED_TOOLS
from .models import PortingModule from .execution_registry import build_execution_registry
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -15,6 +21,71 @@ class RoutedMatch:
score: int score: int
@dataclass
class RuntimeSession:
prompt: str
context: PortContext
setup: WorkspaceSetup
setup_report: SetupReport
system_init_message: str
history: HistoryLog
routed_matches: list[RoutedMatch]
turn_result: TurnResult
command_execution_messages: tuple[str, ...]
tool_execution_messages: tuple[str, ...]
stream_events: tuple[dict[str, object], ...]
persisted_session_path: str
def as_markdown(self) -> str:
lines = [
'# Runtime Session',
'',
f'Prompt: {self.prompt}',
'',
'## Context',
render_context(self.context),
'',
'## Setup',
f'- Python: {self.setup.python_version} ({self.setup.implementation})',
f'- Platform: {self.setup.platform_name}',
f'- Test command: {self.setup.test_command}',
'',
'## Startup Steps',
*(f'- {step}' for step in self.setup.startup_steps()),
'',
'## System Init',
self.system_init_message,
'',
'## Routed Matches',
]
if self.routed_matches:
lines.extend(
f'- [{match.kind}] {match.name} ({match.score}) — {match.source_hint}'
for match in self.routed_matches
)
else:
lines.append('- none')
lines.extend([
'',
'## Command Execution',
*(self.command_execution_messages or ('none',)),
'',
'## Tool Execution',
*(self.tool_execution_messages or ('none',)),
'',
'## Stream Events',
*(f"- {event['type']}: {event}" for event in self.stream_events),
'',
'## Turn Result',
self.turn_result.output,
'',
f'Persisted session path: {self.persisted_session_path}',
'',
self.history.as_markdown(),
])
return '\n'.join(lines)
class PortRuntime: class PortRuntime:
def route_prompt(self, prompt: str, limit: int = 5) -> list[RoutedMatch]: def route_prompt(self, prompt: str, limit: int = 5) -> list[RoutedMatch]:
tokens = {token.lower() for token in prompt.replace('/', ' ').replace('-', ' ').split() if token} tokens = {token.lower() for token in prompt.replace('/', ' ').replace('-', ' ').split() if token}
@@ -24,7 +95,6 @@ class PortRuntime:
} }
selected: list[RoutedMatch] = [] selected: list[RoutedMatch] = []
# Prefer at least one representative from each kind when available.
for kind in ('command', 'tool'): for kind in ('command', 'tool'):
if by_kind[kind]: if by_kind[kind]:
selected.append(by_kind[kind].pop(0)) selected.append(by_kind[kind].pop(0))
@@ -36,6 +106,73 @@ class PortRuntime:
selected.extend(leftovers[: max(0, limit - len(selected))]) selected.extend(leftovers[: max(0, limit - len(selected))])
return selected[:limit] return selected[:limit]
def bootstrap_session(self, prompt: str, limit: int = 5) -> RuntimeSession:
context = build_port_context()
setup_report = run_setup(trusted=True)
setup = setup_report.setup
history = HistoryLog()
engine = QueryEnginePort.from_workspace()
history.add('context', f'python_files={context.python_file_count}, archive_available={context.archive_available}')
history.add('registry', f'commands={len(PORTED_COMMANDS)}, tools={len(PORTED_TOOLS)}')
matches = self.route_prompt(prompt, limit=limit)
registry = build_execution_registry()
command_execs = tuple(registry.command(match.name).execute(prompt) for match in matches if match.kind == 'command' and registry.command(match.name))
tool_execs = tuple(registry.tool(match.name).execute(prompt) for match in matches if match.kind == 'tool' and registry.tool(match.name))
denials = tuple(self._infer_permission_denials(matches))
stream_events = tuple(engine.stream_submit_message(
prompt,
matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
denied_tools=denials,
))
turn_result = engine.submit_message(
prompt,
matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
denied_tools=denials,
)
persisted_session_path = engine.persist_session()
history.add('routing', f'matches={len(matches)} for prompt={prompt!r}')
history.add('execution', f'command_execs={len(command_execs)} tool_execs={len(tool_execs)}')
history.add('turn', f'commands={len(turn_result.matched_commands)} tools={len(turn_result.matched_tools)} denials={len(turn_result.permission_denials)} stop={turn_result.stop_reason}')
history.add('session_store', persisted_session_path)
return RuntimeSession(
prompt=prompt,
context=context,
setup=setup,
setup_report=setup_report,
system_init_message=build_system_init_message(trusted=True),
history=history,
routed_matches=matches,
turn_result=turn_result,
command_execution_messages=command_execs,
tool_execution_messages=tool_execs,
stream_events=stream_events,
persisted_session_path=persisted_session_path,
)
def run_turn_loop(self, prompt: str, limit: int = 5, max_turns: int = 3, structured_output: bool = False) -> list[TurnResult]:
engine = QueryEnginePort.from_workspace()
engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output)
matches = self.route_prompt(prompt, limit=limit)
command_names = tuple(match.name for match in matches if match.kind == 'command')
tool_names = tuple(match.name for match in matches if match.kind == 'tool')
results: list[TurnResult] = []
for turn in range(max_turns):
turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]'
result = engine.submit_message(turn_prompt, command_names, tool_names, ())
results.append(result)
if result.stop_reason != 'completed':
break
return results
def _infer_permission_denials(self, matches: list[RoutedMatch]) -> list[PermissionDenial]:
denials: list[PermissionDenial] = []
for match in matches:
if match.kind == 'tool' and 'bash' in match.name.lower():
denials.append(PermissionDenial(tool_name=match.name, reason='destructive shell execution remains gated in the Python port'))
return denials
def _collect_matches(self, tokens: set[str], modules: tuple[PortingModule, ...], kind: str) -> list[RoutedMatch]: def _collect_matches(self, tokens: set[str], modules: tuple[PortingModule, ...], kind: str) -> list[RoutedMatch]:
matches: list[RoutedMatch] = [] matches: list[RoutedMatch] = []
for module in modules: for module in modules:

35
src/session_store.py Normal file
View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
@dataclass(frozen=True)
class StoredSession:
session_id: str
messages: tuple[str, ...]
input_tokens: int
output_tokens: int
DEFAULT_SESSION_DIR = Path('.port_sessions')
def save_session(session: StoredSession, directory: Path | None = None) -> Path:
target_dir = directory or DEFAULT_SESSION_DIR
target_dir.mkdir(parents=True, exist_ok=True)
path = target_dir / f'{session.session_id}.json'
path.write_text(json.dumps(asdict(session), indent=2))
return path
def load_session(session_id: str, directory: Path | None = None) -> StoredSession:
target_dir = directory or DEFAULT_SESSION_DIR
data = json.loads((target_dir / f'{session_id}.json').read_text())
return StoredSession(
session_id=data['session_id'],
messages=tuple(data['messages']),
input_tokens=data['input_tokens'],
output_tokens=data['output_tokens'],
)

View File

@@ -1,9 +1,77 @@
from __future__ import annotations from __future__ import annotations
import platform
import sys
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from .deferred_init import DeferredInitResult, run_deferred_init
from .prefetch import PrefetchResult, start_keychain_prefetch, start_mdm_raw_read, start_project_scan
@dataclass(frozen=True) @dataclass(frozen=True)
class WorkspaceSetup: class WorkspaceSetup:
python_version: str = '3.13+' python_version: str
implementation: str
platform_name: str
test_command: str = 'python3 -m unittest discover -s tests -v' test_command: str = 'python3 -m unittest discover -s tests -v'
def startup_steps(self) -> tuple[str, ...]:
return (
'start top-level prefetch side effects',
'build workspace context',
'load mirrored command snapshot',
'load mirrored tool snapshot',
'prepare parity audit hooks',
'apply trust-gated deferred init',
)
@dataclass(frozen=True)
class SetupReport:
setup: WorkspaceSetup
prefetches: tuple[PrefetchResult, ...]
deferred_init: DeferredInitResult
trusted: bool
cwd: Path
def as_markdown(self) -> str:
lines = [
'# Setup Report',
'',
f'- Python: {self.setup.python_version} ({self.setup.implementation})',
f'- Platform: {self.setup.platform_name}',
f'- Trusted mode: {self.trusted}',
f'- CWD: {self.cwd}',
'',
'Prefetches:',
*(f'- {prefetch.name}: {prefetch.detail}' for prefetch in self.prefetches),
'',
'Deferred init:',
*self.deferred_init.as_lines(),
]
return '\n'.join(lines)
def build_workspace_setup() -> WorkspaceSetup:
return WorkspaceSetup(
python_version='.'.join(str(part) for part in sys.version_info[:3]),
implementation=platform.python_implementation(),
platform_name=platform.platform(),
)
def run_setup(cwd: Path | None = None, trusted: bool = True) -> SetupReport:
root = cwd or Path(__file__).resolve().parent.parent
prefetches = [
start_mdm_raw_read(),
start_keychain_prefetch(),
start_project_scan(root),
]
return SetupReport(
setup=build_workspace_setup(),
prefetches=tuple(prefetches),
deferred_init=run_deferred_init(trusted=trusted),
trusted=trusted,
cwd=root,
)

23
src/system_init.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from .commands import built_in_command_names, get_commands
from .setup import run_setup
from .tools import get_tools
def build_system_init_message(trusted: bool = True) -> str:
setup = run_setup(trusted=trusted)
commands = get_commands()
tools = get_tools()
lines = [
'# System Init',
'',
f'Trusted: {setup.trusted}',
f'Built-in command names: {len(built_in_command_names())}',
f'Loaded command entries: {len(commands)}',
f'Loaded tool entries: {len(tools)}',
'',
'Startup steps:',
*(f'- {step}' for step in setup.setup.startup_steps()),
]
return '\n'.join(lines)

37
src/tool_pool.py Normal file
View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from dataclasses import dataclass
from .models import PortingModule
from .permissions import ToolPermissionContext
from .tools import get_tools
@dataclass(frozen=True)
class ToolPool:
tools: tuple[PortingModule, ...]
simple_mode: bool
include_mcp: bool
def as_markdown(self) -> str:
lines = [
'# Tool Pool',
'',
f'Simple mode: {self.simple_mode}',
f'Include MCP: {self.include_mcp}',
f'Tool count: {len(self.tools)}',
]
lines.extend(f'- {tool.name}{tool.source_hint}' for tool in self.tools[:15])
return '\n'.join(lines)
def assemble_tool_pool(
simple_mode: bool = False,
include_mcp: bool = True,
permission_context: ToolPermissionContext | None = None,
) -> ToolPool:
return ToolPool(
tools=get_tools(simple_mode=simple_mode, include_mcp=include_mcp, permission_context=permission_context),
simple_mode=simple_mode,
include_mcp=include_mcp,
)

View File

@@ -1,14 +1,25 @@
from __future__ import annotations from __future__ import annotations
import json import json
from dataclasses import dataclass
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from .models import PortingBacklog, PortingModule from .models import PortingBacklog, PortingModule
from .permissions import ToolPermissionContext
SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'tools_snapshot.json' SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'tools_snapshot.json'
@dataclass(frozen=True)
class ToolExecution:
name: str
source_hint: str
payload: str
handled: bool
message: str
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def load_tool_snapshot() -> tuple[PortingModule, ...]: def load_tool_snapshot() -> tuple[PortingModule, ...]:
raw_entries = json.loads(SNAPSHOT_PATH.read_text()) raw_entries = json.loads(SNAPSHOT_PATH.read_text())
@@ -42,12 +53,39 @@ def get_tool(name: str) -> PortingModule | None:
return None return None
def filter_tools_by_permission_context(tools: tuple[PortingModule, ...], permission_context: ToolPermissionContext | None = None) -> tuple[PortingModule, ...]:
if permission_context is None:
return tools
return tuple(module for module in tools if not permission_context.blocks(module.name))
def get_tools(
simple_mode: bool = False,
include_mcp: bool = True,
permission_context: ToolPermissionContext | None = None,
) -> tuple[PortingModule, ...]:
tools = list(PORTED_TOOLS)
if simple_mode:
tools = [module for module in tools if module.name in {'BashTool', 'FileReadTool', 'FileEditTool'}]
if not include_mcp:
tools = [module for module in tools if 'mcp' not in module.name.lower() and 'mcp' not in module.source_hint.lower()]
return filter_tools_by_permission_context(tuple(tools), permission_context)
def find_tools(query: str, limit: int = 20) -> list[PortingModule]: def find_tools(query: str, limit: int = 20) -> list[PortingModule]:
needle = query.lower() needle = query.lower()
matches = [module for module in PORTED_TOOLS if needle in module.name.lower() or needle in module.source_hint.lower()] matches = [module for module in PORTED_TOOLS if needle in module.name.lower() or needle in module.source_hint.lower()]
return matches[:limit] return matches[:limit]
def execute_tool(name: str, payload: str = '') -> ToolExecution:
module = get_tool(name)
if module is None:
return ToolExecution(name=name, source_hint='', payload=payload, handled=False, message=f'Unknown mirrored tool: {name}')
action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}."
return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=True, message=action)
def render_tool_index(limit: int = 20, query: str | None = None) -> str: def render_tool_index(limit: int = 20, query: str | None = None) -> str:
modules = find_tools(query, limit) if query else list(PORTED_TOOLS[:limit]) modules = find_tools(query, limit) if query else list(PORTED_TOOLS[:limit])
lines = [f'Tool entries: {len(PORTED_TOOLS)}', ''] lines = [f'Tool entries: {len(PORTED_TOOLS)}', '']

23
src/transcript.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class TranscriptStore:
entries: list[str] = field(default_factory=list)
flushed: bool = False
def append(self, entry: str) -> None:
self.entries.append(entry)
self.flushed = False
def compact(self, keep_last: int = 10) -> None:
if len(self.entries) > keep_last:
self.entries[:] = self.entries[-keep_last:]
def replay(self) -> tuple[str, ...]:
return tuple(self.entries)
def flush(self) -> None:
self.flushed = True

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import subprocess import subprocess
import sys import sys
import unittest import unittest
from pathlib import Path
from src.commands import PORTED_COMMANDS from src.commands import PORTED_COMMANDS
from src.parity_audit import run_parity_audit from src.parity_audit import run_parity_audit
@@ -100,6 +101,148 @@ class PortingWorkspaceTests(unittest.TestCase):
self.assertIn('review', show_command.stdout.lower()) self.assertIn('review', show_command.stdout.lower())
self.assertIn('mcptool', show_tool.stdout.lower()) self.assertIn('mcptool', show_tool.stdout.lower())
def test_bootstrap_cli_runs(self) -> None:
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'bootstrap', 'review MCP tool', '--limit', '5'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('Runtime Session', result.stdout)
self.assertIn('Startup Steps', result.stdout)
self.assertIn('Routed Matches', result.stdout)
def test_bootstrap_session_tracks_turn_state(self) -> None:
from src.runtime import PortRuntime
session = PortRuntime().bootstrap_session('review MCP tool', limit=5)
self.assertGreaterEqual(len(session.turn_result.matched_tools), 1)
self.assertIn('Prompt:', session.turn_result.output)
self.assertGreaterEqual(session.turn_result.usage.input_tokens, 1)
def test_exec_command_and_tool_cli_run(self) -> None:
command_result = subprocess.run(
[sys.executable, '-m', 'src.main', 'exec-command', 'review', 'inspect security review'],
check=True,
capture_output=True,
text=True,
)
tool_result = subprocess.run(
[sys.executable, '-m', 'src.main', 'exec-tool', 'MCPTool', 'fetch resource list'],
check=True,
capture_output=True,
text=True,
)
self.assertIn("Mirrored command 'review'", command_result.stdout)
self.assertIn("Mirrored tool 'MCPTool'", tool_result.stdout)
def test_setup_report_and_registry_filters_run(self) -> None:
setup_result = subprocess.run(
[sys.executable, '-m', 'src.main', 'setup-report'],
check=True,
capture_output=True,
text=True,
)
command_result = subprocess.run(
[sys.executable, '-m', 'src.main', 'commands', '--limit', '5', '--no-plugin-commands'],
check=True,
capture_output=True,
text=True,
)
tool_result = subprocess.run(
[sys.executable, '-m', 'src.main', 'tools', '--limit', '5', '--simple-mode', '--no-mcp'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('Setup Report', setup_result.stdout)
self.assertIn('Command entries:', command_result.stdout)
self.assertIn('Tool entries:', tool_result.stdout)
def test_load_session_cli_runs(self) -> None:
from src.runtime import PortRuntime
session = PortRuntime().bootstrap_session('review MCP tool', limit=5)
session_id = Path(session.persisted_session_path).stem
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'load-session', session_id],
check=True,
capture_output=True,
text=True,
)
self.assertIn(session_id, result.stdout)
self.assertIn('messages', result.stdout)
def test_tool_permission_filtering_cli_runs(self) -> None:
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'tools', '--limit', '10', '--deny-prefix', 'mcp'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('Tool entries:', result.stdout)
self.assertNotIn('MCPTool', result.stdout)
def test_turn_loop_cli_runs(self) -> None:
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'turn-loop', 'review MCP tool', '--max-turns', '2', '--structured-output'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('## Turn 1', result.stdout)
self.assertIn('stop_reason=', result.stdout)
def test_remote_mode_clis_run(self) -> None:
remote_result = subprocess.run([sys.executable, '-m', 'src.main', 'remote-mode', 'workspace'], check=True, capture_output=True, text=True)
ssh_result = subprocess.run([sys.executable, '-m', 'src.main', 'ssh-mode', 'workspace'], check=True, capture_output=True, text=True)
teleport_result = subprocess.run([sys.executable, '-m', 'src.main', 'teleport-mode', 'workspace'], check=True, capture_output=True, text=True)
self.assertIn('mode=remote', remote_result.stdout)
self.assertIn('mode=ssh', ssh_result.stdout)
self.assertIn('mode=teleport', teleport_result.stdout)
def test_flush_transcript_cli_runs(self) -> None:
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'flush-transcript', 'review MCP tool'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('flushed=True', result.stdout)
def test_command_graph_and_tool_pool_cli_run(self) -> None:
command_graph = subprocess.run([sys.executable, '-m', 'src.main', 'command-graph'], check=True, capture_output=True, text=True)
tool_pool = subprocess.run([sys.executable, '-m', 'src.main', 'tool-pool'], check=True, capture_output=True, text=True)
self.assertIn('Command Graph', command_graph.stdout)
self.assertIn('Tool Pool', tool_pool.stdout)
def test_setup_report_mentions_deferred_init(self) -> None:
result = subprocess.run(
[sys.executable, '-m', 'src.main', 'setup-report'],
check=True,
capture_output=True,
text=True,
)
self.assertIn('Deferred init:', result.stdout)
self.assertIn('plugin_init=True', result.stdout)
def test_execution_registry_runs(self) -> None:
from src.execution_registry import build_execution_registry
registry = build_execution_registry()
self.assertGreaterEqual(len(registry.commands), 150)
self.assertGreaterEqual(len(registry.tools), 100)
self.assertIn('Mirrored command', registry.command('review').execute('review security'))
self.assertIn('Mirrored tool', registry.tool('MCPTool').execute('fetch mcp resources'))
def test_bootstrap_graph_and_direct_modes_run(self) -> None:
graph_result = subprocess.run([sys.executable, '-m', 'src.main', 'bootstrap-graph'], check=True, capture_output=True, text=True)
direct_result = subprocess.run([sys.executable, '-m', 'src.main', 'direct-connect-mode', 'workspace'], check=True, capture_output=True, text=True)
deep_link_result = subprocess.run([sys.executable, '-m', 'src.main', 'deep-link-mode', 'workspace'], check=True, capture_output=True, text=True)
self.assertIn('Bootstrap Graph', graph_result.stdout)
self.assertIn('mode=direct-connect', direct_result.stdout)
self.assertIn('mode=deep-link', deep_link_result.stdout)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()