# wsdl_importer.py # # Copyright 2025 Pavel Baksy # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later import xml.etree.ElementTree as ET from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple # WSDL 1.1 _WSDL = 'http://schemas.xmlsoap.org/wsdl/' _SOAP11 = 'http://schemas.xmlsoap.org/wsdl/soap/' _SOAP12 = 'http://schemas.xmlsoap.org/wsdl/soap12/' _ENV11 = 'http://schemas.xmlsoap.org/soap/envelope/' _ENV12 = 'http://www.w3.org/2003/05/soap-envelope' # WSDL 2.0 _WSDL2 = 'http://www.w3.org/ns/wsdl' _WSDL2_SOAP = 'http://www.w3.org/ns/wsdl/soap' # XML Schema _XS = 'http://www.w3.org/2001/XMLSchema' # XSD simple-type → human hint _XS_HINTS: Dict[str, str] = { 'string': 'string', 'normalizedString': 'string', 'token': 'string', 'int': 'int', 'integer': 'int', 'nonNegativeInteger': 'int', 'positiveInteger': 'int', 'negativeInteger': 'int', 'short': 'int', 'byte': 'int', 'unsignedByte': 'int', 'unsignedInt': 'int', 'unsignedShort': 'int', 'long': 'long', 'unsignedLong': 'long', 'boolean': 'boolean', 'float': 'float', 'double': 'float', 'decimal': 'decimal', 'dateTime': 'datetime', 'date': 'date', 'time': 'time', 'base64Binary': 'base64', 'hexBinary': 'hex', 'anyType': 'any', 'anySimpleType': 'any', 'duration': 'duration', 'guid': 'guid', } # Well-known namespace → preferred short prefix _KNOWN_NS_PREFIXES: Dict[str, str] = { 'http://schemas.datacontract.org': 'dc', 'http://schemas.microsoft.com/2003/10/Serialization/': 'ser', 'http://www.w3.org/2001/XMLSchema-instance': 'xsi', 'http://www.w3.org/2001/XMLSchema': 'xs', } def _q(ns: str, tag: str) -> str: return f'{{{ns}}}{tag}' def _local(tag: str) -> str: return tag.split('}')[-1] if '}' in tag else tag def _hint(xs_local: str, optional: bool) -> str: base = _XS_HINTS.get(xs_local, xs_local) return f'[{base}{"?" if optional else ""}]' # --------------------------------------------------------------------------- # Internal parameter tree # --------------------------------------------------------------------------- @dataclass class _Param: """Tree node for building a typed SOAP body element.""" name: str ns: str = '' # element namespace; '' = inherit op namespace hint: Optional[str] = None # leaf text like '[string]'; None = container node children: list = field(default_factory=list) # list[_Param] # --------------------------------------------------------------------------- # Public data classes # --------------------------------------------------------------------------- @dataclass class WsdlOperation: name: str soap_action: str endpoint_url: str soap_version: str # '1.1' or '1.2' target_namespace: str body_template: str @dataclass class WsdlParseResult: service_name: str endpoint_url: str operations: List[WsdlOperation] = field(default_factory=list) error: Optional[str] = None # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def parse_wsdl(xml_content: str) -> WsdlParseResult: """Auto-detect WSDL 1.1 / 2.0 and return service info + operations.""" try: root = ET.fromstring(xml_content) except ET.ParseError as e: return WsdlParseResult(service_name='', endpoint_url='', error=f'Invalid XML: {e}') local = _local(root.tag) if local == 'definitions': return _parse_wsdl11(root) elif local == 'description': return _parse_wsdl20(root) else: return WsdlParseResult( service_name='', endpoint_url='', error='Document is not a valid WSDL 1.1 (definitions) or WSDL 2.0 (description) file' ) def build_http_request(operation: WsdlOperation): """Convert a WsdlOperation into an HttpRequest ready to send.""" from .models import HttpRequest headers: Dict[str, str] = {} if operation.soap_version == '1.2': ct = 'application/soap+xml; charset=utf-8' if operation.soap_action: ct += f'; action="{operation.soap_action}"' headers['Content-Type'] = ct else: headers['Content-Type'] = 'text/xml; charset=utf-8' if operation.soap_action: headers['SOAPAction'] = f'"{operation.soap_action}"' return HttpRequest( method='POST', url=operation.endpoint_url, headers=headers, body=operation.body_template, syntax='XML', ) # --------------------------------------------------------------------------- # WSDL 1.1 parser # --------------------------------------------------------------------------- def _parse_wsdl11(root: ET.Element) -> WsdlParseResult: target_ns = root.get('targetNamespace', '') service_name = root.get('name', '') or 'WSDL Service' # Service name + endpoint URL service_el = root.find(_q(_WSDL, 'service')) if service_el is None: service_el = root.find('service') if service_el is not None and service_el.get('name'): service_name = service_el.get('name') endpoint_url = '' default_soap_ver = '1.1' if service_el is not None: for port in service_el: addr11 = port.find(_q(_SOAP11, 'address')) if addr11 is not None: endpoint_url = addr11.get('location', '') default_soap_ver = '1.1' break addr12 = port.find(_q(_SOAP12, 'address')) if addr12 is not None: endpoint_url = addr12.get('location', '') default_soap_ver = '1.2' break # Collect (soap_action, soap_version) per operation from bindings op_info: Dict[str, Tuple[str, str]] = {} for binding in root.iter(): if _local(binding.tag) != 'binding': continue is11 = binding.find(_q(_SOAP11, 'binding')) is not None is12 = binding.find(_q(_SOAP12, 'binding')) is not None if not is11 and not is12: continue bv = '1.2' if is12 else '1.1' for op in binding: if _local(op.tag) != 'operation': continue op_name = _local(op.get('name', '')) if not op_name: continue soap_op = op.find(_q(_SOAP11, 'operation')) if soap_op is None: soap_op = op.find(_q(_SOAP12, 'operation')) action = soap_op.get('soapAction', '') if soap_op is not None else '' if op_name not in op_info: op_info[op_name] = (action, bv) # Fallback: portType when no SOAP binding found if not op_info: for pt in root.iter(): if _local(pt.tag) != 'portType': continue for op in pt: if _local(op.tag) == 'operation': op_name = op.get('name', '') if op_name and op_name not in op_info: op_info[op_name] = ('', default_soap_ver) if not op_info: return WsdlParseResult( service_name=service_name, endpoint_url=endpoint_url, error='No SOAP operations found in this WSDL document' ) # Build schema maps: name → (element, namespace) elem_map, type_map = _build_schema_maps(root, _WSDL) operations = [] for op_name, (action, ver) in op_info.items(): params = _extract_params_wsdl11(root, op_name, elem_map, type_map) body = _build_envelope(op_name, target_ns, ver, params) operations.append(WsdlOperation( name=op_name, soap_action=action, endpoint_url=endpoint_url, soap_version=ver, target_namespace=target_ns, body_template=body, )) return WsdlParseResult( service_name=service_name or 'WSDL Service', endpoint_url=endpoint_url, operations=operations, ) def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> list: """Return list[_Param] for the input of a WSDL 1.1 operation.""" input_elem_name = _find_input_elem_wsdl11(root, op_name) # Naming-convention fallback if not input_elem_name: for candidate in [op_name, op_name + 'Request', op_name + 'Input']: if candidate in elem_map: input_elem_name = candidate break if not input_elem_name or input_elem_name not in elem_map: return [] elem, elem_ns = elem_map[input_elem_name] return _parse_element(elem, elem_ns, elem_map, type_map) def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]: """Walk portType → input message → part/@element and return local element name.""" for pt in root.iter(): if _local(pt.tag) != 'portType': continue for op in pt: if _local(op.tag) != 'operation' or op.get('name') != op_name: continue inp = op.find(_q(_WSDL, 'input')) if inp is None: inp = op.find('input') if inp is None: return None msg_local = (inp.get('message') or '').split(':')[-1] for msg in root.iter(): if _local(msg.tag) != 'message' or msg.get('name') != msg_local: continue for part in msg: if _local(part.tag) != 'part': continue elem_ref = part.get('element', '') if elem_ref: return elem_ref.split(':')[-1] return None return None # --------------------------------------------------------------------------- # WSDL 2.0 parser # --------------------------------------------------------------------------- def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: target_ns = root.get('targetNamespace', '') service_name = root.get('name', '') or 'WSDL Service' # Service element service_el = root.find(_q(_WSDL2, 'service')) if service_el is None: service_el = root.find('service') if service_el is not None and service_el.get('name'): service_name = service_el.get('name') # Endpoint address endpoint_url = '' if service_el is not None: for ep in service_el: if _local(ep.tag) == 'endpoint': addr = ep.get('address', '') if addr: endpoint_url = addr break # Determine which binding the service uses service_binding_local = None if service_el is not None: for ep in service_el: if _local(ep.tag) == 'endpoint': b_ref = ep.get('binding', '') service_binding_local = b_ref.split(':')[-1] break # Collect (soap_action) per operation from SOAP bindings binding_ops: Dict[str, Dict[str, str]] = {} for binding in root.iter(): if _local(binding.tag) != 'binding': continue b_name = binding.get('name', '') b_type = binding.get('type', '') is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower()) if not is_soap: is_soap = any( _WSDL2_SOAP in (child.tag or '') or 'soap' in _local(child.tag).lower() for child in binding ) if not is_soap: continue ops_actions: Dict[str, str] = {} for child in binding: if _local(child.tag) != 'operation': continue ref = (child.get('ref') or '').split(':')[-1] action = ( child.get(_q(_WSDL2_SOAP, 'action')) or child.get('action') or '' ) if ref: ops_actions[ref] = action binding_ops[b_name] = ops_actions # Choose the right binding's operations op_info: Dict[str, str] = {} # op_name → soap_action if service_binding_local and service_binding_local in binding_ops: op_info = binding_ops[service_binding_local] else: for ops in binding_ops.values(): for op_name, action in ops.items(): if op_name not in op_info: op_info[op_name] = action # Fallback: collect from interface operations if not op_info: for iface in root.iter(): if _local(iface.tag) != 'interface': continue for op in iface: if _local(op.tag) == 'operation': op_name = op.get('name', '') if op_name and op_name not in op_info: op_info[op_name] = '' if not op_info: return WsdlParseResult( service_name=service_name, endpoint_url=endpoint_url, error='No SOAP operations found in WSDL 2.0 document' ) elem_map, type_map = _build_schema_maps(root, _WSDL2) operations = [] for op_name, action in op_info.items(): params = _extract_params_wsdl20(root, op_name, elem_map, type_map) body = _build_envelope(op_name, target_ns, '1.2', params) operations.append(WsdlOperation( name=op_name, soap_action=action, endpoint_url=endpoint_url, soap_version='1.2', target_namespace=target_ns, body_template=body, )) return WsdlParseResult( service_name=service_name or 'WSDL Service', endpoint_url=endpoint_url, operations=operations, ) def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> list: """Return list[_Param] for the input of a WSDL 2.0 operation.""" for iface in root.iter(): if _local(iface.tag) != 'interface': continue for op in iface: if _local(op.tag) != 'operation' or op.get('name') != op_name: continue for child in op: if _local(child.tag) == 'input': elem_ref = (child.get('element') or '').split(':')[-1] if elem_ref in elem_map: elem, elem_ns = elem_map[elem_ref] return _parse_element(elem, elem_ns, elem_map, type_map) # Naming-convention fallback for candidate in [op_name, op_name + 'Request', op_name + 'Input']: if candidate in elem_map: elem, elem_ns = elem_map[candidate] return _parse_element(elem, elem_ns, elem_map, type_map) return [] # --------------------------------------------------------------------------- # XSD schema helpers # --------------------------------------------------------------------------- def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]: """Build {name: (element, ns)} and {name: (complexType, ns)} maps from . Each map value is a (ET.Element, targetNamespace) tuple so callers can track which schema namespace every element / type belongs to. """ elem_map: Dict[str, Tuple[ET.Element, str]] = {} type_map: Dict[str, Tuple[ET.Element, str]] = {} types_el = root.find(_q(wsdl_ns, 'types')) if types_el is None: types_el = root.find('types') if types_el is None: return elem_map, type_map for node in types_el.iter(): if _local(node.tag) != 'schema': continue schema_ns = node.get('targetNamespace', '') for child in node: name = child.get('name', '') if not name: continue loc = _local(child.tag) if loc == 'element': elem_map[name] = (child, schema_ns) elif loc == 'complexType': type_map[name] = (child, schema_ns) return elem_map, type_map def _parse_element(elem: ET.Element, elem_ns: str, elem_map: Dict, type_map: Dict, depth: int = 0) -> list: """Extract list[_Param] children from an xs:element.""" if depth > 4: return [] # Inline complexType ct = elem.find(_q(_XS, 'complexType')) if ct is not None: return _parse_complex_type(ct, elem_ns, elem_map, type_map, depth) # Named type reference type_ref = elem.get('type', '') type_local = type_ref.split(':')[-1] if type_ref else '' if type_local: if type_local in _XS_HINTS: return [] # simple scalar — not a parameter container entry = type_map.get(type_local) if entry is not None: ct, type_ns = entry return _parse_complex_type(ct, type_ns, elem_map, type_map, depth) return [] def _parse_complex_type(ct: ET.Element, ns: str, elem_map: Dict, type_map: Dict, depth: int = 0) -> list: """Extract list[_Param] from an xs:complexType.""" params: list = [] # xs:complexContent / xs:extension (inheritance) cc = ct.find(_q(_XS, 'complexContent')) if cc is not None: ext = cc.find(_q(_XS, 'extension')) if ext is not None: base_local = (ext.get('base') or '').split(':')[-1] entry = type_map.get(base_local) if entry is not None: base_ct, base_ns = entry params.extend(_parse_complex_type(base_ct, base_ns, elem_map, type_map, depth + 1)) for tag in ('sequence', 'all', 'choice'): seq = ext.find(_q(_XS, tag)) if seq is not None: params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth)) break return params # Direct sequence / all / choice for tag in ('sequence', 'all', 'choice'): seq = ct.find(_q(_XS, tag)) if seq is not None: params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth)) break return params def _parse_sequence(seq: ET.Element, ns: str, elem_map: Dict, type_map: Dict, depth: int = 0) -> list: """Extract list[_Param] from xs:sequence / xs:all / xs:choice. Complex child elements are kept as container _Param nodes (preserving the wrapper element), rather than being flattened into the parent list. Child elements of a referenced type carry that type's namespace. """ params: list = [] choice_optional = _local(seq.tag) == 'choice' for child in seq: loc = _local(child.tag) if loc == 'element': name = child.get('name', '') if not name: ref_local = (child.get('ref') or '').split(':')[-1] if ref_local: name = ref_local if not name: continue optional = choice_optional or child.get('minOccurs', '1') == '0' type_ref = child.get('type', '') type_local = type_ref.split(':')[-1] if type_ref else '' if type_local and type_local in _XS_HINTS: params.append(_Param(name=name, ns=ns, hint=_hint(type_local, optional))) else: inline_ct = child.find(_q(_XS, 'complexType')) if inline_ct is not None and depth < 3: sub = _parse_complex_type(inline_ct, ns, elem_map, type_map, depth + 1) if sub: params.append(_Param(name=name, ns=ns, children=sub)) else: params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) elif type_local and type_local in type_map and depth < 3: child_ct, child_ns = type_map[type_local] sub = _parse_complex_type(child_ct, child_ns, elem_map, type_map, depth + 1) if sub: # Keep wrapper element; children carry child_ns namespace params.append(_Param(name=name, ns=ns, children=sub)) else: params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) else: params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) elif loc in ('sequence', 'all', 'choice') and depth < 4: params.extend(_parse_sequence(child, ns, elem_map, type_map, depth + 1)) return params # --------------------------------------------------------------------------- # SOAP envelope builder # --------------------------------------------------------------------------- def _assign_ns_prefix(ns: str, ns_to_pfx: Dict[str, str], used_pfx: set) -> str: """Return an existing or newly-assigned XML prefix for *ns*.""" if ns in ns_to_pfx: return ns_to_pfx[ns] # Check well-known namespaces first (prefix-match) candidate = '' for known_ns, known_pfx in _KNOWN_NS_PREFIXES.items(): if ns.startswith(known_ns): candidate = known_pfx break if not candidate: # Derive a short name from the last meaningful URL path segment last = ns.rstrip('/').rsplit('/', 1)[-1] base = ''.join(c for c in last.lower() if c.isalpha())[:4] _generic = {'org', 'com', 'net', 'gov', 'www', 'http', 'wsdl', 'soap', ''} if base in _generic: parts = ns.rstrip('/').split('/') for part in reversed(parts): seg = ''.join(c for c in part.lower() if c.isalpha())[:4] if seg and seg not in _generic: base = seg break candidate = base or 'ns' # Ensure uniqueness orig, i = candidate, 1 while candidate in used_pfx: candidate = f'{orig}{i}' i += 1 ns_to_pfx[ns] = candidate used_pfx.add(candidate) return candidate def _build_envelope(op_name: str, target_ns: str, soap_version: str, params=None) -> str: env_ns = _ENV12 if soap_version == '1.2' else _ENV11 if params: # --- collect all unique namespaces in tree order --- ns_order: List[str] = [] ns_seen: set = set() def _collect_ns(ps): for p in ps: if p.ns and p.ns not in ns_seen: ns_order.append(p.ns) ns_seen.add(p.ns) _collect_ns(p.children) if target_ns and target_ns not in ns_seen: ns_order.append(target_ns) ns_seen.add(target_ns) _collect_ns(params) # --- assign prefixes --- ns_to_pfx: Dict[str, str] = {} used_pfx: set = set() # Target namespace always gets 'tns' (consistent with the no-params branch) if target_ns: ns_to_pfx[target_ns] = 'tns' used_pfx.add('tns') for ns in ns_order: if ns not in ns_to_pfx: _assign_ns_prefix(ns, ns_to_pfx, used_pfx) # --- namespace declarations on the operation element --- ns_decls = ' '.join( f'xmlns:{ns_to_pfx[ns]}="{ns}"' for ns in ns_order if ns in ns_to_pfx ) # --- recursive XML renderer --- def _render(ps, indent: str) -> List[str]: lines: List[str] = [] for p in ps: pfx = ns_to_pfx.get(p.ns or target_ns, '') tag = f'{pfx}:{p.name}' if pfx else p.name if p.hint is not None: lines.append(f'{indent}<{tag}>{p.hint}') elif p.children: lines.append(f'{indent}<{tag}>') lines.extend(_render(p.children, indent + ' ')) lines.append(f'{indent}') else: lines.append(f'{indent}<{tag}/>') return lines op_pfx = ns_to_pfx.get(target_ns, '') op_tag = f'{op_pfx}:{op_name}' if op_pfx else op_name op_open = f'<{op_tag} {ns_decls}>' if ns_decls else f'<{op_tag}>' body_lines = [f' {op_open}'] body_lines.extend(_render(params, ' ')) body_lines.append(f' ') body = '\n'.join(body_lines) return ( f'\n' f'\n' f' \n' f' \n' f'{body}\n' f' \n' f'' ) else: ns_decl = f'\n xmlns:tns="{target_ns}"' if target_ns else '' op_el = f'tns:{op_name}' if target_ns else op_name return ( f'\n' f'\n' f' \n' f' \n' f' <{op_el}>\n' f' \n' f' \n' f' \n' f'' )