Register custom message types

Out of the box rosbags only supports the message types that ship with a default ROS2 distribution. If you want to (de)serialize custom messages you need to add them to the type system manually.

From rosbag1

"""Example: Register rosbag1 types."""

from __future__ import annotations

from typing import TYPE_CHECKING

from rosbags.rosbag1 import Reader
from rosbags.typesys import Stores, get_types_from_msg, get_typestore

if TYPE_CHECKING:
    from pathlib import Path


def process_bag(src: Path) -> None:
    """Register contained messages types before processing bag.

    Args:
        src: Bag to process.

    """
    typestore = get_typestore(Stores.EMPTY)
    with Reader(src) as reader:
        typs = {}
        for conn in reader.connections:
            typs.update(get_types_from_msg(conn.msgdef, conn.msgtype))
        typestore.register(typs)

        # Now all message types used in the bag are registered
        # for conn, timestamp, data in reader.messages():
        #     ...

From definition string

"""Example: Register type from definition string."""

from rosbags.serde import serialize_cdr
from rosbags.typesys import Stores, get_types_from_msg, get_typestore

# Your custom message definition
STRIDX_MSG = """
string string
uint32 index
"""

typestore = get_typestore(Stores.ROS2_FOXY)
typestore.register(get_types_from_msg(STRIDX_MSG, 'custom_msgs/msg/StrIdx'))

StrIdx = typestore.types['custom_msgs/msg/StrIdx']

message = StrIdx(string='foo', index=42)

# Rawdata that can be passed to rosbag2.Writer.write
rawdata = serialize_cdr(message, message.__msgtype__)

From multiple files

"""Example: Register types from msg files."""

from pathlib import Path

from rosbags.typesys import Stores, get_types_from_msg, get_typestore


def guess_msgtype(path: Path) -> str:
    """Guess message type name from path."""
    name = path.relative_to(path.parents[2]).with_suffix('')
    if 'msg' not in name.parts:
        name = name.parent / 'msg' / name.name
    return str(name)


typestore = get_typestore(Stores.ROS2_FOXY)
add_types = {}

for pathstr in ['/path/to/custom_msgs/msg/Speed.msg', '/path/to/custom_msgs/msg/Accel.msg']:
    msgpath = Path(pathstr)
    msgdef = msgpath.read_text(encoding='utf-8')
    add_types.update(get_types_from_msg(msgdef, guess_msgtype(msgpath)))

typestore.register(add_types)

Accel = typestore.types['custom_msgs/msg/Accel']
Speed = typestore.types['custom_msgs/msg/Speed']

Accel(42)
Speed(42)