Migrate storage to per-file format, add backup UI

This commit is contained in:
Pavel Baksy 2026-05-27 15:10:01 +02:00
parent 101b6217bc
commit 1e4fa5f6ee
7 changed files with 527 additions and 188 deletions

View File

@ -11,5 +11,15 @@
<summary>Request timeout</summary> <summary>Request timeout</summary>
<description>Timeout for HTTP requests in seconds</description> <description>Timeout for HTTP requests in seconds</description>
</key> </key>
<key name="backup-folder" type="s">
<default>''</default>
<summary>Backup folder path</summary>
<description>Path to the folder where project data is exported as a backup</description>
</key>
<key name="backup-auto-export" type="b">
<default>false</default>
<summary>Auto-export on save</summary>
<description>Automatically export project data to the backup folder after each save</description>
</key>
</schema> </schema>
</schemalist> </schemalist>

View File

@ -82,7 +82,7 @@ class RosterApplication(Adw.Application):
def on_preferences_action(self, widget, _): def on_preferences_action(self, widget, _):
"""Callback for the app.preferences action.""" """Callback for the app.preferences action."""
window = self.props.active_window window = self.props.active_window
preferences = PreferencesDialog(history_manager=window.history_manager) preferences = PreferencesDialog(history_manager=window.history_manager, project_manager=window.project_manager)
preferences.set_transient_for(window) preferences.set_transient_for(window)
preferences.connect('history-cleared', lambda d: window._load_history()) preferences.connect('history-cleared', lambda d: window._load_history())
preferences.present() preferences.present()

View File

@ -34,6 +34,7 @@ roster_sources = [
'variable_substitution.py', 'variable_substitution.py',
'http_client.py', 'http_client.py',
'history_manager.py', 'history_manager.py',
'migration.py',
'project_manager.py', 'project_manager.py',
'secret_manager.py', 'secret_manager.py',
'tab_manager.py', 'tab_manager.py',

115
src/migration.py Normal file
View File

@ -0,0 +1,115 @@
# migration.py
#
# Copyright 2025 Pavel Baksy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
import re
import shutil
from pathlib import Path
logger = logging.getLogger(__name__)
def _sanitize_name(name: str) -> str:
s = name.lower().strip()
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'[\s_]+', '-', s)
s = re.sub(r'-+', '-', s)
return s.strip('-') or 'unnamed'
def _unique_name(used: set, base: str) -> str:
if base not in used:
return base
i = 2
while f"{base}-{i}" in used:
i += 1
return f"{base}-{i}"
def needs_migration(projects_dir: Path, legacy_file: Path) -> bool:
return legacy_file.exists() and not projects_dir.exists()
def run_migration(legacy_file: Path, projects_dir: Path) -> bool:
"""
Read requests.json, write new per-file directory structure.
Atomic: writes to a tmp dir first, then renames.
On success renames requests.json requests.json.bak.
On failure leaves no partial state and returns False.
"""
tmp_dir = projects_dir.parent / 'projects.tmp'
try:
with open(legacy_file, 'r') as f:
data = json.load(f)
projects = data.get('projects', [])
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
tmp_dir.mkdir(parents=True)
used_project_dirs: set = set()
for project in projects:
proj_base = _sanitize_name(project.get('name', ''))
proj_dir_name = _unique_name(used_project_dirs, proj_base)
used_project_dirs.add(proj_dir_name)
proj_dir = tmp_dir / proj_dir_name
proj_dir.mkdir()
requests = project.get('requests', [])
request_order = []
used_req_basenames: set = set()
for req in requests:
req_base = _sanitize_name(req.get('name', ''))
req_basename = _unique_name(used_req_basenames, req_base)
used_req_basenames.add(req_basename)
req_filename = req_basename + '.json'
request_order.append(req_filename)
with open(proj_dir / req_filename, 'w') as f:
json.dump(req, f, indent=2)
project_meta = {k: v for k, v in project.items() if k != 'requests'}
project_meta['request_order'] = request_order
with open(proj_dir / '_project.json', 'w') as f:
json.dump(project_meta, f, indent=2)
# Atomic rename
tmp_dir.rename(projects_dir)
bak_file = legacy_file.parent / (legacy_file.name + '.bak')
legacy_file.rename(bak_file)
logger.info("Migration successful: %d projects migrated, backup at %s", len(projects), bak_file)
return True
except Exception as e:
logger.error("Migration failed: %s", e)
try:
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
except Exception:
pass
return False

View File

