Compare commits

...

6 Commits

10 changed files with 982 additions and 1 deletions

View File

@ -17,4 +17,8 @@ These are dynamically linked system libraries available on https://gitlab.gnome.
Icons are loaded at runtime from the system and are not distributed with this application. Icons are loaded at runtime from the system and are not distributed with this application.
## GNOME Icon Development Kit (CC0-1.0)
- `papyrus-vertical-symbolic.svg` — from https://gitlab.gnome.org/Teams/Design/icon-development-kit
- License: Creative Commons Zero v1.0 Universal (public domain dedication)
All licenses are compatible with GPL-3.0-or-later. All licenses are compatible with GPL-3.0-or-later.

View File

@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" height="16px" viewBox="0 0 16 16" width="16px"><path d="m 7.007812 -0.0078125 c -0.519531 0 -1.035156 0.1367185 -1.5 0.4023435 c -0.925781 0.535157 -1.5 1.527344 -1.5 2.597657 v 7 h -3.9999995 v 3 c 0 1.070312 0.5742185 2.066406 1.4999995 2.601562 c 0.464844 0.265625 0.984376 0.398438 1.5 0.398438 v 0.007812 l 7 -0.003906 v -0.003906 h 0.011719 c 0.796875 0 1.5625 -0.316407 2.121094 -0.875 c 0.5625 -0.5625 0.878906 -1.328126 0.878906 -2.125 c 0.003907 -0.058594 -0.003906 -0.117188 -0.011719 -0.175782 v -6.824218 h 3 v -3 c 0 -0.796876 -0.316406 -1.558594 -0.878906 -2.121094 s -1.324218 -0.8789065 -2.121094 -0.8789065 z m 0 2.0000005 c 0.171876 0 0.34375 0.046874 0.5 0.136718 c 0.3125 0.175782 0.5 0.503906 0.5 0.863282 v 3 h 3 v 7 h 0.011719 c 0 0.265624 -0.101562 0.519531 -0.292969 0.707031 c -0.1875 0.1875 -0.441406 0.292969 -0.707031 0.292969 c -0.023437 0 -0.046875 0 -0.070312 0.003906 l -4.121094 0.003906 c 0.113281 -0.320312 0.179687 -0.660156 0.179687 -1.007812 v -10 c 0 -0.359376 0.1875 -0.6875 0.5 -0.863282 c 0.15625 -0.089844 0.328126 -0.136718 0.5 -0.136718 z m 2.820313 0 h 3.179687 c 0.265626 0 0.519532 0.105468 0.707032 0.292968 s 0.292968 0.441406 0.292968 0.707032 v 1 h -4 v -1 c 0 -0.347657 -0.066406 -0.683594 -0.179687 -1 z m -7.820313 10 h 2 v 1 c 0 0.359374 -0.1875 0.6875 -0.5 0.867187 s -0.6875 0.179687 -1 0 s -0.5 -0.507813 -0.5 -0.867187 z m 0 0" fill="#222222"/></svg>

After

Width:  |  Height:  |  Size: 1.5 KiB

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" height="16px" viewBox="0 0 16 16" width="16px">
<g fill="none" stroke="#222222" stroke-linecap="round" stroke-linejoin="round">
<!-- Rounded rectangle frame -->
<rect x="1.5" y="1.5" width="13" height="13" rx="2.25" ry="2.25" stroke-width="1.5"/>
<!-- Bold W letter -->
<path stroke-width="2.1" d="M 3.25 5 L 5.25 11 L 8 8 L 10.75 11 L 12.75 5"/>
</g>
</svg>

After

Width:  |  Height:  |  Size: 458 B

View File

@ -1,5 +1,5 @@
project('roster', project('roster',
version: '0.8.4', version: '0.9.0',
meson_version: '>= 1.0.0', meson_version: '>= 1.0.0',
default_options: [ 'warning_level=2', 'werror=false', ], default_options: [ 'warning_level=2', 'werror=false', ],
) )

View File

@ -48,6 +48,16 @@
</style> </style>
</object> </object>
</child> </child>
<child type="end">
<object class="GtkButton" id="import_wsdl_button">
<property name="icon-name">papyrus-vertical-symbolic</property>
<property name="tooltip-text">Import from WSDL</property>
<signal name="clicked" handler="on_import_wsdl_clicked"/>
<style>
<class name="flat"/>
</style>
</object>
</child>
</object> </object>
</child> </child>

View File

