Examples

Live visualization

import datetime
import json
import logging
import struct
import time
from math import cos, sin

import foxglove
import numpy as np
from foxglove import Channel, Schema
from foxglove.channels import RawImageChannel
from foxglove.schemas import (
    Color,
    CubePrimitive,
    Duration,
    FrameTransform,
    FrameTransforms,
    PackedElementField,
    PackedElementFieldNumericType,
    PointCloud,
    Pose,
    Quaternion,
    RawImage,
    SceneEntity,
    SceneUpdate,
    Timestamp,
    Vector3,
)
from foxglove.websocket import (
    Capability,
    ChannelView,
    Client,
    ClientChannel,
    ServerListener,
)

any_schema = {
    "type": "object",
    "additionalProperties": True,
}

plot_schema = {
    "type": "object",
    "properties": {
        "timestamp": {"type": "number"},
        "y": {"type": "number"},
    },
}


class ExampleListener(ServerListener):
    def __init__(self) -> None:
        # Map client id -> set of subscribed topics
        self.subscribers: dict[int, set[str]] = {}

    def has_subscribers(self) -> bool:
        return len(self.subscribers) > 0

    def on_subscribe(
        self,
        client: Client,
        channel: ChannelView,
    ) -> None:
        """
        Called by the server when a client subscribes to a channel.
        We'll use this and on_unsubscribe to simply track if we have any subscribers at all.
        """
        logging.info(f"Client {client} subscribed to channel {channel.topic}")
        self.subscribers.setdefault(client.id, set()).add(channel.topic)

    def on_unsubscribe(
        self,
        client: Client,
        channel: ChannelView,
    ) -> None:
        """
        Called by the server when a client unsubscribes from a channel.
        """
        logging.info(f"Client {client} unsubscribed from channel {channel.topic}")
        self.subscribers[client.id].remove(channel.topic)
        if not self.subscribers[client.id]:
            del self.subscribers[client.id]

    def on_client_advertise(
        self,
        client: Client,
        channel: ClientChannel,
    ) -> None:
        """
        Called when a client advertises a new channel.
        """
        logging.info(f"Client {client.id} advertised channel: {channel.id}")
        logging.info(f"  Topic: {channel.topic}")
        logging.info(f"  Encoding: {channel.encoding}")
        logging.info(f"  Schema name: {channel.schema_name}")
        logging.info(f"  Schema encoding: {channel.schema_encoding}")
        logging.info(f"  Schema: {channel.schema!r}")

    def on_message_data(
        self,
        client: Client,
        client_channel_id: int,
        data: bytes,
    ) -> None:
        """
        This handler demonstrates receiving messages from the client.
        You can send messages from Foxglove app in the publish panel:
        https://docs.foxglove.dev/docs/visualization/panels/publish
        """
        logging.info(f"Message from client {client.id} on channel {client_channel_id}")
        logging.info(f"Data: {data!r}")

    def on_client_unadvertise(
        self,
        client: Client,
        client_channel_id: int,
    ) -> None:
        """
        Called when a client unadvertises a new channel.
        """
        logging.info(f"Client {client.id} unadvertised channel: {client_channel_id}")


def main() -> None:
    foxglove.set_log_level(logging.DEBUG)

    listener = ExampleListener()

    server = foxglove.start_server(
        server_listener=listener,
        capabilities=[Capability.ClientPublish],
        supported_encodings=["json"],
    )

    # Log messages with a custom schema and any encoding.
    sin_chan = Channel(
        topic="/sine",
        message_encoding="json",
        schema=Schema(
            name="sine",
            encoding="jsonschema",
            data=json.dumps(plot_schema).encode("utf-8"),
        ),
    )

    # If you want to use JSON encoding, you can also specify the schema and log messages as dicts.
    # Dicts can also be logged without specifying a schema.
    json_chan = Channel(topic="/json", schema=plot_schema)

    img_chan = RawImageChannel(topic="/image")

    try:
        counter = 0
        while True:
            counter += 1
            now = time.time()
            y = sin(now)

            json_msg = {
                "timestamp": now,
                "y": y,
            }
            sin_chan.log(json.dumps(json_msg).encode("utf-8"))

            json_chan.log(json_msg)

            foxglove.log(
                "/tf",
                FrameTransforms(
                    transforms=[
                        FrameTransform(
                            parent_frame_id="world",
                            child_frame_id="box",
                            rotation=euler_to_quaternion(
                                roll=1, pitch=0, yaw=counter * 0.1
                            ),
                        ),
                        FrameTransform(
                            parent_frame_id="world",
                            child_frame_id="points",
                            translation=Vector3(x=-10, y=-10, z=0),
                        ),
                    ]
                ),
            )

            foxglove.log(
                "/boxes",
                SceneUpdate(
                    entities=[
                        SceneEntity(
                            frame_id="box",
                            id="box_1",
                            timestamp=Timestamp.from_datetime(datetime.datetime.now()),
                            lifetime=Duration.from_secs(1.2345),
                            cubes=[
                                CubePrimitive(
                                    pose=Pose(
                                        position=Vector3(x=0, y=y, z=3),
                                        orientation=euler_to_quaternion(
                                            roll=0, pitch=0, yaw=counter * -0.1
                                        ),
                                    ),
                                    size=Vector3(x=1, y=1, z=1),
                                    color=Color(r=1.0, g=0, b=0, a=1),
                                )
                            ],
                        ),
                    ]
                ),
            )

            foxglove.log(
                "/pointcloud",
                make_point_cloud(),
            )

            # Or use typed channels directly to get better type checking
            img_chan.log(
                RawImage(
                    data=np.zeros((100, 100, 3), dtype=np.uint8).tobytes(),
                    step=300,
                    width=100,
                    height=100,
                    encoding="rgb8",
                ),
            )

            time.sleep(0.05)

            while not listener.has_subscribers():
                time.sleep(1)
                continue

    except KeyboardInterrupt:
        server.stop()


