diff --git a/src/wsdl_importer.py b/src/wsdl_importer.py index 7336a91..cd3a52d 100644 --- a/src/wsdl_importer.py +++ b/src/wsdl_importer.py @@ -51,6 +51,14 @@ _XS_HINTS: Dict[str, str] = { 'duration': 'duration', 'guid': 'guid', } +# Well-known namespace → preferred short prefix +_KNOWN_NS_PREFIXES: Dict[str, str] = { + 'http://schemas.datacontract.org': 'dc', + 'http://schemas.microsoft.com/2003/10/Serialization/': 'ser', + 'http://www.w3.org/2001/XMLSchema-instance': 'xsi', + 'http://www.w3.org/2001/XMLSchema': 'xs', +} + def _q(ns: str, tag: str) -> str: return f'{{{ns}}}{tag}' @@ -65,6 +73,19 @@ def _hint(xs_local: str, optional: bool) -> str: return f'[{base}{"?" if optional else ""}]' +# --------------------------------------------------------------------------- +# Internal parameter tree +# --------------------------------------------------------------------------- + +@dataclass +class _Param: + """Tree node for building a typed SOAP body element.""" + name: str + ns: str = '' # element namespace; '' = inherit op namespace + hint: Optional[str] = None # leaf text like '[string]'; None = container node + children: list = field(default_factory=list) # list[_Param] + + # --------------------------------------------------------------------------- # Public data classes # --------------------------------------------------------------------------- @@ -204,7 +225,7 @@ def _parse_wsdl11(root: ET.Element) -> WsdlParseResult: error='No SOAP operations found in this WSDL document' ) - # Build schema maps for parameter extraction + # Build schema maps: name → (element, namespace) elem_map, type_map = _build_schema_maps(root, _WSDL) operations = [] @@ -223,9 +244,8 @@ def _parse_wsdl11(root: ET.Element) -> WsdlParseResult: ) -def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: - """Return [(param_name, hint), …] for the input of a WSDL 1.1 operation.""" - # Walk portType → message → part element +def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> list: + """Return list[_Param] for the input of a WSDL 1.1 operation.""" input_elem_name = _find_input_elem_wsdl11(root, op_name) # Naming-convention fallback @@ -238,7 +258,8 @@ def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple if not input_elem_name or input_elem_name not in elem_map: return [] - return _parse_element(elem_map[input_elem_name], elem_map, type_map) + elem, elem_ns = elem_map[input_elem_name] + return _parse_element(elem, elem_ns, elem_map, type_map) def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]: @@ -303,14 +324,12 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: break # Collect (soap_action) per operation from SOAP bindings - # binding_local_name → {op_local → soap_action} binding_ops: Dict[str, Dict[str, str]] = {} for binding in root.iter(): if _local(binding.tag) != 'binding': continue b_name = binding.get('name', '') b_type = binding.get('type', '') - # Check for SOAP binding type or SOAP child elements is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower()) if not is_soap: is_soap = any( @@ -325,7 +344,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: if _local(child.tag) != 'operation': continue ref = (child.get('ref') or '').split(':')[-1] - # SOAPAction may be a namespaced attribute action = ( child.get(_q(_WSDL2_SOAP, 'action')) or child.get('action') @@ -340,7 +358,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: if service_binding_local and service_binding_local in binding_ops: op_info = binding_ops[service_binding_local] else: - # Merge all SOAP binding operations for ops in binding_ops.values(): for op_name, action in ops.items(): if op_name not in op_info: @@ -363,7 +380,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: error='No SOAP operations found in WSDL 2.0 document' ) - # Build schema maps elem_map, type_map = _build_schema_maps(root, _WSDL2) operations = [] @@ -382,8 +398,8 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult: ) -def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: - """Return [(param_name, hint), …] for the input of a WSDL 2.0 operation.""" +def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> list: + """Return list[_Param] for the input of a WSDL 2.0 operation.""" for iface in root.iter(): if _local(iface.tag) != 'interface': continue @@ -394,12 +410,14 @@ def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple if _local(child.tag) == 'input': elem_ref = (child.get('element') or '').split(':')[-1] if elem_ref in elem_map: - return _parse_element(elem_map[elem_ref], elem_map, type_map) + elem, elem_ns = elem_map[elem_ref] + return _parse_element(elem, elem_ns, elem_map, type_map) # Naming-convention fallback for candidate in [op_name, op_name + 'Request', op_name + 'Input']: if candidate in elem_map: - return _parse_element(elem_map[candidate], elem_map, type_map) + elem, elem_ns = elem_map[candidate] + return _parse_element(elem, elem_ns, elem_map, type_map) return [] @@ -409,9 +427,13 @@ def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple # --------------------------------------------------------------------------- def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]: - """Build {local_name: element} and {local_name: complexType} maps from .""" - elem_map: Dict[str, ET.Element] = {} - type_map: Dict[str, ET.Element] = {} + """Build {name: (element, ns)} and {name: (complexType, ns)} maps from . + + Each map value is a (ET.Element, targetNamespace) tuple so callers can + track which schema namespace every element / type belongs to. + """ + elem_map: Dict[str, Tuple[ET.Element, str]] = {} + type_map: Dict[str, Tuple[ET.Element, str]] = {} types_el = root.find(_q(wsdl_ns, 'types')) if types_el is None: @@ -422,47 +444,49 @@ def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]: for node in types_el.iter(): if _local(node.tag) != 'schema': continue + schema_ns = node.get('targetNamespace', '') for child in node: name = child.get('name', '') if not name: continue loc = _local(child.tag) if loc == 'element': - elem_map[name] = child + elem_map[name] = (child, schema_ns) elif loc == 'complexType': - type_map[name] = child + type_map[name] = (child, schema_ns) return elem_map, type_map -def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict, - depth: int = 0) -> List[Tuple[str, str]]: - """Extract [(name, hint)] from an xs:element (inline complexType or type=ref).""" +def _parse_element(elem: ET.Element, elem_ns: str, elem_map: Dict, type_map: Dict, + depth: int = 0) -> list: + """Extract list[_Param] children from an xs:element.""" if depth > 4: return [] # Inline complexType ct = elem.find(_q(_XS, 'complexType')) if ct is not None: - return _parse_complex_type(ct, elem_map, type_map, depth) + return _parse_complex_type(ct, elem_ns, elem_map, type_map, depth) # Named type reference - type_ref = elem.get('type', '') + type_ref = elem.get('type', '') type_local = type_ref.split(':')[-1] if type_ref else '' if type_local: if type_local in _XS_HINTS: return [] # simple scalar — not a parameter container - ct = type_map.get(type_local) - if ct is not None: - return _parse_complex_type(ct, elem_map, type_map, depth) + entry = type_map.get(type_local) + if entry is not None: + ct, type_ns = entry + return _parse_complex_type(ct, type_ns, elem_map, type_map, depth) return [] -def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict, - depth: int = 0) -> List[Tuple[str, str]]: - """Extract [(name, hint)] from an xs:complexType.""" - params: List[Tuple[str, str]] = [] +def _parse_complex_type(ct: ET.Element, ns: str, elem_map: Dict, type_map: Dict, + depth: int = 0) -> list: + """Extract list[_Param] from an xs:complexType.""" + params: list = [] # xs:complexContent / xs:extension (inheritance) cc = ct.find(_q(_XS, 'complexContent')) @@ -470,13 +494,14 @@ def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict, ext = cc.find(_q(_XS, 'extension')) if ext is not None: base_local = (ext.get('base') or '').split(':')[-1] - base_ct = type_map.get(base_local) - if base_ct is not None: - params.extend(_parse_complex_type(base_ct, elem_map, type_map, depth + 1)) + entry = type_map.get(base_local) + if entry is not None: + base_ct, base_ns = entry + params.extend(_parse_complex_type(base_ct, base_ns, elem_map, type_map, depth + 1)) for tag in ('sequence', 'all', 'choice'): seq = ext.find(_q(_XS, tag)) if seq is not None: - params.extend(_parse_sequence(seq, elem_map, type_map, depth)) + params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth)) break return params @@ -484,16 +509,21 @@ def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict, for tag in ('sequence', 'all', 'choice'): seq = ct.find(_q(_XS, tag)) if seq is not None: - params.extend(_parse_sequence(seq, elem_map, type_map, depth)) + params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth)) break return params -def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict, - depth: int = 0) -> List[Tuple[str, str]]: - """Extract [(name, hint)] from xs:sequence / xs:all / xs:choice.""" - params: List[Tuple[str, str]] = [] +def _parse_sequence(seq: ET.Element, ns: str, elem_map: Dict, type_map: Dict, + depth: int = 0) -> list: + """Extract list[_Param] from xs:sequence / xs:all / xs:choice. + + Complex child elements are kept as container _Param nodes (preserving the + wrapper element), rather than being flattened into the parent list. + Child elements of a referenced type carry that type's namespace. + """ + params: list = [] choice_optional = _local(seq.tag) == 'choice' for child in seq: @@ -509,26 +539,33 @@ def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict, if not name: continue - optional = choice_optional or child.get('minOccurs', '1') == '0' - type_ref = child.get('type', '') + optional = choice_optional or child.get('minOccurs', '1') == '0' + type_ref = child.get('type', '') type_local = type_ref.split(':')[-1] if type_ref else '' if type_local and type_local in _XS_HINTS: - params.append((name, _hint(type_local, optional))) + params.append(_Param(name=name, ns=ns, hint=_hint(type_local, optional))) else: - # Inline or referenced complex type — mark as [any] at this depth inline_ct = child.find(_q(_XS, 'complexType')) - if inline_ct is not None and depth < 2: - sub = _parse_complex_type(inline_ct, elem_map, type_map, depth + 1) - params.extend(sub) if sub else params.append((name, _hint('anyType', optional))) - elif type_local and type_local in type_map and depth < 2: - sub = _parse_complex_type(type_map[type_local], elem_map, type_map, depth + 1) - params.extend(sub) if sub else params.append((name, _hint('anyType', optional))) + if inline_ct is not None and depth < 3: + sub = _parse_complex_type(inline_ct, ns, elem_map, type_map, depth + 1) + if sub: + params.append(_Param(name=name, ns=ns, children=sub)) + else: + params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) + elif type_local and type_local in type_map and depth < 3: + child_ct, child_ns = type_map[type_local] + sub = _parse_complex_type(child_ct, child_ns, elem_map, type_map, depth + 1) + if sub: + # Keep wrapper element; children carry child_ns namespace + params.append(_Param(name=name, ns=ns, children=sub)) + else: + params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) else: - params.append((name, _hint('anyType', optional))) + params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional))) - elif loc in ('sequence', 'all', 'choice') and depth < 3: - params.extend(_parse_sequence(child, elem_map, type_map, depth + 1)) + elif loc in ('sequence', 'all', 'choice') and depth < 4: + params.extend(_parse_sequence(child, ns, elem_map, type_map, depth + 1)) return params @@ -537,18 +574,109 @@ def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict, # SOAP envelope builder # --------------------------------------------------------------------------- +def _assign_ns_prefix(ns: str, ns_to_pfx: Dict[str, str], used_pfx: set) -> str: + """Return an existing or newly-assigned XML prefix for *ns*.""" + if ns in ns_to_pfx: + return ns_to_pfx[ns] + + # Check well-known namespaces first (prefix-match) + candidate = '' + for known_ns, known_pfx in _KNOWN_NS_PREFIXES.items(): + if ns.startswith(known_ns): + candidate = known_pfx + break + + if not candidate: + # Derive a short name from the last meaningful URL path segment + last = ns.rstrip('/').rsplit('/', 1)[-1] + base = ''.join(c for c in last.lower() if c.isalpha())[:4] + _generic = {'org', 'com', 'net', 'gov', 'www', 'http', 'wsdl', 'soap', ''} + if base in _generic: + parts = ns.rstrip('/').split('/') + for part in reversed(parts): + seg = ''.join(c for c in part.lower() if c.isalpha())[:4] + if seg and seg not in _generic: + base = seg + break + candidate = base or 'ns' + + # Ensure uniqueness + orig, i = candidate, 1 + while candidate in used_pfx: + candidate = f'{orig}{i}' + i += 1 + + ns_to_pfx[ns] = candidate + used_pfx.add(candidate) + return candidate + + def _build_envelope(op_name: str, target_ns: str, soap_version: str, - params: Optional[List[Tuple[str, str]]] = None) -> str: + params=None) -> str: env_ns = _ENV12 if soap_version == '1.2' else _ENV11 if params: - # Default-namespace style → parameters inherit namespace, no prefix needed - ns_attr = f' xmlns="{target_ns}"' if target_ns else '' - lines = [f' <{op_name}{ns_attr}>'] - for pname, phint in params: - lines.append(f' <{pname}>{phint}') - lines.append(f' ') - body = '\n'.join(lines) + # --- collect all unique namespaces in tree order --- + ns_order: List[str] = [] + ns_seen: set = set() + + def _collect_ns(ps): + for p in ps: + if p.ns and p.ns not in ns_seen: + ns_order.append(p.ns) + ns_seen.add(p.ns) + _collect_ns(p.children) + + if target_ns and target_ns not in ns_seen: + ns_order.append(target_ns) + ns_seen.add(target_ns) + _collect_ns(params) + + # --- assign prefixes --- + ns_to_pfx: Dict[str, str] = {} + used_pfx: set = set() + + # Target namespace always gets 'tns' (consistent with the no-params branch) + if target_ns: + ns_to_pfx[target_ns] = 'tns' + used_pfx.add('tns') + + for ns in ns_order: + if ns not in ns_to_pfx: + _assign_ns_prefix(ns, ns_to_pfx, used_pfx) + + # --- namespace declarations on the operation element --- + ns_decls = ' '.join( + f'xmlns:{ns_to_pfx[ns]}="{ns}"' + for ns in ns_order + if ns in ns_to_pfx + ) + + # --- recursive XML renderer --- + def _render(ps, indent: str) -> List[str]: + lines: List[str] = [] + for p in ps: + pfx = ns_to_pfx.get(p.ns or target_ns, '') + tag = f'{pfx}:{p.name}' if pfx else p.name + if p.hint is not None: + lines.append(f'{indent}<{tag}>{p.hint}') + elif p.children: + lines.append(f'{indent}<{tag}>') + lines.extend(_render(p.children, indent + ' ')) + lines.append(f'{indent}') + else: + lines.append(f'{indent}<{tag}/>') + return lines + + op_pfx = ns_to_pfx.get(target_ns, '') + op_tag = f'{op_pfx}:{op_name}' if op_pfx else op_name + op_open = f'<{op_tag} {ns_decls}>' if ns_decls else f'<{op_tag}>' + + body_lines = [f' {op_open}'] + body_lines.extend(_render(params, ' ')) + body_lines.append(f' ') + body = '\n'.join(body_lines) + return ( f'\n' f'\n'