roster/src/script_executor.py

441 lines
15 KiB
Python

# script_executor.py
#
# Copyright 2025 Pavel Baksy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import subprocess
import json
import tempfile
import re
from pathlib import Path
from typing import Tuple, Optional, Dict, List, TYPE_CHECKING
from dataclasses import dataclass, field
from .constants import SCRIPT_EXECUTION_TIMEOUT_SECONDS
if TYPE_CHECKING:
from .project_manager import ProjectManager
from .models import HttpRequest
@dataclass
class ScriptResult:
"""Result of script execution."""
success: bool
output: str # console.log output or error message
error: Optional[str] = None
variable_updates: Dict[str, str] = field(default_factory=dict) # Variables set by script
warnings: List[str] = field(default_factory=list) # Warnings (e.g., no env selected)
modified_request: Optional['HttpRequest'] = None # Modified request from preprocessing
@dataclass
class ScriptContext:
"""Context passed to script executor for variable updates."""
project_id: Optional[str]
environment_id: Optional[str]
project_manager: 'ProjectManager'
class ScriptExecutor:
"""Executes JavaScript code using gjs (GNOME JavaScript)."""
TIMEOUT_SECONDS = SCRIPT_EXECUTION_TIMEOUT_SECONDS
@staticmethod
def execute_postprocessing_script(
script_code: str,
response,
context: Optional[ScriptContext] = None
) -> ScriptResult:
"""
Execute postprocessing script with response object.
Args:
script_code: JavaScript code to execute
response: HttpResponse object
context: Optional context for variable updates
Returns:
ScriptResult with output, variable updates, or error
"""
if not script_code.strip():
return ScriptResult(success=True, output="")
# Create response object for JavaScript
response_obj = ScriptExecutor._create_response_object(response)
# Build complete script with context
script_with_context = f"""
const response = {json.dumps(response_obj)};
// Roster API for variable management
const roster = {{
__variables: {{}},
setVariable: function(name, value) {{
this.__variables[String(name)] = String(value);
}},
setVariables: function(obj) {{
for (let key in obj) {{
if (obj.hasOwnProperty(key)) {{
this.__variables[String(key)] = String(obj[key]);
}}
}}
}}
}};
// Capture console.log output
let consoleOutput = [];
const console = {{
log: function(...args) {{
consoleOutput.push(args.map(arg => String(arg)).join(' '));
}}
}};
// User script
try {{
{script_code}
}} catch (error) {{
consoleOutput.push('Error: ' + error.message);
throw error;
}}
// Output results: console logs + separator + variables JSON
print(consoleOutput.join('\\n'));
print('___ROSTER_VARIABLES___');
print(JSON.stringify(roster.__variables));
"""
# Execute with gjs
output, error, returncode = ScriptExecutor._run_gjs_script(
script_with_context,
timeout=ScriptExecutor.TIMEOUT_SECONDS
)
# Parse output and extract variables
console_output = ""
variable_updates = {}
warnings = []
if returncode == 0:
# Split output into console logs and variables JSON
parts = output.split('___ROSTER_VARIABLES___')
console_output = parts[0].strip()
if len(parts) > 1:
try:
variables_json = parts[1].strip()
raw_variables = json.loads(variables_json)
# Validate variable names and add to updates
for name, value in raw_variables.items():
if ScriptExecutor._is_valid_variable_name(name):
variable_updates[name] = value
else:
warnings.append(f"Invalid variable name '{name}' (must be alphanumeric + underscore)")
except json.JSONDecodeError:
warnings.append("Failed to parse variable updates from script")
return ScriptResult(
success=True,
output=console_output,
variable_updates=variable_updates,
warnings=warnings
)
else:
error_msg = error.strip() if error else "Script execution failed"
return ScriptResult(success=False, output="", error=error_msg)
@staticmethod
def execute_preprocessing_script(
script_code: str,
request,
context: Optional[ScriptContext] = None
) -> ScriptResult:
"""
Execute preprocessing script with request object.
Args:
script_code: JavaScript code to execute
request: HttpRequest object (will be modified)
context: Optional context for variable access and updates
Returns:
ScriptResult with output, modified request, variable updates, or error
"""
if not script_code.strip():
return ScriptResult(success=True, output="")
# Import here to avoid circular import
from .models import HttpRequest
# Create request object for JavaScript
request_obj = ScriptExecutor._create_request_object(request)
# Get environment variables (read-only)
env_vars = ScriptExecutor._get_environment_variables(context)
# Get project metadata
project_meta = ScriptExecutor._get_project_metadata(context)
# Build complete script with context
script_with_context = f"""
const request = {json.dumps(request_obj)};
// Roster API for variable management
const roster = {{
__variables: {{}}, // For setVariable
__envVariables: {json.dumps(env_vars)}, // Read-only current environment
getVariable: function(name) {{
return this.__envVariables[name] || null;
}},
setVariable: function(name, value) {{
this.__variables[String(name)] = String(value);
}},
setVariables: function(obj) {{
for (let key in obj) {{
if (obj.hasOwnProperty(key)) {{
this.__variables[String(key)] = String(obj[key]);
}}
}}
}},
project: {json.dumps(project_meta)}
}};
// Capture console.log output
let consoleOutput = [];
const console = {{
log: function(...args) {{
consoleOutput.push(args.map(arg => String(arg)).join(' '));
}}
}};
// User script
try {{
{script_code}
}} catch (error) {{
consoleOutput.push('Error: ' + error.message);
throw error;
}}
// Output results: modified request + separator + console logs + variables
print(JSON.stringify(request));
print('___ROSTER_SEPARATOR___');
print(consoleOutput.join('\\n'));
print('___ROSTER_VARIABLES___');
print(JSON.stringify(roster.__variables));
"""
# Execute with gjs
output, error, returncode = ScriptExecutor._run_gjs_script(
script_with_context,
timeout=ScriptExecutor.TIMEOUT_SECONDS
)
# Parse output and extract results
console_output = ""
variable_updates = {}
warnings = []
modified_request = None
if returncode == 0:
# Split output into: request JSON + console logs + variables JSON
parts = output.split('___ROSTER_SEPARATOR___')
if len(parts) >= 2:
request_json_str = parts[0].strip()
rest = parts[1]
# Parse modified request
try:
request_data = json.loads(request_json_str)
# Validate required fields
if not request_data.get('url') or not request_data.get('url').strip():
warnings.append("Modified request has empty URL")
return ScriptResult(
success=False,
output="",
error="Script produced invalid request: URL is empty",
warnings=warnings
)
# Create HttpRequest from modified data
modified_request = HttpRequest(
method=request_data.get('method', 'GET'),
url=request_data.get('url', ''),
headers=request_data.get('headers', {}),
body=request_data.get('body', ''),
syntax=request.syntax # Preserve syntax from original
)
except json.JSONDecodeError as e:
warnings.append(f"Failed to parse modified request: {str(e)}")
return ScriptResult(
success=False,
output="",
error="Script produced invalid request JSON",
warnings=warnings
)
# Parse console output and variables
var_parts = rest.split('___ROSTER_VARIABLES___')
console_output = var_parts[0].strip()
if len(var_parts) > 1:
try:
variables_json = var_parts[1].strip()
raw_variables = json.loads(variables_json)
# Validate variable names and add to updates
for name, value in raw_variables.items():
if ScriptExecutor._is_valid_variable_name(name):
variable_updates[name] = value
else:
warnings.append(f"Invalid variable name '{name}' (must be alphanumeric + underscore)")
except json.JSONDecodeError:
warnings.append("Failed to parse variable updates from script")
return ScriptResult(
success=True,
output=console_output,
variable_updates=variable_updates,
warnings=warnings,
modified_request=modified_request
)
else:
error_msg = error.strip() if error else "Script execution failed"
return ScriptResult(success=False, output="", error=error_msg)
@staticmethod
def _create_response_object(response) -> dict:
"""Convert HttpResponse to JavaScript-compatible dict."""
# Parse headers text into dict
headers = {}
if response.headers:
for line in response.headers.split('\n')[1:]: # Skip status line
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return {
'body': response.body,
'headers': headers,
'statusCode': response.status_code,
'statusText': response.status_text,
'responseTime': response.response_time_ms
}
@staticmethod
def _create_request_object(request) -> dict:
"""Convert HttpRequest to JavaScript-compatible dict."""
return {
'method': request.method,
'url': request.url,
'headers': dict(request.headers), # Convert to regular dict
'body': request.body
}
@staticmethod
def _get_environment_variables(context: Optional[ScriptContext]) -> dict:
"""Get environment variables from context."""
if not context or not context.project_id or not context.environment_id:
return {}
# Load projects and find the environment
projects = context.project_manager.load_projects()
for project in projects:
if project.id == context.project_id:
for env in project.environments:
if env.id == context.environment_id:
return env.variables
return {}
@staticmethod
def _get_project_metadata(context: Optional[ScriptContext]) -> dict:
"""Get project metadata (name and environments)."""
if not context or not context.project_id:
return {'name': '', 'environments': []}
# Load projects and find the project
projects = context.project_manager.load_projects()
for project in projects:
if project.id == context.project_id:
return {
'name': project.name,
'environments': [env.name for env in project.environments]
}
return {'name': '', 'environments': []}
@staticmethod
def _is_valid_variable_name(name: str) -> bool:
"""
Validate variable name (alphanumeric + underscore).
Args:
name: Variable name to validate
Returns:
True if valid, False otherwise
"""
return bool(re.match(r'^\w+$', name))
@staticmethod
def _run_gjs_script(script_code: str, timeout: int = 5) -> Tuple[str, str, int]:
"""
Run JavaScript code using gjs.
Args:
script_code: Complete JavaScript code
timeout: Execution timeout in seconds
Returns:
Tuple of (stdout, stderr, returncode)
"""
try:
# Write script to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
f.write(script_code)
script_path = f.name
try:
# Execute with gjs
result = subprocess.run(
['gjs', script_path],
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout, result.stderr, result.returncode
finally:
# Clean up temp file
Path(script_path).unlink(missing_ok=True)
except subprocess.TimeoutExpired:
return "", f"Script execution timeout ({timeout}s)", 1
except FileNotFoundError:
return "", "gjs not found. Please ensure GNOME JavaScript is installed.", 1
except Exception as e:
return "", f"Script execution error: {str(e)}", 1