"""
Multiaddr Protocol Codes and Registry
This module defines all supported multiaddr protocol codes, their names, and their encodings.
Key features:
- Protocol code constants (e.g., P_IP4, P_TCP, P_DNSADDR)
- Protocol class for protocol metadata
- ProtocolRegistry for fast lookup and aliasing
- Reference to the multicodec table for protocol codes
Common protocol codes:
P_IP4 = 0x04 # IPv4
P_IP6 = 0x29 # IPv6
P_TCP = 0x06 # TCP
P_UDP = 0x0111 # UDP
P_DNS = 0x35 # DNS (any)
P_DNS4 = 0x36 # DNS (IPv4)
P_DNS6 = 0x37 # DNS (IPv6)
P_DNSADDR = 0x38 # DNSADDR (libp2p)
P_P2P = 0x01A5 # Peer ID
P_TLS = 0x01C0 # TLS
P_HTTP = 0x01E0 # HTTP
P_HTTPS = 0x01BB # HTTPS
...
For a full list, see the PROTOCOLS list and the multicodec table:
https://github.com/multiformats/multicodec/blob/master/table.csv
Example usage:
from multiaddr.protocols import P_TCP, protocol_with_code
print(P_TCP) # 6
proto = protocol_with_code(P_TCP)
print(proto.name) # 'tcp'
"""
from typing import Any
import varint
from . import exceptions
from .codecs import codec_by_name
__all__ = ("PROTOCOLS", "REGISTRY", "Protocol")
# source of protocols https://github.com/multiformats/multicodec/blob/master/table.csv#L382
P_IP4 = 0x04
P_IP6 = 0x29
P_IP6ZONE = 0x2A
P_TCP = 0x06
P_UDP = 0x0111
P_DCCP = 0x21
P_SCTP = 0x84
P_UDT = 0x012D
P_UTP = 0x012E
P_P2P = 0x01A5
P_HTTP = 0x01E0
P_HTTPS = 0x01BB
P_TLS = 0x01C0
P_QUIC = 0x01CC
P_QUIC1 = 0x01CD
P_WS = 0x01DD
P_WSS = 0x01DE
P_ONION = 0x01BC
P_ONION3 = 0x01BD
P_P2P_CIRCUIT = 0x0122
P_DNS = 0x35
P_DNS4 = 0x36
P_DNS6 = 0x37
P_DNSADDR = 0x38
P_P2P_WEBSOCKET_STAR = 0x01DF
P_P2P_WEBRTC_STAR = 0x0113
P_P2P_WEBRTC_DIRECT = 0x0114
P_UNIX = 0x0190
P_HTTP_PATH = 0x01E1
P_SNI = 0x01C1
P_NOISE = 0x01C6
P_WEBTRANSPORT = 0x01D1
[docs]
class Protocol:
__slots__ = [
"code", # int
"codec", # string
"name", # string
]
def __init__(self, code: int, name: str, codec: str | None) -> None:
if not isinstance(code, int):
raise TypeError("code must be an integer")
if not isinstance(name, str):
raise TypeError("name must be a string")
if not isinstance(codec, str) and codec is not None:
raise TypeError("codec must be a string or None")
self.code = code
self.name = name
self.codec = codec
@property
def size(self) -> int:
return codec_by_name(self.codec).SIZE
@property
def path(self) -> bool:
return codec_by_name(self.codec).IS_PATH
@property
def vcode(self) -> bytes:
return varint.encode(self.code)
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Protocol):
return NotImplemented
return all(
(
self.code == other.code,
self.name == other.name,
self.codec == other.codec,
self.path == other.path,
)
)
def __hash__(self) -> int:
return hash((self.code, self.name))
def __repr__(self) -> str:
return "Protocol(code={code!r}, name={name!r}, codec={codec!r})".format(
code=self.code,
name=self.name,
codec=self.codec,
)
# List of multiaddr protocols supported by this module by default
PROTOCOLS = [
Protocol(P_IP4, "ip4", "ip4"),
Protocol(P_TCP, "tcp", "uint16be"),
Protocol(P_UDP, "udp", "uint16be"),
Protocol(P_DCCP, "dccp", "uint16be"),
Protocol(P_IP6, "ip6", "ip6"),
Protocol(P_IP6ZONE, "ip6zone", "utf8"),
Protocol(P_DNS, "dns", "domain"),
Protocol(P_DNS4, "dns4", "domain"),
Protocol(P_DNS6, "dns6", "domain"),
Protocol(P_DNSADDR, "dnsaddr", "domain"),
Protocol(P_SCTP, "sctp", "uint16be"),
Protocol(P_UDT, "udt", None),
Protocol(P_UTP, "utp", None),
Protocol(P_P2P, "p2p", "cid"),
Protocol(P_ONION, "onion", "onion"),
Protocol(P_ONION3, "onion3", "onion3"),
Protocol(P_QUIC, "quic", None),
Protocol(P_QUIC1, "quic-v1", None),
Protocol(P_HTTP, "http", None),
Protocol(P_HTTPS, "https", None),
Protocol(P_TLS, "tls", None),
Protocol(P_WS, "ws", None),
Protocol(P_WSS, "wss", None),
Protocol(P_P2P_WEBSOCKET_STAR, "p2p-websocket-star", None),
Protocol(P_P2P_WEBRTC_STAR, "p2p-webrtc-star", None),
Protocol(P_P2P_WEBRTC_DIRECT, "p2p-webrtc-direct", None),
Protocol(P_P2P_CIRCUIT, "p2p-circuit", None),
Protocol(P_WEBTRANSPORT, "webtransport", None),
Protocol(P_UNIX, "unix", "fspath"),
]
class ProtocolRegistry:
"""A collection of individual Multiaddr protocols indexed for fast lookup"""
__slots__ = ("_codes_to_protocols", "_locked", "_names_to_protocols")
def __init__(self, protocols: list[Protocol] | tuple[Protocol, ...] = ()) -> None:
self._locked = False
protocols_tuple = tuple(protocols) if isinstance(protocols, list) else protocols
self._codes_to_protocols: dict[int, Protocol] = {
proto.code: proto for proto in protocols_tuple
}
self._names_to_protocols: dict[str, Protocol] = {
proto.name: proto for proto in protocols_tuple
}
def add(self, proto: Protocol) -> Protocol:
"""Add the given protocol description to this registry
Raises
------
~multiaddr.exceptions.ProtocolRegistryLocked
Protocol registry is locked and does not accept any new entries.
You can use `.copy(unlock=True)` to copy an existing locked registry
and unlock it.
~multiaddr.exceptions.ProtocolExistsError
A protocol with the given name or code already exists.
"""
if self._locked:
raise exceptions.ProtocolRegistryLocked()
if proto.name in self._names_to_protocols:
raise exceptions.ProtocolExistsError(proto, "name")
if proto.code in self._codes_to_protocols:
raise exceptions.ProtocolExistsError(proto, "code")
self._names_to_protocols[proto.name] = proto
self._codes_to_protocols[proto.code] = proto
return proto
def add_alias_name(self, proto: Protocol | str, alias_name: str) -> None:
"""Add an alternate name for an existing protocol description to the registry
Raises
------
~multiaddr.exceptions.ProtocolRegistryLocked
Protocol registry is locked and does not accept any new entries.
You can use `.copy(unlock=True)` to copy an existing locked registry
and unlock it.
~multiaddr.exceptions.ProtocolExistsError
A protocol with the given name already exists.
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *proto* could be found.
"""
if self._locked:
raise exceptions.ProtocolRegistryLocked()
proto = self.find(proto)
assert self._names_to_protocols.get(proto.name) is proto, (
"Protocol to alias must have already been added to the registry"
)
if alias_name in self._names_to_protocols:
raise exceptions.ProtocolExistsError(self._names_to_protocols[alias_name], "name")
self._names_to_protocols[alias_name] = proto
def add_alias_code(self, proto: Protocol | int, alias_code: int) -> None:
"""Add an alternate code for an existing protocol description to the registry
Raises
------
~multiaddr.exceptions.ProtocolRegistryLocked
Protocol registry is locked and does not accept any new entries.
You can use `.copy(unlock=True)` to copy an existing locked registry
and unlock it.
~multiaddr.exceptions.ProtocolExistsError
A protocol with the given code already exists.
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *proto* could be found.
"""
if self._locked:
raise exceptions.ProtocolRegistryLocked()
proto = self.find(proto)
assert self._codes_to_protocols.get(proto.code) is proto, (
"Protocol to alias must have already been added to the registry"
)
if alias_code in self._codes_to_protocols:
raise exceptions.ProtocolExistsError(self._codes_to_protocols[alias_code], "name")
self._codes_to_protocols[alias_code] = proto
def lock(self) -> None:
"""Lock this registry instance to deny any further changes"""
self._locked = True
@property
def locked(self) -> bool:
return self._locked
def copy(self, *, unlock: bool = False) -> "ProtocolRegistry":
"""Create a copy of this protocol registry
Arguments
---------
unlock
Create the copied registry unlocked even if the current one is locked?
"""
registry = ProtocolRegistry()
registry._locked = self._locked and not unlock
registry._codes_to_protocols = self._codes_to_protocols.copy()
registry._names_to_protocols = self._names_to_protocols.copy()
return registry
__copy__ = copy
def find_by_name(self, name: str) -> Protocol:
"""Find a protocol by its name
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *name* could be found.
"""
try:
return self._names_to_protocols[name]
except KeyError:
raise exceptions.ProtocolNotFoundError(name)
def find_by_code(self, code: int) -> Protocol:
"""Find a protocol by its code
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *code* could be found.
"""
try:
return self._codes_to_protocols[code]
except KeyError:
raise exceptions.ProtocolNotFoundError(code)
def find(self, proto: Protocol | str | int) -> Protocol:
"""Find a protocol by its name, code, or Protocol instance
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *proto* could be found.
"""
if isinstance(proto, Protocol):
return proto
elif isinstance(proto, str):
return self.find_by_name(proto)
elif isinstance(proto, int):
return self.find_by_code(proto)
else:
raise TypeError("Protocol must be a string, integer, or Protocol instance")
# Create a registry with all the default protocols
REGISTRY = ProtocolRegistry(PROTOCOLS)
REGISTRY.lock()
def protocol_with_name(name: str) -> Protocol:
"""Find a protocol by its name
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *name* could be found.
"""
return REGISTRY.find_by_name(name)
def protocol_with_code(code: int) -> Protocol:
"""Find a protocol by its code
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *code* could be found.
"""
return REGISTRY.find_by_code(code)
def protocol_with_any(proto: Protocol | str | int) -> Protocol:
"""Find a protocol by its name, code, or Protocol instance
Raises
------
~multiaddr.exceptions.ProtocolNotFoundError
No protocol matching *proto* could be found.
"""
return REGISTRY.find(proto)
def protocols_with_string(string: str) -> list[Protocol]:
"""Find all protocols that are part of the given string
Raises
------
~multiaddr.exceptions.StringParseError
The given string is not a valid multiaddr string.
~multiaddr.exceptions.ProtocolNotFoundError
If a protocol in the string is not found.
"""
if not string:
return []
if not string.startswith("/"):
string = "/" + string
# consume trailing slashes
string = string.rstrip("/")
sp = string.split("/")
# skip the first element, since it starts with /
sp.pop(0)
protocols = []
while sp:
element = sp.pop(0)
if not element: # Skip empty elements from multiple slashes
continue
try:
proto = protocol_with_name(element)
protocols.append(proto)
if proto.codec is not None:
codec = codec_by_name(proto.codec)
if proto.name == "unix":
# For unix, consume all remaining elements as part of the path
if sp:
path_value = "/".join(sp)
try:
codec.to_bytes(proto, path_value)
sp.clear()
continue
except Exception as exc:
raise exceptions.StringParseError(
f"Invalid path value for protocol {proto.name}",
string,
proto.name,
exc,
) from exc
else:
raise exceptions.StringParseError(
f"Protocol {proto.name} requires a path value",
string,
proto.name,
ValueError("Missing required path value"),
)
elif sp:
# Find next non-empty element
next_elem = None
while sp and not next_elem:
next_elem = sp[0]
if not next_elem: # Skip empty elements
sp.pop(0)
continue
if next_elem: # Only proceed if we found a non-empty element
# First try to validate as value for current protocol
try:
codec.to_bytes(proto, next_elem)
sp.pop(0)
continue
except Exception as exc:
# If value validation fails, check if it's a protocol name
if next_elem.isalnum():
try:
protocol_with_name(next_elem)
if proto.name in ["ip6zone"]:
if not any(codec.to_bytes(proto, val) for val in sp if val):
raise exceptions.StringParseError(
f"Protocol {proto.name} requires a value",
string,
proto.name,
ValueError("Missing required value"),
)
continue
except exceptions.ProtocolNotFoundError:
raise exceptions.StringParseError(
f"Invalid value for protocol {proto.name}",
string,
proto.name,
exc,
) from exc
else:
raise exceptions.StringParseError(
f"Invalid value for protocol {proto.name}",
string,
proto.name,
exc,
) from exc
else:
if proto.name in ["ip6zone"]:
raise exceptions.StringParseError(
f"Protocol {proto.name} requires a value",
string,
proto.name,
ValueError("Missing required value"),
)
else:
if proto.name in ["ip6zone"]:
raise exceptions.StringParseError(
f"Protocol {proto.name} requires a value",
string,
proto.name,
ValueError("Missing required value"),
)
except exceptions.ProtocolNotFoundError as exc:
raise exc
return protocols