"""
DNS resolver implementation for multiaddr.
This module provides DNS resolution for multiaddrs, supporting the following protocols:
- /dns, /dns4, /dns6: Standard DNS resolution for IPv4 and IPv6
- /dnsaddr: Recursive TXT record resolution for libp2p bootstrap nodes
Features:
- Recursive resolution for /dnsaddr, /dns4, and /dns6
- Peer ID preservation during resolution
- Timeout and cancellation support (Trio)
- Error handling for recursion limits and DNS failures
Example usage:
from multiaddr import Multiaddr
ma = Multiaddr("/dns4/example.com/tcp/443")
resolved = await ma.resolve()
print(resolved)
# [Multiaddr("/ip4/93.184.216.34/tcp/443")]
"""
import logging
import re
from typing import cast
import dns.asyncresolver
import dns.rdataclass
import dns.rdtypes.ANY.TXT # type: ignore
import dns.rdtypes.IN.A
import dns.rdtypes.IN.AAAA
import dns.resolver
import trio
from ..exceptions import RecursionLimitError, ResolutionError
from ..multiaddr import Multiaddr
from ..protocols import P_DNS, P_DNS4, P_DNS6, P_DNSADDR, Protocol
from .base import Resolver
[docs]
class DNSResolver(Resolver):
"""
DNS resolver for multiaddr.
Resolves /dns, /dns4, /dns6, and /dnsaddr multiaddrs to their underlying IP addresses.
Supports recursive resolution for DNSADDR records and protocol-specific resolution for
DNS4/DNS6.
"""
MAX_RECURSIVE_DEPTH = 32
DEFAULT_TIMEOUT = 5.0 # 5 seconds timeout
def __init__(self):
"""Initialize the DNS resolver."""
self._resolver = dns.asyncresolver.Resolver()
[docs]
async def resolve(self, maddr: "Multiaddr", options: dict | None = None) -> list["Multiaddr"]:
"""
Resolve a DNS multiaddr to its actual addresses.
Args:
maddr: The multiaddr to resolve
options: Optional configuration options (e.g., max_recursive_depth, signal)
Returns:
A list of resolved multiaddrs
Raises:
ResolutionError: If resolution fails
RecursionLimitError: If maximum recursive depth is reached
trio.Cancelled: If the operation is cancelled
"""
protocols: list[Protocol] = list(maddr.protocols())
if not protocols:
raise ResolutionError("empty multiaddr")
first_protocol = protocols[0]
if first_protocol.code not in (P_DNS, P_DNS4, P_DNS6, P_DNSADDR):
return [maddr]
# Get the hostname and clean it of quotes
hostname = maddr.value_for_protocol(first_protocol.code)
if not hostname:
return [maddr]
# Remove quotes from hostname
hostname = self._clean_quotes(hostname)
# Get max recursive depth from options or use default
max_depth = (
options.get("max_recursive_depth", self.MAX_RECURSIVE_DEPTH)
if options
else self.MAX_RECURSIVE_DEPTH
)
# Get signal from options if provided
signal = options.get("signal") if options else None
try:
if first_protocol.code == P_DNSADDR:
resolved = await self._resolve_dnsaddr(
hostname,
maddr,
max_depth,
signal,
)
return resolved # Do not fallback to [maddr]
else:
resolved = await self._resolve_dns_with_stack(maddr, signal)
return resolved if resolved else [maddr] # Classic DNS fallback remains
except RecursionLimitError:
# Do not wrap RecursionLimitError so tests can catch it
raise
except Exception as e:
raise ResolutionError(f"Failed to resolve {hostname}: {e!s}")
def _clean_quotes(self, text: str) -> str:
"""Remove quotes from a string.
Args:
text: The text to clean
Returns:
The cleaned text without quotes
"""
# Remove all types of quotes (single, double, mixed)
return re.sub(r'[\'"\s]+', "", text)
async def _resolve_dnsaddr(
self,
hostname: str,
original_ma: "Multiaddr",
max_depth: int,
signal: trio.CancelScope | None = None,
) -> list["Multiaddr"]:
"""
Resolve a DNSADDR record according to libp2p specification.
Queries TXT records from _dnsaddr.<hostname> and parses dnsaddr=<multiaddr> entries.
Recursively resolves /dns4 and /dns6 entries to /ip4 and /ip6 addresses.
Args:
hostname: The hostname to resolve
original_ma: The original multiaddr being resolved
max_depth: Maximum depth for recursive resolution
signal: Optional signal for cancellation
Returns:
A list of resolved multiaddrs
Raises:
ResolutionError: If resolution fails
RecursionLimitError: If maximum recursive depth is reached
trio.Cancelled: If the operation is cancelled
"""
if max_depth <= 0:
raise RecursionLimitError(f"Maximum recursive depth exceeded for {hostname}")
# Get the peer ID if present
peer_id = None
try:
peer_id = original_ma.get_peer_id()
except Exception:
# If there's no peer ID, that's fine - we'll just resolve the address
pass
# Query TXT records from _dnsaddr.<hostname> according to libp2p spec
dnsaddr_hostname = f"_dnsaddr.{hostname}"
try:
if signal:
# Use the provided signal for cancellation
with signal:
return await self._query_dnsaddr_txt_records(
dnsaddr_hostname, peer_id, max_depth, signal
)
else:
# Use default timeout-based cancellation
with trio.CancelScope() as cancel_scope: # type: ignore[call-arg]
# Set a timeout for DNS resolution
cancel_scope.deadline = trio.current_time() + self.DEFAULT_TIMEOUT
cancel_scope.cancelled_caught = True
return await self._query_dnsaddr_txt_records(
dnsaddr_hostname, peer_id, max_depth, cancel_scope
)
except Exception as e:
raise ResolutionError(f"Failed to resolve DNSADDR {hostname}: {e!s}")
async def _query_dnsaddr_txt_records(
self,
dnsaddr_hostname: str,
peer_id: str | None,
max_depth: int,
signal: trio.CancelScope | None = None,
_debug_level: int = 0,
) -> list["Multiaddr"]:
"""
Query TXT records and parse dnsaddr=<multiaddr> entries.
Args:
dnsaddr_hostname: The _dnsaddr.<hostname> to query
peer_id: Optional peer ID to filter results
max_depth: Maximum depth for recursive resolution
signal: Optional signal for cancellation
_debug_level: Internal, for debug output indentation
Returns:
A list of resolved multiaddrs
"""
results = []
indent = " " * _debug_level
try:
answer = await self._resolver.resolve(dnsaddr_hostname, "TXT")
logging.debug(
f"{indent}Queried TXT for {dnsaddr_hostname}, "
f"found {len(answer)} records (depth {max_depth})"
)
for rdata in answer:
# Cast to TXT record type for proper attribute access
txt_rdata = cast(dns.rdtypes.ANY.TXT.TXT, rdata)
txt_data_raw = txt_rdata.strings[0] if txt_rdata.strings else ""
if isinstance(txt_data_raw, bytes):
txt_data = txt_data_raw.decode("utf-8")
else:
txt_data = str(txt_data_raw)
logging.debug(f"{indent} TXT: {txt_data}")
if txt_data.startswith("dnsaddr="):
multiaddr_str = txt_data[8:]
multiaddr_str = self._clean_quotes(multiaddr_str).strip()
logging.debug(f"{indent} Parsed multiaddr: {multiaddr_str}")
if not multiaddr_str:
continue
try:
parsed_ma = Multiaddr(multiaddr_str)
try:
parsed_peer_id = parsed_ma.get_peer_id()
logging.debug(f"{indent} Peer ID: {parsed_peer_id}")
except Exception:
logging.debug(f"{indent} No peer ID")
if peer_id:
try:
parsed_peer_id = parsed_ma.get_peer_id()
if parsed_peer_id != peer_id:
logging.debug(f"{indent} Skipping (peer ID mismatch)")
continue
except Exception:
logging.debug(f"{indent} Skipping (no peer ID in multiaddr)")
continue
if (
multiaddr_str.startswith("/dnsaddr")
or multiaddr_str.startswith("/dns4")
or multiaddr_str.startswith("/dns6")
):
try:
logging.debug(f"{indent} Recursing into {multiaddr_str}")
recursive_options = {"max_recursive_depth": max_depth - 1}
resolved = await self.resolve(parsed_ma, recursive_options)
for r in resolved:
# Only append if not a dnsaddr/dns4/dns6 (i.e., only final IPs)
if not any(
p.name in ("dnsaddr", "dns4", "dns6") for p in r.protocols()
):
logging.debug(f"{indent} Final resolved: {r}")
results.append(r)
except RecursionLimitError:
logging.debug(f"{indent} Recursion limit hit!")
continue
else:
logging.debug(f"{indent} Final resolved: {parsed_ma}")
results.append(parsed_ma)
except Exception as e:
logging.debug(f"{indent} Error parsing multiaddr: {e}")
continue
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
logging.debug(f"{indent}No TXT records found for {dnsaddr_hostname}")
pass
except Exception as e:
logging.debug(f"{indent}Error querying TXT records for {dnsaddr_hostname}: {e}")
raise ResolutionError(f"Failed to query TXT records for {dnsaddr_hostname}: {e!s}")
return results
async def _resolve_dns(
self, hostname: str, protocol_code: int, signal: trio.CancelScope | None = None
) -> list["Multiaddr"]:
"""Resolve a DNS record.
Args:
hostname: The hostname to resolve
protocol_code: The protocol code (DNS, DNS4, or DNS6)
signal: Optional signal for cancellation
Returns:
A list of resolved multiaddrs
Raises:
ResolutionError: If resolution fails
trio.Cancelled: If the operation is cancelled
"""
try:
if signal:
# Use the provided signal for cancellation
with signal:
results = []
if protocol_code in (P_DNS, P_DNS4):
try:
answer = await self._resolver.resolve(hostname, "A")
for rdata in answer:
address = str(cast(dns.rdtypes.IN.A.A, rdata).address)
results.append(Multiaddr(f"/ip4/{address}"))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
pass
if protocol_code in (P_DNS, P_DNS6):
try:
answer = await self._resolver.resolve(hostname, "AAAA")
for rdata in answer:
address = str(cast(dns.rdtypes.IN.AAAA.AAAA, rdata).address)
results.append(Multiaddr(f"/ip6/{address}"))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
pass
return results
else:
# No signal provided, proceed without cancellation
results = []
if protocol_code in (P_DNS, P_DNS4):
try:
answer = await self._resolver.resolve(hostname, "A")
for rdata in answer:
address = str(cast(dns.rdtypes.IN.A.A, rdata).address)
results.append(Multiaddr(f"/ip4/{address}"))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
pass
if protocol_code in (P_DNS, P_DNS6):
try:
answer = await self._resolver.resolve(hostname, "AAAA")
for rdata in answer:
address = str(cast(dns.rdtypes.IN.AAAA.AAAA, rdata).address)
results.append(Multiaddr(f"/ip6/{address}"))
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
pass
return results
except Exception as e:
raise ResolutionError(f"Failed to resolve DNS {hostname}: {e!s}")
async def _resolve_dns_with_stack(
self, maddr: "Multiaddr", signal: trio.CancelScope | None = None
) -> list["Multiaddr"]:
"""Resolve a DNS record while preserving the rest of the multiaddr stack.
This method handles cases like /dns4/host/tcp/port by resolving the DNS part
and keeping the rest of the multiaddr intact.
Args:
maddr: The multiaddr to resolve
signal: Optional signal for cancellation
Returns:
A list of resolved multiaddrs with preserved stack
Raises:
ResolutionError: If resolution fails
trio.Cancelled: If the operation is cancelled
"""
protocols = list(maddr.protocols())
if not protocols:
return [maddr]
first_protocol = protocols[0]
if first_protocol.code not in (P_DNS, P_DNS4, P_DNS6):
return [maddr]
# Get the hostname
hostname = maddr.value_for_protocol(first_protocol.code)
if not hostname:
return [maddr]
# Remove quotes from hostname
hostname = self._clean_quotes(hostname)
# Get the resolved IP addresses
resolved_ips = await self._resolve_dns(hostname, first_protocol.code, signal)
if not resolved_ips:
return [maddr]
# Split the multiaddr to get the remaining stack (everything after the DNS part)
parts = maddr.split(1) # Split after the first protocol
if len(parts) < 2:
# No remaining stack, just return the resolved IPs
return resolved_ips
remaining_stack = parts[1] # Everything after the DNS part
results = []
for ip_maddr in resolved_ips:
# Combine the resolved IP with the remaining stack
if remaining_stack.protocols():
# There's a remaining stack, encapsulate it
combined = ip_maddr.encapsulate(remaining_stack)
else:
# No remaining stack, just use the IP
combined = ip_maddr
results.append(combined)
return results
__all__ = ["DNSResolver"]