def make_point_cloud() -> PointCloud:
    """
    https://foxglove.dev/blog/visualizing-point-clouds-with-custom-colors
    """
    point_struct = struct.Struct("<fffBBBB")
    f32 = PackedElementFieldNumericType.Float32
    u32 = PackedElementFieldNumericType.Uint32

    t = time.time()
    points = [(x + cos(t + y / 5), y, 0) for x in range(20) for y in range(20)]
    buffer = bytearray(point_struct.size * len(points))
    for i, point in enumerate(points):
        x, y, z = point
        r = int(255 * (0.5 + 0.5 * x / 20))
        g = int(255 * y / 20)
        b = int(255 * (0.5 + 0.5 * sin(t)))
        a = int(255 * (0.5 + 0.5 * ((x / 20) * (y / 20))))
        point_struct.pack_into(buffer, i * point_struct.size, x, y, z, b, g, r, a)

    return PointCloud(
        frame_id="points",
        pose=Pose(
            position=Vector3(x=0, y=0, z=0),
            orientation=Quaternion(x=0, y=0, z=0, w=1),
        ),
        point_stride=16,  # 4 fields * 4 bytes
        fields=[
            PackedElementField(name="x", offset=0, type=f32),
            PackedElementField(name="y", offset=4, type=f32),
            PackedElementField(name="z", offset=8, type=f32),
            PackedElementField(name="rgba", offset=12, type=u32),
        ],
        data=bytes(buffer),
    )


def euler_to_quaternion(roll: float, pitch: float, yaw: float) -> Quaternion:
    """Convert Euler angles to a rotation quaternion

    See e.g. https://danceswithcode.net/engineeringnotes/quaternions/quaternions.html

    :param roll: rotation around X axis (radians)
    :param pitch: rotation around Y axis (radians)
    :param yaw: rotation around Z axis (radians)
    :returns: a protobuf Quaternion
    """
    roll, pitch, yaw = roll * 0.5, pitch * 0.5, yaw * 0.5

    sin_r, cos_r = sin(roll), cos(roll)
    sin_p, cos_p = sin(pitch), cos(pitch)
    sin_y, cos_y = sin(yaw), cos(yaw)

    w = cos_r * cos_p * cos_y + sin_r * sin_p * sin_y
    x = sin_r * cos_p * cos_y - cos_r * sin_p * sin_y
    y = cos_r * sin_p * cos_y + sin_r * cos_p * sin_y
    z = cos_r * cos_p * sin_y - sin_r * sin_p * cos_y

    return Quaternion(x=x, y=y, z=z, w=w)


if __name__ == "__main__":
    main()

Write to an MCAP file

import argparse
import inspect

import foxglove
from foxglove.channels import LogChannel
from foxglove.schemas import Log, LogLevel

parser = argparse.ArgumentParser()
parser.add_argument("--path", type=str, default="output.mcap")
args = parser.parse_args()


log_chan = LogChannel(topic="/log1")


def main() -> None:
    # Create a new mcap file at the given path for recording
    with foxglove.open_mcap(args.path):
        for i in range(10):
            frame = inspect.currentframe()
            frameinfo = inspect.getframeinfo(frame) if frame else None

            foxglove.log(
                "/log2",
                Log(
                    level=LogLevel.Info,
                    name="SDK example",
                    file=frameinfo.filename if frameinfo else None,
                    line=frameinfo.lineno if frameinfo else None,
                    message=f"message {i}",
                ),
            )

            # Or use a typed channel directly to get better type checking
            log_chan.log(
                Log(
                    level=LogLevel.Info,
                    name="SDK example",
                    file=frameinfo.filename if frameinfo else None,
                    line=frameinfo.lineno if frameinfo else None,
                    message=f"message {i}",
                ),
            )