@ -44,6 +44,8 @@ roster_sources = [
'preferences_dialog.py', 'preferences_dialog.py',
'request_tab_widget.py', 'request_tab_widget.py',
'script_executor.py', 'script_executor.py',
'wsdl_importer.py',
'wsdl_import_dialog.py',
] ]
install_data(roster_sources, install_dir: moduledir) install_data(roster_sources, install_dir: moduledir)

View File

@ -8,6 +8,8 @@
<file preprocess="xml-stripblanks">environments-dialog.ui</file> <file preprocess="xml-stripblanks">environments-dialog.ui</file>
<file preprocess="xml-stripblanks">export-dialog.ui</file> <file preprocess="xml-stripblanks">export-dialog.ui</file>
<file alias="icons/export-symbolic.svg">../data/icons/hicolor/symbolic/apps/export-symbolic.svg</file> <file alias="icons/export-symbolic.svg">../data/icons/hicolor/symbolic/apps/export-symbolic.svg</file>
<file alias="icons/wsdl-import-symbolic.svg">../data/icons/hicolor/symbolic/apps/wsdl-import-symbolic.svg</file>
<file alias="icons/papyrus-vertical-symbolic.svg">../data/icons/hicolor/symbolic/apps/papyrus-vertical-symbolic.svg</file>
<file preprocess="xml-stripblanks">widgets/header-row.ui</file> <file preprocess="xml-stripblanks">widgets/header-row.ui</file>
<file preprocess="xml-stripblanks">widgets/history-item.ui</file> <file preprocess="xml-stripblanks">widgets/history-item.ui</file>
<file preprocess="xml-stripblanks">widgets/project-item.ui</file> <file preprocess="xml-stripblanks">widgets/project-item.ui</file>

View File

@ -36,6 +36,7 @@ from .project_manager import ProjectManager
from .tab_manager import TabManager from .tab_manager import TabManager
from .icon_picker_dialog import IconPickerDialog from .icon_picker_dialog import IconPickerDialog
from .environments_dialog import EnvironmentsDialog from .environments_dialog import EnvironmentsDialog
from .wsdl_import_dialog import WsdlImportDialog
from .request_tab_widget import RequestTabWidget from .request_tab_widget import RequestTabWidget
from .widgets.history_item import HistoryItem from .widgets.history_item import HistoryItem
from .widgets.project_item import ProjectItem from .widgets.project_item import ProjectItem
@ -67,6 +68,8 @@ class RosterWindow(Adw.ApplicationWindow):
projects_listbox = Gtk.Template.Child() projects_listbox = Gtk.Template.Child()
add_project_button = Gtk.Template.Child() add_project_button = Gtk.Template.Child()
import_wsdl_button = Gtk.Template.Child()
# History (hidden but kept for compatibility) # History (hidden but kept for compatibility)
history_listbox = Gtk.Template.Child() history_listbox = Gtk.Template.Child()
@ -1410,6 +1413,26 @@ class RosterWindow(Adw.ApplicationWindow):
widget.original_request = None widget.original_request = None
widget.modified = True widget.modified = True
@Gtk.Template.Callback()
def on_import_wsdl_clicked(self, button):
"""Open the WSDL import dialog."""
projects = self.project_manager.load_projects()
dialog = WsdlImportDialog(self.project_manager, projects)
dialog.connect('import-completed', self._on_wsdl_import_completed)
dialog.present(self)
def _on_wsdl_import_completed(self, dialog, project_id: str):
"""Refresh sidebar after a successful WSDL import."""
self._load_projects()
projects = self.project_manager.load_projects()
count = 0
for p in projects:
if p.id == project_id:
count = len(p.requests)
name = p.name
break
self._show_toast(f"Imported {count} operation(s) into '{name}'")
def _on_delete_request(self, widget, saved_request, project): def _on_delete_request(self, widget, saved_request, project):
"""Delete saved request with confirmation.""" """Delete saved request with confirmation."""
dialog = Adw.AlertDialog() dialog = Adw.AlertDialog()

355
src/wsdl_import_dialog.py Normal file
View File

