Source code for multiaddr.multiaddr

import collections.abc
from collections.abc import Iterator, Sequence
from typing import Any, TypeVar, Union, overload

import varint

from . import exceptions, protocols
from .codecs import codec_by_name
from .protocols import protocol_with_name
from .transforms import bytes_iter, bytes_to_string

__all__ = ("Multiaddr",)


T = TypeVar("T")


class MultiAddrKeys(collections.abc.KeysView[Any], collections.abc.Sequence[Any]):
    def __init__(self, mapping: "Multiaddr") -> None:
        self._mapping = mapping
        super().__init__(mapping)

    def __contains__(self, proto: object) -> bool:
        proto = self._mapping.registry.find(proto)
        return collections.abc.Sequence.__contains__(self, proto)

    def __getitem__(self, idx: int | slice) -> Any | Sequence[Any]:
        if isinstance(idx, slice):
            return list(self)[idx]
        if idx < 0:
            idx = len(self) + idx
        for idx2, proto in enumerate(self):
            if idx2 == idx:
                return proto
        raise IndexError("Protocol list index out of range")

    def __hash__(self) -> int:
        return hash(tuple(self))

    def __iter__(self) -> Iterator[Any]:
        for _, proto, _, _ in bytes_iter(self._mapping.to_bytes()):
            yield proto


class MultiAddrItems(
    collections.abc.ItemsView[Any, Any], collections.abc.Sequence[tuple[Any, Any]]
):
    def __init__(self, mapping: "Multiaddr") -> None:
        self._mapping = mapping
        super().__init__(mapping)

    def __contains__(self, item: object) -> bool:
        if not isinstance(item, tuple) or len(item) != 2:
            return False
        proto, value = item
        proto = self._mapping.registry.find(proto)
        return collections.abc.Sequence.__contains__(self, (proto, value))

    @overload
    def __getitem__(self, idx: int) -> tuple[Any, Any]: ...

    @overload
    def __getitem__(self, idx: slice) -> Sequence[tuple[Any, Any]]: ...

    def __getitem__(self, idx: int | slice) -> tuple[Any, Any] | Sequence[tuple[Any, Any]]:
        if isinstance(idx, slice):
            return list(self)[idx]
        if idx < 0:
            idx = len(self) + idx
        for idx2, item in enumerate(self):
            if idx2 == idx:
                return item
        raise IndexError("Protocol item list index out of range")

    def __iter__(self) -> Iterator[tuple[Any, Any]]:
        for _, proto, codec, part in bytes_iter(self._mapping.to_bytes()):
            if codec.SIZE != 0:
                try:
                    # If we have an address, return it
                    yield proto, codec.to_string(proto, part)
                except Exception as exc:
                    raise exceptions.BinaryParseError(
                        str(exc),
                        self._mapping.to_bytes(),
                        proto.name,
                        exc,
                    ) from exc
            else:
                # We were given something like '/utp', which doesn't have
                # an address, so return None
                yield proto, None


class MultiAddrValues(collections.abc.ValuesView[Any], collections.abc.Sequence[Any]):
    def __init__(self, mapping: "Multiaddr") -> None:
        self._mapping = mapping
        super().__init__(mapping)

    def __contains__(self, value: object) -> bool:
        return collections.abc.Sequence.__contains__(self, value)

    def __getitem__(self, idx: int | slice) -> Any | Sequence[Any]:
        if isinstance(idx, slice):
            return list(self)[idx]
        if idx < 0:
            idx = len(self) + idx
        for idx2, value in enumerate(self):
            if idx2 == idx:
                return value
        raise IndexError("Protocol value list index out of range")

    def __iter__(self) -> Iterator[Any]:
        for _, value in MultiAddrItems(self._mapping):
            yield value


