roster/src/project_manager.py

823 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# project_manager.py
#
# Copyright 2025 Pavel Baksy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import shutil
import uuid
import logging
from pathlib import Path
from typing import List, Optional, Callable
from datetime import datetime, timezone
import gi
gi.require_version('GLib', '2.0')
from gi.repository import GLib
from .models import Project, SavedRequest, HttpRequest, Environment
from .secret_manager import get_secret_manager
from .migration import _sanitize_name, needs_migration, run_migration
logger = logging.getLogger(__name__)
def _unique_filename(base: str, exclude: set) -> str:
"""Return a unique .json filename whose basename is not in `exclude`."""
if base not in exclude:
return base + '.json'
i = 2
while f"{base}-{i}" in exclude:
i += 1
return f"{base}-{i}.json"
class ProjectManager:
"""Manages project and saved request persistence."""
def __init__(self):
self.data_dir = Path(GLib.get_user_data_dir()) / 'cz.bugsy.roster'
self.projects_dir = self.data_dir / 'projects'
self.legacy_file = self.data_dir / 'requests.json'
self._ensure_data_dir()
self._projects_cache: Optional[List[Project]] = None
# project_id → directory name under projects_dir
self._project_dirs: dict = {}
# request_id → filename (e.g. "get-users.json")
self._request_filenames: dict = {}
self._legacy_mode = False
if needs_migration(self.projects_dir, self.legacy_file):
if not run_migration(self.legacy_file, self.projects_dir):
logger.error("Migration failed falling back to legacy mode")
self._legacy_mode = True
def _ensure_data_dir(self):
self.data_dir.mkdir(parents=True, exist_ok=True)
# ===== Loading =====
def load_projects(self, force_reload: bool = False) -> List[Project]:
if self._projects_cache is not None and not force_reload:
return self._projects_cache
if self._legacy_mode or not self.projects_dir.exists():
return self._load_legacy()
return self._load_from_dirs()
def _load_from_dirs(self) -> List[Project]:
projects = []
self._project_dirs.clear()
self._request_filenames.clear()
try:
entries = sorted(self.projects_dir.iterdir())
except Exception as e:
logger.error("Cannot iterate projects dir: %s", e)
self._projects_cache = []
return []
for proj_dir in entries:
if not proj_dir.is_dir():
continue
meta_file = proj_dir / '_project.json'
if not meta_file.exists():
continue
try:
with open(meta_file, 'r') as f:
meta = json.load(f)
request_order = meta.get('request_order', [])
requests = []
loaded_files: set = set()
for filename in request_order:
req_file = proj_dir / filename
if not req_file.exists():
continue
try:
with open(req_file, 'r') as f:
req_data = json.load(f)
req = SavedRequest.from_dict(req_data)
requests.append(req)
self._request_filenames[req.id] = filename
loaded_files.add(filename)
except Exception as e:
logger.error("Error loading request %s: %s", req_file, e)
# Also pick up files not listed in request_order
for req_file in sorted(proj_dir.iterdir()):
if req_file.name.startswith('_') or req_file.suffix != '.json':
continue
if req_file.name in loaded_files:
continue
try:
with open(req_file, 'r') as f:
req_data = json.load(f)
req = SavedRequest.from_dict(req_data)
requests.append(req)
self._request_filenames[req.id] = req_file.name
except Exception as e:
logger.error("Error loading extra request %s: %s", req_file, e)
project = Project(
id=meta['id'],
name=meta['name'],
requests=requests,
created_at=meta['created_at'],
icon=meta.get('icon', 'folder-symbolic'),
variable_names=meta.get('variable_names', []),
environments=[Environment.from_dict(e) for e in meta.get('environments', [])],
sensitive_variables=meta.get('sensitive_variables', []),
)
projects.append(project)
self._project_dirs[project.id] = proj_dir.name
except Exception as e:
logger.error("Error loading project %s: %s", proj_dir, e)
self._projects_cache = projects
return projects
def _load_legacy(self) -> List[Project]:
if not self.legacy_file.exists():
self._projects_cache = []
return []
try:
with open(self.legacy_file, 'r') as f:
data = json.load(f)
self._projects_cache = [Project.from_dict(p) for p in data.get('projects', [])]
return self._projects_cache
except Exception as e:
logger.error("Error loading legacy projects: %s", e)
self._projects_cache = []
return []
# ===== Saving =====
def _save_project(self, project: Project):
if self._legacy_mode:
self._save_all_legacy()
return
try:
old_dir_name = self._project_dirs.get(project.id)
new_dir_name = _sanitize_name(project.name)
# Handle project rename
if old_dir_name and old_dir_name != new_dir_name:
old_dir = self.projects_dir / old_dir_name
if old_dir.exists():
# Ensure new name is unique
candidate = new_dir_name
i = 2
while (self.projects_dir / candidate).exists():
candidate = f"{new_dir_name}-{i}"
i += 1
new_dir_name = candidate
old_dir.rename(self.projects_dir / new_dir_name)
project_dir = self.projects_dir / new_dir_name
project_dir.mkdir(parents=True, exist_ok=True)
self._project_dirs[project.id] = new_dir_name
# Assign filenames preserving stable names where possible
request_order = []
used_basenames: set = set()
for req in project.requests:
old_filename = self._request_filenames.get(req.id)
desired_base = _sanitize_name(req.name)
if old_filename is not None:
old_base = old_filename[:-5] if old_filename.endswith('.json') else old_filename
if old_base == desired_base and old_base not in used_basenames:
filename = old_filename
else:
filename = _unique_filename(desired_base, used_basenames)
else:
filename = _unique_filename(desired_base, used_basenames)
used_basenames.add(filename[:-5] if filename.endswith('.json') else filename)
request_order.append(filename)
self._request_filenames[req.id] = filename
# Write request files
for req, filename in zip(project.requests, request_order):
with open(project_dir / filename, 'w') as f:
json.dump(req.to_dict(), f, indent=2)
# Remove orphaned request files
for existing in list(project_dir.iterdir()):
if existing.name.startswith('_') or existing.suffix != '.json':
continue
if existing.name not in request_order:
existing.unlink()
# Write project metadata
meta = {
'id': project.id,
'name': project.name,
'created_at': project.created_at,
'icon': project.icon,
'variable_names': project.variable_names,
'sensitive_variables': project.sensitive_variables,
'environments': [e.to_dict() for e in project.environments],
'request_order': request_order,
}
with open(project_dir / '_project.json', 'w') as f:
json.dump(meta, f, indent=2)
self._trigger_auto_backup()
except Exception as e:
logger.error("Error saving project %s: %s", project.name, e)
def _save_all_legacy(self):
"""Fallback: write all cached projects to the monolithic JSON file."""
try:
projects = self._projects_cache or []
data = {'version': 1, 'projects': [p.to_dict() for p in projects]}
with open(self.legacy_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error("Error saving projects (legacy): %s", e)
def _trigger_auto_backup(self):
try:
import gi
gi.require_version('Gio', '2.0')
from gi.repository import Gio
settings = Gio.Settings.new('cz.bugsy.roster')
if settings.get_boolean('backup-auto-export'):
folder = settings.get_string('backup-folder')
if folder:
self.export_to_folder(folder)
except Exception as e:
logger.error("Auto-backup failed: %s", e)
# ===== Backup / export =====
def export_to_folder(self, destination: str):
dest = Path(destination) / 'roster-backup'
shutil.copytree(self.projects_dir, dest, dirs_exist_ok=True)
# ===== Public API =====
def add_project(self, name: str) -> Project:
projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat()
default_env = Environment(
id=str(uuid.uuid4()),
name="Default",
variables={},
created_at=now
)
project = Project(
id=str(uuid.uuid4()),
name=name,
requests=[],
created_at=now,
variable_names=[],
environments=[default_env]
)
projects.append(project)
self._save_project(project)
return project
def update_project(self, project_id: str, new_name: str = None, new_icon: str = None):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
if new_name is not None:
p.name = new_name
if new_icon is not None:
p.icon = new_icon
self._save_project(p)
break
def delete_project(self, project_id: str,
callback: Optional[Callable[[], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
for p in projects:
if p.id == project_id:
if not self._legacy_mode:
dir_name = self._project_dirs.get(project_id)
if dir_name:
project_dir = self.projects_dir / dir_name
if project_dir.exists():
shutil.rmtree(project_dir)
self._project_dirs.pop(project_id, None)
break
self._projects_cache = [p for p in projects if p.id != project_id]
if self._legacy_mode:
self._save_all_legacy()
def on_secrets_deleted(success):
if callback:
callback()
secret_manager.delete_all_project_secrets(project_id, on_secrets_deleted)
def add_request(self, project_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest:
projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat()
saved_request = SavedRequest(
id=str(uuid.uuid4()),
name=name,
request=request,
created_at=now,
modified_at=now,
scripts=scripts
)
for p in projects:
if p.id == project_id:
p.requests.append(saved_request)
self._save_project(p)
break
return saved_request
def find_request_by_name(self, project_id: str, name: str) -> Optional[SavedRequest]:
projects = self.load_projects()
for p in projects:
if p.id == project_id:
for req in p.requests:
if req.name == name:
return req
break
return None
def update_request(self, project_id: str, request_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest:
projects = self.load_projects()
updated_request = None
for p in projects:
if p.id == project_id:
for req in p.requests:
if req.id == request_id:
req.name = name
req.request = request
req.scripts = scripts
req.modified_at = datetime.now(timezone.utc).isoformat()
updated_request = req
break
if updated_request:
self._save_project(p)
break
return updated_request
def delete_request(self, project_id: str, request_id: str):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
p.requests = [r for r in p.requests if r.id != request_id]
self._request_filenames.pop(request_id, None)
self._save_project(p)
break
def add_environment(self, project_id: str, name: str) -> Environment:
projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat()
environment = None
for p in projects:
if p.id == project_id:
variables = {var_name: "" for var_name in p.variable_names}
environment = Environment(
id=str(uuid.uuid4()),
name=name,
variables=variables,
created_at=now
)
p.environments.append(environment)
self._save_project(p)
break
return environment
def copy_environment(self, project_id: str, source_env_id: str, new_name: str,
callback: Optional[Callable] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
for p in projects:
if p.id == project_id:
source_env = next((e for e in p.environments if e.id == source_env_id), None)
if source_env is None:
if callback:
callback(None)
return
now = datetime.now(timezone.utc).isoformat()
new_env_id = str(uuid.uuid4())
new_env = Environment(
id=new_env_id,
name=new_name,
variables=dict(source_env.variables),
created_at=now
)
p.environments.append(new_env)
self._save_project(p)
sensitive_vars = [v for v in p.sensitive_variables if v in source_env.variables]
if not sensitive_vars:
if callback:
callback(new_env)
return
pending_count = [len(sensitive_vars)]
def make_copy_handler(var_name, project_name, env_name):
def on_store_done(success=True):
pending_count[0] -= 1
if pending_count[0] == 0 and callback:
callback(new_env)
def on_retrieve(value):
if value:
secret_manager.store_secret(
project_id=project_id,
environment_id=new_env_id,
variable_name=var_name,
value=value,
project_name=project_name,
environment_name=env_name,
callback=lambda success: on_store_done(success)
)
else:
on_store_done()
return on_retrieve
for var_name in sensitive_vars:
secret_manager.retrieve_secret(
project_id=project_id,
environment_id=source_env_id,
variable_name=var_name,
callback=make_copy_handler(var_name, p.name, new_name)
)
return
if callback:
callback(None)
def update_environment(self, project_id: str, env_id: str, name: str = None, variables: dict = None):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
for env in p.environments:
if env.id == env_id:
if name is not None:
env.name = name
if variables is not None:
env.variables = variables
break
self._save_project(p)
break
def delete_environment(self, project_id: str, env_id: str,
callback: Optional[Callable[[], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
for p in projects:
if p.id == project_id:
p.environments = [e for e in p.environments if e.id != env_id]
self._save_project(p)
break
def on_secrets_deleted(success):
if callback:
callback()
secret_manager.delete_all_environment_secrets(project_id, env_id, on_secrets_deleted)
def add_variable(self, project_id: str, variable_name: str):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
if variable_name not in p.variable_names:
p.variable_names.append(variable_name)
for env in p.environments:
env.variables[variable_name] = ""
self._save_project(p)
break
def rename_variable(self, project_id: str, old_name: str, new_name: str,
callback: Optional[Callable[[], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
is_sensitive = False
for p in projects:
if p.id == project_id:
if old_name in p.variable_names:
idx = p.variable_names.index(old_name)
p.variable_names[idx] = new_name
if old_name in p.sensitive_variables:
is_sensitive = True
idx_sensitive = p.sensitive_variables.index(old_name)
p.sensitive_variables[idx_sensitive] = new_name
for env in p.environments:
if old_name in env.variables:
env.variables[new_name] = env.variables.pop(old_name)
self._save_project(p)
break
if is_sensitive:
secret_manager.rename_variable_secrets(project_id, old_name, new_name, lambda success: callback() if callback else None)
elif callback:
callback()
def delete_variable(self, project_id: str, variable_name: str,
callback: Optional[Callable[[], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
is_sensitive = False
environments_to_cleanup = []
for p in projects:
if p.id == project_id:
if variable_name in p.variable_names:
p.variable_names.remove(variable_name)
if variable_name in p.sensitive_variables:
is_sensitive = True
p.sensitive_variables.remove(variable_name)
environments_to_cleanup = [env.id for env in p.environments]
for env in p.environments:
env.variables.pop(variable_name, None)
self._save_project(p)
break
if is_sensitive and environments_to_cleanup:
pending_count = [len(environments_to_cleanup)]
def on_single_delete(success):
pending_count[0] -= 1
if pending_count[0] == 0 and callback:
callback()
for env_id in environments_to_cleanup:
secret_manager.delete_secret(project_id, env_id, variable_name, on_single_delete)
elif callback:
callback()
def update_environment_variable(self, project_id: str, env_id: str, variable_name: str, value: str):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
for env in p.environments:
if env.id == env_id:
env.variables[variable_name] = value
break
self._save_project(p)
break
def batch_update_environment_variables(self, project_id: str, env_id: str, variables: dict):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
for env in p.environments:
if env.id == env_id:
for var_name, var_value in variables.items():
env.variables[var_name] = var_value
break
self._save_project(p)
break
# ===== Sensitive Variable Management =====
def mark_variable_as_sensitive(self, project_id: str, variable_name: str,
callback: Optional[Callable[[bool], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
secrets_to_store = []
for p in projects:
if p.id == project_id:
if variable_name not in p.sensitive_variables:
p.sensitive_variables.append(variable_name)
for env in p.environments:
if variable_name in env.variables:
value = env.variables[variable_name]
secrets_to_store.append({
'environment_id': env.id,
'environment_name': env.name,
'value': value,
'project_name': p.name
})
env.variables[variable_name] = ""
self._save_project(p)
if not secrets_to_store:
if callback:
callback(True)
return
pending_count = [len(secrets_to_store)]
all_success = [True]
def on_single_store(success):
if not success:
all_success[0] = False
pending_count[0] -= 1
if pending_count[0] == 0 and callback:
callback(all_success[0])
for secret_info in secrets_to_store:
secret_manager.store_secret(
project_id=project_id,
environment_id=secret_info['environment_id'],
variable_name=variable_name,
value=secret_info['value'],
project_name=secret_info['project_name'],
environment_name=secret_info['environment_name'],
callback=on_single_store
)
return
if callback:
callback(False)
def mark_variable_as_nonsensitive(self, project_id: str, variable_name: str,
callback: Optional[Callable[[bool], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
target_project = None
environments_to_migrate = []
for p in projects:
if p.id == project_id:
target_project = p
if variable_name in p.sensitive_variables:
p.sensitive_variables.remove(variable_name)
environments_to_migrate = list(p.environments)
break
if not target_project or not environments_to_migrate:
if callback:
callback(False)
return
pending_count = [len(environments_to_migrate)]
all_success = [True]
def on_single_migrate(env):
def on_retrieve(value):
env.variables[variable_name] = value or ""
self._save_project(target_project)
def on_delete(success):
if not success:
all_success[0] = False
pending_count[0] -= 1
if pending_count[0] == 0 and callback:
callback(all_success[0])
secret_manager.delete_secret(
project_id=project_id,
environment_id=env.id,
variable_name=variable_name,
callback=on_delete
)
return on_retrieve
for env in environments_to_migrate:
secret_manager.retrieve_secret(
project_id=project_id,
environment_id=env.id,
variable_name=variable_name,
callback=on_single_migrate(env)
)
def is_variable_sensitive(self, project_id: str, variable_name: str) -> bool:
projects = self.load_projects()
for p in projects:
if p.id == project_id:
return variable_name in p.sensitive_variables
return False
def get_variable_value(self, project_id: str, env_id: str, variable_name: str,
callback: Callable[[str], None]):
projects = self.load_projects()
for p in projects:
if p.id == project_id:
if variable_name in p.sensitive_variables:
secret_manager = get_secret_manager()
def on_retrieve(value):
callback(value or "")
secret_manager.retrieve_secret(
project_id=project_id,
environment_id=env_id,
variable_name=variable_name,
callback=on_retrieve
)
return
else:
for env in p.environments:
if env.id == env_id:
callback(env.variables.get(variable_name, ""))
return
callback("")
def set_variable_value(self, project_id: str, env_id: str, variable_name: str, value: str,
callback: Optional[Callable[[bool], None]] = None):
projects = self.load_projects()
secret_manager = get_secret_manager()
for p in projects:
if p.id == project_id:
if variable_name in p.sensitive_variables:
for env in p.environments:
if env.id == env_id:
secret_manager.store_secret(
project_id=project_id,
environment_id=env.id,
variable_name=variable_name,
value=value,
project_name=p.name,
environment_name=env.name,
callback=callback
)
env.variables[variable_name] = ""
self._save_project(p)
return
else:
for env in p.environments:
if env.id == env_id:
env.variables[variable_name] = value
break
self._save_project(p)
if callback:
callback(True)
return
if callback:
callback(False)
def get_environment_with_secrets(self, project_id: str, env_id: str,
callback: Callable[[Optional[Environment]], None]):
projects = self.load_projects()
secret_manager = get_secret_manager()
for p in projects:
if p.id == project_id:
for env in p.environments:
if env.id == env_id:
complete_env = Environment(
id=env.id,
name=env.name,
variables=env.variables.copy(),
created_at=env.created_at
)
if not p.sensitive_variables:
callback(complete_env)
return
def on_secrets_retrieved(secrets_dict):
for var_name, value in secrets_dict.items():
if value is not None:
complete_env.variables[var_name] = value
callback(complete_env)
secret_manager.retrieve_multiple_secrets(
project_id=project_id,
environment_id=env_id,
variable_names=list(p.sensitive_variables),
callback=on_secrets_retrieved
)
return
callback(None)