# secret_manager.py # # Copyright 2025 Pavel Baksy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later """ Manager for storing and retrieving sensitive variable values using GNOME Keyring. This module provides a clean interface to libsecret for storing environment variable values that contain sensitive data (API keys, passwords, tokens). Uses async APIs for compatibility with the XDG Secrets Portal in sandboxed environments (Flatpak). Each secret is identified by: - project_id: UUID of the project - environment_id: UUID of the environment - variable_name: Name of the variable """ import gi gi.require_version('Secret', '1') from gi.repository import Secret, GLib, Gio import logging from typing import Callable, Optional, Any logger = logging.getLogger(__name__) # Define schema for Roster environment variables # This separates them from other secrets in GNOME Keyring ROSTER_SCHEMA = Secret.Schema.new( "cz.bugsy.roster.EnvironmentVariable", Secret.SchemaFlags.NONE, { "project_id": Secret.SchemaAttributeType.STRING, "environment_id": Secret.SchemaAttributeType.STRING, "variable_name": Secret.SchemaAttributeType.STRING, } ) class SecretManager: """Manages sensitive variable storage using GNOME Keyring via libsecret. All methods use async APIs for compatibility with the XDG Secrets Portal. """ def __init__(self): """Initialize the secret manager.""" self.schema = ROSTER_SCHEMA def store_secret(self, project_id: str, environment_id: str, variable_name: str, value: str, project_name: str = "", environment_name: str = "", callback: Optional[Callable[[bool], None]] = None): """ Store a sensitive variable value in GNOME Keyring (async). Args: project_id: UUID of the project environment_id: UUID of the environment variable_name: Name of the variable value: The secret value to store project_name: Optional human-readable project name (for display in Seahorse) environment_name: Optional human-readable environment name (for display) callback: Optional callback function called with success boolean """ # Create a descriptive label for the keyring UI label = f"Roster: {project_name}/{environment_name}/{variable_name}" if project_name else \ f"Roster Variable: {variable_name}" # Build attributes dictionary for the secret attributes = { "project_id": project_id, "environment_id": environment_id, "variable_name": variable_name, } def on_store_complete(source, result, user_data): try: Secret.password_store_finish(result) logger.info(f"Stored secret for {variable_name} in {environment_id}") if callback: callback(True) except GLib.Error as e: logger.error(f"Failed to store secret: {e.message}") if callback: callback(False) # Store the secret asynchronously Secret.password_store( self.schema, attributes, Secret.COLLECTION_DEFAULT, label, value, None, # cancellable on_store_complete, None # user_data ) def retrieve_secret(self, project_id: str, environment_id: str, variable_name: str, callback: Callable[[Optional[str]], None]): """ Retrieve a sensitive variable value from GNOME Keyring (async). Args: project_id: UUID of the project environment_id: UUID of the environment variable_name: Name of the variable callback: Callback function called with the secret value (or None if not found) """ attributes = { "project_id": project_id, "environment_id": environment_id, "variable_name": variable_name, } def on_lookup_complete(source, result, user_data): try: value = Secret.password_lookup_finish(result) if value is None: logger.debug(f"No secret found for {variable_name}") else: logger.debug(f"Retrieved secret for {variable_name}") callback(value) except GLib.Error as e: logger.error(f"Failed to retrieve secret: {e.message}") callback(None) # Retrieve the secret asynchronously Secret.password_lookup( self.schema, attributes, None, # cancellable on_lookup_complete, None # user_data ) def delete_secret(self, project_id: str, environment_id: str, variable_name: str, callback: Optional[Callable[[bool], None]] = None): """ Delete a sensitive variable value from GNOME Keyring (async). Args: project_id: UUID of the project environment_id: UUID of the environment variable_name: Name of the variable callback: Optional callback function called with success boolean """ attributes = { "project_id": project_id, "environment_id": environment_id, "variable_name": variable_name, } def on_clear_complete(source, result, user_data): try: deleted = Secret.password_clear_finish(result) if deleted: logger.info(f"Deleted secret for {variable_name}") else: logger.debug(f"No secret to delete for {variable_name}") if callback: callback(True) except GLib.Error as e: logger.error(f"Failed to delete secret: {e.message}") if callback: callback(False) # Delete the secret asynchronously Secret.password_clear( self.schema, attributes, None, # cancellable on_clear_complete, None # user_data ) def delete_all_project_secrets(self, project_id: str, callback: Optional[Callable[[bool], None]] = None): """ Delete all secrets for a project (when project is deleted). Args: project_id: UUID of the project callback: Optional callback function called with success boolean """ attributes = { "project_id": project_id, } def on_clear_complete(source, result, user_data): try: Secret.password_clear_finish(result) logger.info(f"Deleted all secrets for project {project_id}") if callback: callback(True) except GLib.Error as e: logger.error(f"Failed to delete project secrets: {e.message}") if callback: callback(False) # Delete all matching secrets asynchronously Secret.password_clear( self.schema, attributes, None, on_clear_complete, None ) def delete_all_environment_secrets(self, project_id: str, environment_id: str, callback: Optional[Callable[[bool], None]] = None): """ Delete all secrets for an environment (when environment is deleted). Args: project_id: UUID of the project environment_id: UUID of the environment callback: Optional callback function called with success boolean """ attributes = { "project_id": project_id, "environment_id": environment_id, } def on_clear_complete(source, result, user_data): try: Secret.password_clear_finish(result) logger.info(f"Deleted all secrets for environment {environment_id}") if callback: callback(True) except GLib.Error as e: logger.error(f"Failed to delete environment secrets: {e.message}") if callback: callback(False) Secret.password_clear( self.schema, attributes, None, on_clear_complete, None ) def rename_variable_secrets(self, project_id: str, old_name: str, new_name: str, callback: Optional[Callable[[bool], None]] = None): """ Rename a variable across all environments (updates secret keys). This is called when a variable is renamed. We need to: 1. Find all secrets with old_name 2. Store them with new_name 3. Delete old secrets Args: project_id: UUID of the project old_name: Current variable name new_name: New variable name callback: Optional callback function called with success boolean """ attributes = { "project_id": project_id, "variable_name": old_name, } def on_search_complete(source, result, user_data): try: secrets = Secret.password_search_finish(result) if not secrets: logger.debug(f"No secrets found for variable {old_name}") if callback: callback(True) return # Track how many secrets we need to process pending_count = [len(secrets)] all_success = [True] def on_rename_done(success): pending_count[0] -= 1 if not success: all_success[0] = False if pending_count[0] == 0: logger.info(f"Renamed variable secrets from {old_name} to {new_name}") if callback: callback(all_success[0]) # For each secret, retrieve value, store with new name, delete old for item in secrets: self._rename_single_secret(item, project_id, old_name, new_name, on_rename_done) except GLib.Error as e: logger.error(f"Failed to search for variable secrets: {e.message}") if callback: callback(False) # Search for matching secrets asynchronously Secret.password_search( self.schema, attributes, Secret.SearchFlags.ALL, None, on_search_complete, None ) def _rename_single_secret(self, item, project_id: str, old_name: str, new_name: str, callback: Callable[[bool], None]): """Helper to rename a single secret item.""" def on_load_complete(item, result, user_data): try: item.load_secret_finish(result) secret = item.get_secret() if secret is None: callback(False) return value = secret.get_text() attrs = item.get_attributes() environment_id = attrs.get("environment_id") if not environment_id: logger.warning("Secret missing environment_id, skipping") callback(False) return # Store with new name, then delete old def on_store_done(success): if success: self.delete_secret(project_id, environment_id, old_name, callback) else: callback(False) self.store_secret( project_id=project_id, environment_id=environment_id, variable_name=new_name, value=value, callback=on_store_done ) except GLib.Error as e: logger.error(f"Failed to load secret for rename: {e.message}") callback(False) item.load_secret(None, on_load_complete, None) # ========== Batch Operations ========== def retrieve_multiple_secrets(self, project_id: str, environment_id: str, variable_names: list, callback: Callable[[dict], None]): """ Retrieve multiple secrets at once (async). Args: project_id: UUID of the project environment_id: UUID of the environment variable_names: List of variable names to retrieve callback: Callback function called with dict of {variable_name: value} """ if not variable_names: callback({}) return results = {} pending_count = [len(variable_names)] def on_single_retrieve(var_name): def handler(value): results[var_name] = value pending_count[0] -= 1 if pending_count[0] == 0: callback(results) return handler for var_name in variable_names: self.retrieve_secret( project_id=project_id, environment_id=environment_id, variable_name=var_name, callback=on_single_retrieve(var_name) ) def store_multiple_secrets(self, project_id: str, environment_id: str, variables: dict, project_name: str = "", environment_name: str = "", callback: Optional[Callable[[bool], None]] = None): """ Store multiple secrets at once (async). Args: project_id: UUID of the project environment_id: UUID of the environment variables: Dict of {variable_name: value} to store project_name: Optional project name for display environment_name: Optional environment name for display callback: Optional callback called with overall success boolean """ if not variables: if callback: callback(True) return pending_count = [len(variables)] all_success = [True] def on_single_store(success): if not success: all_success[0] = False pending_count[0] -= 1 if pending_count[0] == 0 and callback: callback(all_success[0]) for var_name, value in variables.items(): self.store_secret( project_id=project_id, environment_id=environment_id, variable_name=var_name, value=value, project_name=project_name, environment_name=environment_name, callback=on_single_store ) # Singleton instance _secret_manager = None def get_secret_manager() -> SecretManager: """Get the global SecretManager instance.""" global _secret_manager if _secret_manager is None: _secret_manager = SecretManager() return _secret_manager