[docs] class Multiaddr(collections.abc.Mapping[Any, Any]): """Multiaddr is a representation of multiple nested internet addresses. Multiaddr is a cross-protocol, cross-platform format for representing internet addresses. It emphasizes explicitness and self-description. Learn more here: https://multiformats.io/multiaddr/ Multiaddrs have both a binary and string representation. >>> from multiaddr import Multiaddr >>> addr = Multiaddr("/ip4/1.2.3.4/tcp/80") Multiaddr objects are immutable, so `encapsulate` and `decapsulate` return new objects rather than modify internal state. """ __slots__ = ("_bytes", "registry") def __init__( self, addr: Union[str, bytes, "Multiaddr"], *, registry: Any = protocols.REGISTRY ) -> None: """Instantiate a new Multiaddr. Args: addr : A string-encoded or a byte-encoded Multiaddr """ self.registry = registry if isinstance(addr, str): self._from_string(addr) elif isinstance(addr, bytes): self._from_bytes(addr) elif isinstance(addr, Multiaddr): self._bytes = addr.to_bytes() else: raise TypeError("MultiAddr must be bytes, str or another MultiAddr instance")
[docs] @classmethod def join(cls, *addrs: Union[str, bytes, "Multiaddr"]) -> "Multiaddr": """Concatenate the values of the given MultiAddr strings or objects, encapsulating each successive MultiAddr value with the previous ones.""" return cls(b"".join(map(lambda a: cls(a).to_bytes(), addrs)))
def __eq__(self, other: Any) -> bool: """Checks if two Multiaddr objects are exactly equal.""" if not isinstance(other, Multiaddr): return NotImplemented return self._bytes == other._bytes def __str__(self) -> str: """Return the string representation of this Multiaddr. May raise a :class:`~multiaddr.exceptions.BinaryParseError` if the stored MultiAddr binary representation is invalid.""" return bytes_to_string(self._bytes) def __contains__(self, proto: object) -> bool: return proto in MultiAddrKeys(self) def __iter__(self) -> Iterator[Any]: return iter(MultiAddrKeys(self)) def __len__(self) -> int: return sum(1 for _ in bytes_iter(self.to_bytes())) def __repr__(self) -> str: return "<Multiaddr %s>" % str(self) def __hash__(self) -> int: return self._bytes.__hash__()
[docs] def to_bytes(self) -> bytes: """Returns the byte array representation of this Multiaddr.""" return self._bytes
__bytes__ = to_bytes
[docs] def protocols(self) -> MultiAddrKeys: """Returns a list of Protocols this Multiaddr includes.""" return MultiAddrKeys(self)
[docs] def split(self, maxsplit: int = -1) -> list["Multiaddr"]: """Returns the list of individual path components this MultiAddr is made up of.""" final_split_offset = -1 results = [] for idx, (offset, proto, codec, part_value) in enumerate(bytes_iter(self._bytes)): # Split at most `maxplit` times if idx == maxsplit: final_split_offset = offset break # Re-assemble binary MultiAddr representation part_size = varint.encode(len(part_value)) if codec.SIZE < 0 else b"" part = b"".join((proto.vcode, part_size, part_value)) # Add MultiAddr with the given value results.append(self.__class__(part)) # Add final item with remainder of MultiAddr if there is anything left if final_split_offset >= 0: results.append(self.__class__(self._bytes[final_split_offset:])) return results
keys = protocols
[docs] def items(self) -> MultiAddrItems: return MultiAddrItems(self)
[docs] def values(self) -> MultiAddrValues: return MultiAddrValues(self)
[docs] def encapsulate(self, other: Union[str, bytes, "Multiaddr"]) -> "Multiaddr": """Wrap this Multiaddr around another. For example: /ip4/1.2.3.4 encapsulate /tcp/80 = /ip4/1.2.3.4/tcp/80 """ return self.__class__.join(self, other)
[docs] def decapsulate(self, addr: Union["Multiaddr", str]) -> "Multiaddr": """Remove a Multiaddr wrapping. For example: /ip4/1.2.3.4/tcp/80 decapsulate /ip4/1.2.3.4 = /tcp/80 """ addr_str = str(addr) s = str(self) i = s.rindex(addr_str) if i < 0: raise ValueError(f"Address {s} does not contain subaddress: {addr_str}") return Multiaddr(s[:i])
[docs] def decapsulate_code(self, code: int) -> "Multiaddr": """ Remove the last occurrence of the protocol with the given code and everything after it. If the protocol code is not present, return the original multiaddr. """ # Find all protocol codes and their offsets offsets = [] for offset, proto, codec, part_value in bytes_iter(self._bytes): offsets.append((offset, proto.code)) # Find the last occurrence of the code last_index = -1 for i, (offset, proto_code) in enumerate(offsets): if proto_code == code: last_index = i if last_index == -1: # Protocol code not found, return original return self # Get the offset to slice up to cut_offset = offsets[last_index][0] if cut_offset == 0: return self.__class__("") return self.__class__(self._bytes[:cut_offset])
[docs] def value_for_protocol(self, proto: Any) -> Any | None: """Return the value (if any) following the specified protocol Returns ------- Union[object, NoneType] The parsed protocol value for the given protocol code or ``None`` if the given protocol does not require any value Raises ------ ~multiaddr.exceptions.BinaryParseError The stored MultiAddr binary representation is invalid ~multiaddr.exceptions.ProtocolLookupError MultiAddr does not contain any instance of this protocol """ proto = self.registry.find(proto) for proto2, value in self.items(): if proto2 is proto or proto2 == proto: return value raise exceptions.ProtocolLookupError(proto, str(self))
def __getitem__(self, proto: Any) -> Any: """Returns the value for the given protocol. Raises ------ ~multiaddr.exceptions.ProtocolLookupError If the protocol is not found in this Multiaddr. ~multiaddr.exceptions.BinaryParseError If the protocol value is invalid. """ proto = self.registry.find(proto) for _, p, codec, part in bytes_iter(self._bytes): if p == proto: if codec.SIZE == 0: return None try: return codec.to_string(proto, part) except Exception as exc: raise exceptions.BinaryParseError( str(exc), self._bytes, proto.name, exc, ) from exc raise exceptions.ProtocolLookupError(proto, str(self))
[docs] async def resolve(self) -> list["Multiaddr"]: """Resolve this multiaddr if it contains a resolvable protocol. Returns: A list of resolved multiaddrs """ from .resolvers.dns import DNSResolver resolver = DNSResolver() return await resolver.resolve(self)
def _from_string(self, addr: str) -> None: """Parse a string multiaddr. Args: addr: The multiaddr string to parse. Raises: StringParseError: If the string multiaddr is invalid. """ if not addr: # Allow empty multiaddrs (like JavaScript implementation) self._bytes = b"" return # Handle other protocols parts = iter(addr.strip("/").split("/")) if not parts: raise exceptions.StringParseError("empty multiaddr", addr) self._bytes = b"" for part in parts: if not part: continue # Special handling for unix paths if part == "unix": try: # Get the next part as the path value value = next(parts) if not value: raise exceptions.StringParseError("empty unix path", addr) # Join any remaining parts as part of the path remaining_parts = [] while True: try: next_part = next(parts) if not next_part: continue remaining_parts.append(next_part) except StopIteration: break if remaining_parts: value = value + "/" + "/".join(remaining_parts) proto = protocol_with_name("unix") codec = codec_by_name(proto.codec) if not codec: raise exceptions.StringParseError(f"unknown codec: {proto.codec}", addr) try: self._bytes += varint.encode(proto.code) buf = codec.to_bytes(proto, value) # Add length prefix for variable-sized or zero-sized codecs if codec.SIZE <= 0: self._bytes += varint.encode(len(buf)) if buf: # Only append buffer if it's not empty self._bytes += buf except Exception as e: raise exceptions.StringParseError(str(e), addr) from e continue except StopIteration: raise exceptions.StringParseError("missing value for unix protocol", addr) # Handle other protocols # Split protocol name and value if present if "=" in part: proto_name, value = part.split("=", 1) else: proto_name = part value = None try: proto = protocol_with_name(proto_name) except Exception as exc: raise exceptions.StringParseError(f"unknown protocol: {proto_name}", addr) from exc # If the protocol expects a value, get it if proto.codec is not None: if value is None: try: value = next(parts) except StopIteration: raise exceptions.StringParseError( f"missing value for protocol: {proto_name}", addr ) # Validate value (optional: could add more checks here) # If value looks like a protocol name, that's an error try: protocol_with_name(value) # If no exception, value is a protocol name, which is not allowed here raise exceptions.StringParseError( f"expected value for protocol {proto_name}, got protocol name {value}", addr ) except exceptions.ProtocolNotFoundError: pass # value is not a protocol name, so it's valid as a value codec = codec_by_name(proto.codec) if not codec: raise exceptions.StringParseError(f"unknown codec: {proto.codec}", addr) try: self._bytes += varint.encode(proto.code) # Special case: protocols with codec=None are flag protocols # (no value, no length prefix, no buffer) if proto.codec is None: continue buf = codec.to_bytes(proto, value or "") if codec.SIZE <= 0: # Add length prefix for variable-sized or zero-sized codecs self._bytes += varint.encode(len(buf)) if buf: # Only append buffer if it's not empty self._bytes += buf except Exception as e: raise exceptions.StringParseError(str(e), addr) from e def _from_bytes(self, addr: bytes) -> None: """Parse a binary multiaddr. Args: addr: The multiaddr bytes to parse. Raises: BinaryParseError: If the binary multiaddr is invalid. """ if not addr: # Allow empty multiaddrs (like JavaScript implementation) self._bytes = b"" return self._bytes = addr
[docs] def get_peer_id(self) -> str | None: """Get the peer ID from the multiaddr. For circuit addresses, returns the target peer ID, not the relay peer ID. Returns: The peer ID if found, None otherwise. Raises: BinaryParseError: If the binary multiaddr is invalid. """ try: tuples = [] for _, proto, codec, part in bytes_iter(self._bytes): if proto.name == "p2p": tuples.append((proto, part)) # If this is a p2p-circuit address, reset tuples to get target peer id # not the peer id of the relay if proto.name == "p2p-circuit": tuples = [] # Get the last p2p tuple (target peer ID for circuits) if tuples: last_tuple = tuples[-1] proto, part = last_tuple # Get the codec for this specific protocol codec = codec_by_name(proto.codec) # Handle both fixed-size and variable-sized codecs if codec is not None and codec.SIZE != 0: return codec.to_string(proto, part) return None except Exception: return None