Skip to content

Health

health

health.py — MCP server health probing via JSON-RPC handshake and environment diagnostics.

CheckResult

CheckResult(label, *, passed, warning=False, fix_hint=None)

Single diagnostic check result.

Create a check result.

PARAMETER DESCRIPTION
label

Human-readable description of what was checked.

TYPE: str

passed

Whether the check passed.

TYPE: bool

warning

If True, treat as a non-fatal warning instead of failure.

TYPE: bool DEFAULT: False

fix_hint

Actionable command or instruction to resolve the issue.

TYPE: str | None DEFAULT: None

Source code in src/crux_cli/health.py
def __init__(self, label: str, *, passed: bool, warning: bool = False, fix_hint: str | None = None):
    """Create a check result.

    Args:
        label: Human-readable description of what was checked.
        passed: Whether the check passed.
        warning: If ``True``, treat as a non-fatal warning instead of failure.
        fix_hint: Actionable command or instruction to resolve the issue.
    """
    self.label = label
    self.passed = passed
    self.warning = warning
    self.fix_hint = fix_hint

probe_mcp_server

probe_mcp_server(config)

Probe an MCP server via MCP initialize + tools/list handshake. Returns (status, reason).

Source code in src/crux_cli/health.py
def probe_mcp_server(config: dict[str, Any]) -> tuple[str, str]:
    """Probe an MCP server via MCP initialize + tools/list handshake.
    Returns (status, reason).
    """
    result = probe_mcp_server_detailed(config)
    return result["status"], result["detail"]

probe_mcp_server_detailed

probe_mcp_server_detailed(config)

Extended probe returning a dict with status, detail, tools_count, server_info.

