Improve WSDL import: WSDL 2.0 support, param extraction, UI cleanup

This commit is contained in:
Pavel Baksy 2026-05-12 12:46:42 +02:00
parent da0f2e02f8
commit 037691eb5c
2 changed files with 536 additions and 163 deletions

View File

@ -31,7 +31,6 @@ class WsdlImportDialog(Adw.Dialog):
__gtype_name__ = 'WsdlImportDialog'
__gsignals__ = {
# Emitted after a successful import; carries the target project_id
'import-completed': (GObject.SIGNAL_RUN_FIRST, None, (str,)),
}
@ -100,7 +99,7 @@ class WsdlImportDialog(Adw.Dialog):
title.add_css_class("title-2")
box.append(title)
subtitle = Gtk.Label(label="Enter the URL of a WSDL document to discover SOAP operations")
subtitle = Gtk.Label(label="Enter the URL of a WSDL 1.1 or 2.0 document to discover SOAP operations")
subtitle.add_css_class("dim-label")
subtitle.set_wrap(True)
subtitle.set_justify(Gtk.Justification.CENTER)
@ -129,83 +128,83 @@ class WsdlImportDialog(Adw.Dialog):
return box
def _make_ops_page(self) -> Gtk.Widget:
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
outer.set_margin_top(16)
outer.set_margin_bottom(16)
outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
outer.set_margin_top(14)
outer.set_margin_bottom(14)
outer.set_margin_start(16)
outer.set_margin_end(16)
# Service info summary
self._svc_group = Adw.PreferencesGroup()
self._svc_group.set_title("Service")
self._svc_name_row = Adw.ActionRow()
self._svc_name_row.set_title("Name")
self._svc_group.add(self._svc_name_row)
# --- Endpoint row (Name removed per UX feedback) ---
svc_grp = Adw.PreferencesGroup()
svc_grp.set_title("Service")
self._svc_url_row = Adw.ActionRow()
self._svc_url_row.set_title("Endpoint")
self._svc_group.add(self._svc_url_row)
outer.append(self._svc_group)
svc_grp.add(self._svc_url_row)
outer.append(svc_grp)
# Operations header row
# --- Operations header ---
ops_hdr = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
ops_hdr.set_margin_top(4)
ops_hdr.set_margin_top(2)
ops_lbl = Gtk.Label(label="Operations")
ops_lbl.add_css_class("title-4")
ops_lbl.set_hexpand(True)
ops_lbl.set_xalign(0)
ops_hdr.append(ops_lbl)
self._sel_all_btn = Gtk.Button(label="Select All")
self._sel_all_btn = Gtk.Button(label="Deselect All")
self._sel_all_btn.add_css_class("flat")
self._sel_all_btn.connect("clicked", self._on_select_all)
ops_hdr.append(self._sel_all_btn)
outer.append(ops_hdr)
# Scrollable operations list
# --- Scrollable operations list ---
scroll = Gtk.ScrolledWindow()
scroll.set_vexpand(True)
scroll.set_min_content_height(100)
scroll.set_min_content_height(80)
self._ops_list = Gtk.ListBox()
self._ops_list.add_css_class("boxed-list")
self._ops_list.set_selection_mode(Gtk.SelectionMode.NONE)
scroll.set_child(self._ops_list)
outer.append(scroll)
# Import target
# --- Import target (compact: 1 row per option with suffix widget) ---
tgt_grp = Adw.PreferencesGroup()
tgt_grp.set_title("Import to")
tgt_grp.set_margin_top(4)
tgt_grp.set_margin_top(2)
# Row: [radio] Create new project [entry: name]
new_row = Adw.ActionRow()
new_row.set_title("Create new project")
self._new_radio = Gtk.CheckButton()
self._new_radio.set_active(True)
new_row.add_prefix(self._new_radio)
new_row.set_activatable_widget(self._new_radio)
self._new_name_entry = Gtk.Entry()
self._new_name_entry.set_placeholder_text("Project name")
self._new_name_entry.set_hexpand(True)
self._new_name_entry.set_valign(Gtk.Align.CENTER)
new_row.add_suffix(self._new_name_entry)
tgt_grp.add(new_row)
self._new_name_row = Adw.EntryRow()
self._new_name_row.set_title("Project name")
tgt_grp.add(self._new_name_row)
if self._existing_projects:
# Row: [radio] Add to existing project [dropdown]
exist_row = Adw.ActionRow()
exist_row.set_title("Add to existing project")
self._exist_radio = Gtk.CheckButton()
self._exist_radio.set_group(self._new_radio)
exist_row.add_prefix(self._exist_radio)
exist_row.set_activatable_widget(self._exist_radio)
tgt_grp.add(exist_row)
self._proj_combo = Adw.ComboRow()
self._proj_combo.set_title("Project")
self._proj_combo.set_model(Gtk.StringList.new([p.name for p in self._existing_projects]))
self._proj_combo.set_sensitive(False)
tgt_grp.add(self._proj_combo)
proj_list = Gtk.StringList.new([p.name for p in self._existing_projects])
self._proj_dd = Gtk.DropDown(model=proj_list)
self._proj_dd.set_valign(Gtk.Align.CENTER)
self._proj_dd.set_sensitive(False)
exist_row.add_suffix(self._proj_dd)
tgt_grp.add(exist_row)
self._new_radio.connect("toggled", self._on_target_toggled)
else:
self._exist_radio = None
self._proj_combo = None
self._proj_dd = None
outer.append(tgt_grp)
return outer
@ -228,15 +227,16 @@ class WsdlImportDialog(Adw.Dialog):
def _on_target_toggled(self, _radio):
is_new = self._new_radio.get_active()
self._new_name_row.set_sensitive(is_new)
if self._proj_combo:
self._proj_combo.set_sensitive(not is_new)
self._new_name_entry.set_sensitive(is_new)
if self._proj_dd:
self._proj_dd.set_sensitive(not is_new)
def _on_select_all(self, _btn):
all_on = all(cb.get_active() for _, cb in self._op_checkboxes)
new_state = not all_on
for _, cb in self._op_checkboxes:
cb.set_active(not all_on)
self._sel_all_btn.set_label("Deselect All" if not all_on else "Select All")
cb.set_active(new_state)
self._sel_all_btn.set_label("Deselect All" if new_state else "Select All")
def _on_fetch(self, _btn):
url = self._url_row.get_text().strip()
@ -298,14 +298,14 @@ class WsdlImportDialog(Adw.Dialog):
return
if self._new_radio.get_active():
name = self._new_name_row.get_text().strip()
name = self._new_name_entry.get_text().strip()
if not name:
return
project = self._pm.add_project(name)
project_id = project.id
elif self._exist_radio and self._exist_radio.get_active() and self._proj_combo:
idx = self._proj_combo.get_selected()
if idx == Gtk.INVALID_LIST_POSITION or idx >= len(self._existing_projects):
elif self._exist_radio and self._exist_radio.get_active() and self._proj_dd:
idx = self._proj_dd.get_selected()
if idx >= len(self._existing_projects):
return
project_id = self._existing_projects[idx].id
else:
@ -331,14 +331,14 @@ class WsdlImportDialog(Adw.Dialog):
self._set_err(msg)
def _populate_ops_page(self, result: WsdlParseResult):
self._svc_name_row.set_subtitle(result.service_name)
self._svc_url_row.set_subtitle(result.endpoint_url or "Not specified")
while child := self._ops_list.get_first_child():
self._ops_list.remove(child)
self._op_checkboxes.clear()
for op in result.operations:
# Sort alphabetically
for op in sorted(result.operations, key=lambda o: o.name.lower()):
row = Adw.ActionRow()
row.set_title(op.name)
if op.soap_action:
@ -350,5 +350,6 @@ class WsdlImportDialog(Adw.Dialog):
self._ops_list.append(row)
self._op_checkboxes.append((op, cb))
self._new_name_row.set_text(result.service_name)
self._sel_all_btn.set_label("Select All")
# Pre-fill project name; all ops checked → "Deselect All"
self._new_name_entry.set_text(result.service_name)
self._sel_all_btn.set_label("Deselect All")