if __name__ == "__main__":
    main()

Stream an MCAP file to Foxglove

import argparse
import logging
import time
from typing import Optional

import foxglove
import mcap
import mcap.reader
import mcap.records
from foxglove import Channel, Schema
from foxglove.websocket import Capability, WebSocketServer

channels: dict[str, Channel] = {}


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--file", type=str, required=True)
    parser.add_argument("--port", type=int, default=8765)
    parser.add_argument("--host", type=str, default="127.0.0.1")
    args = parser.parse_args()

    file_name = args.file

    server = foxglove.start_server(
        name=file_name, port=args.port, host=args.host, capabilities=[Capability.Time]
    )

    try:
        while True:
            stream_until_done(file_name, server)

            logging.info("Looping")
            server.clear_session()

    except KeyboardInterrupt:
        server.stop()


def stream_until_done(file_name: str, server: WebSocketServer):
    tracker: Optional[TimeTracker] = None
    with open(file_name, "rb") as f:
        reader = mcap.reader.make_reader(f)
        for mcap_schema, mcap_chan, mcap_msg in reader.iter_messages():
            if tracker is None:
                tracker = new_time_tracker(mcap_msg)

            tracker.sleep_until(mcap_msg.log_time)

            notify_time = tracker.notify()
            if notify_time is not None:
                server.broadcast_time(tracker._now_ns)

            channel = get_channel(mcap_schema, mcap_chan)
            channel.log(mcap_msg.data)


def new_time_tracker(message: mcap.records.Message):
    return TimeTracker(offset_ns=message.log_time)


def get_channel(
    mcap_schema: Optional[mcap.records.Schema], mcap_channel: mcap.records.Channel
) -> Channel:
    """
    Return a Channel we can log to, based on records seen in the mcap.
    Channels are stored based on topic name.
    """
    if mcap_channel.topic in channels:
        return channels[mcap_channel.topic]

    channels[mcap_channel.topic] = Channel(
        topic=mcap_channel.topic,
        message_encoding=mcap_channel.message_encoding,
        schema=(
            None
            if mcap_schema is None
            else Schema(
                name=mcap_schema.name,
                encoding=mcap_schema.encoding,
                data=mcap_schema.data,
            )
        ),
    )

    return channels[mcap_channel.topic]


class TimeTracker:
    """
    Helper for keep tracking of the relationship between a file timestamp and the wallclock.

    :param offset_ns: The offset from epoch to treat as "now".
    """

    def __init__(self, *, offset_ns: int):
        self._offset_ns = offset_ns
        self._now_ns = offset_ns
        self._notify_interval_ns = 1e9 / 60
        self._notify_last = 0
        self._start = time.time_ns()

    def sleep_until(self, offset_ns: int):
        elapsed = time.time_ns() - self._start
        delta = offset_ns - self._offset_ns - elapsed
        if delta > 0:
            time.sleep(delta / 1e9)
        self._now_ns = offset_ns

    def notify(self) -> Optional[int]:
        if self._now_ns - self._notify_last > self._notify_interval_ns:
            self._notify_last = self._now_ns
            return self._now_ns
        return None


if __name__ == "__main__":
    main()

Parameter server

"""
This implements a parameter server for live visualization.

View and edit parameters from a Parameters panel in Foxglove:
https://docs.foxglove.dev/docs/visualization/panels/parameters
"""

import logging
import time
from typing import List, Optional

import foxglove
from foxglove.websocket import Capability, Client, Parameter


class ParameterStore(foxglove.websocket.ServerListener):
    def __init__(self, parameters: list[Parameter]) -> None:
        # In this example our parameters are unique by name
        self.parameters = {param.name: param for param in parameters}
        # We can keep track of any parameters that are subscribed to
        self.subscribed_param_names: set[str] = set()

    # Foxglove server callback
    def on_get_parameters(
        self,
        client: Client,
        param_names: list[str],
        request_id: Optional[str] = None,
    ) -> list[Parameter]:
        logging.debug(f"on_get_parameters: {param_names}, {client.id}, {request_id}")
        if not param_names:
            return list(self.parameters.values())
        return [
            self.parameters[name] for name in param_names if name in self.parameters
        ]

    def on_set_parameters(
        self,
        client: Client,
        parameters: list[Parameter],
        request_id: Optional[str] = None,
    ) -> list[Parameter]:
        logging.debug(f"on_set_parameters: {parameters}, {client.id}, {request_id}")
        for changed_param in parameters:
            if changed_param.value is None:
                del self.parameters[changed_param.name]
            else:
                # Add or update
                self.parameters[changed_param.name] = changed_param
        return parameters

    def on_parameters_subscribe(self, param_names: List[str]) -> None:
        # The SDK takes care of notifying the client of the current parameters;
        # this is informational only.
        logging.debug(f"New subscriptions for: {param_names}")
        self.subscribed_param_names.update(param_names)

    def on_parameters_unsubscribe(self, param_names: List[str]) -> None:
        # The SDK takes care of notifying the client of the current parameters;
        # this is informational only.
        logging.debug(f"Remove subscriptions for: {param_names}")
        self.subscribed_param_names.difference_update(param_names)


