import io
import logging
from collections.abc import Generator
from io import BytesIO
import varint
from . import exceptions
from .codecs import CodecBase, codec_by_name
from .protocols import Protocol, protocol_with_code, protocol_with_name
logger = logging.getLogger(__name__)
[docs]
def string_to_bytes(string: str) -> bytes:
bs: list[bytes] = []
for proto, codec, value in string_iter(string):
logger.debug(
f"[DEBUG string_to_bytes] LOOP: proto={proto.name}, codec={codec}, value={value}"
)
logger.debug(
f"[DEBUG string_to_bytes] Processing: proto={proto.name}, "
f"codec.SIZE={getattr(codec, 'SIZE', None) if codec else None}, value={value}"
)
logger.debug(f"[DEBUG string_to_bytes] Protocol code: {proto.code}")
encoded_code = varint.encode(proto.code)
logger.debug(f"[DEBUG string_to_bytes] Encoded protocol code: {encoded_code}")
bs.append(encoded_code)
# Special case: protocols with codec=None are flag protocols
# (no value, no length prefix, no buffer)
if codec is None:
logger.debug(
f"[DEBUG string_to_bytes] Protocol {proto.name} has no data, "
"skipping value encoding"
)
continue
if value is None:
raise ValueError("Value cannot be None")
try:
logger.debug(f"[DEBUG string_to_bytes] Raw CID value before encoding: {value}")
buf = codec.to_bytes(proto, value)
logger.debug(f"[DEBUG string_to_bytes] Generated buf: proto={proto.name}, buf={buf}")
except Exception as exc:
logger.debug(f"[DEBUG string_to_bytes] Error: {exc}")
raise exceptions.StringParseError(str(exc), string) from exc
logger.debug(
f"[DEBUG string_to_bytes] Appending: proto={proto.name}, "
f"codec.SIZE={getattr(codec, 'SIZE', None)}"
)
# Only add length prefix for variable-sized codecs (SIZE <= 0)
if codec.SIZE <= 0:
bs.append(varint.encode(len(buf)))
logger.debug(
f"[DEBUG string_to_bytes] Appending varint length: {varint.encode(len(buf))}"
)
# Only append the buffer if it's not empty
if buf:
bs.append(buf)
logger.debug(f"[DEBUG string_to_bytes] Final bs: {bs}")
return b"".join(bs)
[docs]
def bytes_to_string(buf: bytes) -> str:
"""Convert a binary multiaddr to its string representation
Raises
------
~multiaddr.exceptions.BinaryParseError
The given bytes are not a valid multiaddr.
"""
if not buf:
return ""
bs = BytesIO(buf)
strings = []
code = None
proto = None
while bs.tell() < len(buf):
try:
code = varint.decode_stream(bs)
logger.debug(f"[DEBUG bytes_to_string] Decoded protocol code: {code}")
proto = protocol_with_code(code)
logger.debug(f"[DEBUG bytes_to_string] Protocol name: {proto.name}")
if proto.codec is not None:
codec = codec_by_name(proto.codec)
if codec.SIZE > 0:
value = codec.to_string(proto, bs.read(codec.SIZE // 8))
else:
# For variable-sized codecs,
# read the length prefix but don't pass it to the codec
size = varint.decode_stream(bs)
value = codec.to_string(proto, bs.read(size))
logger.debug(f"[DEBUG] bytes_to_string: proto={proto.name}, value='{value}'")
if codec.IS_PATH and value.startswith("/"):
strings.append(f"/{proto.name}{value}")
else:
strings.append(f"/{proto.name}/{value}")
else:
strings.append(f"/{proto.name}")
except Exception as exc:
# Use the code as the protocol identifier if proto is not available
# Ensure we always have either a string or an integer
protocol_id = proto.name if proto is not None else (code if code is not None else 0)
raise exceptions.BinaryParseError(str(exc), buf, protocol_id, exc) from exc
return "".join(strings)
[docs]
def size_for_addr(codec: CodecBase, buf_io: io.BytesIO) -> int:
if codec.SIZE >= 0:
return codec.SIZE // 8
else:
return varint.decode_stream(buf_io)
[docs]
def string_iter(
string: str,
) -> Generator[tuple[Protocol, CodecBase | None, str | None], None, None]:
"""Iterate over the parts of a string multiaddr.
Args:
string: The string multiaddr to iterate over
Yields:
A tuple of (protocol, codec, value) for each part of the multiaddr
"""
if not string:
return
parts = string.strip("/").split("/")
i = 0
while i < len(parts):
proto_name = parts[i]
try:
proto = protocol_with_name(proto_name)
except exceptions.ProtocolNotFoundError as exc:
raise exceptions.StringParseError(str(exc), string) from exc
codec = codec_by_name(proto.codec)
value = None
if proto.codec is not None:
if i + 1 >= len(parts):
raise exceptions.StringParseError(
f"missing value for protocol: {proto_name}", string
)
value = parts[i + 1]
i += 1 # Skip the next part since we used it as value
logger.debug(f"[DEBUG string_iter] Using next part as value: {value}")
yield proto, codec, value
else:
logger.debug(f"[DEBUG string_iter] No value found for protocol {proto.name}")
yield proto, codec, None
i += 1
[docs]
def bytes_iter(buf: bytes) -> Generator[tuple[int, Protocol, CodecBase, bytes], None, None]:
buf_io = io.BytesIO(buf)
while buf_io.tell() < len(buf):
offset = buf_io.tell()
code = varint.decode_stream(buf_io)
proto = None
try:
proto = protocol_with_code(code)
codec = codec_by_name(proto.codec)
except (ImportError, exceptions.ProtocolNotFoundError) as exc:
raise exceptions.BinaryParseError(
"Unknown Protocol",
buf,
proto.name if proto else code,
) from exc
size = size_for_addr(codec, buf_io)
yield offset, proto, codec, buf_io.read(size)