stage-2: secrets manager with keyring
Credential broker via keyring (zero plaintext on disk), CLI secrets subcommand, 29 new tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
90
cli.py
Executable file
90
cli.py
Executable file
@@ -0,0 +1,90 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Echo Core CLI tool."""
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
|
||||
from src.secrets import set_secret, get_secret, list_secrets, delete_secret, check_secrets
|
||||
|
||||
|
||||
def cmd_secrets(args):
|
||||
"""Handle secrets subcommand."""
|
||||
if args.secrets_action == "set":
|
||||
if args.file:
|
||||
path = Path(args.file)
|
||||
if not path.exists():
|
||||
print(f"Error: file {args.file} not found")
|
||||
sys.exit(1)
|
||||
value = path.read_text().strip()
|
||||
set_secret(args.name, value)
|
||||
path.unlink() # Delete source file after storing
|
||||
print(f"Secret '{args.name}' set from file (file deleted)")
|
||||
else:
|
||||
value = getpass.getpass(f"Enter value for '{args.name}': ")
|
||||
set_secret(args.name, value)
|
||||
print(f"Secret '{args.name}' set")
|
||||
|
||||
elif args.secrets_action == "list":
|
||||
names = list_secrets()
|
||||
if not names:
|
||||
print("No secrets stored")
|
||||
else:
|
||||
for name in names:
|
||||
print(f" - {name}")
|
||||
|
||||
elif args.secrets_action == "delete":
|
||||
if delete_secret(args.name):
|
||||
print(f"Secret '{args.name}' deleted")
|
||||
else:
|
||||
print(f"Secret '{args.name}' not found")
|
||||
|
||||
elif args.secrets_action == "test":
|
||||
results = check_secrets()
|
||||
for name, exists in results.items():
|
||||
print(f" {name}: {'OK' if exists else 'MISSING'}")
|
||||
if all(results.values()):
|
||||
print("\nAll required secrets present.")
|
||||
else:
|
||||
print("\nWARNING: Some required secrets are missing!")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="echo", description="Echo Core CLI")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# secrets
|
||||
secrets_parser = sub.add_parser("secrets", help="Manage secrets")
|
||||
secrets_sub = secrets_parser.add_subparsers(dest="secrets_action")
|
||||
|
||||
set_p = secrets_sub.add_parser("set", help="Set a secret")
|
||||
set_p.add_argument("name", help="Secret name")
|
||||
set_p.add_argument("--file", help="Read value from file (file deleted after)")
|
||||
|
||||
secrets_sub.add_parser("list", help="List secret names")
|
||||
|
||||
del_p = secrets_sub.add_parser("delete", help="Delete a secret")
|
||||
del_p.add_argument("name", help="Secret name")
|
||||
|
||||
secrets_sub.add_parser("test", help="Check required secrets")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command is None:
|
||||
parser.print_help()
|
||||
sys.exit(0)
|
||||
|
||||
if args.command == "secrets":
|
||||
if args.secrets_action is None:
|
||||
secrets_parser.print_help()
|
||||
sys.exit(0)
|
||||
cmd_secrets(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
69
src/secrets.py
Normal file
69
src/secrets.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Echo Core secrets manager — keyring wrapper."""
|
||||
|
||||
import json
|
||||
import keyring
|
||||
|
||||
SERVICE = "echo-core"
|
||||
|
||||
# Required secrets that should exist for full functionality
|
||||
REQUIRED_SECRETS = ["discord_token"]
|
||||
|
||||
|
||||
def set_secret(name: str, value: str) -> None:
|
||||
"""Store a secret in the system keyring."""
|
||||
keyring.set_password(SERVICE, name, value)
|
||||
# Track secret name in the registry
|
||||
names = _get_registry()
|
||||
if name not in names:
|
||||
names.append(name)
|
||||
_save_registry(names)
|
||||
|
||||
|
||||
def get_secret(name: str) -> str | None:
|
||||
"""Retrieve a secret from keyring. Returns None if not found."""
|
||||
return keyring.get_password(SERVICE, name)
|
||||
|
||||
|
||||
def get_json_secret(name: str) -> dict | None:
|
||||
"""Retrieve and JSON-deserialize a secret."""
|
||||
raw = get_secret(name)
|
||||
if raw is None:
|
||||
return None
|
||||
return json.loads(raw)
|
||||
|
||||
|
||||
def delete_secret(name: str) -> bool:
|
||||
"""Delete a secret. Returns True if existed."""
|
||||
try:
|
||||
keyring.delete_password(SERVICE, name)
|
||||
except keyring.errors.PasswordDeleteError:
|
||||
pass
|
||||
names = _get_registry()
|
||||
if name in names:
|
||||
names.remove(name)
|
||||
_save_registry(names)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def list_secrets() -> list[str]:
|
||||
"""List all stored secret names."""
|
||||
return _get_registry()
|
||||
|
||||
|
||||
def check_secrets() -> dict[str, bool]:
|
||||
"""Check which required secrets exist. Returns {name: exists}."""
|
||||
return {name: get_secret(name) is not None for name in REQUIRED_SECRETS}
|
||||
|
||||
|
||||
def _get_registry() -> list[str]:
|
||||
"""Get list of secret names from keyring registry."""
|
||||
raw = keyring.get_password(SERVICE, "_registry")
|
||||
if raw is None:
|
||||
return []
|
||||
return json.loads(raw)
|
||||
|
||||
|
||||
def _save_registry(names: list[str]) -> None:
|
||||
"""Save secret names registry to keyring."""
|
||||
keyring.set_password(SERVICE, "_registry", json.dumps(sorted(set(names))))
|
||||
257
tests/test_secrets.py
Normal file
257
tests/test_secrets.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Comprehensive tests for src/secrets.py and cli.py secrets subcommand."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
from pathlib import Path
|
||||
|
||||
from src.secrets import (
|
||||
SERVICE,
|
||||
REQUIRED_SECRETS,
|
||||
set_secret,
|
||||
get_secret,
|
||||
get_json_secret,
|
||||
delete_secret,
|
||||
list_secrets,
|
||||
check_secrets,
|
||||
_get_registry,
|
||||
_save_registry,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper: a fake keyring store backed by a plain dict
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class FakeKeyring:
|
||||
"""In-memory keyring replacement for tests."""
|
||||
|
||||
def __init__(self):
|
||||
self.store: dict[tuple[str, str], str] = {}
|
||||
|
||||
def get_password(self, service, name):
|
||||
return self.store.get((service, name))
|
||||
|
||||
def set_password(self, service, name, value):
|
||||
self.store[(service, name)] = value
|
||||
|
||||
def delete_password(self, service, name):
|
||||
key = (service, name)
|
||||
if key not in self.store:
|
||||
import keyring.errors
|
||||
raise keyring.errors.PasswordDeleteError(name)
|
||||
del self.store[key]
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_keyring():
|
||||
"""Patch keyring globally for every test so the real keyring is never touched."""
|
||||
fake = FakeKeyring()
|
||||
with (
|
||||
patch("src.secrets.keyring.get_password", side_effect=fake.get_password),
|
||||
patch("src.secrets.keyring.set_password", side_effect=fake.set_password),
|
||||
patch("src.secrets.keyring.delete_password", side_effect=fake.delete_password),
|
||||
):
|
||||
yield fake
|
||||
|
||||
|
||||
# ===== _get_registry / _save_registry ======================================
|
||||
|
||||
class TestRegistry:
|
||||
def test_get_registry_empty(self, mock_keyring):
|
||||
assert _get_registry() == []
|
||||
|
||||
def test_save_and_get_registry(self, mock_keyring):
|
||||
_save_registry(["alpha", "beta"])
|
||||
result = _get_registry()
|
||||
assert result == ["alpha", "beta"]
|
||||
|
||||
def test_save_registry_deduplicates_and_sorts(self, mock_keyring):
|
||||
_save_registry(["zeta", "alpha", "zeta"])
|
||||
result = _get_registry()
|
||||
assert result == ["alpha", "zeta"]
|
||||
|
||||
|
||||
# ===== set_secret ==========================================================
|
||||
|
||||
class TestSetSecret:
|
||||
def test_stores_value(self, mock_keyring):
|
||||
set_secret("mykey", "myvalue")
|
||||
assert mock_keyring.store[(SERVICE, "mykey")] == "myvalue"
|
||||
|
||||
def test_updates_registry(self, mock_keyring):
|
||||
set_secret("mykey", "myvalue")
|
||||
assert "mykey" in _get_registry()
|
||||
|
||||
def test_set_multiple_secrets(self, mock_keyring):
|
||||
set_secret("a", "1")
|
||||
set_secret("b", "2")
|
||||
reg = _get_registry()
|
||||
assert "a" in reg
|
||||
assert "b" in reg
|
||||
|
||||
def test_set_same_secret_twice_no_duplicate_registry(self, mock_keyring):
|
||||
set_secret("dup", "v1")
|
||||
set_secret("dup", "v2")
|
||||
assert _get_registry().count("dup") == 1
|
||||
assert mock_keyring.store[(SERVICE, "dup")] == "v2"
|
||||
|
||||
|
||||
# ===== get_secret ==========================================================
|
||||
|
||||
class TestGetSecret:
|
||||
def test_returns_value(self, mock_keyring):
|
||||
set_secret("k", "v")
|
||||
assert get_secret("k") == "v"
|
||||
|
||||
def test_returns_none_when_missing(self, mock_keyring):
|
||||
assert get_secret("nonexistent") is None
|
||||
|
||||
|
||||
# ===== get_json_secret =====================================================
|
||||
|
||||
class TestGetJsonSecret:
|
||||
def test_returns_deserialized_dict(self, mock_keyring):
|
||||
payload = {"host": "localhost", "port": 5432}
|
||||
set_secret("db", json.dumps(payload))
|
||||
assert get_json_secret("db") == payload
|
||||
|
||||
def test_returns_none_when_missing(self, mock_keyring):
|
||||
assert get_json_secret("nope") is None
|
||||
|
||||
def test_raises_on_invalid_json(self, mock_keyring):
|
||||
set_secret("bad", "not-json{{{")
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
get_json_secret("bad")
|
||||
|
||||
|
||||
# ===== delete_secret =======================================================
|
||||
|
||||
class TestDeleteSecret:
|
||||
def test_delete_existing_returns_true(self, mock_keyring):
|
||||
set_secret("rm_me", "val")
|
||||
assert delete_secret("rm_me") is True
|
||||
|
||||
def test_delete_existing_removes_from_keyring(self, mock_keyring):
|
||||
set_secret("rm_me", "val")
|
||||
delete_secret("rm_me")
|
||||
assert (SERVICE, "rm_me") not in mock_keyring.store
|
||||
|
||||
def test_delete_existing_removes_from_registry(self, mock_keyring):
|
||||
set_secret("rm_me", "val")
|
||||
delete_secret("rm_me")
|
||||
assert "rm_me" not in _get_registry()
|
||||
|
||||
def test_delete_nonexistent_returns_false(self, mock_keyring):
|
||||
assert delete_secret("ghost") is False
|
||||
|
||||
|
||||
# ===== list_secrets ========================================================
|
||||
|
||||
class TestListSecrets:
|
||||
def test_empty_when_nothing_stored(self, mock_keyring):
|
||||
assert list_secrets() == []
|
||||
|
||||
def test_returns_names(self, mock_keyring):
|
||||
set_secret("x", "1")
|
||||
set_secret("y", "2")
|
||||
names = list_secrets()
|
||||
assert "x" in names
|
||||
assert "y" in names
|
||||
|
||||
|
||||
# ===== check_secrets ========================================================
|
||||
|
||||
class TestCheckSecrets:
|
||||
def test_missing_required(self, mock_keyring):
|
||||
result = check_secrets()
|
||||
assert result == {"discord_token": False}
|
||||
|
||||
def test_present_required(self, mock_keyring):
|
||||
set_secret("discord_token", "tok-123")
|
||||
result = check_secrets()
|
||||
assert result == {"discord_token": True}
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# CLI tests — exercise cmd_secrets via main() with mocked sys.argv
|
||||
# ===========================================================================
|
||||
|
||||
import sys
|
||||
from cli import main as cli_main
|
||||
|
||||
|
||||
class TestCLISecretsList:
|
||||
def test_list_empty(self, mock_keyring, capsys):
|
||||
with patch("sys.argv", ["echo", "secrets", "list"]):
|
||||
cli_main()
|
||||
assert "No secrets stored" in capsys.readouterr().out
|
||||
|
||||
def test_list_populated(self, mock_keyring, capsys):
|
||||
set_secret("foo", "bar")
|
||||
with patch("sys.argv", ["echo", "secrets", "list"]):
|
||||
cli_main()
|
||||
assert "foo" in capsys.readouterr().out
|
||||
|
||||
|
||||
class TestCLISecretsSet:
|
||||
def test_set_from_file(self, mock_keyring, tmp_path, capsys):
|
||||
secret_file = tmp_path / "token.txt"
|
||||
secret_file.write_text("super-secret\n")
|
||||
with patch("sys.argv", ["echo", "secrets", "set", "my_token", "--file", str(secret_file)]):
|
||||
cli_main()
|
||||
out = capsys.readouterr().out
|
||||
assert "set from file" in out
|
||||
assert get_secret("my_token") == "super-secret"
|
||||
# file should be deleted after set
|
||||
assert not secret_file.exists()
|
||||
|
||||
def test_set_interactive(self, mock_keyring, capsys):
|
||||
with (
|
||||
patch("sys.argv", ["echo", "secrets", "set", "my_token"]),
|
||||
patch("cli.getpass.getpass", return_value="interactive-val"),
|
||||
):
|
||||
cli_main()
|
||||
out = capsys.readouterr().out
|
||||
assert "set" in out.lower()
|
||||
assert get_secret("my_token") == "interactive-val"
|
||||
|
||||
def test_set_from_missing_file_exits(self, mock_keyring, tmp_path):
|
||||
with (
|
||||
patch("sys.argv", ["echo", "secrets", "set", "x", "--file", str(tmp_path / "nope.txt")]),
|
||||
pytest.raises(SystemExit) as exc_info,
|
||||
):
|
||||
cli_main()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
|
||||
class TestCLISecretsDelete:
|
||||
def test_delete_existing(self, mock_keyring, capsys):
|
||||
set_secret("d", "val")
|
||||
with patch("sys.argv", ["echo", "secrets", "delete", "d"]):
|
||||
cli_main()
|
||||
assert "deleted" in capsys.readouterr().out
|
||||
|
||||
def test_delete_nonexistent(self, mock_keyring, capsys):
|
||||
with patch("sys.argv", ["echo", "secrets", "delete", "nope"]):
|
||||
cli_main()
|
||||
assert "not found" in capsys.readouterr().out
|
||||
|
||||
|
||||
class TestCLISecretsTest:
|
||||
def test_missing_required_exits_1(self, mock_keyring, capsys):
|
||||
with (
|
||||
patch("sys.argv", ["echo", "secrets", "test"]),
|
||||
pytest.raises(SystemExit) as exc_info,
|
||||
):
|
||||
cli_main()
|
||||
assert exc_info.value.code == 1
|
||||
assert "MISSING" in capsys.readouterr().out
|
||||
|
||||
def test_all_present(self, mock_keyring, capsys):
|
||||
set_secret("discord_token", "tok")
|
||||
with patch("sys.argv", ["echo", "secrets", "test"]):
|
||||
cli_main()
|
||||
out = capsys.readouterr().out
|
||||
assert "OK" in out
|
||||
assert "All required secrets present" in out
|
||||
Reference in New Issue
Block a user