Use with native stack

Messages read with rosbags are simple dataclasses that mimic the native ROS2 interface. If you want to pass those messages to the native ROS2 stack, you need to convert them into native objects first.

Message instance conversion

"""Example: Message instance conversion."""

from __future__ import annotations

import importlib
from typing import TypeVar

import numpy as np

T = TypeVar('T')

NATIVE_CLASSES: dict[str, type] = {}


def to_native(msg: object) -> object:
    """Convert rosbags message to native message.

    Args:
        msg: Rosbags message.

    Returns:
        Native message.

    """
    msgtype: str = msg.__msgtype__  # type: ignore[attr-defined]
    if msgtype not in NATIVE_CLASSES:
        pkg, name = msgtype.rsplit('/', 1)
        NATIVE_CLASSES[msgtype] = getattr(importlib.import_module(pkg.replace('/', '.')), name)

    fields = {}
    for name, field in msg.__dataclass_fields__.items():  # type: ignore[attr-defined]
        if 'ClassVar' in field.type:
            continue
        value = getattr(msg, name)
        if '__msg__' in field.type:
            value = to_native(value)
        elif isinstance(value, list):
            value = [to_native(x) for x in value]
        elif isinstance(value, np.ndarray):
            value = value.tolist()
        fields[name] = value

    return NATIVE_CLASSES[msgtype](**fields)


if __name__ == '__main__':
    from rosbags.typesys.stores.ros2_foxy import (
        builtin_interfaces__msg__Time,
        sensor_msgs__msg__Image,
        std_msgs__msg__Header,
    )

    image = sensor_msgs__msg__Image(
        std_msgs__msg__Header(builtin_interfaces__msg__Time(42, 666), '/frame'),
        4,
        4,
        'rgb8',
        False,  # noqa: FBT003
        4 * 3,
        np.zeros(4 * 4 * 3, dtype=np.uint8),
    )

    native_image = to_native(image)
    # native_image can now be passed to the ROS stack