@ -0,0 +1,355 @@
# wsdl_import_dialog.py
#
# Copyright 2025 Pavel Baksy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import gi
gi.require_version('Soup', '3.0')
from gi.repository import Adw, Gtk, GObject, GLib, Soup
from typing import List
from .wsdl_importer import parse_wsdl, build_http_request, WsdlParseResult, WsdlOperation
class WsdlImportDialog(Adw.Dialog):
"""Two-step dialog: fetch WSDL URL → select operations → import."""
__gtype_name__ = 'WsdlImportDialog'
__gsignals__ = {
'import-completed': (GObject.SIGNAL_RUN_FIRST, None, (str,)),
}
def __init__(self, project_manager, existing_projects):
super().__init__()
self.set_title("Import from WSDL")
self.set_content_width(560)
self.set_content_height(540)
self._pm = project_manager
self._existing_projects = list(existing_projects)
self._wsdl_result: WsdlParseResult | None = None
self._op_checkboxes: List[tuple[WsdlOperation, Gtk.CheckButton]] = []
self._soup = Soup.Session.new()
self._build_ui()
# ------------------------------------------------------------------ build
def _build_ui(self):
tv = Adw.ToolbarView()
tv.add_top_bar(Adw.HeaderBar())
self._stack = Gtk.Stack()
self._stack.set_transition_type(Gtk.StackTransitionType.SLIDE_LEFT_RIGHT)
self._stack.set_transition_duration(200)
self._stack.add_named(self._make_url_page(), "url")
self._stack.add_named(self._make_ops_page(), "ops")
tv.set_content(self._stack)
ab = Gtk.ActionBar()
self._back_btn = Gtk.Button(label="Back")
self._back_btn.connect("clicked", lambda _: self._show_url_page())
ab.pack_start(self._back_btn)
self._fetch_btn = Gtk.Button(label="Fetch WSDL")
self._fetch_btn.add_css_class("suggested-action")
self._fetch_btn.connect("clicked", self._on_fetch)
ab.pack_end(self._fetch_btn)
self._import_btn = Gtk.Button(label="Import")
self._import_btn.add_css_class("suggested-action")
self._import_btn.connect("clicked", self._on_import)
ab.pack_end(self._import_btn)
tv.add_bottom_bar(ab)
self.set_child(tv)
self._show_url_page()
def _make_url_page(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=16)
box.set_valign(Gtk.Align.CENTER)
box.set_vexpand(True)
box.set_margin_top(32)
box.set_margin_bottom(24)
box.set_margin_start(24)
box.set_margin_end(24)
icon = Gtk.Image.new_from_icon_name("network-server-symbolic")
icon.set_pixel_size(64)
icon.add_css_class("dim-label")
box.append(icon)
title = Gtk.Label(label="Import from WSDL")
title.add_css_class("title-2")
box.append(title)
subtitle = Gtk.Label(label="Enter the URL of a WSDL 1.1 or 2.0 document to discover SOAP operations")
subtitle.add_css_class("dim-label")
subtitle.set_wrap(True)
subtitle.set_justify(Gtk.Justification.CENTER)
box.append(subtitle)
grp = Adw.PreferencesGroup()
self._url_row = Adw.EntryRow()
self._url_row.set_title("WSDL URL")
self._url_row.set_input_purpose(Gtk.InputPurpose.URL)
self._url_row.connect("entry-activated", lambda _: self._on_fetch(None))
grp.add(self._url_row)
box.append(grp)
self._err_label = Gtk.Label()
self._err_label.add_css_class("error")
self._err_label.set_wrap(True)
self._err_label.set_visible(False)
box.append(self._err_label)
self._spinner = Gtk.Spinner()
self._spinner.set_size_request(32, 32)
self._spinner.set_halign(Gtk.Align.CENTER)
self._spinner.set_visible(False)
box.append(self._spinner)
return box
def _make_ops_page(self) -> Gtk.Widget:
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
outer.set_margin_top(14)
outer.set_margin_bottom(14)
outer.set_margin_start(16)
outer.set_margin_end(16)
# --- Endpoint row (Name removed per UX feedback) ---
svc_grp = Adw.PreferencesGroup()
svc_grp.set_title("Service")
self._svc_url_row = Adw.ActionRow()
self._svc_url_row.set_title("Endpoint")
svc_grp.add(self._svc_url_row)
outer.append(svc_grp)
# --- Operations header ---
ops_hdr = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
ops_hdr.set_margin_top(2)
ops_lbl = Gtk.Label(label="Operations")
ops_lbl.add_css_class("title-4")
ops_lbl.set_hexpand(True)
ops_lbl.set_xalign(0)
ops_hdr.append(ops_lbl)
self._sel_all_btn = Gtk.Button(label="Deselect All")
self._sel_all_btn.add_css_class("flat")
self._sel_all_btn.connect("clicked", self._on_select_all)
ops_hdr.append(self._sel_all_btn)
outer.append(ops_hdr)
# --- Scrollable operations list ---
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
scroll.set_min_content_height(80)
self._ops_list = Gtk.ListBox()
self._ops_list.add_css_class("boxed-list")
self._ops_list.set_selection_mode(Gtk.SelectionMode.NONE)
scroll.set_child(self._ops_list)
outer.append(scroll)
# --- Import target (compact: 1 row per option with suffix widget) ---
tgt_grp = Adw.PreferencesGroup()
tgt_grp.set_title("Import to")
tgt_grp.set_margin_top(2)
# Row: [radio] Create new project [entry: name]
new_row = Adw.ActionRow()
new_row.set_title("Create new project")
self._new_radio = Gtk.CheckButton()
self._new_radio.set_active(True)
new_row.add_prefix(self._new_radio)
new_row.set_activatable_widget(self._new_radio)
self._new_name_entry = Gtk.Entry()
self._new_name_entry.set_placeholder_text("Project name")
self._new_name_entry.set_hexpand(True)
self._new_name_entry.set_valign(Gtk.Align.CENTER)
new_row.add_suffix(self._new_name_entry)
tgt_grp.add(new_row)
if self._existing_projects:
# Row: [radio] Add to existing project [dropdown]
exist_row = Adw.ActionRow()
exist_row.set_title("Add to existing project")
self._exist_radio = Gtk.CheckButton()
self._exist_radio.set_group(self._new_radio)
exist_row.add_prefix(self._exist_radio)
exist_row.set_activatable_widget(self._exist_radio)
proj_list = Gtk.StringList.new([p.name for p in self._existing_projects])
self._proj_dd = Gtk.DropDown(model=proj_list)
self._proj_dd.set_valign(Gtk.Align.CENTER)
self._proj_dd.set_sensitive(False)
exist_row.add_suffix(self._proj_dd)
tgt_grp.add(exist_row)
self._new_radio.connect("toggled", self._on_target_toggled)
else:
self._exist_radio = None
self._proj_dd = None
outer.append(tgt_grp)
return outer
# ----------------------------------------------------------- page control
def _show_url_page(self):
self._stack.set_visible_child_name("url")
self._back_btn.set_visible(False)
self._fetch_btn.set_visible(True)
self._import_btn.set_visible(False)
def _show_ops_page(self):
self._stack.set_visible_child_name("ops")
self._back_btn.set_visible(True)
self._fetch_btn.set_visible(False)
self._import_btn.set_visible(True)
# --------------------------------------------------------- event handlers
def _on_target_toggled(self, _radio):
is_new = self._new_radio.get_active()
self._new_name_entry.set_sensitive(is_new)
if self._proj_dd:
self._proj_dd.set_sensitive(not is_new)
def _on_select_all(self, _btn):
all_on = all(cb.get_active() for _, cb in self._op_checkboxes)
new_state = not all_on
for _, cb in self._op_checkboxes:
cb.set_active(new_state)
self._sel_all_btn.set_label("Deselect All" if new_state else "Select All")
def _on_fetch(self, _btn):
url = self._url_row.get_text().strip()
if not url:
self._set_err("Please enter a WSDL URL")
return
if not url.startswith(('http://', 'https://')):
self._set_err("URL must start with http:// or https://")
return
self._err_label.set_visible(False)
self._spinner.set_visible(True)
self._spinner.start()
self._fetch_btn.set_sensitive(False)
try:
msg = Soup.Message.new("GET", url)
except Exception as e:
self._fetch_failed(f"Invalid URL: {e}")
return
msg.get_request_headers().append("Accept", "text/xml, application/xml, */*")
self._soup.send_and_read_async(msg, GLib.PRIORITY_DEFAULT, None,
self._on_downloaded, msg)
def _on_downloaded(self, session, async_result, msg):
self._spinner.stop()
self._spinner.set_visible(False)
self._fetch_btn.set_sensitive(True)
try:
bytes_data = session.send_and_read_finish(async_result)
except GLib.Error as e:
self._set_err(f"Download failed: {e.message}")
return
status = msg.get_status()
if not (200 <= status < 300):
self._set_err(f"Server returned HTTP {status}")
return
xml = bytes_data.get_data().decode('utf-8', errors='replace')
parsed = parse_wsdl(xml)
if parsed.error:
self._set_err(parsed.error)
return
self._wsdl_result = parsed
self._populate_ops_page(parsed)
self._show_ops_page()
def _on_import(self, _btn):
if not self._wsdl_result:
return
selected = [op for op, cb in self._op_checkboxes if cb.get_active()]
if not selected:
return
if self._new_radio.get_active():
name = self._new_name_entry.get_text().strip()
if not name:
return
project = self._pm.add_project(name)
project_id = project.id
elif self._exist_radio and self._exist_radio.get_active() and self._proj_dd:
idx = self._proj_dd.get_selected()
if idx >= len(self._existing_projects):
return
project_id = self._existing_projects[idx].id
else:
return
for op in selected:
http_req = build_http_request(op)
self._pm.add_request(project_id, op.name, http_req)
self.emit('import-completed', project_id)
self.close()
# ----------------------------------------------------------------- helpers
def _set_err(self, msg: str):
self._err_label.set_text(msg)
self._err_label.set_visible(True)
def _fetch_failed(self, msg: str):
self._spinner.stop()
self._spinner.set_visible(False)
self._fetch_btn.set_sensitive(True)
self._set_err(msg)
def _populate_ops_page(self, result: WsdlParseResult):
self._svc_url_row.set_subtitle(result.endpoint_url or "Not specified")
while child := self._ops_list.get_first_child():
self._ops_list.remove(child)
self._op_checkboxes.clear()
# Sort alphabetically
for op in sorted(result.operations, key=lambda o: o.name.lower()):
row = Adw.ActionRow()
row.set_title(op.name)
if op.soap_action:
row.set_subtitle(f"SOAPAction: {op.soap_action}")
cb = Gtk.CheckButton()
cb.set_active(True)
row.add_prefix(cb)
row.set_activatable_widget(cb)
self._ops_list.append(row)
self._op_checkboxes.append((op, cb))
# Pre-fill project name; all ops checked → "Deselect All"
self._new_name_entry.set_text(result.service_name)
self._sel_all_btn.set_label("Deselect All")

