# script_executor.py # # Copyright 2025 Pavel Baksy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later import subprocess import json import tempfile import re from pathlib import Path from typing import Tuple, Optional, Dict, List, TYPE_CHECKING from dataclasses import dataclass, field from .constants import SCRIPT_EXECUTION_TIMEOUT_SECONDS if TYPE_CHECKING: from .project_manager import ProjectManager from .models import HttpRequest @dataclass class ScriptResult: """Result of script execution.""" success: bool output: str # console.log output or error message error: Optional[str] = None variable_updates: Dict[str, str] = field(default_factory=dict) # Variables set by script warnings: List[str] = field(default_factory=list) # Warnings (e.g., no env selected) modified_request: Optional['HttpRequest'] = None # Modified request from preprocessing @dataclass class ScriptContext: """Context passed to script executor for variable updates.""" project_id: Optional[str] environment_id: Optional[str] project_manager: 'ProjectManager' class ScriptExecutor: """Executes JavaScript code using gjs (GNOME JavaScript).""" TIMEOUT_SECONDS = SCRIPT_EXECUTION_TIMEOUT_SECONDS @staticmethod def execute_postprocessing_script( script_code: str, response, context: Optional[ScriptContext] = None ) -> ScriptResult: """ Execute postprocessing script with response object. Args: script_code: JavaScript code to execute response: HttpResponse object context: Optional context for variable updates Returns: ScriptResult with output, variable updates, or error """ if not script_code.strip(): return ScriptResult(success=True, output="") # Create response object for JavaScript response_obj = ScriptExecutor._create_response_object(response) # Build complete script with context script_with_context = f""" const response = {json.dumps(response_obj)}; // Roster API for variable management const roster = {{ __variables: {{}}, setVariable: function(name, value) {{ this.__variables[String(name)] = String(value); }}, setVariables: function(obj) {{ for (let key in obj) {{ if (obj.hasOwnProperty(key)) {{ this.__variables[String(key)] = String(obj[key]); }} }} }} }}; // Capture console.log output let consoleOutput = []; const console = {{ log: function(...args) {{ consoleOutput.push(args.map(arg => String(arg)).join(' ')); }} }}; // User script try {{ {script_code} }} catch (error) {{ consoleOutput.push('Error: ' + error.message); throw error; }} // Output results: console logs + separator + variables JSON print(consoleOutput.join('\\n')); print('___ROSTER_VARIABLES___'); print(JSON.stringify(roster.__variables)); """ # Execute with gjs output, error, returncode = ScriptExecutor._run_gjs_script( script_with_context, timeout=ScriptExecutor.TIMEOUT_SECONDS ) # Parse output and extract variables console_output = "" variable_updates = {} warnings = [] if returncode == 0: # Split output into console logs and variables JSON parts = output.split('___ROSTER_VARIABLES___') console_output = parts[0].strip() if len(parts) > 1: try: variables_json = parts[1].strip() raw_variables = json.loads(variables_json) # Validate variable names and add to updates for name, value in raw_variables.items(): if ScriptExecutor._is_valid_variable_name(name): variable_updates[name] = value else: warnings.append(f"Invalid variable name '{name}' (must be alphanumeric + underscore)") except json.JSONDecodeError: warnings.append("Failed to parse variable updates from script") return ScriptResult( success=True, output=console_output, variable_updates=variable_updates, warnings=warnings ) else: error_msg = error.strip() if error else "Script execution failed" return ScriptResult(success=False, output="", error=error_msg) @staticmethod def execute_preprocessing_script( script_code: str, request, context: Optional[ScriptContext] = None ) -> ScriptResult: """ Execute preprocessing script with request object. Args: script_code: JavaScript code to execute request: HttpRequest object (will be modified) context: Optional context for variable access and updates Returns: ScriptResult with output, modified request, variable updates, or error """ if not script_code.strip(): return ScriptResult(success=True, output="") # Import here to avoid circular import from .models import HttpRequest # Create request object for JavaScript request_obj = ScriptExecutor._create_request_object(request) # Get environment variables (read-only) env_vars = ScriptExecutor._get_environment_variables(context) # Get project metadata project_meta = ScriptExecutor._get_project_metadata(context) # Build complete script with context script_with_context = f""" const request = {json.dumps(request_obj)}; // Roster API for variable management const roster = {{ __variables: {{}}, // For setVariable __envVariables: {json.dumps(env_vars)}, // Read-only current environment getVariable: function(name) {{ return this.__envVariables[name] || null; }}, setVariable: function(name, value) {{ this.__variables[String(name)] = String(value); }}, setVariables: function(obj) {{ for (let key in obj) {{ if (obj.hasOwnProperty(key)) {{ this.__variables[String(key)] = String(obj[key]); }} }} }}, project: {json.dumps(project_meta)} }}; // Capture console.log output let consoleOutput = []; const console = {{ log: function(...args) {{ consoleOutput.push(args.map(arg => String(arg)).join(' ')); }} }}; // User script try {{ {script_code} }} catch (error) {{ consoleOutput.push('Error: ' + error.message); throw error; }} // Output results: modified request + separator + console logs + variables print(JSON.stringify(request)); print('___ROSTER_SEPARATOR___'); print(consoleOutput.join('\\n')); print('___ROSTER_VARIABLES___'); print(JSON.stringify(roster.__variables)); """ # Execute with gjs output, error, returncode = ScriptExecutor._run_gjs_script( script_with_context, timeout=ScriptExecutor.TIMEOUT_SECONDS ) # Parse output and extract results console_output = "" variable_updates = {} warnings = [] modified_request = None if returncode == 0: # Split output into: request JSON + console logs + variables JSON parts = output.split('___ROSTER_SEPARATOR___') if len(parts) >= 2: request_json_str = parts[0].strip() rest = parts[1] # Parse modified request try: request_data = json.loads(request_json_str) # Validate required fields if not request_data.get('url') or not request_data.get('url').strip(): warnings.append("Modified request has empty URL") return ScriptResult( success=False, output="", error="Script produced invalid request: URL is empty", warnings=warnings ) # Create HttpRequest from modified data modified_request = HttpRequest( method=request_data.get('method', 'GET'), url=request_data.get('url', ''), headers=request_data.get('headers', {}), body=request_data.get('body', ''), syntax=request.syntax # Preserve syntax from original ) except json.JSONDecodeError as e: warnings.append(f"Failed to parse modified request: {str(e)}") return ScriptResult( success=False, output="", error="Script produced invalid request JSON", warnings=warnings ) # Parse console output and variables var_parts = rest.split('___ROSTER_VARIABLES___') console_output = var_parts[0].strip() if len(var_parts) > 1: try: variables_json = var_parts[1].strip() raw_variables = json.loads(variables_json) # Validate variable names and add to updates for name, value in raw_variables.items(): if ScriptExecutor._is_valid_variable_name(name): variable_updates[name] = value else: warnings.append(f"Invalid variable name '{name}' (must be alphanumeric + underscore)") except json.JSONDecodeError: warnings.append("Failed to parse variable updates from script") return ScriptResult( success=True, output=console_output, variable_updates=variable_updates, warnings=warnings, modified_request=modified_request ) else: error_msg = error.strip() if error else "Script execution failed" return ScriptResult(success=False, output="", error=error_msg) @staticmethod def _create_response_object(response) -> dict: """Convert HttpResponse to JavaScript-compatible dict.""" # Parse headers text into dict headers = {} if response.headers: for line in response.headers.split('\n')[1:]: # Skip status line line = line.strip() if ':' in line: key, value = line.split(':', 1) headers[key.strip()] = value.strip() return { 'body': response.body, 'headers': headers, 'statusCode': response.status_code, 'statusText': response.status_text, 'responseTime': response.response_time_ms } @staticmethod def _create_request_object(request) -> dict: """Convert HttpRequest to JavaScript-compatible dict.""" return { 'method': request.method, 'url': request.url, 'headers': dict(request.headers), # Convert to regular dict 'body': request.body } @staticmethod def _get_environment_variables(context: Optional[ScriptContext]) -> dict: """Get environment variables from context.""" if not context or not context.project_id or not context.environment_id: return {} # Load projects and find the environment projects = context.project_manager.load_projects() for project in projects: if project.id == context.project_id: for env in project.environments: if env.id == context.environment_id: return env.variables return {} @staticmethod def _get_project_metadata(context: Optional[ScriptContext]) -> dict: """Get project metadata (name and environments).""" if not context or not context.project_id: return {'name': '', 'environments': []} # Load projects and find the project projects = context.project_manager.load_projects() for project in projects: if project.id == context.project_id: return { 'name': project.name, 'environments': [env.name for env in project.environments] } return {'name': '', 'environments': []} @staticmethod def _is_valid_variable_name(name: str) -> bool: """ Validate variable name (alphanumeric + underscore). Args: name: Variable name to validate Returns: True if valid, False otherwise """ return bool(re.match(r'^\w+$', name)) @staticmethod def _run_gjs_script(script_code: str, timeout: int = 5) -> Tuple[str, str, int]: """ Run JavaScript code using gjs. Args: script_code: Complete JavaScript code timeout: Execution timeout in seconds Returns: Tuple of (stdout, stderr, returncode) """ try: # Write script to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f: f.write(script_code) script_path = f.name try: # Execute with gjs result = subprocess.run( ['gjs', script_path], capture_output=True, text=True, timeout=timeout ) return result.stdout, result.stderr, result.returncode finally: # Clean up temp file Path(script_path).unlink(missing_ok=True) except subprocess.TimeoutExpired: return "", f"Script execution timeout ({timeout}s)", 1 except FileNotFoundError: return "", "gjs not found. Please ensure GNOME JavaScript is installed.", 1 except Exception as e: return "", f"Script execution error: {str(e)}", 1