Source code in src/crux_cli/health.py
def probe_mcp_server_detailed(config: dict[str, Any]) -> dict[str, Any]:
    """Extended probe returning a dict with status, detail, tools_count, server_info."""
    command = config.get("command", "")
    args_list = config.get("args", [])
    env_overrides = config.get("env", {})
    auth = config.get("auth", {})

    if not shutil.which(command):
        return {
            "status": "failed",
            "detail": f"command not found: '{command}'",
            "tools_count": None,
            "server_info": None,
        }

    if command.startswith("http://") or command.startswith("https://"):
        return {"status": "! needs authentication", "detail": command, "tools_count": None, "server_info": None}

    if auth.get("check_cmd"):
        try:
            result = subprocess.run(auth["check_cmd"], capture_output=True, timeout=5)  # noqa: S603
            if result.returncode != 0:
                return {
                    "status": "auth_required",
                    "detail": auth.get("fix_description", "authentication required"),
                    "tools_count": None,
                    "server_info": None,
                }
        except Exception:
            return {
                "status": "auth_required",
                "detail": auth.get("fix_description", "authentication check failed"),
                "tools_count": None,
                "server_info": None,
            }

    env = os.environ.copy()
    env.update(env_overrides)
    full_cmd = [command] + [str(a) for a in args_list]

    messages = (
        json.dumps(
            {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "initialize",
                "params": {
                    "protocolVersion": "2024-11-05",
                    "capabilities": {},
                    "clientInfo": {"name": "crux", "version": "0.1.0"},
                },
            }
        )
        + "\n"
        + json.dumps({"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}})
        + "\n"
    )

    try:
        proc = subprocess.Popen(  # noqa: S603
            full_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
        )
        stdout, _ = proc.communicate(input=messages.encode(), timeout=10)
        lines = [line for line in stdout.decode().splitlines() if line.strip()]

        server_info_detail = None
        tools_count = None
        for line in lines:
            try:
                resp = json.loads(line)
            except json.JSONDecodeError:
                continue
            if resp.get("id") == 1 and "result" in resp:
                si = resp["result"].get("serverInfo", {})
                name = si.get("name", "")
                version = si.get("version", "")
                server_info_detail = f"{name} {version}".strip() if name else "connected"
            if resp.get("id") == 2:
                if "error" in resp:
                    msg = resp["error"].get("message", "")
                    auth_keywords = ["auth", "login", "credential", "token", "unauthorized", "permission"]
                    if any(k in msg.lower() for k in auth_keywords):
                        return {
                            "status": "auth_required",
                            "detail": msg,
                            "tools_count": None,
                            "server_info": server_info_detail,
                        }
                    return {"status": "error", "detail": msg, "tools_count": None, "server_info": server_info_detail}
                if "result" in resp:
                    tools = resp["result"].get("tools", [])
                    tools_count = len(tools)
                    return {
                        "status": "connected",
                        "detail": server_info_detail or "connected",
                        "tools_count": tools_count,
                        "server_info": server_info_detail,
                    }

        return {
            "status": "running",
            "detail": server_info_detail or " ".join(full_cmd),
            "tools_count": None,
            "server_info": server_info_detail,
        }

    except subprocess.TimeoutExpired:
        proc.kill()
        return {
            "status": "timeout",
            "detail": "server started but did not respond in time",
            "tools_count": None,
            "server_info": None,
        }
    except Exception as e:
        return {"status": "failed", "detail": str(e), "tools_count": None, "server_info": None}

check_python_version

check_python_version(min_version=(3, 11))

Check that the running Python meets the minimum version.

Source code in src/crux_cli/health.py
def check_python_version(min_version: tuple[int, int] = (3, 11)) -> CheckResult:
    """Check that the running Python meets the minimum version."""
    current = sys.version_info[:2]
    ok = current >= min_version
    label = f"Python >= {min_version[0]}.{min_version[1]} (found {current[0]}.{current[1]})"
    return CheckResult(
        label, passed=ok, fix_hint=f"Install Python {min_version[0]}.{min_version[1]}+: https://python.org"
    )

check_tool_installed

check_tool_installed(name, *, fix_hint=None)

Check that an external tool is on PATH.

Source code in src/crux_cli/health.py
def check_tool_installed(name: str, *, fix_hint: str | None = None) -> CheckResult:
    """Check that an external tool is on PATH."""
    found = shutil.which(name) is not None
    label = f"{name} installed"
    return CheckResult(label, passed=found, fix_hint=fix_hint)

check_directory_structure

check_directory_structure(crux_root)

Verify that essential Crux directories exist.

Source code in src/crux_cli/health.py
def check_directory_structure(crux_root: Path) -> list[CheckResult]:
    """Verify that essential Crux directories exist."""
    checks = []
    expected = [
        (crux_root, "Crux root exists"),
        (crux_root / "marketplace", "marketplace/ directory"),
        (crux_root / "marketplace" / "mcps", "marketplace/mcps/ directory"),
        (crux_root / "marketplace" / "skills", "marketplace/skills/ directory"),
        (crux_root / "src", "src/ directory"),
    ]
    for path, label in expected:
        checks.append(CheckResult(label, passed=path.is_dir(), fix_hint=f"mkdir -p {path}"))
    return checks

check_registry_valid

check_registry_valid(registry_path)

Verify the registry JSON is valid and parseable.

Source code in src/crux_cli/health.py
def check_registry_valid(registry_path: Path) -> CheckResult:
    """Verify the registry JSON is valid and parseable."""
    if not registry_path.exists():
        return CheckResult("Registry JSON exists", passed=False, fix_hint=f"File not found: {registry_path}")
    try:
        with open(registry_path) as f:
            data = json.load(f)
        if not isinstance(data, dict):
            return CheckResult("Registry JSON valid", passed=False, fix_hint="Registry is not a JSON object")
        return CheckResult("Registry JSON valid", passed=True)
    except (json.JSONDecodeError, OSError) as e:
        return CheckResult("Registry JSON valid", passed=False, fix_hint=f"Parse error: {e}")

check_mcp_sources_present

check_mcp_sources_present(crux_root, mcp_definitions)

Check that cloned MCP source directories exist on disk.

Source code in src/crux_cli/health.py
def check_mcp_sources_present(crux_root: Path, mcp_definitions: dict[str, Any]) -> list[CheckResult]:
    """Check that cloned MCP source directories exist on disk."""
    checks = []
    for mcp_name, defn in mcp_definitions.items():
        source_dir = defn.get("source_dir")
        if source_dir:
            full = crux_root / source_dir
            checks.append(
                CheckResult(
                    f"MCP source: {mcp_name} ({source_dir})",
                    passed=full.is_dir(),
                    fix_hint=f"Run: crux mcp add {mcp_name}",
                )
            )
    return checks

check_build_artifacts

check_build_artifacts(crux_root, mcp_definitions)

Check that MCPs with build_cmd have their build artifacts.

Source code in src/crux_cli/health.py
def check_build_artifacts(crux_root: Path, mcp_definitions: dict[str, Any]) -> list[CheckResult]:
    """Check that MCPs with build_cmd have their build artifacts."""
    checks = []
    for mcp_name, defn in mcp_definitions.items():
        build_cmd = defn.get("build_cmd")
        source_dir = defn.get("source_dir")
        if build_cmd and source_dir:
            src = crux_root / source_dir
            has_artifacts = (
                (src / "node_modules").is_dir()
                or (src / "dist").is_dir()
                or (src / ".venv").is_dir()
                or (src / "build").is_dir()
            )
            checks.append(
                CheckResult(
                    f"Build artifacts: {mcp_name}",
                    passed=has_artifacts,
                    warning=not has_artifacts,
                    fix_hint=f"Run: cd {src} && {build_cmd}",
                )
            )
    return checks

check_crux_in_path

check_crux_in_path()

Check that the crux binary is on PATH.

Source code in src/crux_cli/health.py
def check_crux_in_path() -> CheckResult:
    """Check that the crux binary is on PATH."""
    found = shutil.which("crux") is not None
    return CheckResult("crux binary in PATH", passed=found, fix_hint="Add crux bin/ to PATH in ~/.zshrc")

run_doctor_checks

run_doctor_checks(crux_root, registry_path=None, mcp_definitions=None)

Run all doctor checks and return a flat list of results.

Source code in src/crux_cli/health.py
def run_doctor_checks(
    crux_root: Path,
    registry_path: Path | None = None,
    mcp_definitions: dict[str, Any] | None = None,
) -> list[CheckResult]:
    """Run all doctor checks and return a flat list of results."""
    results: list[CheckResult] = []

    results.append(check_python_version())
    results.append(check_tool_installed("uv", fix_hint="curl -LsSf https://astral.sh/uv/install.sh | sh"))
    results.append(check_tool_installed("git", fix_hint="Install git: https://git-scm.com"))
    results.append(check_tool_installed("node", fix_hint="Install Node.js: https://nodejs.org"))
    results.append(check_tool_installed("npm", fix_hint="Install Node.js (npm is bundled)"))
    results.append(
        check_tool_installed("claude", fix_hint="Install Claude CLI: https://docs.anthropic.com/en/docs/claude-cli")
    )

    results.extend(check_directory_structure(crux_root))

    if registry_path is not None:
        results.append(check_registry_valid(registry_path))

    if mcp_definitions:
        results.extend(check_mcp_sources_present(crux_root, mcp_definitions))
        results.extend(check_build_artifacts(crux_root, mcp_definitions))

    results.append(check_crux_in_path())

    return results