rosbags.highlevel

Highlevel interfaces for rosbags.

class rosbags.highlevel.AnyReader(paths, *, default_typestore=None)

Bases: object

Unified rosbag1 and rosbag2 reader.

Parameters:
  • paths (Sequence[Path])

  • default_typestore (Typestore | None)

close()

Close rosbag.

Return type:

None

deserialize(rawdata, typ)

Deserialize message with appropriate helper.

Return type:

object

Parameters:
  • rawdata (bytes)

  • typ (str)

property duration: int

Duration in nanoseconds between earliest and latest messages.

property end_time: int

Timestamp in nanoseconds after the latest message.

property message_count: int

Total message count.

messages(connections=(), start=None, stop=None)

Read messages from bags.

Parameters:
  • connections (Iterable[Connection]) – Iterable with connections to filter for. An empty iterable disables filtering on connections.

  • start (int | None) – Yield only messages at or after this timestamp (ns).

  • stop (int | None) – Yield only messages before this timestamp (ns).

Yields:

Tuples of connection, timestamp (ns), and rawdata.

Return type:

Generator[tuple[Connection, int, bytes], None, None]

open()

Open rosbags.

Return type:

None

property start_time: int

Timestamp in nanoseconds of the earliest message.

property topics: dict[str, TopicInfo]

Topics stored in the rosbags.

exception rosbags.highlevel.AnyReaderError

Bases: Exception

Reader error.