Fix WSDL parser: keep wrapper elements and per-schema namespaces

This commit is contained in:
Pavel Baksy 2026-05-19 13:40:26 +02:00
parent b8338be283
commit a92eff7a68

View File

@ -51,6 +51,14 @@ _XS_HINTS: Dict[str, str] = {
'duration': 'duration', 'guid': 'guid', 'duration': 'duration', 'guid': 'guid',
} }
# Well-known namespace → preferred short prefix
_KNOWN_NS_PREFIXES: Dict[str, str] = {
'http://schemas.datacontract.org': 'dc',
'http://schemas.microsoft.com/2003/10/Serialization/': 'ser',
'http://www.w3.org/2001/XMLSchema-instance': 'xsi',
'http://www.w3.org/2001/XMLSchema': 'xs',
}
def _q(ns: str, tag: str) -> str: def _q(ns: str, tag: str) -> str:
return f'{{{ns}}}{tag}' return f'{{{ns}}}{tag}'
@ -65,6 +73,19 @@ def _hint(xs_local: str, optional: bool) -> str:
return f'[{base}{"?" if optional else ""}]' return f'[{base}{"?" if optional else ""}]'
# ---------------------------------------------------------------------------
# Internal parameter tree
# ---------------------------------------------------------------------------
@dataclass
class _Param:
"""Tree node for building a typed SOAP body element."""
name: str
ns: str = '' # element namespace; '' = inherit op namespace
hint: Optional[str] = None # leaf text like '[string]'; None = container node
children: list = field(default_factory=list) # list[_Param]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Public data classes # Public data classes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -204,7 +225,7 @@ def _parse_wsdl11(root: ET.Element) -> WsdlParseResult:
error='No SOAP operations found in this WSDL document' error='No SOAP operations found in this WSDL document'
) )
# Build schema maps for parameter extraction # Build schema maps: name → (element, namespace)
elem_map, type_map = _build_schema_maps(root, _WSDL) elem_map, type_map = _build_schema_maps(root, _WSDL)
operations = [] operations = []
@ -223,9 +244,8 @@ def _parse_wsdl11(root: ET.Element) -> WsdlParseResult:
) )
def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> list:
"""Return [(param_name, hint), …] for the input of a WSDL 1.1 operation.""" """Return list[_Param] for the input of a WSDL 1.1 operation."""
# Walk portType → message → part element
input_elem_name = _find_input_elem_wsdl11(root, op_name) input_elem_name = _find_input_elem_wsdl11(root, op_name)
# Naming-convention fallback # Naming-convention fallback
@ -238,7 +258,8 @@ def _extract_params_wsdl11(root, op_name: str, elem_map, type_map) -> List[Tuple
if not input_elem_name or input_elem_name not in elem_map: if not input_elem_name or input_elem_name not in elem_map:
return [] return []
return _parse_element(elem_map[input_elem_name], elem_map, type_map) elem, elem_ns = elem_map[input_elem_name]
return _parse_element(elem, elem_ns, elem_map, type_map)
def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]: def _find_input_elem_wsdl11(root, op_name: str) -> Optional[str]:
@ -303,14 +324,12 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
break break
# Collect (soap_action) per operation from SOAP bindings # Collect (soap_action) per operation from SOAP bindings
# binding_local_name → {op_local → soap_action}
binding_ops: Dict[str, Dict[str, str]] = {} binding_ops: Dict[str, Dict[str, str]] = {}
for binding in root.iter(): for binding in root.iter():
if _local(binding.tag) != 'binding': if _local(binding.tag) != 'binding':
continue continue
b_name = binding.get('name', '') b_name = binding.get('name', '')
b_type = binding.get('type', '') b_type = binding.get('type', '')
# Check for SOAP binding type or SOAP child elements
is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower()) is_soap = (_WSDL2_SOAP in b_type or 'soap' in b_type.lower())
if not is_soap: if not is_soap:
is_soap = any( is_soap = any(
@ -325,7 +344,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
if _local(child.tag) != 'operation': if _local(child.tag) != 'operation':
continue continue
ref = (child.get('ref') or '').split(':')[-1] ref = (child.get('ref') or '').split(':')[-1]
# SOAPAction may be a namespaced attribute
action = ( action = (
child.get(_q(_WSDL2_SOAP, 'action')) child.get(_q(_WSDL2_SOAP, 'action'))
or child.get('action') or child.get('action')
@ -340,7 +358,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
if service_binding_local and service_binding_local in binding_ops: if service_binding_local and service_binding_local in binding_ops:
op_info = binding_ops[service_binding_local] op_info = binding_ops[service_binding_local]
else: else:
# Merge all SOAP binding operations
for ops in binding_ops.values(): for ops in binding_ops.values():
for op_name, action in ops.items(): for op_name, action in ops.items():
if op_name not in op_info: if op_name not in op_info:
@ -363,7 +380,6 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
error='No SOAP operations found in WSDL 2.0 document' error='No SOAP operations found in WSDL 2.0 document'
) )
# Build schema maps
elem_map, type_map = _build_schema_maps(root, _WSDL2) elem_map, type_map = _build_schema_maps(root, _WSDL2)
operations = [] operations = []
@ -382,8 +398,8 @@ def _parse_wsdl20(root: ET.Element) -> WsdlParseResult:
) )
def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple[str, str]]: def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> list:
"""Return [(param_name, hint), …] for the input of a WSDL 2.0 operation.""" """Return list[_Param] for the input of a WSDL 2.0 operation."""
for iface in root.iter(): for iface in root.iter():
if _local(iface.tag) != 'interface': if _local(iface.tag) != 'interface':
continue continue
@ -394,12 +410,14 @@ def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple
if _local(child.tag) == 'input': if _local(child.tag) == 'input':
elem_ref = (child.get('element') or '').split(':')[-1] elem_ref = (child.get('element') or '').split(':')[-1]
if elem_ref in elem_map: if elem_ref in elem_map:
return _parse_element(elem_map[elem_ref], elem_map, type_map) elem, elem_ns = elem_map[elem_ref]
return _parse_element(elem, elem_ns, elem_map, type_map)
# Naming-convention fallback # Naming-convention fallback
for candidate in [op_name, op_name + 'Request', op_name + 'Input']: for candidate in [op_name, op_name + 'Request', op_name + 'Input']:
if candidate in elem_map: if candidate in elem_map:
return _parse_element(elem_map[candidate], elem_map, type_map) elem, elem_ns = elem_map[candidate]
return _parse_element(elem, elem_ns, elem_map, type_map)
return [] return []
@ -409,9 +427,13 @@ def _extract_params_wsdl20(root, op_name: str, elem_map, type_map) -> List[Tuple
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]: def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]:
"""Build {local_name: element} and {local_name: complexType} maps from <types>.""" """Build {name: (element, ns)} and {name: (complexType, ns)} maps from <types>.
elem_map: Dict[str, ET.Element] = {}
type_map: Dict[str, ET.Element] = {} Each map value is a (ET.Element, targetNamespace) tuple so callers can
track which schema namespace every element / type belongs to.
"""
elem_map: Dict[str, Tuple[ET.Element, str]] = {}
type_map: Dict[str, Tuple[ET.Element, str]] = {}
types_el = root.find(_q(wsdl_ns, 'types')) types_el = root.find(_q(wsdl_ns, 'types'))
if types_el is None: if types_el is None:
@ -422,29 +444,30 @@ def _build_schema_maps(root: ET.Element, wsdl_ns: str) -> Tuple[Dict, Dict]:
for node in types_el.iter(): for node in types_el.iter():
if _local(node.tag) != 'schema': if _local(node.tag) != 'schema':
continue continue
schema_ns = node.get('targetNamespace', '')
for child in node: for child in node:
name = child.get('name', '') name = child.get('name', '')
if not name: if not name:
continue continue
loc = _local(child.tag) loc = _local(child.tag)
if loc == 'element': if loc == 'element':
elem_map[name] = child elem_map[name] = (child, schema_ns)
elif loc == 'complexType': elif loc == 'complexType':
type_map[name] = child type_map[name] = (child, schema_ns)
return elem_map, type_map return elem_map, type_map
def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict, def _parse_element(elem: ET.Element, elem_ns: str, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]: depth: int = 0) -> list:
"""Extract [(name, hint)] from an xs:element (inline complexType or type=ref).""" """Extract list[_Param] children from an xs:element."""
if depth > 4: if depth > 4:
return [] return []
# Inline complexType # Inline complexType
ct = elem.find(_q(_XS, 'complexType')) ct = elem.find(_q(_XS, 'complexType'))
if ct is not None: if ct is not None:
return _parse_complex_type(ct, elem_map, type_map, depth) return _parse_complex_type(ct, elem_ns, elem_map, type_map, depth)
# Named type reference # Named type reference
type_ref = elem.get('type', '') type_ref = elem.get('type', '')
@ -452,17 +475,18 @@ def _parse_element(elem: ET.Element, elem_map: Dict, type_map: Dict,
if type_local: if type_local:
if type_local in _XS_HINTS: if type_local in _XS_HINTS:
return [] # simple scalar — not a parameter container return [] # simple scalar — not a parameter container
ct = type_map.get(type_local) entry = type_map.get(type_local)
if ct is not None: if entry is not None:
return _parse_complex_type(ct, elem_map, type_map, depth) ct, type_ns = entry
return _parse_complex_type(ct, type_ns, elem_map, type_map, depth)
return [] return []
def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict, def _parse_complex_type(ct: ET.Element, ns: str, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]: depth: int = 0) -> list:
"""Extract [(name, hint)] from an xs:complexType.""" """Extract list[_Param] from an xs:complexType."""
params: List[Tuple[str, str]] = [] params: list = []
# xs:complexContent / xs:extension (inheritance) # xs:complexContent / xs:extension (inheritance)
cc = ct.find(_q(_XS, 'complexContent')) cc = ct.find(_q(_XS, 'complexContent'))
@ -470,13 +494,14 @@ def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict,
ext = cc.find(_q(_XS, 'extension')) ext = cc.find(_q(_XS, 'extension'))
if ext is not None: if ext is not None:
base_local = (ext.get('base') or '').split(':')[-1] base_local = (ext.get('base') or '').split(':')[-1]
base_ct = type_map.get(base_local) entry = type_map.get(base_local)
if base_ct is not None: if entry is not None:
params.extend(_parse_complex_type(base_ct, elem_map, type_map, depth + 1)) base_ct, base_ns = entry
params.extend(_parse_complex_type(base_ct, base_ns, elem_map, type_map, depth + 1))
for tag in ('sequence', 'all', 'choice'): for tag in ('sequence', 'all', 'choice'):
seq = ext.find(_q(_XS, tag)) seq = ext.find(_q(_XS, tag))
if seq is not None: if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth)) params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth))
break break
return params return params
@ -484,16 +509,21 @@ def _parse_complex_type(ct: ET.Element, elem_map: Dict, type_map: Dict,
for tag in ('sequence', 'all', 'choice'): for tag in ('sequence', 'all', 'choice'):
seq = ct.find(_q(_XS, tag)) seq = ct.find(_q(_XS, tag))
if seq is not None: if seq is not None:
params.extend(_parse_sequence(seq, elem_map, type_map, depth)) params.extend(_parse_sequence(seq, ns, elem_map, type_map, depth))
break break
return params return params
def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict, def _parse_sequence(seq: ET.Element, ns: str, elem_map: Dict, type_map: Dict,
depth: int = 0) -> List[Tuple[str, str]]: depth: int = 0) -> list:
"""Extract [(name, hint)] from xs:sequence / xs:all / xs:choice.""" """Extract list[_Param] from xs:sequence / xs:all / xs:choice.
params: List[Tuple[str, str]] = []
Complex child elements are kept as container _Param nodes (preserving the
wrapper element), rather than being flattened into the parent list.
Child elements of a referenced type carry that type's namespace.
"""
params: list = []
choice_optional = _local(seq.tag) == 'choice' choice_optional = _local(seq.tag) == 'choice'
for child in seq: for child in seq:
@ -514,21 +544,28 @@ def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict,
type_local = type_ref.split(':')[-1] if type_ref else '' type_local = type_ref.split(':')[-1] if type_ref else ''
if type_local and type_local in _XS_HINTS: if type_local and type_local in _XS_HINTS:
params.append((name, _hint(type_local, optional))) params.append(_Param(name=name, ns=ns, hint=_hint(type_local, optional)))
else: else:
# Inline or referenced complex type — mark as [any] at this depth
inline_ct = child.find(_q(_XS, 'complexType')) inline_ct = child.find(_q(_XS, 'complexType'))
if inline_ct is not None and depth < 2: if inline_ct is not None and depth < 3:
sub = _parse_complex_type(inline_ct, elem_map, type_map, depth + 1) sub = _parse_complex_type(inline_ct, ns, elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional))) if sub:
elif type_local and type_local in type_map and depth < 2: params.append(_Param(name=name, ns=ns, children=sub))
sub = _parse_complex_type(type_map[type_local], elem_map, type_map, depth + 1)
params.extend(sub) if sub else params.append((name, _hint('anyType', optional)))
else: else:
params.append((name, _hint('anyType', optional))) params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional)))
elif type_local and type_local in type_map and depth < 3:
child_ct, child_ns = type_map[type_local]
sub = _parse_complex_type(child_ct, child_ns, elem_map, type_map, depth + 1)
if sub:
# Keep wrapper element; children carry child_ns namespace
params.append(_Param(name=name, ns=ns, children=sub))
else:
params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional)))
else:
params.append(_Param(name=name, ns=ns, hint=_hint('anyType', optional)))
elif loc in ('sequence', 'all', 'choice') and depth < 3: elif loc in ('sequence', 'all', 'choice') and depth < 4:
params.extend(_parse_sequence(child, elem_map, type_map, depth + 1)) params.extend(_parse_sequence(child, ns, elem_map, type_map, depth + 1))
return params return params
@ -537,18 +574,109 @@ def _parse_sequence(seq: ET.Element, elem_map: Dict, type_map: Dict,
# SOAP envelope builder # SOAP envelope builder
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _assign_ns_prefix(ns: str, ns_to_pfx: Dict[str, str], used_pfx: set) -> str:
"""Return an existing or newly-assigned XML prefix for *ns*."""
if ns in ns_to_pfx:
return ns_to_pfx[ns]
# Check well-known namespaces first (prefix-match)
candidate = ''
for known_ns, known_pfx in _KNOWN_NS_PREFIXES.items():
if ns.startswith(known_ns):
candidate = known_pfx
break
if not candidate:
# Derive a short name from the last meaningful URL path segment
last = ns.rstrip('/').rsplit('/', 1)[-1]
base = ''.join(c for c in last.lower() if c.isalpha())[:4]
_generic = {'org', 'com', 'net', 'gov', 'www', 'http', 'wsdl', 'soap', ''}
if base in _generic:
parts = ns.rstrip('/').split('/')
for part in reversed(parts):
seg = ''.join(c for c in part.lower() if c.isalpha())[:4]
if seg and seg not in _generic:
base = seg
break
candidate = base or 'ns'
# Ensure uniqueness
orig, i = candidate, 1
while candidate in used_pfx:
candidate = f'{orig}{i}'
i += 1
ns_to_pfx[ns] = candidate
used_pfx.add(candidate)
return candidate
def _build_envelope(op_name: str, target_ns: str, soap_version: str, def _build_envelope(op_name: str, target_ns: str, soap_version: str,
params: Optional[List[Tuple[str, str]]] = None) -> str: params=None) -> str:
env_ns = _ENV12 if soap_version == '1.2' else _ENV11 env_ns = _ENV12 if soap_version == '1.2' else _ENV11
if params: if params:
# Default-namespace style → parameters inherit namespace, no prefix needed # --- collect all unique namespaces in tree order ---
ns_attr = f' xmlns="{target_ns}"' if target_ns else '' ns_order: List[str] = []
lines = [f' <{op_name}{ns_attr}>'] ns_seen: set = set()
for pname, phint in params:
lines.append(f' <{pname}>{phint}</{pname}>') def _collect_ns(ps):
lines.append(f' </{op_name}>') for p in ps:
body = '\n'.join(lines) if p.ns and p.ns not in ns_seen:
ns_order.append(p.ns)
ns_seen.add(p.ns)
_collect_ns(p.children)
if target_ns and target_ns not in ns_seen:
ns_order.append(target_ns)
ns_seen.add(target_ns)
_collect_ns(params)
# --- assign prefixes ---
ns_to_pfx: Dict[str, str] = {}
used_pfx: set = set()
# Target namespace always gets 'tns' (consistent with the no-params branch)
if target_ns:
ns_to_pfx[target_ns] = 'tns'
used_pfx.add('tns')
for ns in ns_order:
if ns not in ns_to_pfx:
_assign_ns_prefix(ns, ns_to_pfx, used_pfx)
# --- namespace declarations on the operation element ---
ns_decls = ' '.join(
f'xmlns:{ns_to_pfx[ns]}="{ns}"'
for ns in ns_order
if ns in ns_to_pfx
)
# --- recursive XML renderer ---
def _render(ps, indent: str) -> List[str]:
lines: List[str] = []
for p in ps:
pfx = ns_to_pfx.get(p.ns or target_ns, '')
tag = f'{pfx}:{p.name}' if pfx else p.name
if p.hint is not None:
lines.append(f'{indent}<{tag}>{p.hint}</{tag}>')
elif p.children:
lines.append(f'{indent}<{tag}>')
lines.extend(_render(p.children, indent + ' '))
lines.append(f'{indent}</{tag}>')
else:
lines.append(f'{indent}<{tag}/>')
return lines
op_pfx = ns_to_pfx.get(target_ns, '')
op_tag = f'{op_pfx}:{op_name}' if op_pfx else op_name
op_open = f'<{op_tag} {ns_decls}>' if ns_decls else f'<{op_tag}>'
body_lines = [f' {op_open}']
body_lines.extend(_render(params, ' '))
body_lines.append(f' </{op_tag}>')
body = '\n'.join(body_lines)
return ( return (
f'<?xml version="1.0" encoding="utf-8"?>\n' f'<?xml version="1.0" encoding="utf-8"?>\n'
f'<soap:Envelope xmlns:soap="{env_ns}">\n' f'<soap:Envelope xmlns:soap="{env_ns}">\n'