Secrets management for Crux — pluggable backend system.
Provides a SecretsBackend protocol and concrete implementations
- MacOSKeychainBackend (macOS Keychain via
security CLI)
- LinuxSecretServiceBackend (freedesktop Secret Service via
secretstorage)
- AgeEncryptedBackend (age-encrypted JSON file for headless Linux)
The active backend is determined by config.toml [secrets] backend = "...".
SecretsBackend
Bases: Protocol
Protocol that all secrets backends must implement.
MacOSKeychainBackend
Secrets backend using macOS Keychain via the security CLI.
set
set(mcp_name, key, value)
Store a secret in macOS Keychain via security add-generic-password.
Source code in src/crux_cli/secrets.py
| def set(self, mcp_name: str, key: str, value: str) -> None: # noqa: A003
"""Store a secret in macOS Keychain via ``security add-generic-password``."""
service = self._service(mcp_name)
cmd = ["security", "add-generic-password", "-U", "-s", service, "-a", key, "-w"] # noqa: S607
result = subprocess.run(cmd, input=value.encode(), capture_output=True) # noqa: S603
if result.returncode != 0:
msg = result.stderr.decode().strip() if result.stderr else "unknown error"
print(f"Keychain write failed: {msg}", file=sys.stderr)
sys.exit(1)
_index_add_key(mcp_name, key)
|
get
Retrieve a secret from macOS Keychain. Returns None if not found.
Source code in src/crux_cli/secrets.py
| def get(self, mcp_name: str, key: str) -> str | None: # noqa: A003
"""Retrieve a secret from macOS Keychain. Returns ``None`` if not found."""
cmd = ["security", "find-generic-password", "-s", self._service(mcp_name), "-a", key, "-w"] # noqa: S607
result = subprocess.run(cmd, capture_output=True, text=True) # noqa: S603
if result.returncode == 0:
return result.stdout.strip()
return None
|
delete
Delete a secret from macOS Keychain.
Source code in src/crux_cli/secrets.py
| def delete(self, mcp_name: str, key: str) -> None:
"""Delete a secret from macOS Keychain."""
cmd = ["security", "delete-generic-password", "-s", self._service(mcp_name), "-a", key] # noqa: S607
subprocess.run(cmd, capture_output=True) # noqa: S603
_index_remove_key(mcp_name, key)
|
list_keys
List stored secret key names, optionally filtered by MCP.
Source code in src/crux_cli/secrets.py
| def list_keys(self, mcp_name: str | None = None) -> dict[str, list[str]]:
"""List stored secret key names, optionally filtered by MCP."""
index = load_secrets_index()
if mcp_name:
keys = index.get(mcp_name, [])
return {mcp_name: keys} if keys else {}
return index
|
LinuxSecretServiceBackend
LinuxSecretServiceBackend()
Secrets backend using freedesktop.org Secret Service (via secretstorage).
Initialize with lazy D-Bus connection.
Source code in src/crux_cli/secrets.py
| def __init__(self) -> None:
"""Initialize with lazy D-Bus connection."""
self._connection: Any = None
self._available: bool | None = None
|
set
set(mcp_name, key, value)
Store a secret via Secret Service, falling back to age encryption.
Source code in src/crux_cli/secrets.py
| def set(self, mcp_name: str, key: str, value: str) -> None: # noqa: A003
"""Store a secret via Secret Service, falling back to age encryption."""
collection = self._get_collection()
if collection is None:
self._fallback().set(mcp_name, key, value)
return
label = f"crux.{mcp_name}/{key}"
attrs = {"service": f"crux.{mcp_name}", "account": key}
collection.create_item(label, attrs, value.encode(), replace=True)
_index_add_key(mcp_name, key)
|
get
Retrieve a secret from Secret Service. Returns None if not found.
Source code in src/crux_cli/secrets.py
| def get(self, mcp_name: str, key: str) -> str | None: # noqa: A003
"""Retrieve a secret from Secret Service. Returns ``None`` if not found."""
collection = self._get_collection()
if collection is None:
return self._fallback().get(mcp_name, key)
attrs = {"service": f"crux.{mcp_name}", "account": key}
items = list(collection.search_items(attrs))
if items:
return items[0].get_secret().decode()
return None
|
delete
Delete a secret from Secret Service.
Source code in src/crux_cli/secrets.py
| def delete(self, mcp_name: str, key: str) -> None:
"""Delete a secret from Secret Service."""
collection = self._get_collection()
if collection is None:
self._fallback().delete(mcp_name, key)
return
attrs = {"service": f"crux.{mcp_name}", "account": key}
items = list(collection.search_items(attrs))
for item in items:
item.delete()
_index_remove_key(mcp_name, key)
|
list_keys
List stored secret key names, optionally filtered by MCP.
Source code in src/crux_cli/secrets.py
| def list_keys(self, mcp_name: str | None = None) -> dict[str, list[str]]:
"""List stored secret key names, optionally filtered by MCP."""
index = load_secrets_index()
if mcp_name:
keys = index.get(mcp_name, [])
return {mcp_name: keys} if keys else {}
return index
|
AgeEncryptedBackend
Secrets backend using age-encrypted JSON file.
set
set(mcp_name, key, value)
Store a secret in the age-encrypted store.
Source code in src/crux_cli/secrets.py
| def set(self, mcp_name: str, key: str, value: str) -> None: # noqa: A003
"""Store a secret in the age-encrypted store."""
self._ensure_identity()
store = self._load_store()
store.setdefault(mcp_name, {})
store[mcp_name][key] = value
self._save_store(store)
_index_add_key(mcp_name, key)
|
get
Retrieve a secret from the age-encrypted store. Returns None if not found.
Source code in src/crux_cli/secrets.py
| def get(self, mcp_name: str, key: str) -> str | None: # noqa: A003
"""Retrieve a secret from the age-encrypted store. Returns ``None`` if not found."""
store = self._load_store()
return store.get(mcp_name, {}).get(key)
|
delete
Delete a secret from the age-encrypted store.
Source code in src/crux_cli/secrets.py
| def delete(self, mcp_name: str, key: str) -> None:
"""Delete a secret from the age-encrypted store."""
store = self._load_store()
if mcp_name in store:
store[mcp_name].pop(key, None)
if not store[mcp_name]:
del store[mcp_name]
self._save_store(store)
_index_remove_key(mcp_name, key)
|
list_keys
List stored secret key names, optionally filtered by MCP.
Source code in src/crux_cli/secrets.py
| def list_keys(self, mcp_name: str | None = None) -> dict[str, list[str]]:
"""List stored secret key names, optionally filtered by MCP."""
index = load_secrets_index()
if mcp_name:
keys = index.get(mcp_name, [])
return {mcp_name: keys} if keys else {}
return index
|
load_secrets_index
Load the secrets index from disk, returning empty dict if absent.
Source code in src/crux_cli/secrets.py
| def load_secrets_index() -> dict[str, list[str]]:
"""Load the secrets index from disk, returning empty dict if absent."""
idx_path = secrets_path()
if idx_path.exists():
with open(idx_path) as f:
return json.load(f)
return {}
|
save_secrets_index
save_secrets_index(index)
Atomically write the secrets index to disk (temp file + rename).
Source code in src/crux_cli/secrets.py
| def save_secrets_index(index: dict[str, list[str]]) -> None:
"""Atomically write the secrets index to disk (temp file + rename)."""
idx_path = secrets_path()
idx_path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=idx_path.parent, suffix=".tmp")
try:
with open(fd, "w") as f:
json.dump(index, f, indent=2, sort_keys=True)
f.write("\n")
Path(tmp).replace(idx_path)
except BaseException:
Path(tmp).unlink(missing_ok=True)
raise
|
get_backend
Return the appropriate SecretsBackend based on configuration.
Source code in src/crux_cli/secrets.py
| def get_backend(config: dict[str, Any] | None = None) -> SecretsBackend:
"""Return the appropriate SecretsBackend based on configuration."""
if config is None:
from crux_cli.config import load_config
config = load_config()
name = config.get("secrets", {}).get("backend", "keychain")
cls = _BACKENDS.get(name)
if cls is None:
msg = f"Unknown secrets backend {name!r}. Known: {', '.join(_BACKENDS)}"
raise ValueError(msg)
return cls()
|