574
src/wsdl_importer.py Normal file
View File

@ -0,0 +1,574 @@
# wsdl_importer.py
#
# Copyright 2025 Pavel Baksy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
# WSDL 1.1
_WSDL = 'http://schemas.xmlsoap.org/wsdl/'
_SOAP11 = 'http://schemas.xmlsoap.org/wsdl/soap/'
_SOAP12 = 'http://schemas.xmlsoap.org/wsdl/soap12/'
_ENV11 = 'http://schemas.xmlsoap.org/soap/envelope/'
_ENV12 = 'http://www.w3.org/2003/05/soap-envelope'
# WSDL 2.0
_WSDL2 = 'http://www.w3.org/ns/wsdl'
_WSDL2_SOAP = 'http://www.w3.org/ns/wsdl/soap'
# XML Schema
_XS = 'http://www.w3.org/2001/XMLSchema'
# XSD simple-type → human hint
_XS_HINTS: Dict[str, str] = {
'string': 'string', 'normalizedString': 'string', 'token': 'string',
'int': 'int', 'integer': 'int', 'nonNegativeInteger': 'int',
'positiveInteger': 'int', 'negativeInteger': 'int',
'short': 'int', 'byte': 'int', 'unsignedByte': 'int',
'unsignedInt': 'int', 'unsignedShort': 'int',
'long': 'long', 'unsignedLong': 'long',
'boolean': 'boolean',
'float': 'float', 'double': 'float', 'decimal': 'decimal',
'dateTime': 'datetime', 'date': 'date', 'time': 'time',
'base64Binary': 'base64', 'hexBinary': 'hex',
'anyType': 'any', 'anySimpleType': 'any',
'duration': 'duration', 'guid': 'guid',
}
def _q(ns: str, tag: str) -> str:
return f'{{{ns}}}{tag}'
def _local(tag: str) -> str:
return tag.split('}')[-1] if '}' in tag else tag
def _hint(xs_local: str, optional: bool) -> str:
base = _XS_HINTS.get(xs_local, xs_local)
return f'[{base}{"?" if optional else ""}]'
# ---------------------------------------------------------------------------
# Public data classes
# ---------------------------------------------------------------------------
@dataclass
class WsdlOperation:
name: str
soap_action: str
endpoint_url: str
soap_version: str # '1.1' or '1.2'
target_namespace: str
body_template: str
@dataclass
class WsdlParseResult:
service_name: str
endpoint_url: str
operations: List[WsdlOperation] = field(default_factory=list)
error: Optional[str] = None
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def parse_wsdl(xml_content: str) -> WsdlParseResult:
"""Auto-detect WSDL 1.1 / 2.0 and return service info + operations."""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
return WsdlParseResult(service_name='', endpoint_url='', error=f'Invalid XML: {e}')
local = _local(root.tag)
if local == 'definitions':
return _parse_wsdl11(root)
elif local == 'description':
return _parse_wsdl20(root)
else:
return WsdlParseResult(
service_name='', endpoint_url='',
error='Document is not a valid WSDL 1.1 (definitions) or WSDL 2.0 (description) file'
)
def build_http_request(operation: WsdlOperation):
"""Convert a WsdlOperation into an HttpRequest ready to send."""
from .models import HttpRequest
headers: Dict[str, str] = {}
if operation.soap_version == '1.2':
ct = 'application/soap+xml; charset=utf-8'
if operation.soap_action:
ct += f'; action="{operation.soap_action}"'
headers['Content-Type'] = ct
else:
headers['Content-Type'] = 'text/xml; charset=utf-8'
if operation.soap_action:
headers['SOAPAction'] = f'"{operation.soap_action}"'
return HttpRequest(
method='POST',
url=operation.endpoint_url,
headers=headers,
body=operation.body_template,
syntax='XML',
)
# ---------------------------------------------------------------------------
# WSDL 1.1 parser
# ---------------------------------------------------------------------------
def _parse_wsdl11(root: ET.Element) -> WsdlParseResult:
target_ns = root.get('targetNamespace', '')
service_name = root.get('name', '') or 'WSDL Service'
# Service name + endpoint URL
service_el = root.find(_q(_WSDL, 'service'))
if service_el is None:
service_el = root.find('service')
if service_el is not None and service_el.get('name'):
service_name = service_el.get('name')
endpoint_url = ''
default_soap_ver = '1.1'
if service_el is not None:
for port in service_el:
addr11 = port.find(_q(_SOAP11, 'address'))
if addr11 is not None:
endpoint_url = addr11.get('location', '')
default_soap_ver = '1.1'
break
addr12 = port.find(_q(_SOAP12, 'address'))
if addr12 is not None:
endpoint_url = addr12.get('location', '')
default_soap_ver = '1.2'
break
# Collect (soap_action, soap_version) per operation from bindings
op_info: Dict[str, Tuple[str, str]] = {}
for binding in root.iter():
if _local(binding.tag) != 'binding':
continue
is11 = binding.find(_q(_SOAP11, 'binding')) is not None
is12 = binding.find(_q(_SOAP12, 'binding')) is not None
if not is11 and not is12:
continue
bv = '1.2' if is12 else '1.1'
for op in binding:
if _local(op.tag) != 'operation':
continue
op_name = _local(op.get('name', ''))
if not op_name:
continue
soap_op = op.find(_q(_SOAP11, 'operation'))
if soap_op is None:
soap_op = op.find(_q(_SOAP12, 'operation'))
action = soap_op.get('soapAction', '') if soap_op is not None else ''
if op_name not in op_info:
op_info[op_name] = (action, bv)
# Fallback: portType when no SOAP binding found
if not op_info:
for pt in root.iter():
if _local(pt.tag) != 'portType':
continue
for op in pt:
if _local(op.tag) == 'operation':
op_name = op.get('name', '')
if op_name and op_name not in op_info:
op_info[op_name] = ('', default_soap_ver)
if not op_info:
return WsdlParseResult(
service_name=service_name, endpoint_url=endpoint_url,
error='No SOAP operations found in this WSDL document'
)
# Build schema maps for parameter extraction
elem_map, type_map = _build_schema_maps(root, _WSDL)
operations = []
for op_name, (action, ver) in op_info.items():
params = _extract_params_wsdl11(root, op_name, elem_map, type_map)
body = _build_envelope(op_name, target_ns, ver, params)
operations.append(WsdlOperation(
name=op_name, soap_action=action, endpoint_url=endpoint_url,
soap_version=ver, target_namespace=target_ns, body_template=body,
))
return WsdlParseResult(
service_name=service_name or 'WSDL Service',
endpoint_url=endpoint_url,
operations=operations,
)
def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]:
"""Return [(param_name, hint), …] for the input of a WSDL 1.1 operation."""
# Walk portType → message → part element
input_elem_name = _find_input_elem_wsdl11(root, op_name)
# Naming-convention fallback
if not input_elem_name:
for candidate in [op_name, op_name + 'Request', op_name + 'Input']:
if candidate in elem_map:
input_elem_name = candidate
break
if not input_elem_name or input_elem_name not in elem_map:
return []
return _parse_element(elem_map[input_elem_name], elem_map, type_map)
def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]:
"""Walk portType → input message → part/@element and return local element name."""
for pt in root.iter():
if _local(pt.tag) != 'portType':
continue
for op in pt:
if _local(op.tag) != 'operation' or op.get('name') != op_name:
continue
inp = op.find(_q(_WSDL, 'input'))
if inp is None:
inp = op.find('input')
if inp is None:
return None
msg_local = (inp.get('message') or '').split(':')[-1]
for msg in root.iter():
if _local(msg.tag) != 'message' or msg.get('name') != msg_local:
continue
for part in msg:
if _local(part.tag) != 'part':
continue
elem_ref = part.get('element', '')
if elem_ref:
return elem_ref.split(':')[-1]
return None
return None
# ---------------------------------------------------------------------------
# WSDL 2.0 parser
# ---------------------------------------------------------------------------
def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
target_ns = root.get('targetNamespace', '')
service_name = root.get('name', '') or 'WSDL Service'
# Service element
service_el = root.find(_q(_WSDL2, 'service'))
if service_el is None:
service_el = root.find('service')
if service_el is not None and service_el.get('name'):
service_name = service_el.get('name')
# Endpoint address
endpoint_url = ''
if service_el is not None:
for ep in service_el:
if _local(ep.tag) == 'endpoint':
addr = ep.get('address', '')
if addr:
endpoint_url = addr
break
# Determine which binding the service uses
service_binding_local = None
if service_el is not None:
for ep in service_el:
if _local(ep.tag) == 'endpoint':
b_ref = ep.get('binding', '')
service_binding_local = b_ref.split(':')[-1]
break
# Collect (soap_action) per operation from SOAP bindings
# binding_local_name → {op_local → soap_action}
binding_ops: Dict[str, Dict[str, str]] = {}
for binding in root.iter():
if _local(binding.tag) != 'binding':
continue
b_name = binding.get('name', '')
b_type = binding.get('type', '')
# Check for SOAP binding type or SOAP child elements
is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower())
if not is_soap:
is_soap = any(
_WSDL2_SOAP in (child.tag or '') or 'soap' in _local(child.tag).lower()
for child in binding
)
if not is_soap:
continue
ops_actions: Dict[str, str] = {}
for child in binding:
if _local(child.tag) != 'operation':
continue
ref = (child.get('ref') or '').split(':')[-1]
# SOAPAction may be a namespaced attribute
action = (
child.get(_q(_WSDL2_SOAP, 'action'))
or child.get('action')
or ''
)
if ref:
ops_actions[ref] = action
binding_ops[b_name] = ops_actions
# Choose the right binding's operations
op_info: Dict[str, str] = {} # op_name → soap_action
if service_binding_local and service_binding_local in binding_ops:
op_info = binding_ops[service_binding_local]
else:
# Merge all SOAP binding operations
for ops in binding_ops.values():
for op_name, action in ops.items():
if op_name not in op_info:
op_info[op_name] = action
# Fallback: collect from interface operations
if not op_info:
for iface in root.iter():
if _local(iface.tag) != 'interface':
continue
for op in iface:
if _local(op.tag) == 'operation':
op_name = op.get('name', '')
if op_name and op_name not in op_info:
op_info[op_name] = ''
if not op_info:
return WsdlParseResult(
service_name=service_name, endpoint_url=endpoint_url,
error='No SOAP operations found in WSDL 2.0 document'
)
# Build schema maps
elem_map, type_map = _build_schema_maps(root, _WSDL2)
operations = []
for op_name, action in op_info.items():
params = _extract_params_wsdl20(root, op_name, elem_map, type_map)
body = _build_envelope(op_name, target_ns, '1.2', params)
operations.append(WsdlOperation(
name=op_name, soap_action=action, endpoint_url=endpoint_url,
soap_version='1.2', target_namespace=target_ns, body_template=body,
))
return WsdlParseResult(
service_name=service_name or 'WSDL Service',
endpoint_url=endpoint_url,
operations=operations,
)
def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]:
"""Return [(param_name, hint), …] for the input of a WSDL 2.0 operation."""
for iface in root.iter():
if _local(iface.tag) != 'interface':
continue
for op in iface:
if _local(op.tag) != 'operation' or op.get('name') != op_name:
continue
for child in op:
if _local(child.tag) == 'input':
elem_ref = (child.get('element') or '').split(':')[-1]
if elem_ref in elem_map:
return _parse_element(elem_map[elem_ref], elem_map, type_map)
# Naming-convention fallback
for candidate in [op_name, op_name + 'Request', op_name + 'Input']:
if candidate in elem_map:
return _parse_element(elem_map[candidate], elem_map, type_map)
return []
# ---------------------------------------------------------------------------
# XSD schema helpers
# ---------------------------------------------------------------------------
def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]:
"""Build {local_name: element} and {local_name: complexType} maps from <types>."""
elem_map: Dict[str, ET.Element] = {}
type_map: Dict[str, ET.Element] = {}
types_el = root.find(_q(wsdl_ns, 'types'))
if types_el is None:
types_el = root.find('types')
if types_el is None:
return elem_map, type_map
for node in types_el.iter():
if _local(node.tag) != 'schema':
continue
for child in node:
name = child.get('name', '')
if not name:
continue
loc = _local(child.tag)
if loc == 'element':
elem_map[name] = child
elif loc == 'complexType':
type_map[name] = child
return elem_map, type_map
def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from an xs:element (inline complexType or type=ref)."""
if depth > 4:
return []
# Inline complexType
ct = elem.find(_q(_XS, 'complexType'))
if ct is not None:
return _parse_complex_type(ct, elem_map, type_map, depth)
# Named type reference
type_ref = elem.get('type', '')
type_local = type_ref.split(':')[-1] if type_ref else ''
if type_local:
if type_local in _XS_HINTS:
return [] # simple scalar — not a parameter container
ct = type_map.get(type_local)
if ct is not None:
return _parse_complex_type(ct, elem_map, type_map, depth)
return []
def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from an xs:complexType."""
params: List[Tuple[str, str]] = []
# xs:complexContent / xs:extension (inheritance)
cc = ct.find(_q(_XS, 'complexContent'))
if cc is not None:
ext = cc.find(_q(_XS, 'extension'))
if ext is not None:
base_local = (ext.get('base') or '').split(':')[-1]
base_ct = type_map.get(base_local)
if base_ct is not None:
params.extend(_parse_complex_type(base_ct, elem_map, type_map, depth + 1))
for tag in ('sequence', 'all', 'choice'):
seq = ext.find(_q(_XS, tag))
if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth))
break
return params
# Direct sequence / all / choice
for tag in ('sequence', 'all', 'choice'):
seq = ct.find(_q(_XS, tag))
if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth))
break
return params
def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from xs:sequence / xs:all / xs:choice."""
params: List[Tuple[str, str]] = []
choice_optional = _local(seq.tag) == 'choice'
for child in seq:
loc = _local(child.tag)
if loc == 'element':
name = child.get('name', '')
if not name:
ref_local = (child.get('ref') or '').split(':')[-1]
if ref_local:
name = ref_local
if not name:
continue
optional = choice_optional or child.get('minOccurs', '1') == '0'
type_ref = child.get('type', '')
type_local = type_ref.split(':')[-1] if type_ref else ''
if type_local and type_local in _XS_HINTS:
params.append((name, _hint(type_local, optional)))
else:
# Inline or referenced complex type — mark as [any] at this depth
inline_ct = child.find(_q(_XS, 'complexType'))
if inline_ct is not None and depth < 2:
sub = _parse_complex_type(inline_ct, elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional)))
elif type_local and type_local in type_map and depth < 2:
sub = _parse_complex_type(type_map[type_local], elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional)))
else:
params.append((name, _hint('anyType', optional)))
elif loc in ('sequence', 'all', 'choice') and depth < 3:
params.extend(_parse_sequence(child, elem_map, type_map, depth + 1))
return params
# ---------------------------------------------------------------------------
# SOAP envelope builder
# ---------------------------------------------------------------------------
def _build_envelope(op_name: str, target_ns: str, soap_version: str,
params: Optional[List[Tuple[str, str]]] = None) -> str:
env_ns = _ENV12 if soap_version == '1.2' else _ENV11
if params:
# Default-namespace style → parameters inherit namespace, no prefix needed
ns_attr = f' xmlns="{target_ns}"' if target_ns else ''
lines = [f' <{op_name}{ns_attr}>']
for pname, phint in params:
lines.append(f' <{pname}>{phint}</{pname}>')
lines.append(f' </{op_name}>')
body = '\n'.join(lines)
return (
f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}">\n'
f' <soap:Header/>\n'
f' <soap:Body>\n'
f'{body}\n'
f' </soap:Body>\n'
f'</soap:Envelope>'
)
else:
ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else ''
op_el = f'tns:{op_name}' if target_ns else op_name
return (
f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}"{ns_decl}>\n'
f' <soap:Header/>\n'
f' <soap:Body>\n'
f' <{op_el}>\n'
f' <!-- Add parameters here -->\n'
f' </{op_el}>\n'
f' </soap:Body>\n'
f'</soap:Envelope>'
)