@ -67,6 +67,65 @@
</child> </child>
</object> </object>
</child> </child>
<child>
<object class="AdwPreferencesGroup">
<property name="title" translatable="yes">Data</property>
<property name="description" translatable="yes">Project storage and backup</property>
<child>
<object class="AdwActionRow" id="data_folder_row">
<property name="title" translatable="yes">Data folder</property>
<child>
<object class="GtkButton" id="open_folder_button">
<property name="label" translatable="yes">Open</property>
<property name="valign">center</property>
<signal name="clicked" handler="on_open_folder_clicked" swapped="no"/>
</object>
</child>
</object>
</child>
<child>
<object class="AdwActionRow" id="backup_folder_row">
<property name="title" translatable="yes">Backup folder</property>
<child>
<object class="GtkBox">
<property name="spacing">6</property>
<property name="valign">center</property>
<child>
<object class="GtkButton" id="choose_backup_folder_button">
<property name="label" translatable="yes">Choose…</property>
<signal name="clicked" handler="on_choose_backup_folder_clicked" swapped="no"/>
</object>
</child>
</object>
</child>
</object>
</child>
<child>
<object class="AdwSwitchRow" id="auto_backup_row">
<property name="title" translatable="yes">Auto-backup on save</property>
<property name="subtitle" translatable="yes">Copy project files to the backup folder after each save</property>
</object>
</child>
<child>
<object class="AdwActionRow">
<property name="title" translatable="yes">Backup now</property>
<property name="subtitle" translatable="yes">Copy all project files to the backup folder</property>
<child>
<object class="GtkButton" id="backup_now_button">
<property name="label" translatable="yes">Backup</property>
<property name="valign">center</property>
<signal name="clicked" handler="on_backup_now_clicked" swapped="no"/>
</object>
</child>
</object>
</child>
</object>
</child>
</object> </object>
</child> </child>
</template> </template>

View File