def main() -> None:
    foxglove.set_log_level(logging.DEBUG)

    initial_values: list[Parameter] = [
        Parameter("p0"),
        Parameter(
            "p1",
            value={
                "a": 1,
                "b": True,
                "c": "hello",
                "arr": [1, True],
            },
        ),
        Parameter("p2", value=True),
        Parameter("p3", value=0.124),
        Parameter("p4", value=[1, 1, 2, 3, 5]),
        Parameter("p5", value=b"data"),
        Parameter("p6", value="hello"),
    ]

    store = ParameterStore(initial_values)

    websocket_server = foxglove.start_server(
        server_listener=store,
        capabilities=[
            # 'Parameters' is required for get/set callbacks
            Capability.Parameters,
        ],
    )

    try:
        while True:
            websocket_server.publish_parameter_values(list(store.parameters.values()))
            time.sleep(10)
    except KeyboardInterrupt:
        websocket_server.stop()


if __name__ == "__main__":
    main()

Advertising Services

"""
This example demonstrates how to use the Foxglove WebSocket API to implement services which can be
called from the Service Call panel in the Foxglove app.

https://docs.foxglove.dev/docs/visualization/panels/service-call
"""

import argparse
import logging

import foxglove
from foxglove.websocket import (
    Capability,
    Service,
    ServiceRequest,
    ServiceSchema,
)


# A handler can also be a bare function.
def logging_handler(
    request: ServiceRequest,
) -> bytes:
    """
    A handler for the service, adhering to the `ServiceHandler` type.

    The handler should return a bytes object which will be sent back to the client.
    """
    log_request(request)
    return b"{}"


# A handler can also be defined as any callable.
class EchoService:
    def __call__(
        self,
        request: ServiceRequest,
    ) -> bytes:
        log_request(request)
        return request.payload


def log_request(r: ServiceRequest):
    logging.debug(
        f"[{r.service_name}] "
        f"client {r.client_id} call {r.call_id}: "
        f"({r.encoding}): {r.payload!r}"
    )


def main():
    """
    This example demonstrates how to use the Foxglove WebSocket API to implement services which can
    be called from the Foxglove app.
    """
    foxglove.set_log_level("DEBUG")

    parser = argparse.ArgumentParser()
    parser.add_argument("--port", type=int, default=8765)
    parser.add_argument("--host", type=str, default="127.0.0.1")
    args = parser.parse_args()

    logging_service = Service(
        name="logging",
        schema=ServiceSchema(
            name="logging-schema",
        ),
        handler=logging_handler,
    )

    echo_service = Service(
        name="echo",
        schema=ServiceSchema(
            name="echo-schema",
        ),
        handler=EchoService(),
    )

    server = foxglove.start_server(
        name="ws-services-example",
        port=args.port,
        host=args.host,
        capabilities=[Capability.Services],
        # If publishing from Foxglove, add at least one supported encoding (json, ros1, or cdr).
        # These examples use json.
        supported_encodings=["json"],
        # The services to publish
        services=[echo_service, logging_service],
    )

    try:
        while True:
            pass
    except KeyboardInterrupt:
        server.stop()


if __name__ == "__main__":
    main()

Asset server

import logging
import time
from typing import Optional

import foxglove


def asset_handler(uri: str) -> Optional[bytes]:
    """
    This will respond to "package://" asset requests from Foxglove by reading files from disk.
    This example doesn't do any path validation or upward traversal prevention.
    """
    asset = None
    if uri.startswith("package://"):
        filepath = uri.replace("package://", "", 1)
        try:
            with open(filepath, "rb") as file:
                asset = file.read()
        except FileNotFoundError:
            pass

    status = "OK" if asset else "Not Found"
    logging.debug(f"asset_handler {status}: {uri}")
    return asset


def main() -> None:
    foxglove.set_log_level(logging.DEBUG)

    server = foxglove.start_server(
        asset_handler=asset_handler,
    )

    try:
        while True:
            # Send transforms for the model as needed, on a `FrameTransformsChannel`
            time.sleep(1)

    except KeyboardInterrupt:
        server.stop()


if __name__ == "__main__":
    main()