diff --git a/src/wsdl_import_dialog.py b/src/wsdl_import_dialog.py index 5abf0da..1f889df 100644 --- a/src/wsdl_import_dialog.py +++ b/src/wsdl_import_dialog.py @@ -31,7 +31,6 @@ class WsdlImportDialog(Adw.Dialog): __gtype_name__ = 'WsdlImportDialog' __gsignals__ = { - # Emitted after a successful import; carries the target project_id 'import-completed': (GObject.SIGNAL_RUN_FIRST, None, (str,)), } @@ -100,7 +99,7 @@ class WsdlImportDialog(Adw.Dialog): title.add_css_class("title-2") box.append(title) - subtitle = Gtk.Label(label="Enter the URL of a WSDL document to discover SOAP operations") + subtitle = Gtk.Label(label="Enter the URL of a WSDL 1.1 or 2.0 document to discover SOAP operations") subtitle.add_css_class("dim-label") subtitle.set_wrap(True) subtitle.set_justify(Gtk.Justification.CENTER) @@ -129,83 +128,83 @@ class WsdlImportDialog(Adw.Dialog): return box def _make_ops_page(self) -> Gtk.Widget: - outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) - outer.set_margin_top(16) - outer.set_margin_bottom(16) + outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10) + outer.set_margin_top(14) + outer.set_margin_bottom(14) outer.set_margin_start(16) outer.set_margin_end(16) - # Service info summary - self._svc_group = Adw.PreferencesGroup() - self._svc_group.set_title("Service") - self._svc_name_row = Adw.ActionRow() - self._svc_name_row.set_title("Name") - self._svc_group.add(self._svc_name_row) + # --- Endpoint row (Name removed per UX feedback) --- + svc_grp = Adw.PreferencesGroup() + svc_grp.set_title("Service") self._svc_url_row = Adw.ActionRow() self._svc_url_row.set_title("Endpoint") - self._svc_group.add(self._svc_url_row) - outer.append(self._svc_group) + svc_grp.add(self._svc_url_row) + outer.append(svc_grp) - # Operations header row + # --- Operations header --- ops_hdr = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) - ops_hdr.set_margin_top(4) + ops_hdr.set_margin_top(2) ops_lbl = Gtk.Label(label="Operations") ops_lbl.add_css_class("title-4") ops_lbl.set_hexpand(True) ops_lbl.set_xalign(0) ops_hdr.append(ops_lbl) - self._sel_all_btn = Gtk.Button(label="Select All") + self._sel_all_btn = Gtk.Button(label="Deselect All") self._sel_all_btn.add_css_class("flat") self._sel_all_btn.connect("clicked", self._on_select_all) ops_hdr.append(self._sel_all_btn) outer.append(ops_hdr) - # Scrollable operations list + # --- Scrollable operations list --- scroll = Gtk.ScrolledWindow() scroll.set_vexpand(True) - scroll.set_min_content_height(100) + scroll.set_min_content_height(80) self._ops_list = Gtk.ListBox() self._ops_list.add_css_class("boxed-list") self._ops_list.set_selection_mode(Gtk.SelectionMode.NONE) scroll.set_child(self._ops_list) outer.append(scroll) - # Import target + # --- Import target (compact: 1 row per option with suffix widget) --- tgt_grp = Adw.PreferencesGroup() tgt_grp.set_title("Import to") - tgt_grp.set_margin_top(4) + tgt_grp.set_margin_top(2) + # Row: [radio] Create new project [entry: name] new_row = Adw.ActionRow() new_row.set_title("Create new project") self._new_radio = Gtk.CheckButton() self._new_radio.set_active(True) new_row.add_prefix(self._new_radio) new_row.set_activatable_widget(self._new_radio) + self._new_name_entry = Gtk.Entry() + self._new_name_entry.set_placeholder_text("Project name") + self._new_name_entry.set_hexpand(True) + self._new_name_entry.set_valign(Gtk.Align.CENTER) + new_row.add_suffix(self._new_name_entry) tgt_grp.add(new_row) - self._new_name_row = Adw.EntryRow() - self._new_name_row.set_title("Project name") - tgt_grp.add(self._new_name_row) - if self._existing_projects: + # Row: [radio] Add to existing project [dropdown] exist_row = Adw.ActionRow() exist_row.set_title("Add to existing project") self._exist_radio = Gtk.CheckButton() self._exist_radio.set_group(self._new_radio) exist_row.add_prefix(self._exist_radio) exist_row.set_activatable_widget(self._exist_radio) - tgt_grp.add(exist_row) - self._proj_combo = Adw.ComboRow() - self._proj_combo.set_title("Project") - self._proj_combo.set_model(Gtk.StringList.new([p.name for p in self._existing_projects])) - self._proj_combo.set_sensitive(False) - tgt_grp.add(self._proj_combo) + proj_list = Gtk.StringList.new([p.name for p in self._existing_projects]) + self._proj_dd = Gtk.DropDown(model=proj_list) + self._proj_dd.set_valign(Gtk.Align.CENTER) + self._proj_dd.set_sensitive(False) + exist_row.add_suffix(self._proj_dd) + tgt_grp.add(exist_row) self._new_radio.connect("toggled", self._on_target_toggled) else: self._exist_radio = None - self._proj_combo = None + self._proj_dd = None outer.append(tgt_grp) return outer @@ -228,15 +227,16 @@ class WsdlImportDialog(Adw.Dialog): def _on_target_toggled(self, _radio): is_new = self._new_radio.get_active() - self._new_name_row.set_sensitive(is_new) - if self._proj_combo: - self._proj_combo.set_sensitive(not is_new) + self._new_name_entry.set_sensitive(is_new) + if self._proj_dd: + self._proj_dd.set_sensitive(not is_new) def _on_select_all(self, _btn): all_on = all(cb.get_active() for _, cb in self._op_checkboxes) + new_state = not all_on for _, cb in self._op_checkboxes: - cb.set_active(not all_on) - self._sel_all_btn.set_label("Deselect All" if not all_on else "Select All") + cb.set_active(new_state) + self._sel_all_btn.set_label("Deselect All" if new_state else "Select All") def _on_fetch(self, _btn): url = self._url_row.get_text().strip() @@ -298,14 +298,14 @@ class WsdlImportDialog(Adw.Dialog): return if self._new_radio.get_active(): - name = self._new_name_row.get_text().strip() + name = self._new_name_entry.get_text().strip() if not name: return - project = self._pm.add_project(name) + project = self._pm.add_project(name) project_id = project.id - elif self._exist_radio and self._exist_radio.get_active() and self._proj_combo: - idx = self._proj_combo.get_selected() - if idx == Gtk.INVALID_LIST_POSITION or idx >= len(self._existing_projects): + elif self._exist_radio and self._exist_radio.get_active() and self._proj_dd: + idx = self._proj_dd.get_selected() + if idx >= len(self._existing_projects): return project_id = self._existing_projects[idx].id else: @@ -331,14 +331,14 @@ class WsdlImportDialog(Adw.Dialog): self._set_err(msg) def _populate_ops_page(self, result: WsdlParseResult): - self._svc_name_row.set_subtitle(result.service_name) self._svc_url_row.set_subtitle(result.endpoint_url or "Not specified") while child := self._ops_list.get_first_child(): self._ops_list.remove(child) self._op_checkboxes.clear() - for op in result.operations: + # Sort alphabetically + for op in sorted(result.operations, key=lambda o: o.name.lower()): row = Adw.ActionRow() row.set_title(op.name) if op.soap_action: @@ -350,5 +350,6 @@ class WsdlImportDialog(Adw.Dialog): self._ops_list.append(row) self._op_checkboxes.append((op, cb)) - self._new_name_row.set_text(result.service_name) - self._sel_all_btn.set_label("Select All") + # Pre-fill project name; all ops checked → "Deselect All" + self._new_name_entry.set_text(result.service_name) + self._sel_all_btn.set_label("Deselect All") diff --git a/src/wsdl_importer.py b/src/wsdl_importer.py index 77cb1e3..7336a91 100644 --- a/src/wsdl_importer.py +++ b/src/wsdl_importer.py @@ -21,11 +21,35 @@ import xml.etree.ElementTree as ET from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple -_WSDL = 'http://schemas.xmlsoap.org/wsdl/' -_SOAP11 = 'http://schemas.xmlsoap.org/wsdl/soap/' -_SOAP12 = 'http://schemas.xmlsoap.org/wsdl/soap12/' -_ENV11 = 'http://schemas.xmlsoap.org/soap/envelope/' -_ENV12 = 'http://www.w3.org/2003/05/soap-envelope' +# WSDL 1.1 +_WSDL = 'http://schemas.xmlsoap.org/wsdl/' +_SOAP11 = 'http://schemas.xmlsoap.org/wsdl/soap/' +_SOAP12 = 'http://schemas.xmlsoap.org/wsdl/soap12/' +_ENV11 = 'http://schemas.xmlsoap.org/soap/envelope/' +_ENV12 = 'http://www.w3.org/2003/05/soap-envelope' + +# WSDL 2.0 +_WSDL2 = 'http://www.w3.org/ns/wsdl' +_WSDL2_SOAP = 'http://www.w3.org/ns/wsdl/soap' + +# XML Schema +_XS = 'http://www.w3.org/2001/XMLSchema' + +# XSD simple-type → human hint +_XS_HINTS: Dict[str, str] = { + 'string': 'string', 'normalizedString': 'string', 'token': 'string', + 'int': 'int', 'integer': 'int', 'nonNegativeInteger': 'int', + 'positiveInteger': 'int', 'negativeInteger': 'int', + 'short': 'int', 'byte': 'int', 'unsignedByte': 'int', + 'unsignedInt': 'int', 'unsignedShort': 'int', + 'long': 'long', 'unsignedLong': 'long', + 'boolean': 'boolean', + 'float': 'float', 'double': 'float', 'decimal': 'decimal', + 'dateTime': 'datetime', 'date': 'date', 'time': 'time', + 'base64Binary': 'base64', 'hexBinary': 'hex', + 'anyType': 'any', 'anySimpleType': 'any', + 'duration': 'duration', 'guid': 'guid', +} def _q(ns: str, tag: str) -> str: @@ -36,6 +60,15 @@ def _local(tag: str) -> str: return tag.split('}')[-1] if '}' in tag else tag +def _hint(xs_local: str, optional: bool) -> str: + base = _XS_HINTS.get(xs_local, xs_local) + return f'[{base}{"?" if optional else ""}]' + + +# --------------------------------------------------------------------------- +# Public data classes +# --------------------------------------------------------------------------- + @dataclass class WsdlOperation: name: str @@ -54,129 +87,28 @@ class WsdlParseResult: error: Optional[str] = None +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + def parse_wsdl(xml_content: str) -> WsdlParseResult: - """Parse WSDL 1.1 XML and return service info + list of operations.""" + """Auto-detect WSDL 1.1 / 2.0 and return service info + operations.""" try: root = ET.fromstring(xml_content) except ET.ParseError as e: return WsdlParseResult(service_name='', endpoint_url='', error=f'Invalid XML: {e}') - if _local(root.tag) != 'definitions': + local = _local(root.tag) + if local == 'definitions': + return _parse_wsdl11(root) + elif local == 'description': + return _parse_wsdl20(root) + else: return WsdlParseResult( service_name='', endpoint_url='', - error='Document is not a WSDL 1.1 definitions element' + error='Document is not a valid WSDL 1.1 (definitions) or WSDL 2.0 (description) file' ) - target_ns = root.get('targetNamespace', '') - service_name = root.get('name', 'WSDL Service') - - # --- Service element: name + endpoint URL --- - service_el = root.find(_q(_WSDL, 'service')) - if service_el is None: - service_el = root.find('service') - - if service_el is not None and service_el.get('name'): - service_name = service_el.get('name') - - endpoint_url = '' - default_soap_ver = '1.1' - - if service_el is not None: - for port in service_el: - addr11 = port.find(_q(_SOAP11, 'address')) - if addr11 is not None: - endpoint_url = addr11.get('location', '') - default_soap_ver = '1.1' - break - addr12 = port.find(_q(_SOAP12, 'address')) - if addr12 is not None: - endpoint_url = addr12.get('location', '') - default_soap_ver = '1.2' - break - - # --- Bindings: collect (soap_action, soap_version) per operation name --- - # op_name -> (soap_action, soap_version) - op_info: Dict[str, Tuple[str, str]] = {} - - for binding in root.iter(): - if _local(binding.tag) != 'binding': - continue - - is11 = binding.find(_q(_SOAP11, 'binding')) is not None - is12 = binding.find(_q(_SOAP12, 'binding')) is not None - if not is11 and not is12: - continue - bv = '1.2' if is12 else '1.1' - - for op in binding: - if _local(op.tag) != 'operation': - continue - raw_name = op.get('name', '') - op_name = _local(raw_name) - if not op_name: - continue - - soap_op = op.find(_q(_SOAP11, 'operation')) - if soap_op is None: - soap_op = op.find(_q(_SOAP12, 'operation')) - action = soap_op.get('soapAction', '') if soap_op is not None else '' - - if op_name not in op_info: - op_info[op_name] = (action, bv) - - # Fallback: portType operations when no SOAP binding was found - if not op_info: - for pt in root.iter(): - if _local(pt.tag) != 'portType': - continue - for op in pt: - if _local(op.tag) == 'operation': - op_name = op.get('name', '') - if op_name and op_name not in op_info: - op_info[op_name] = ('', default_soap_ver) - - if not op_info: - return WsdlParseResult( - service_name=service_name, - endpoint_url=endpoint_url, - error='No SOAP operations found in this WSDL document' - ) - - operations = [ - WsdlOperation( - name=name, - soap_action=action, - endpoint_url=endpoint_url, - soap_version=ver, - target_namespace=target_ns, - body_template=_build_envelope(name, target_ns, ver), - ) - for name, (action, ver) in op_info.items() - ] - - return WsdlParseResult( - service_name=service_name or 'WSDL Service', - endpoint_url=endpoint_url, - operations=operations, - ) - - -def _build_envelope(op_name: str, target_ns: str, soap_version: str) -> str: - env_ns = _ENV12 if soap_version == '1.2' else _ENV11 - ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else '' - op_el = f'tns:{op_name}' if target_ns else op_name - return ( - f'\n' - f'\n' - f' \n' - f' \n' - f' <{op_el}>\n' - f' \n' - f' \n' - f' \n' - f'' - ) - def build_http_request(operation: WsdlOperation): """Convert a WsdlOperation into an HttpRequest ready to send.""" @@ -200,3 +132,443 @@ def build_http_request(operation: WsdlOperation): body=operation.body_template, syntax='XML', ) + + +# --------------------------------------------------------------------------- +# WSDL 1.1 parser +# --------------------------------------------------------------------------- + +def _parse_wsdl11(root: ET.Element) -> WsdlParseResult: + target_ns = root.get('targetNamespace', '') + service_name = root.get('name', '') or 'WSDL Service' + + # Service name + endpoint URL + service_el = root.find(_q(_WSDL, 'service')) + if service_el is None: + service_el = root.find('service') + if service_el is not None and service_el.get('name'): + service_name = service_el.get('name') + + endpoint_url = '' + default_soap_ver = '1.1' + if service_el is not None: + for port in service_el: + addr11 = port.find(_q(_SOAP11, 'address')) + if addr11 is not None: + endpoint_url = addr11.get('location', '') + default_soap_ver = '1.1' + break + addr12 = port.find(_q(_SOAP12, 'address')) + if addr12 is not None: + endpoint_url = addr12.get('location', '') + default_soap_ver = '1.2' + break + + # Collect (soap_action, soap_version) per operation from bindings + op_info: Dict[str, Tuple[str, str]] = {} + for binding in root.iter(): + if _local(binding.tag) != 'binding': + continue + is11 = binding.find(_q(_SOAP11, 'binding')) is not None + is12 = binding.find(_q(_SOAP12, 'binding')) is not None + if not is11 and not is12: + continue + bv = '1.2' if is12 else '1.1' + for op in binding: + if _local(op.tag) != 'operation': + continue + op_name = _local(op.get('name', '')) + if not op_name: + continue + soap_op = op.find(_q(_SOAP11, 'operation')) + if soap_op is None: + soap_op = op.find(_q(_SOAP12, 'operation')) + action = soap_op.get('soapAction', '') if soap_op is not None else '' + if op_name not in op_info: + op_info[op_name] = (action, bv) + + # Fallback: portType when no SOAP binding found + if not op_info: + for pt in root.iter(): + if _local(pt.tag) != 'portType': + continue + for op in pt: + if _local(op.tag) == 'operation': + op_name = op.get('name', '') + if op_name and op_name not in op_info: + op_info[op_name] = ('', default_soap_ver) + + if not op_info: + return WsdlParseResult( + service_name=service_name, endpoint_url=endpoint_url, + error='No SOAP operations found in this WSDL document' + ) + + # Build schema maps for parameter extraction + elem_map, type_map = _build_schema_maps(root, _WSDL) + + operations = [] + for op_name, (action, ver) in op_info.items(): + params = _extract_params_wsdl11(root, op_name, elem_map, type_map) + body = _build_envelope(op_name, target_ns, ver, params) + operations.append(WsdlOperation( + name=op_name, soap_action=action, endpoint_url=endpoint_url, + soap_version=ver, target_namespace=target_ns, body_template=body, + )) + + return WsdlParseResult( + service_name=service_name or 'WSDL Service', + endpoint_url=endpoint_url, + operations=operations, + ) + + +def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: + """Return [(param_name, hint), …] for the input of a WSDL 1.1 operation.""" + # Walk portType → message → part element + input_elem_name = _find_input_elem_wsdl11(root, op_name) + + # Naming-convention fallback + if not input_elem_name: + for candidate in [op_name, op_name + 'Request', op_name + 'Input']: + if candidate in elem_map: + input_elem_name = candidate + break + + if not input_elem_name or input_elem_name not in elem_map: + return [] + + return _parse_element(elem_map[input_elem_name], elem_map, type_map) + + +def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]: + """Walk portType → input message → part/@element and return local element name.""" + for pt in root.iter(): + if _local(pt.tag) != 'portType': + continue + for op in pt: + if _local(op.tag) != 'operation' or op.get('name') != op_name: + continue + inp = op.find(_q(_WSDL, 'input')) + if inp is None: + inp = op.find('input') + if inp is None: + return None + msg_local = (inp.get('message') or '').split(':')[-1] + for msg in root.iter(): + if _local(msg.tag) != 'message' or msg.get('name') != msg_local: + continue + for part in msg: + if _local(part.tag) != 'part': + continue + elem_ref = part.get('element', '') + if elem_ref: + return elem_ref.split(':')[-1] + return None + return None + + +# --------------------------------------------------------------------------- +# WSDL 2.0 parser +# --------------------------------------------------------------------------- + +def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: + target_ns = root.get('targetNamespace', '') + service_name = root.get('name', '') or 'WSDL Service' + + # Service element + service_el = root.find(_q(_WSDL2, 'service')) + if service_el is None: + service_el = root.find('service') + if service_el is not None and service_el.get('name'): + service_name = service_el.get('name') + + # Endpoint address + endpoint_url = '' + if service_el is not None: + for ep in service_el: + if _local(ep.tag) == 'endpoint': + addr = ep.get('address', '') + if addr: + endpoint_url = addr + break + + # Determine which binding the service uses + service_binding_local = None + if service_el is not None: + for ep in service_el: + if _local(ep.tag) == 'endpoint': + b_ref = ep.get('binding', '') + service_binding_local = b_ref.split(':')[-1] + break + + # Collect (soap_action) per operation from SOAP bindings + # binding_local_name → {op_local → soap_action} + binding_ops: Dict[str, Dict[str, str]] = {} + for binding in root.iter(): + if _local(binding.tag) != 'binding': + continue + b_name = binding.get('name', '') + b_type = binding.get('type', '') + # Check for SOAP binding type or SOAP child elements + is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower()) + if not is_soap: + is_soap = any( + _WSDL2_SOAP in (child.tag or '') or 'soap' in _local(child.tag).lower() + for child in binding + ) + if not is_soap: + continue + + ops_actions: Dict[str, str] = {} + for child in binding: + if _local(child.tag) != 'operation': + continue + ref = (child.get('ref') or '').split(':')[-1] + # SOAPAction may be a namespaced attribute + action = ( + child.get(_q(_WSDL2_SOAP, 'action')) + or child.get('action') + or '' + ) + if ref: + ops_actions[ref] = action + binding_ops[b_name] = ops_actions + + # Choose the right binding's operations + op_info: Dict[str, str] = {} # op_name → soap_action + if service_binding_local and service_binding_local in binding_ops: + op_info = binding_ops[service_binding_local] + else: + # Merge all SOAP binding operations + for ops in binding_ops.values(): + for op_name, action in ops.items(): + if op_name not in op_info: + op_info[op_name] = action + + # Fallback: collect from interface operations + if not op_info: + for iface in root.iter(): + if _local(iface.tag) != 'interface': + continue + for op in iface: + if _local(op.tag) == 'operation': + op_name = op.get('name', '') + if op_name and op_name not in op_info: + op_info[op_name] = '' + + if not op_info: + return WsdlParseResult( + service_name=service_name, endpoint_url=endpoint_url, + error='No SOAP operations found in WSDL 2.0 document' + ) + + # Build schema maps + elem_map, type_map = _build_schema_maps(root, _WSDL2) + + operations = [] + for op_name, action in op_info.items(): + params = _extract_params_wsdl20(root, op_name, elem_map, type_map) + body = _build_envelope(op_name, target_ns, '1.2', params) + operations.append(WsdlOperation( + name=op_name, soap_action=action, endpoint_url=endpoint_url, + soap_version='1.2', target_namespace=target_ns, body_template=body, + )) + + return WsdlParseResult( + service_name=service_name or 'WSDL Service', + endpoint_url=endpoint_url, + operations=operations, + ) + + +def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: + """Return [(param_name, hint), …] for the input of a WSDL 2.0 operation.""" + for iface in root.iter(): + if _local(iface.tag) != 'interface': + continue + for op in iface: + if _local(op.tag) != 'operation' or op.get('name') != op_name: + continue + for child in op: + if _local(child.tag) == 'input': + elem_ref = (child.get('element') or '').split(':')[-1] + if elem_ref in elem_map: + return _parse_element(elem_map[elem_ref], elem_map, type_map) + + # Naming-convention fallback + for candidate in [op_name, op_name + 'Request', op_name + 'Input']: + if candidate in elem_map: + return _parse_element(elem_map[candidate], elem_map, type_map) + + return [] + + +# --------------------------------------------------------------------------- +# XSD schema helpers +# --------------------------------------------------------------------------- + +def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]: + """Build {local_name: element} and {local_name: complexType} maps from .""" + elem_map: Dict[str, ET.Element] = {} + type_map: Dict[str, ET.Element] = {} + + types_el = root.find(_q(wsdl_ns, 'types')) + if types_el is None: + types_el = root.find('types') + if types_el is None: + return elem_map, type_map + + for node in types_el.iter(): + if _local(node.tag) != 'schema': + continue + for child in node: + name = child.get('name', '') + if not name: + continue + loc = _local(child.tag) + if loc == 'element': + elem_map[name] = child + elif loc == 'complexType': + type_map[name] = child + + return elem_map, type_map + + +def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict, + depth: int = 0) -> List[Tuple[str, str]]: + """Extract [(name, hint)] from an xs:element (inline complexType or type=ref).""" + if depth > 4: + return [] + + # Inline complexType + ct = elem.find(_q(_XS, 'complexType')) + if ct is not None: + return _parse_complex_type(ct, elem_map, type_map, depth) + + # Named type reference + type_ref = elem.get('type', '') + type_local = type_ref.split(':')[-1] if type_ref else '' + if type_local: + if type_local in _XS_HINTS: + return [] # simple scalar — not a parameter container + ct = type_map.get(type_local) + if ct is not None: + return _parse_complex_type(ct, elem_map, type_map, depth) + + return [] + + +def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict, + depth: int = 0) -> List[Tuple[str, str]]: + """Extract [(name, hint)] from an xs:complexType.""" + params: List[Tuple[str, str]] = [] + + # xs:complexContent / xs:extension (inheritance) + cc = ct.find(_q(_XS, 'complexContent')) + if cc is not None: + ext = cc.find(_q(_XS, 'extension')) + if ext is not None: + base_local = (ext.get('base') or '').split(':')[-1] + base_ct = type_map.get(base_local) + if base_ct is not None: + params.extend(_parse_complex_type(base_ct, elem_map, type_map, depth + 1)) + for tag in ('sequence', 'all', 'choice'): + seq = ext.find(_q(_XS, tag)) + if seq is not None: + params.extend(_parse_sequence(seq, elem_map, type_map, depth)) + break + return params + + # Direct sequence / all / choice + for tag in ('sequence', 'all', 'choice'): + seq = ct.find(_q(_XS, tag)) + if seq is not None: + params.extend(_parse_sequence(seq, elem_map, type_map, depth)) + break + + return params + + +def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict, + depth: int = 0) -> List[Tuple[str, str]]: + """Extract [(name, hint)] from xs:sequence / xs:all / xs:choice.""" + params: List[Tuple[str, str]] = [] + choice_optional = _local(seq.tag) == 'choice' + + for child in seq: + loc = _local(child.tag) + + if loc == 'element': + name = child.get('name', '') + if not name: + ref_local = (child.get('ref') or '').split(':')[-1] + if ref_local: + name = ref_local + + if not name: + continue + + optional = choice_optional or child.get('minOccurs', '1') == '0' + type_ref = child.get('type', '') + type_local = type_ref.split(':')[-1] if type_ref else '' + + if type_local and type_local in _XS_HINTS: + params.append((name, _hint(type_local, optional))) + else: + # Inline or referenced complex type — mark as [any] at this depth + inline_ct = child.find(_q(_XS, 'complexType')) + if inline_ct is not None and depth < 2: + sub = _parse_complex_type(inline_ct, elem_map, type_map, depth + 1) + params.extend(sub) if sub else params.append((name, _hint('anyType', optional))) + elif type_local and type_local in type_map and depth < 2: + sub = _parse_complex_type(type_map[type_local], elem_map, type_map, depth + 1) + params.extend(sub) if sub else params.append((name, _hint('anyType', optional))) + else: + params.append((name, _hint('anyType', optional))) + + elif loc in ('sequence', 'all', 'choice') and depth < 3: + params.extend(_parse_sequence(child, elem_map, type_map, depth + 1)) + + return params + + +# --------------------------------------------------------------------------- +# SOAP envelope builder +# --------------------------------------------------------------------------- + +def _build_envelope(op_name: str, target_ns: str, soap_version: str, + params: Optional[List[Tuple[str, str]]] = None) -> str: + env_ns = _ENV12 if soap_version == '1.2' else _ENV11 + + if params: + # Default-namespace style → parameters inherit namespace, no prefix needed + ns_attr = f' xmlns="{target_ns}"' if target_ns else '' + lines = [f' <{op_name}{ns_attr}>'] + for pname, phint in params: + lines.append(f' <{pname}>{phint}') + lines.append(f' ') + body = '\n'.join(lines) + return ( + f'\n' + f'\n' + f' \n' + f' \n' + f'{body}\n' + f' \n' + f'' + ) + else: + ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else '' + op_el = f'tns:{op_name}' if target_ns else op_name + return ( + f'\n' + f'\n' + f' \n' + f' \n' + f' <{op_el}>\n' + f' \n' + f' \n' + f' \n' + f'' + )