323 lines
10 KiB
Python
323 lines
10 KiB
Python
# secret_manager.py
|
|
#
|
|
# Copyright 2025 Pavel Baksy
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
"""
|
|
Manager for storing and retrieving sensitive variable values using GNOME Keyring.
|
|
|
|
This module provides a clean interface to libsecret for storing environment
|
|
variable values that contain sensitive data (API keys, passwords, tokens).
|
|
|
|
Each secret is identified by:
|
|
- project_id: UUID of the project
|
|
- environment_id: UUID of the environment
|
|
- variable_name: Name of the variable
|
|
"""
|
|
|
|
import gi
|
|
gi.require_version('Secret', '1')
|
|
from gi.repository import Secret, GLib
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Define schema for Roster environment variables
|
|
# This separates them from other secrets in GNOME Keyring
|
|
ROSTER_SCHEMA = Secret.Schema.new(
|
|
"cz.bugsy.roster.EnvironmentVariable",
|
|
Secret.SchemaFlags.NONE,
|
|
{
|
|
"project_id": Secret.SchemaAttributeType.STRING,
|
|
"environment_id": Secret.SchemaAttributeType.STRING,
|
|
"variable_name": Secret.SchemaAttributeType.STRING,
|
|
}
|
|
)
|
|
|
|
|
|
class SecretManager:
|
|
"""Manages sensitive variable storage using GNOME Keyring via libsecret."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the secret manager."""
|
|
self.schema = ROSTER_SCHEMA
|
|
|
|
def store_secret(self, project_id: str, environment_id: str,
|
|
variable_name: str, value: str, project_name: str = "",
|
|
environment_name: str = "") -> bool:
|
|
"""
|
|
Store a sensitive variable value in GNOME Keyring.
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
environment_id: UUID of the environment
|
|
variable_name: Name of the variable
|
|
value: The secret value to store
|
|
project_name: Optional human-readable project name (for display in Seahorse)
|
|
environment_name: Optional human-readable environment name (for display)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
# Create a descriptive label for the keyring UI
|
|
label = f"Roster: {project_name}/{environment_name}/{variable_name}" if project_name else \
|
|
f"Roster Variable: {variable_name}"
|
|
|
|
# Build attributes dictionary for the secret
|
|
attributes = {
|
|
"project_id": project_id,
|
|
"environment_id": environment_id,
|
|
"variable_name": variable_name,
|
|
}
|
|
|
|
try:
|
|
# Store the secret synchronously
|
|
# Using PASSWORD collection (default keyring)
|
|
Secret.password_store_sync(
|
|
self.schema,
|
|
attributes,
|
|
Secret.COLLECTION_DEFAULT,
|
|
label,
|
|
value,
|
|
None # cancellable
|
|
)
|
|
logger.info(f"Stored secret for {variable_name} in {environment_id}")
|
|
return True
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to store secret: {e.message}")
|
|
return False
|
|
|
|
def retrieve_secret(self, project_id: str, environment_id: str,
|
|
variable_name: str) -> str | None:
|
|
"""
|
|
Retrieve a sensitive variable value from GNOME Keyring.
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
environment_id: UUID of the environment
|
|
variable_name: Name of the variable
|
|
|
|
Returns:
|
|
The secret value if found, None otherwise
|
|
"""
|
|
attributes = {
|
|
"project_id": project_id,
|
|
"environment_id": environment_id,
|
|
"variable_name": variable_name,
|
|
}
|
|
|
|
try:
|
|
# Retrieve the secret synchronously
|
|
value = Secret.password_lookup_sync(
|
|
self.schema,
|
|
attributes,
|
|
None # cancellable
|
|
)
|
|
|
|
if value is None:
|
|
logger.debug(f"No secret found for {variable_name}")
|
|
else:
|
|
logger.debug(f"Retrieved secret for {variable_name}")
|
|
|
|
return value
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to retrieve secret: {e.message}")
|
|
return None
|
|
|
|
def delete_secret(self, project_id: str, environment_id: str,
|
|
variable_name: str) -> bool:
|
|
"""
|
|
Delete a sensitive variable value from GNOME Keyring.
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
environment_id: UUID of the environment
|
|
variable_name: Name of the variable
|
|
|
|
Returns:
|
|
True if deleted (or didn't exist), False on error
|
|
"""
|
|
attributes = {
|
|
"project_id": project_id,
|
|
"environment_id": environment_id,
|
|
"variable_name": variable_name,
|
|
}
|
|
|
|
try:
|
|
# Delete the secret synchronously
|
|
deleted = Secret.password_clear_sync(
|
|
self.schema,
|
|
attributes,
|
|
None # cancellable
|
|
)
|
|
|
|
if deleted:
|
|
logger.info(f"Deleted secret for {variable_name}")
|
|
else:
|
|
logger.debug(f"No secret to delete for {variable_name}")
|
|
|
|
return True
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to delete secret: {e.message}")
|
|
return False
|
|
|
|
def delete_all_project_secrets(self, project_id: str) -> bool:
|
|
"""
|
|
Delete all secrets for a project (when project is deleted).
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
attributes = {
|
|
"project_id": project_id,
|
|
}
|
|
|
|
try:
|
|
# Delete all matching secrets
|
|
deleted = Secret.password_clear_sync(
|
|
self.schema,
|
|
attributes,
|
|
None
|
|
)
|
|
|
|
logger.info(f"Deleted all secrets for project {project_id}")
|
|
return True
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to delete project secrets: {e.message}")
|
|
return False
|
|
|
|
def delete_all_environment_secrets(self, project_id: str, environment_id: str) -> bool:
|
|
"""
|
|
Delete all secrets for an environment (when environment is deleted).
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
environment_id: UUID of the environment
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
attributes = {
|
|
"project_id": project_id,
|
|
"environment_id": environment_id,
|
|
}
|
|
|
|
try:
|
|
deleted = Secret.password_clear_sync(
|
|
self.schema,
|
|
attributes,
|
|
None
|
|
)
|
|
|
|
logger.info(f"Deleted all secrets for environment {environment_id}")
|
|
return True
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to delete environment secrets: {e.message}")
|
|
return False
|
|
|
|
def rename_variable_secrets(self, project_id: str, old_name: str, new_name: str) -> bool:
|
|
"""
|
|
Rename a variable across all environments (updates secret keys).
|
|
|
|
This is called when a variable is renamed. We need to:
|
|
1. Find all secrets with old_name
|
|
2. Store them with new_name
|
|
3. Delete old secrets
|
|
|
|
Args:
|
|
project_id: UUID of the project
|
|
old_name: Current variable name
|
|
new_name: New variable name
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
# Search for all secrets with the old variable name
|
|
attributes = {
|
|
"project_id": project_id,
|
|
"variable_name": old_name,
|
|
}
|
|
|
|
try:
|
|
# Search for matching secrets
|
|
# This returns a list of Secret.Item objects
|
|
secrets = Secret.password_search_sync(
|
|
self.schema,
|
|
attributes,
|
|
Secret.SearchFlags.ALL,
|
|
None
|
|
)
|
|
|
|
if not secrets:
|
|
logger.debug(f"No secrets found for variable {old_name}")
|
|
return True
|
|
|
|
# For each secret, retrieve value, store with new name, delete old
|
|
for item in secrets:
|
|
# Get the secret value
|
|
item.load_secret_sync(None)
|
|
value = item.get_secret().get_text()
|
|
|
|
# Get environment_id from attributes
|
|
attrs = item.get_attributes()
|
|
environment_id = attrs.get("environment_id")
|
|
|
|
if not environment_id:
|
|
logger.warning(f"Secret missing environment_id, skipping")
|
|
continue
|
|
|
|
# Store with new name
|
|
self.store_secret(
|
|
project_id=project_id,
|
|
environment_id=environment_id,
|
|
variable_name=new_name,
|
|
value=value
|
|
)
|
|
|
|
# Delete old secret
|
|
self.delete_secret(
|
|
project_id=project_id,
|
|
environment_id=environment_id,
|
|
variable_name=old_name
|
|
)
|
|
|
|
logger.info(f"Renamed variable secrets from {old_name} to {new_name}")
|
|
return True
|
|
|
|
except GLib.Error as e:
|
|
logger.error(f"Failed to rename variable secrets: {e.message}")
|
|
return False
|
|
|
|
|
|
# Singleton instance
|
|
_secret_manager = None
|
|
|
|
def get_secret_manager() -> SecretManager:
|
|
"""Get the global SecretManager instance."""
|
|
global _secret_manager
|
|
if _secret_manager is None:
|
|
_secret_manager = SecretManager()
|
|
return _secret_manager
|