View File

@ -21,12 +21,36 @@ import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
# WSDL 1.1
_WSDL = 'http://schemas.xmlsoap.org/wsdl/'
_SOAP11 = 'http://schemas.xmlsoap.org/wsdl/soap/'
_SOAP12 = 'http://schemas.xmlsoap.org/wsdl/soap12/'
_ENV11 = 'http://schemas.xmlsoap.org/soap/envelope/'
_ENV12 = 'http://www.w3.org/2003/05/soap-envelope'
# WSDL 2.0
_WSDL2 = 'http://www.w3.org/ns/wsdl'
_WSDL2_SOAP = 'http://www.w3.org/ns/wsdl/soap'
# XML Schema
_XS = 'http://www.w3.org/2001/XMLSchema'
# XSD simple-type → human hint
_XS_HINTS: Dict[str, str] = {
'string': 'string', 'normalizedString': 'string', 'token': 'string',
'int': 'int', 'integer': 'int', 'nonNegativeInteger': 'int',
'positiveInteger': 'int', 'negativeInteger': 'int',
'short': 'int', 'byte': 'int', 'unsignedByte': 'int',
'unsignedInt': 'int', 'unsignedShort': 'int',
'long': 'long', 'unsignedLong': 'long',
'boolean': 'boolean',
'float': 'float', 'double': 'float', 'decimal': 'decimal',
'dateTime': 'datetime', 'date': 'date', 'time': 'time',
'base64Binary': 'base64', 'hexBinary': 'hex',
'anyType': 'any', 'anySimpleType': 'any',
'duration': 'duration', 'guid': 'guid',
}
def _q(ns: str, tag: str) -> str:
return f'{{{ns}}}{tag}'
@ -36,6 +60,15 @@ def _local(tag: str) -> str:
return tag.split('}')[-1] if '}' in tag else tag
def _hint(xs_local: str, optional: bool) -> str:
base = _XS_HINTS.get(xs_local, xs_local)
return f'[{base}{"?" if optional else ""}]'
# ---------------------------------------------------------------------------
# Public data classes
# ---------------------------------------------------------------------------
@dataclass
class WsdlOperation:
name: str
@ -54,127 +87,26 @@ class WsdlParseResult:
error: Optional[str] = None
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def parse_wsdl(xml_content: str) -> WsdlParseResult:
"""Parse WSDL 1.1 XML and return service info + list of operations."""
"""Auto-detect WSDL 1.1 / 2.0 and return service info + operations."""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
return WsdlParseResult(service_name='', endpoint_url='', error=f'Invalid XML: {e}')
if _local(root.tag) != 'definitions':
local = _local(root.tag)
if local == 'definitions':
return _parse_wsdl11(root)
elif local == 'description':
return _parse_wsdl20(root)
else:
return WsdlParseResult(
service_name='', endpoint_url='',
error='Document is not a WSDL 1.1 definitions element'
)
target_ns = root.get('targetNamespace', '')
service_name = root.get('name', 'WSDL Service')
# --- Service element: name + endpoint URL ---
service_el = root.find(_q(_WSDL, 'service'))
if service_el is None:
service_el = root.find('service')
if service_el is not None and service_el.get('name'):
service_name = service_el.get('name')
endpoint_url = ''
default_soap_ver = '1.1'
if service_el is not None:
for port in service_el:
addr11 = port.find(_q(_SOAP11, 'address'))
if addr11 is not None:
endpoint_url = addr11.get('location', '')
default_soap_ver = '1.1'
break
addr12 = port.find(_q(_SOAP12, 'address'))
if addr12 is not None:
endpoint_url = addr12.get('location', '')
default_soap_ver = '1.2'
break
# --- Bindings: collect (soap_action, soap_version) per operation name ---
# op_name -> (soap_action, soap_version)
op_info: Dict[str, Tuple[str, str]] = {}
for binding in root.iter():
if _local(binding.tag) != 'binding':
continue
is11 = binding.find(_q(_SOAP11, 'binding')) is not None
is12 = binding.find(_q(_SOAP12, 'binding')) is not None
if not is11 and not is12:
continue
bv = '1.2' if is12 else '1.1'
for op in binding:
if _local(op.tag) != 'operation':
continue
raw_name = op.get('name', '')
op_name = _local(raw_name)
if not op_name:
continue
soap_op = op.find(_q(_SOAP11, 'operation'))
if soap_op is None:
soap_op = op.find(_q(_SOAP12, 'operation'))
action = soap_op.get('soapAction', '') if soap_op is not None else ''
if op_name not in op_info:
op_info[op_name] = (action, bv)
# Fallback: portType operations when no SOAP binding was found
if not op_info:
for pt in root.iter():
if _local(pt.tag) != 'portType':
continue
for op in pt:
if _local(op.tag) == 'operation':
op_name = op.get('name', '')
if op_name and op_name not in op_info:
op_info[op_name] = ('', default_soap_ver)
if not op_info:
return WsdlParseResult(
service_name=service_name,
endpoint_url=endpoint_url,
error='No SOAP operations found in this WSDL document'
)
operations = [
WsdlOperation(
name=name,
soap_action=action,
endpoint_url=endpoint_url,
soap_version=ver,
target_namespace=target_ns,
body_template=_build_envelope(name, target_ns, ver),
)
for name, (action, ver) in op_info.items()
]
return WsdlParseResult(
service_name=service_name or 'WSDL Service',
endpoint_url=endpoint_url,
operations=operations,
)
def _build_envelope(op_name: str, target_ns: str, soap_version: str) -> str:
env_ns = _ENV12 if soap_version == '1.2' else _ENV11
ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else ''
op_el = f'tns:{op_name}' if target_ns else op_name
return (
f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}"{ns_decl}>\n'
f' <soap:Header/>\n'
f' <soap:Body>\n'
f' <{op_el}>\n'
f' <!-- Add parameters here -->\n'
f' </{op_el}>\n'
f' </soap:Body>\n'
f'</soap:Envelope>'
error='Document is not a valid WSDL 1.1 (definitions) or WSDL 2.0 (description) file'
)
@ -200,3 +132,443 @@ def build_http_request(operation: WsdlOperation):
body=operation.body_template,
syntax='XML',
)
# ---------------------------------------------------------------------------
# WSDL 1.1 parser
# ---------------------------------------------------------------------------
def _parse_wsdl11(root: ET.Element) -> WsdlParseResult:
target_ns = root.get('targetNamespace', '')
service_name = root.get('name', '') or 'WSDL Service'
# Service name + endpoint URL
service_el = root.find(_q(_WSDL, 'service'))
if service_el is None:
service_el = root.find('service')
if service_el is not None and service_el.get('name'):
service_name = service_el.get('name')
endpoint_url = ''
default_soap_ver = '1.1'
if service_el is not None:
for port in service_el:
addr11 = port.find(_q(_SOAP11, 'address'))
if addr11 is not None:
endpoint_url = addr11.get('location', '')
default_soap_ver = '1.1'
break
addr12 = port.find(_q(_SOAP12, 'address'))
if addr12 is not None:
endpoint_url = addr12.get('location', '')
default_soap_ver = '1.2'
break
# Collect (soap_action, soap_version) per operation from bindings
op_info: Dict[str, Tuple[str, str]] = {}
for binding in root.iter():
if _local(binding.tag) != 'binding':
continue
is11 = binding.find(_q(_SOAP11, 'binding')) is not None
is12 = binding.find(_q(_SOAP12, 'binding')) is not None
if not is11 and not is12:
continue
bv = '1.2' if is12 else '1.1'
for op in binding:
if _local(op.tag) != 'operation':
continue
op_name = _local(op.get('name', ''))
if not op_name:
continue
soap_op = op.find(_q(_SOAP11, 'operation'))
if soap_op is None:
soap_op = op.find(_q(_SOAP12, 'operation'))
action = soap_op.get('soapAction', '') if soap_op is not None else ''
if op_name not in op_info:
op_info[op_name] = (action, bv)
# Fallback: portType when no SOAP binding found
if not op_info:
for pt in root.iter():
if _local(pt.tag) != 'portType':
continue
for op in pt:
if _local(op.tag) == 'operation':
op_name = op.get('name', '')
if op_name and op_name not in op_info:
op_info[op_name] = ('', default_soap_ver)
if not op_info:
return WsdlParseResult(
service_name=service_name, endpoint_url=endpoint_url,
error='No SOAP operations found in this WSDL document'
)
# Build schema maps for parameter extraction
elem_map, type_map = _build_schema_maps(root, _WSDL)
operations = []
for op_name, (action, ver) in op_info.items():
params = _extract_params_wsdl11(root, op_name, elem_map, type_map)
body = _build_envelope(op_name, target_ns, ver, params)
operations.append(WsdlOperation(
name=op_name, soap_action=action, endpoint_url=endpoint_url,
soap_version=ver, target_namespace=target_ns, body_template=body,
))
return WsdlParseResult(
service_name=service_name or 'WSDL Service',
endpoint_url=endpoint_url,
operations=operations,
)
def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]:
"""Return [(param_name, hint), …] for the input of a WSDL 1.1 operation."""
# Walk portType → message → part element
input_elem_name = _find_input_elem_wsdl11(root, op_name)
# Naming-convention fallback
if not input_elem_name:
for candidate in [op_name, op_name + 'Request', op_name + 'Input']:
if candidate in elem_map:
input_elem_name = candidate
break
if not input_elem_name or input_elem_name not in elem_map:
return []
return _parse_element(elem_map[input_elem_name], elem_map, type_map)
def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]:
"""Walk portType → input message → part/@element and return local element name."""
for pt in root.iter():
if _local(pt.tag) != 'portType':
continue
for op in pt:
if _local(op.tag) != 'operation' or op.get('name') != op_name:
continue
inp = op.find(_q(_WSDL, 'input'))
if inp is None:
inp = op.find('input')
if inp is None:
return None
msg_local = (inp.get('message') or '').split(':')[-1]
for msg in root.iter():
if _local(msg.tag) != 'message' or msg.get('name') != msg_local:
continue
for part in msg:
if _local(part.tag) != 'part':
continue
elem_ref = part.get('element', '')
if elem_ref:
return elem_ref.split(':')[-1]
return None
return None
# ---------------------------------------------------------------------------
# WSDL 2.0 parser
# ---------------------------------------------------------------------------
def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
target_ns = root.get('targetNamespace', '')
service_name = root.get('name', '') or 'WSDL Service'
# Service element
service_el = root.find(_q(_WSDL2, 'service'))
if service_el is None:
service_el = root.find('service')
if service_el is not None and service_el.get('name'):
service_name = service_el.get('name')
# Endpoint address
endpoint_url = ''
if service_el is not None:
for ep in service_el:
if _local(ep.tag) == 'endpoint':
addr = ep.get('address', '')
if addr:
endpoint_url = addr
break
# Determine which binding the service uses
service_binding_local = None
if service_el is not None:
for ep in service_el:
if _local(ep.tag) == 'endpoint':
b_ref = ep.get('binding', '')
service_binding_local = b_ref.split(':')[-1]
break
# Collect (soap_action) per operation from SOAP bindings
# binding_local_name → {op_local → soap_action}
binding_ops: Dict[str, Dict[str, str]] = {}
for binding in root.iter():
if _local(binding.tag) != 'binding':
continue
b_name = binding.get('name', '')
b_type = binding.get('type', '')
# Check for SOAP binding type or SOAP child elements
is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower())
if not is_soap:
is_soap = any(
_WSDL2_SOAP in (child.tag or '') or 'soap' in _local(child.tag).lower()
for child in binding
)
if not is_soap:
continue
ops_actions: Dict[str, str] = {}
for child in binding:
if _local(child.tag) != 'operation':
continue
ref = (child.get('ref') or '').split(':')[-1]
# SOAPAction may be a namespaced attribute
action = (
child.get(_q(_WSDL2_SOAP, 'action'))
or child.get('action')
or ''
)
if ref:
ops_actions[ref] = action
binding_ops[b_name] = ops_actions
# Choose the right binding's operations
op_info: Dict[str, str] = {} # op_name → soap_action
if service_binding_local and service_binding_local in binding_ops:
op_info = binding_ops[service_binding_local]
else:
# Merge all SOAP binding operations
for ops in binding_ops.values():
for op_name, action in ops.items():
if op_name not in op_info:
op_info[op_name] = action
# Fallback: collect from interface operations
if not op_info:
for iface in root.iter():
if _local(iface.tag) != 'interface':
continue
for op in iface:
if _local(op.tag) == 'operation':
op_name = op.get('name', '')
if op_name and op_name not in op_info:
op_info[op_name] = ''
if not op_info:
return WsdlParseResult(
service_name=service_name, endpoint_url=endpoint_url,
error='No SOAP operations found in WSDL 2.0 document'
)
# Build schema maps
elem_map, type_map = _build_schema_maps(root, _WSDL2)
operations = []
for op_name, action in op_info.items():
params = _extract_params_wsdl20(root, op_name, elem_map, type_map)
body = _build_envelope(op_name, target_ns, '1.2', params)
operations.append(WsdlOperation(
name=op_name, soap_action=action, endpoint_url=endpoint_url,
soap_version='1.2', target_namespace=target_ns, body_template=body,
))
return WsdlParseResult(
service_name=service_name or 'WSDL Service',
endpoint_url=endpoint_url,
operations=operations,
)
def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]:
"""Return [(param_name, hint), …] for the input of a WSDL 2.0 operation."""
for iface in root.iter():
if _local(iface.tag) != 'interface':
continue
for op in iface:
if _local(op.tag) != 'operation' or op.get('name') != op_name:
continue
for child in op:
if _local(child.tag) == 'input':
elem_ref = (child.get('element') or '').split(':')[-1]
if elem_ref in elem_map:
return _parse_element(elem_map[elem_ref], elem_map, type_map)
# Naming-convention fallback
for candidate in [op_name, op_name + 'Request', op_name + 'Input']:
if candidate in elem_map:
return _parse_element(elem_map[candidate], elem_map, type_map)
return []
# ---------------------------------------------------------------------------
# XSD schema helpers
# ---------------------------------------------------------------------------
def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]:
"""Build {local_name: element} and {local_name: complexType} maps from <types>."""
elem_map: Dict[str, ET.Element] = {}
type_map: Dict[str, ET.Element] = {}
types_el = root.find(_q(wsdl_ns, 'types'))
if types_el is None:
types_el = root.find('types')
if types_el is None:
return elem_map, type_map
for node in types_el.iter():
if _local(node.tag) != 'schema':
continue
for child in node:
name = child.get('name', '')
if not name:
continue
loc = _local(child.tag)
if loc == 'element':
elem_map[name] = child
elif loc == 'complexType':
type_map[name] = child
return elem_map, type_map
def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from an xs:element (inline complexType or type=ref)."""
if depth > 4:
return []
# Inline complexType
ct = elem.find(_q(_XS, 'complexType'))
if ct is not None:
return _parse_complex_type(ct, elem_map, type_map, depth)
# Named type reference
type_ref = elem.get('type', '')
type_local = type_ref.split(':')[-1] if type_ref else ''
if type_local:
if type_local in _XS_HINTS:
return [] # simple scalar — not a parameter container
ct = type_map.get(type_local)
if ct is not None:
return _parse_complex_type(ct, elem_map, type_map, depth)
return []
def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from an xs:complexType."""
params: List[Tuple[str, str]] = []
# xs:complexContent / xs:extension (inheritance)
cc = ct.find(_q(_XS, 'complexContent'))
if cc is not None:
ext = cc.find(_q(_XS, 'extension'))
if ext is not None:
base_local = (ext.get('base') or '').split(':')[-1]
base_ct = type_map.get(base_local)
if base_ct is not None:
params.extend(_parse_complex_type(base_ct, elem_map, type_map, depth + 1))
for tag in ('sequence', 'all', 'choice'):
seq = ext.find(_q(_XS, tag))
if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth))
break
return params
# Direct sequence / all / choice
for tag in ('sequence', 'all', 'choice'):
seq = ct.find(_q(_XS, tag))
if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth))
break
return params
def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]:
"""Extract [(name, hint)] from xs:sequence / xs:all / xs:choice."""
params: List[Tuple[str, str]] = []
choice_optional = _local(seq.tag) == 'choice'
for child in seq:
loc = _local(child.tag)
if loc == 'element':
name = child.get('name', '')
if not name:
ref_local = (child.get('ref') or '').split(':')[-1]
if ref_local:
name = ref_local
if not name:
continue
optional = choice_optional or child.get('minOccurs', '1') == '0'
type_ref = child.get('type', '')
type_local = type_ref.split(':')[-1] if type_ref else ''
if type_local and type_local in _XS_HINTS:
params.append((name, _hint(type_local, optional)))
else:
# Inline or referenced complex type — mark as [any] at this depth
inline_ct = child.find(_q(_XS, 'complexType'))
if inline_ct is not None and depth < 2:
sub = _parse_complex_type(inline_ct, elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional)))
elif type_local and type_local in type_map and depth < 2:
sub = _parse_complex_type(type_map[type_local], elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional)))
else:
params.append((name, _hint('anyType', optional)))
elif loc in ('sequence', 'all', 'choice') and depth < 3:
params.extend(_parse_sequence(child, elem_map, type_map, depth + 1))
return params
# ---------------------------------------------------------------------------
# SOAP envelope builder
# ---------------------------------------------------------------------------
def _build_envelope(op_name: str, target_ns: str, soap_version: str,
params: Optional[List[Tuple[str, str]]] = None) -> str:
env_ns = _ENV12 if soap_version == '1.2' else _ENV11
if params:
# Default-namespace style → parameters inherit namespace, no prefix needed
ns_attr = f' xmlns="{target_ns}"' if target_ns else ''
lines = [f' <{op_name}{ns_attr}>']
for pname, phint in params:
lines.append(f' <{pname}>{phint}</{pname}>')
lines.append(f' </{op_name}>')
body = '\n'.join(lines)
return (
f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}">\n'
f' <soap:Header/>\n'
f' <soap:Body>\n'
f'{body}\n'
f' </soap:Body>\n'
f'</soap:Envelope>'
)
else:
ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else ''
op_el = f'tns:{op_name}' if target_ns else op_name
return (
f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}"{ns_decl}>\n'
f' <soap:Header/>\n'
f' <soap:Body>\n'
f' <{op_el}>\n'
f' <!-- Add parameters here -->\n'
f' </{op_el}>\n'
f' </soap:Body>\n'
f'</soap:Envelope>'
)