@ -7,7 +7,6 @@
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. # (at your option) any later version.
# #
#
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
@ -18,8 +17,11 @@
# #
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
import logging
from gi.repository import Adw, Gtk, Gio, GObject from gi.repository import Adw, Gtk, Gio, GObject
logger = logging.getLogger(__name__)
@Gtk.Template(resource_path='/cz/bugsy/roster/preferences-dialog.ui') @Gtk.Template(resource_path='/cz/bugsy/roster/preferences-dialog.ui')
class PreferencesDialog(Adw.PreferencesWindow): class PreferencesDialog(Adw.PreferencesWindow):
@ -28,41 +30,70 @@ class PreferencesDialog(Adw.PreferencesWindow):
tls_verification_row = Gtk.Template.Child() tls_verification_row = Gtk.Template.Child()
timeout_row = Gtk.Template.Child() timeout_row = Gtk.Template.Child()
clear_history_button = Gtk.Template.Child() clear_history_button = Gtk.Template.Child()
data_folder_row = Gtk.Template.Child()
open_folder_button = Gtk.Template.Child()
backup_folder_row = Gtk.Template.Child()
choose_backup_folder_button = Gtk.Template.Child()
auto_backup_row = Gtk.Template.Child()
backup_now_button = Gtk.Template.Child()
__gsignals__ = { __gsignals__ = {
'history-cleared': (GObject.SIGNAL_RUN_FIRST, None, ()) 'history-cleared': (GObject.SIGNAL_RUN_FIRST, None, ())
} }
def __init__(self, history_manager=None, **kwargs): def __init__(self, history_manager=None, project_manager=None, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.history_manager = history_manager self.history_manager = history_manager
self.project_manager = project_manager
# Get settings
self.settings = Gio.Settings.new('cz.bugsy.roster') self.settings = Gio.Settings.new('cz.bugsy.roster')
# Bind settings to UI
self.settings.bind( self.settings.bind(
'force-tls-verification', 'force-tls-verification',
self.tls_verification_row, self.tls_verification_row,
'active', 'active',
Gio.SettingsBindFlags.DEFAULT Gio.SettingsBindFlags.DEFAULT
) )
self.settings.bind( self.settings.bind(
'request-timeout', 'request-timeout',
self.timeout_row, self.timeout_row,
'value', 'value',
Gio.SettingsBindFlags.DEFAULT Gio.SettingsBindFlags.DEFAULT
) )
self.settings.bind(
'backup-auto-export',
self.auto_backup_row,
'active',
Gio.SettingsBindFlags.DEFAULT
)
# Populate data folder path
if project_manager is not None:
self.data_folder_row.set_subtitle(str(project_manager.data_dir))
# Populate backup folder path
self._update_backup_folder_subtitle()
self.settings.connect('changed::backup-folder', lambda *_: self._update_backup_folder_subtitle())
# Disable backup buttons when no backup folder is set
self._update_backup_button_sensitivity()
self.settings.connect('changed::backup-folder', lambda *_: self._update_backup_button_sensitivity())
def _update_backup_folder_subtitle(self):
folder = self.settings.get_string('backup-folder')
self.backup_folder_row.set_subtitle(folder if folder else 'Not set')
def _update_backup_button_sensitivity(self):
has_folder = bool(self.settings.get_string('backup-folder'))
self.backup_now_button.set_sensitive(has_folder)
self.auto_backup_row.set_sensitive(has_folder)
@Gtk.Template.Callback() @Gtk.Template.Callback()
def on_clear_history_clicked(self, button): def on_clear_history_clicked(self, button):
"""Clear all history after confirmation."""
if not self.history_manager: if not self.history_manager:
return return
# Create confirmation dialog
dialog = Adw.AlertDialog.new( dialog = Adw.AlertDialog.new(
"Clear All History?", "Clear All History?",
"This will permanently delete all saved request and response history. This action cannot be undone." "This will permanently delete all saved request and response history. This action cannot be undone."
@ -75,10 +106,59 @@ class PreferencesDialog(Adw.PreferencesWindow):
def on_response(dialog, response): def on_response(dialog, response):
if response == "clear": if response == "clear":
# Clear the history
self.history_manager.clear_history() self.history_manager.clear_history()
# Notify the main window to refresh
self.emit('history-cleared') self.emit('history-cleared')
dialog.connect("response", on_response) dialog.connect("response", on_response)
dialog.present(self) dialog.present(self)
@Gtk.Template.Callback()
def on_open_folder_clicked(self, button):
if self.project_manager is None:
return
Gio.AppInfo.launch_default_for_uri(
f"file://{self.project_manager.data_dir}", None
)
@Gtk.Template.Callback()
def on_choose_backup_folder_clicked(self, button):
chooser = Gtk.FileChooserNative.new(
"Choose Backup Folder",
self,
Gtk.FileChooserAction.SELECT_FOLDER,
"Select",
"Cancel"
)
current = self.settings.get_string('backup-folder')
if current:
try:
chooser.set_current_folder(Gio.File.new_for_path(current))
except Exception:
pass
def on_response(chooser, response):
if response == Gtk.ResponseType.ACCEPT:
folder = chooser.get_file()
if folder:
self.settings.set_string('backup-folder', folder.get_path())
chooser.connect('response', on_response)
chooser.show()
@Gtk.Template.Callback()
def on_backup_now_clicked(self, button):
if self.project_manager is None:
return
folder = self.settings.get_string('backup-folder')
if not folder:
return
try:
self.project_manager.export_to_folder(folder)
toast = Adw.Toast.new("Backup completed")
self.add_toast(toast)
except Exception as e:
logger.error("Backup failed: %s", e)
toast = Adw.Toast.new("Backup failed")
self.add_toast(toast)

View File

@ -19,6 +19,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
import json import json
import shutil
import uuid import uuid
import logging import logging
from pathlib import Path from pathlib import Path
@ -29,85 +30,265 @@ gi.require_version('GLib', '2.0')
from gi.repository import GLib from gi.repository import GLib
from .models import Project, SavedRequest, HttpRequest, Environment from .models import Project, SavedRequest, HttpRequest, Environment
from .secret_manager import get_secret_manager from .secret_manager import get_secret_manager
from .migration import _sanitize_name, needs_migration, run_migration
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _unique_filename(base: str, exclude: set) -> str:
"""Return a unique .json filename whose basename is not in `exclude`."""
if base not in exclude:
return base + '.json'
i = 2
while f"{base}-{i}" in exclude:
i += 1
return f"{base}-{i}.json"
class ProjectManager: class ProjectManager:
"""Manages project and saved request persistence.""" """Manages project and saved request persistence."""
def __init__(self): def __init__(self):
# Use XDG data directory (works for both Flatpak and native)
# Flatpak: ~/.var/app/cz.bugsy.roster/data/cz.bugsy.roster
# Native: ~/.local/share/cz.bugsy.roster
self.data_dir = Path(GLib.get_user_data_dir()) / 'cz.bugsy.roster' self.data_dir = Path(GLib.get_user_data_dir()) / 'cz.bugsy.roster'
self.projects_file = self.data_dir / 'requests.json' self.projects_dir = self.data_dir / 'projects'
self.legacy_file = self.data_dir / 'requests.json'
self._ensure_data_dir() self._ensure_data_dir()
# In-memory cache for projects to reduce disk I/O
self._projects_cache: Optional[List[Project]] = None self._projects_cache: Optional[List[Project]] = None
# project_id → directory name under projects_dir
self._project_dirs: dict = {}
# request_id → filename (e.g. "get-users.json")
self._request_filenames: dict = {}
self._legacy_mode = False
if needs_migration(self.projects_dir, self.legacy_file):
if not run_migration(self.legacy_file, self.projects_dir):
logger.error("Migration failed falling back to legacy mode")
self._legacy_mode = True
def _ensure_data_dir(self): def _ensure_data_dir(self):
"""Create data directory if it doesn't exist."""
self.data_dir.mkdir(parents=True, exist_ok=True) self.data_dir.mkdir(parents=True, exist_ok=True)
# ===== Loading =====
def load_projects(self, force_reload: bool = False) -> List[Project]: def load_projects(self, force_reload: bool = False) -> List[Project]:
"""
Load projects from JSON file with in-memory caching.
Args:
force_reload: If True, bypass cache and reload from disk
Returns:
List of Project objects
"""
# Return cached data if available and not forcing reload
if self._projects_cache is not None and not force_reload: if self._projects_cache is not None and not force_reload:
return self._projects_cache return self._projects_cache
# Load from disk if self._legacy_mode or not self.projects_dir.exists():
if not self.projects_file.exists(): return self._load_legacy()
return self._load_from_dirs()
def _load_from_dirs(self) -> List[Project]:
projects = []
self._project_dirs.clear()
self._request_filenames.clear()
try:
entries = sorted(self.projects_dir.iterdir())
except Exception as e:
logger.error("Cannot iterate projects dir: %s", e)
self._projects_cache = [] self._projects_cache = []
return [] return []
for proj_dir in entries:
if not proj_dir.is_dir():
continue
meta_file = proj_dir / '_project.json'
if not meta_file.exists():
continue
try: try:
with open(self.projects_file, 'r') as f: with open(meta_file, 'r') as f:
meta = json.load(f)
request_order = meta.get('request_order', [])
requests = []
loaded_files: set = set()
for filename in request_order:
req_file = proj_dir / filename
if not req_file.exists():
continue
try:
with open(req_file, 'r') as f:
req_data = json.load(f)
req = SavedRequest.from_dict(req_data)
requests.append(req)
self._request_filenames[req.id] = filename
loaded_files.add(filename)
except Exception as e:
logger.error("Error loading request %s: %s", req_file, e)
# Also pick up files not listed in request_order
for req_file in sorted(proj_dir.iterdir()):
if req_file.name.startswith('_') or req_file.suffix != '.json':
continue
if req_file.name in loaded_files:
continue
try:
with open(req_file, 'r') as f:
req_data = json.load(f)
req = SavedRequest.from_dict(req_data)
requests.append(req)
self._request_filenames[req.id] = req_file.name
except Exception as e:
logger.error("Error loading extra request %s: %s", req_file, e)
project = Project(
id=meta['id'],
name=meta['name'],
requests=requests,
created_at=meta['created_at'],
icon=meta.get('icon', 'folder-symbolic'),
variable_names=meta.get('variable_names', []),
environments=[Environment.from_dict(e) for e in meta.get('environments', [])],
sensitive_variables=meta.get('sensitive_variables', []),
)
projects.append(project)
self._project_dirs[project.id] = proj_dir.name
except Exception as e:
logger.error("Error loading project %s: %s", proj_dir, e)
self._projects_cache = projects
return projects
def _load_legacy(self) -> List[Project]:
if not self.legacy_file.exists():
self._projects_cache = []
return []
try:
with open(self.legacy_file, 'r') as f:
data = json.load(f) data = json.load(f)
self._projects_cache = [Project.from_dict(p) for p in data.get('projects', [])] self._projects_cache = [Project.from_dict(p) for p in data.get('projects', [])]
return self._projects_cache return self._projects_cache
except Exception as e: except Exception as e:
logger.error(f"Error loading projects: {e}") logger.error("Error loading legacy projects: %s", e)
self._projects_cache = [] self._projects_cache = []
return [] return []
def save_projects(self, projects: List[Project]): # ===== Saving =====
"""Save projects to JSON file and update cache."""
try: def _save_project(self, project: Project):
data = { if self._legacy_mode:
'version': 1, self._save_all_legacy()
'projects': [p.to_dict() for p in projects] return
}
with open(self.projects_file, 'w') as f: try:
json.dump(data, f, indent=2) old_dir_name = self._project_dirs.get(project.id)
new_dir_name = _sanitize_name(project.name)
# Handle project rename
if old_dir_name and old_dir_name != new_dir_name:
old_dir = self.projects_dir / old_dir_name
if old_dir.exists():
# Ensure new name is unique
candidate = new_dir_name
i = 2
while (self.projects_dir / candidate).exists():
candidate = f"{new_dir_name}-{i}"
i += 1
new_dir_name = candidate
old_dir.rename(self.projects_dir / new_dir_name)
project_dir = self.projects_dir / new_dir_name
project_dir.mkdir(parents=True, exist_ok=True)
self._project_dirs[project.id] = new_dir_name
# Assign filenames preserving stable names where possible
request_order = []
used_basenames: set = set()
for req in project.requests:
old_filename = self._request_filenames.get(req.id)
desired_base = _sanitize_name(req.name)
if old_filename is not None:
old_base = old_filename[:-5] if old_filename.endswith('.json') else old_filename
if old_base == desired_base and old_base not in used_basenames:
filename = old_filename
else:
filename = _unique_filename(desired_base, used_basenames)
else:
filename = _unique_filename(desired_base, used_basenames)
used_basenames.add(filename[:-5] if filename.endswith('.json') else filename)
request_order.append(filename)
self._request_filenames[req.id] = filename
# Write request files
for req, filename in zip(project.requests, request_order):
with open(project_dir / filename, 'w') as f:
json.dump(req.to_dict(), f, indent=2)
# Remove orphaned request files
for existing in list(project_dir.iterdir()):
if existing.name.startswith('_') or existing.suffix != '.json':
continue
if existing.name not in request_order:
existing.unlink()
# Write project metadata
meta = {
'id': project.id,
'name': project.name,
'created_at': project.created_at,
'icon': project.icon,
'variable_names': project.variable_names,
'sensitive_variables': project.sensitive_variables,
'environments': [e.to_dict() for e in project.environments],
'request_order': request_order,
}
with open(project_dir / '_project.json', 'w') as f:
json.dump(meta, f, indent=2)
self._trigger_auto_backup()
# Update cache with the saved data
self._projects_cache = projects
except Exception as e: except Exception as e:
logger.error(f"Error saving projects: {e}") logger.error("Error saving project %s: %s", project.name, e)
def _save_all_legacy(self):
"""Fallback: write all cached projects to the monolithic JSON file."""
try:
projects = self._projects_cache or []
data = {'version': 1, 'projects': [p.to_dict() for p in projects]}
with open(self.legacy_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error("Error saving projects (legacy): %s", e)
def _trigger_auto_backup(self):
try:
import gi
gi.require_version('Gio', '2.0')
from gi.repository import Gio
settings = Gio.Settings.new('cz.bugsy.roster')
if settings.get_boolean('backup-auto-export'):
folder = settings.get_string('backup-folder')
if folder:
self.export_to_folder(folder)
except Exception as e:
logger.error("Auto-backup failed: %s", e)
# ===== Backup / export =====
def export_to_folder(self, destination: str):
dest = Path(destination) / 'roster-backup'
shutil.copytree(self.projects_dir, dest, dirs_exist_ok=True)
# ===== Public API =====
def add_project(self, name: str) -> Project: def add_project(self, name: str) -> Project:
"""Create new project with default environment."""
projects = self.load_projects() projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
# Create default environment
default_env = Environment( default_env = Environment(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
name="Default", name="Default",
variables={}, variables={},
created_at=now created_at=now
) )
project = Project( project = Project(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
name=name, name=name,
@ -117,11 +298,10 @@ class ProjectManager:
environments=[default_env] environments=[default_env]
) )
projects.append(project) projects.append(project)
self.save_projects(projects) self._save_project(project)
return project return project
def update_project(self, project_id: str, new_name: str = None, new_icon: str = None): def update_project(self, project_id: str, new_name: str = None, new_icon: str = None):
"""Update a project's name and/or icon."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
@ -129,20 +309,30 @@ class ProjectManager:
p.name = new_name p.name = new_name
if new_icon is not None: if new_icon is not None:
p.icon = new_icon p.icon = new_icon
self._save_project(p)
break break
self.save_projects(projects)
def delete_project(self, project_id: str, def delete_project(self, project_id: str,
callback: Optional[Callable[[], None]] = None): callback: Optional[Callable[[], None]] = None):
"""Delete a project and all its requests (async for secret cleanup)."""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
# Remove project from list and save immediately for p in projects:
projects = [p for p in projects if p.id != project_id] if p.id == project_id:
self.save_projects(projects) if not self._legacy_mode:
dir_name = self._project_dirs.get(project_id)
if dir_name:
project_dir = self.projects_dir / dir_name
if project_dir.exists():
shutil.rmtree(project_dir)
self._project_dirs.pop(project_id, None)
break
self._projects_cache = [p for p in projects if p.id != project_id]
if self._legacy_mode:
self._save_all_legacy()
# Delete all secrets for this project asynchronously
def on_secrets_deleted(success): def on_secrets_deleted(success):
if callback: if callback:
callback() callback()
@ -150,7 +340,6 @@ class ProjectManager:
secret_manager.delete_all_project_secrets(project_id, on_secrets_deleted) secret_manager.delete_all_project_secrets(project_id, on_secrets_deleted)
def add_request(self, project_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest: def add_request(self, project_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest:
"""Add request to a project."""
projects = self.load_projects() projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
saved_request = SavedRequest( saved_request = SavedRequest(
@ -164,12 +353,11 @@ class ProjectManager:
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
p.requests.append(saved_request) p.requests.append(saved_request)
self._save_project(p)
break break
self.save_projects(projects)
return saved_request return saved_request
def find_request_by_name(self, project_id: str, name: str) -> Optional[SavedRequest]: def find_request_by_name(self, project_id: str, name: str) -> Optional[SavedRequest]:
"""Find a request by name within a project. Returns None if not found."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
@ -180,7 +368,6 @@ class ProjectManager:
return None return None
def update_request(self, project_id: str, request_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest: def update_request(self, project_id: str, request_id: str, name: str, request: HttpRequest, scripts=None) -> SavedRequest:
"""Update an existing request."""
projects = self.load_projects() projects = self.load_projects()
updated_request = None updated_request = None
for p in projects: for p in projects:
@ -193,29 +380,27 @@ class ProjectManager:
req.modified_at = datetime.now(timezone.utc).isoformat() req.modified_at = datetime.now(timezone.utc).isoformat()
updated_request = req updated_request = req
break break
break
if updated_request: if updated_request:
self.save_projects(projects) self._save_project(p)
break
return updated_request return updated_request
def delete_request(self, project_id: str, request_id: str): def delete_request(self, project_id: str, request_id: str):
"""Delete a saved request."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
p.requests = [r for r in p.requests if r.id != request_id] p.requests = [r for r in p.requests if r.id != request_id]
self._request_filenames.pop(request_id, None)
self._save_project(p)
break break
self.save_projects(projects)
def add_environment(self, project_id: str, name: str) -> Environment: def add_environment(self, project_id: str, name: str) -> Environment:
"""Add environment to a project."""
projects = self.load_projects() projects = self.load_projects()
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
environment = None environment = None
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Create environment with empty values for all variables
variables = {var_name: "" for var_name in p.variable_names} variables = {var_name: "" for var_name in p.variable_names}
environment = Environment( environment = Environment(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
@ -224,14 +409,13 @@ class ProjectManager:
created_at=now created_at=now
) )
p.environments.append(environment) p.environments.append(environment)
self._save_project(p)
break break
self.save_projects(projects)
return environment return environment
def copy_environment(self, project_id: str, source_env_id: str, new_name: str, def copy_environment(self, project_id: str, source_env_id: str, new_name: str,
callback: Optional[Callable] = None): callback: Optional[Callable] = None):
"""Copy an environment including all variables (sensitive ones via keyring)."""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
@ -252,7 +436,7 @@ class ProjectManager:
created_at=now created_at=now
) )
p.environments.append(new_env) p.environments.append(new_env)
self.save_projects(projects) self._save_project(p)
sensitive_vars = [v for v in p.sensitive_variables if v in source_env.variables] sensitive_vars = [v for v in p.sensitive_variables if v in source_env.variables]
if not sensitive_vars: if not sensitive_vars:
@ -297,7 +481,6 @@ class ProjectManager:
callback(None) callback(None)
def update_environment(self, project_id: str, env_id: str, name: str = None, variables: dict = None): def update_environment(self, project_id: str, env_id: str, name: str = None, variables: dict = None):
"""Update an environment's name and/or variables."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
@ -308,24 +491,20 @@ class ProjectManager:
if variables is not None: if variables is not None:
env.variables = variables env.variables = variables
break break
self._save_project(p)
break break
self.save_projects(projects)
def delete_environment(self, project_id: str, env_id: str, def delete_environment(self, project_id: str, env_id: str,
callback: Optional[Callable[[], None]] = None): callback: Optional[Callable[[], None]] = None):
"""Delete an environment (async for secret cleanup)."""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Remove environment from list
p.environments = [e for e in p.environments if e.id != env_id] p.environments = [e for e in p.environments if e.id != env_id]
self._save_project(p)
break break
self.save_projects(projects)
# Delete all secrets for this environment asynchronously
def on_secrets_deleted(success): def on_secrets_deleted(success):
if callback: if callback:
callback() callback()
@ -333,47 +512,39 @@ class ProjectManager:
secret_manager.delete_all_environment_secrets(project_id, env_id, on_secrets_deleted) secret_manager.delete_all_environment_secrets(project_id, env_id, on_secrets_deleted)
def add_variable(self, project_id: str, variable_name: str): def add_variable(self, project_id: str, variable_name: str):
"""Add a variable to the project and all environments."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
if variable_name not in p.variable_names: if variable_name not in p.variable_names:
p.variable_names.append(variable_name) p.variable_names.append(variable_name)
# Add empty value to all environments
for env in p.environments: for env in p.environments:
env.variables[variable_name] = "" env.variables[variable_name] = ""
self._save_project(p)
break break
self.save_projects(projects)
def rename_variable(self, project_id: str, old_name: str, new_name: str, def rename_variable(self, project_id: str, old_name: str, new_name: str,
callback: Optional[Callable[[], None]] = None): callback: Optional[Callable[[], None]] = None):
"""Rename a variable in the project and all environments (async for secrets)."""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
is_sensitive = False is_sensitive = False
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Update variable_names list
if old_name in p.variable_names: if old_name in p.variable_names:
idx = p.variable_names.index(old_name) idx = p.variable_names.index(old_name)
p.variable_names[idx] = new_name p.variable_names[idx] = new_name
# Check if sensitive and update list
if old_name in p.sensitive_variables: if old_name in p.sensitive_variables:
is_sensitive = True is_sensitive = True
idx_sensitive = p.sensitive_variables.index(old_name) idx_sensitive = p.sensitive_variables.index(old_name)
p.sensitive_variables[idx_sensitive] = new_name p.sensitive_variables[idx_sensitive] = new_name
# Update all environments
for env in p.environments: for env in p.environments:
if old_name in env.variables: if old_name in env.variables:
env.variables[new_name] = env.variables.pop(old_name) env.variables[new_name] = env.variables.pop(old_name)
self._save_project(p)
break break
self.save_projects(projects)
# Rename secrets in keyring if sensitive
if is_sensitive: if is_sensitive:
secret_manager.rename_variable_secrets(project_id, old_name, new_name, lambda success: callback() if callback else None) secret_manager.rename_variable_secrets(project_id, old_name, new_name, lambda success: callback() if callback else None)
elif callback: elif callback:
@ -381,7 +552,6 @@ class ProjectManager:
def delete_variable(self, project_id: str, variable_name: str, def delete_variable(self, project_id: str, variable_name: str,
callback: Optional[Callable[[], None]] = None): callback: Optional[Callable[[], None]] = None):
"""Delete a variable from the project and all environments (async for secrets)."""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
is_sensitive = False is_sensitive = False
@ -389,24 +559,19 @@ class ProjectManager:
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Remove from variable_names
if variable_name in p.variable_names: if variable_name in p.variable_names:
p.variable_names.remove(variable_name) p.variable_names.remove(variable_name)
# Check if sensitive and get environments for cleanup
if variable_name in p.sensitive_variables: if variable_name in p.sensitive_variables:
is_sensitive = True is_sensitive = True
p.sensitive_variables.remove(variable_name) p.sensitive_variables.remove(variable_name)
environments_to_cleanup = [env.id for env in p.environments] environments_to_cleanup = [env.id for env in p.environments]
# Remove from all environments
for env in p.environments: for env in p.environments:
env.variables.pop(variable_name, None) env.variables.pop(variable_name, None)
self._save_project(p)
break break
self.save_projects(projects)
# Delete secrets for this variable asynchronously
if is_sensitive and environments_to_cleanup: if is_sensitive and environments_to_cleanup:
pending_count = [len(environments_to_cleanup)] pending_count = [len(environments_to_cleanup)]
@ -421,7 +586,6 @@ class ProjectManager:
callback() callback()
def update_environment_variable(self, project_id: str, env_id: str, variable_name: str, value: str): def update_environment_variable(self, project_id: str, env_id: str, variable_name: str, value: str):
"""Update a single variable value in an environment."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
@ -429,58 +593,34 @@ class ProjectManager:
if env.id == env_id: if env.id == env_id:
env.variables[variable_name] = value env.variables[variable_name] = value
break break
self._save_project(p)
break break
self.save_projects(projects)
def batch_update_environment_variables(self, project_id: str, env_id: str, variables: dict): def batch_update_environment_variables(self, project_id: str, env_id: str, variables: dict):
"""
Update multiple variables in an environment in one operation.
Args:
project_id: Project ID
env_id: Environment ID
variables: Dict of variable_name -> value pairs to update
"""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
for env in p.environments: for env in p.environments:
if env.id == env_id: if env.id == env_id:
# Update all variables
for var_name, var_value in variables.items(): for var_name, var_value in variables.items():
env.variables[var_name] = var_value env.variables[var_name] = var_value
break break
self._save_project(p)
break break
self.save_projects(projects)
# ========== Sensitive Variable Management ========== # ===== Sensitive Variable Management =====
def mark_variable_as_sensitive(self, project_id: str, variable_name: str, def mark_variable_as_sensitive(self, project_id: str, variable_name: str,
callback: Optional[Callable[[bool], None]] = None): callback: Optional[Callable[[bool], None]] = None):
"""
Mark a variable as sensitive (move values from JSON to keyring) - async.
This will:
1. Add variable to sensitive_variables list
2. Move all environment values to keyring
3. Clear values from JSON (store empty string as placeholder)
Args:
project_id: Project ID
variable_name: Variable name to mark as sensitive
callback: Optional callback called with success boolean
"""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
secrets_to_store = [] secrets_to_store = []
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Add to sensitive list if not already there
if variable_name not in p.sensitive_variables: if variable_name not in p.sensitive_variables:
p.sensitive_variables.append(variable_name) p.sensitive_variables.append(variable_name)
# Collect all environment values to store in keyring
for env in p.environments: for env in p.environments:
if variable_name in env.variables: if variable_name in env.variables:
value = env.variables[variable_name] value = env.variables[variable_name]
@ -490,12 +630,10 @@ class ProjectManager:
'value': value, 'value': value,
'project_name': p.name 'project_name': p.name
}) })
# Clear from JSON (keep empty placeholder)
env.variables[variable_name] = "" env.variables[variable_name] = ""
self.save_projects(projects) self._save_project(p)
# Store all secrets asynchronously
if not secrets_to_store: if not secrets_to_store:
if callback: if callback:
callback(True) callback(True)
@ -528,19 +666,6 @@ class ProjectManager:
def mark_variable_as_nonsensitive(self, project_id: str, variable_name: str, def mark_variable_as_nonsensitive(self, project_id: str, variable_name: str,
callback: Optional[Callable[[bool], None]] = None): callback: Optional[Callable[[bool], None]] = None):
"""
Mark a variable as non-sensitive (move values from keyring to JSON) - async.
This will:
1. Remove variable from sensitive_variables list
2. Move all environment values from keyring to JSON
3. Delete values from keyring
Args:
project_id: Project ID
variable_name: Variable name to mark as non-sensitive
callback: Optional callback called with success boolean
"""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
@ -550,10 +675,8 @@ class ProjectManager:
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
target_project = p target_project = p
# Remove from sensitive list
if variable_name in p.sensitive_variables: if variable_name in p.sensitive_variables:
p.sensitive_variables.remove(variable_name) p.sensitive_variables.remove(variable_name)
environments_to_migrate = list(p.environments) environments_to_migrate = list(p.environments)
break break
@ -562,17 +685,14 @@ class ProjectManager:
callback(False) callback(False)
return return
# Retrieve all secrets, then update JSON, then delete from keyring
pending_count = [len(environments_to_migrate)] pending_count = [len(environments_to_migrate)]
all_success = [True] all_success = [True]
def on_single_migrate(env): def on_single_migrate(env):
def on_retrieve(value): def on_retrieve(value):
# Store in JSON
env.variables[variable_name] = value or "" env.variables[variable_name] = value or ""
self.save_projects(projects) self._save_project(target_project)
# Delete from keyring
def on_delete(success): def on_delete(success):
if not success: if not success:
all_success[0] = False all_success[0] = False
@ -597,7 +717,6 @@ class ProjectManager:
) )
def is_variable_sensitive(self, project_id: str, variable_name: str) -> bool: def is_variable_sensitive(self, project_id: str, variable_name: str) -> bool:
"""Check if a variable is marked as sensitive."""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
@ -606,24 +725,11 @@ class ProjectManager:
def get_variable_value(self, project_id: str, env_id: str, variable_name: str, def get_variable_value(self, project_id: str, env_id: str, variable_name: str,
callback: Callable[[str], None]): callback: Callable[[str], None]):
"""
Get variable value from appropriate storage (keyring or JSON) - async.
This is a convenience method that handles both sensitive and non-sensitive variables.
Args:
project_id: Project ID
env_id: Environment ID
variable_name: Variable name
callback: Callback called with variable value (empty string if not found)
"""
projects = self.load_projects() projects = self.load_projects()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Check if sensitive
if variable_name in p.sensitive_variables: if variable_name in p.sensitive_variables:
# Retrieve from keyring asynchronously
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
def on_retrieve(value): def on_retrieve(value):
@ -637,7 +743,6 @@ class ProjectManager:
) )
return return
else: else:
# Retrieve from JSON synchronously
for env in p.environments: for env in p.environments:
if env.id == env_id: if env.id == env_id:
callback(env.variables.get(variable_name, "")) callback(env.variables.get(variable_name, ""))
@ -647,26 +752,12 @@ class ProjectManager:
def set_variable_value(self, project_id: str, env_id: str, variable_name: str, value: str, def set_variable_value(self, project_id: str, env_id: str, variable_name: str, value: str,
callback: Optional[Callable[[bool], None]] = None): callback: Optional[Callable[[bool], None]] = None):
"""
Set variable value in appropriate storage (keyring or JSON) - async.
This is a convenience method that handles both sensitive and non-sensitive variables.
Args:
project_id: Project ID
env_id: Environment ID
variable_name: Variable name
value: Value to set
callback: Optional callback called with success boolean
"""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
for p in projects: for p in projects:
if p.id == project_id: if p.id == project_id:
# Check if sensitive
if variable_name in p.sensitive_variables: if variable_name in p.sensitive_variables:
# Store in keyring asynchronously
for env in p.environments: for env in p.environments:
if env.id == env_id: if env.id == env_id:
secret_manager.store_secret( secret_manager.store_secret(
@ -678,18 +769,15 @@ class ProjectManager:
environment_name=env.name, environment_name=env.name,
callback=callback callback=callback
) )
# Keep empty placeholder in JSON
env.variables[variable_name] = "" env.variables[variable_name] = ""
self.save_projects(projects) self._save_project(p)
return return
else: else:
# Store in JSON synchronously
for env in p.environments: for env in p.environments:
if env.id == env_id: if env.id == env_id:
env.variables[variable_name] = value env.variables[variable_name] = value
break break
self._save_project(p)
self.save_projects(projects)
if callback: if callback:
callback(True) callback(True)
return return
@ -699,17 +787,6 @@ class ProjectManager:
def get_environment_with_secrets(self, project_id: str, env_id: str, def get_environment_with_secrets(self, project_id: str, env_id: str,
callback: Callable[[Optional[Environment]], None]): callback: Callable[[Optional[Environment]], None]):
"""
Get an environment with all variable values (including secrets from keyring) - async.
This is useful for variable substitution - it returns a complete Environment
object with all values populated from both JSON and keyring.
Args:
project_id: Project ID
env_id: Environment ID
callback: Callback called with Environment object (or None if not found)
"""
projects = self.load_projects() projects = self.load_projects()
secret_manager = get_secret_manager() secret_manager = get_secret_manager()
@ -717,7 +794,6 @@ class ProjectManager:
if p.id == project_id: if p.id == project_id:
for env in p.environments: for env in p.environments:
if env.id == env_id: if env.id == env_id:
# Create a copy of the environment
complete_env = Environment( complete_env = Environment(
id=env.id, id=env.id,
name=env.name, name=env.name,
@ -725,12 +801,10 @@ class ProjectManager:
created_at=env.created_at created_at=env.created_at
) )
# If no sensitive variables, return immediately
if not p.sensitive_variables: if not p.sensitive_variables:
callback(complete_env) callback(complete_env)
return return
# Fill in sensitive variable values from keyring
def on_secrets_retrieved(secrets_dict): def on_secrets_retrieved(secrets_dict):
for var_name, value in secrets_dict.items(): for var_name, value in secrets_dict.items():
if value is not None: if value is not None: