Examples¶
Live visualization¶
import datetime
import json
import logging
import struct
import time
from math import cos, sin
import foxglove
import numpy as np
from foxglove import Channel, Schema
from foxglove.channels import RawImageChannel
from foxglove.schemas import (
Color,
CubePrimitive,
Duration,
FrameTransform,
FrameTransforms,
PackedElementField,
PackedElementFieldNumericType,
PointCloud,
Pose,
Quaternion,
RawImage,
SceneEntity,
SceneUpdate,
Timestamp,
Vector3,
)
from foxglove.websocket import (
Capability,
ChannelView,
Client,
ClientChannel,
ServerListener,
)
any_schema = {
"type": "object",
"additionalProperties": True,
}
plot_schema = {
"type": "object",
"properties": {
"timestamp": {"type": "number"},
"y": {"type": "number"},
},
}
class ExampleListener(ServerListener):
def __init__(self) -> None:
# Map client id -> set of subscribed topics
self.subscribers: dict[int, set[str]] = {}
def has_subscribers(self) -> bool:
return len(self.subscribers) > 0
def on_subscribe(
self,
client: Client,
channel: ChannelView,
) -> None:
"""
Called by the server when a client subscribes to a channel.
We'll use this and on_unsubscribe to simply track if we have any subscribers at all.
"""
logging.info(f"Client {client} subscribed to channel {channel.topic}")
self.subscribers.setdefault(client.id, set()).add(channel.topic)
def on_unsubscribe(
self,
client: Client,
channel: ChannelView,
) -> None:
"""
Called by the server when a client unsubscribes from a channel.
"""
logging.info(f"Client {client} unsubscribed from channel {channel.topic}")
self.subscribers[client.id].remove(channel.topic)
if not self.subscribers[client.id]:
del self.subscribers[client.id]
def on_client_advertise(
self,
client: Client,
channel: ClientChannel,
) -> None:
"""
Called when a client advertises a new channel.
"""
logging.info(f"Client {client.id} advertised channel: {channel.id}")
logging.info(f" Topic: {channel.topic}")
logging.info(f" Encoding: {channel.encoding}")
logging.info(f" Schema name: {channel.schema_name}")
logging.info(f" Schema encoding: {channel.schema_encoding}")
logging.info(f" Schema: {channel.schema!r}")
def on_message_data(
self,
client: Client,
client_channel_id: int,
data: bytes,
) -> None:
"""
This handler demonstrates receiving messages from the client.
You can send messages from Foxglove app in the publish panel:
https://docs.foxglove.dev/docs/visualization/panels/publish
"""
logging.info(f"Message from client {client.id} on channel {client_channel_id}")
logging.info(f"Data: {data!r}")
def on_client_unadvertise(
self,
client: Client,
client_channel_id: int,
) -> None:
"""
Called when a client unadvertises a new channel.
"""
logging.info(f"Client {client.id} unadvertised channel: {client_channel_id}")
def main() -> None:
foxglove.set_log_level(logging.DEBUG)
listener = ExampleListener()
server = foxglove.start_server(
server_listener=listener,
capabilities=[Capability.ClientPublish],
supported_encodings=["json"],
)
# Log messages with a custom schema and any encoding.
sin_chan = Channel(
topic="/sine",
message_encoding="json",
schema=Schema(
name="sine",
encoding="jsonschema",
data=json.dumps(plot_schema).encode("utf-8"),
),
)
# If you want to use JSON encoding, you can also specify the schema and log messages as dicts.
# Dicts can also be logged without specifying a schema.
json_chan = Channel(topic="/json", schema=plot_schema)
img_chan = RawImageChannel(topic="/image")
try:
counter = 0
while True:
counter += 1
now = time.time()
y = sin(now)
json_msg = {
"timestamp": now,
"y": y,
}
sin_chan.log(json.dumps(json_msg).encode("utf-8"))
json_chan.log(json_msg)
foxglove.log(
"/tf",
FrameTransforms(
transforms=[
FrameTransform(
parent_frame_id="world",
child_frame_id="box",
rotation=euler_to_quaternion(
roll=1, pitch=0, yaw=counter * 0.1
),
),
FrameTransform(
parent_frame_id="world",
child_frame_id="points",
translation=Vector3(x=-10, y=-10, z=0),
),
]
),
)
foxglove.log(
"/boxes",
SceneUpdate(
entities=[
SceneEntity(
frame_id="box",
id="box_1",
timestamp=Timestamp.from_datetime(datetime.datetime.now()),
lifetime=Duration.from_secs(1.2345),
cubes=[
CubePrimitive(
pose=Pose(
position=Vector3(x=0, y=y, z=3),
orientation=euler_to_quaternion(
roll=0, pitch=0, yaw=counter * -0.1
),
),
size=Vector3(x=1, y=1, z=1),
color=Color(r=1.0, g=0, b=0, a=1),
)
],
),
]
),
)
foxglove.log(
"/pointcloud",
make_point_cloud(),
)
# Or use typed channels directly to get better type checking
img_chan.log(
RawImage(
data=np.zeros((100, 100, 3), dtype=np.uint8).tobytes(),
step=300,
width=100,
height=100,
encoding="rgb8",
),
)
time.sleep(0.05)
while not listener.has_subscribers():
time.sleep(1)
continue
except KeyboardInterrupt:
server.stop()
def make_point_cloud() -> PointCloud:
"""
https://foxglove.dev/blog/visualizing-point-clouds-with-custom-colors
"""
point_struct = struct.Struct("<fffBBBB")
f32 = PackedElementFieldNumericType.Float32
u32 = PackedElementFieldNumericType.Uint32
t = time.time()
points = [(x + cos(t + y / 5), y, 0) for x in range(20) for y in range(20)]
buffer = bytearray(point_struct.size * len(points))
for i, point in enumerate(points):
x, y, z = point
r = int(255 * (0.5 + 0.5 * x / 20))
g = int(255 * y / 20)
b = int(255 * (0.5 + 0.5 * sin(t)))
a = int(255 * (0.5 + 0.5 * ((x / 20) * (y / 20))))
point_struct.pack_into(buffer, i * point_struct.size, x, y, z, b, g, r, a)
return PointCloud(
frame_id="points",
pose=Pose(
position=Vector3(x=0, y=0, z=0),
orientation=Quaternion(x=0, y=0, z=0, w=1),
),
point_stride=16, # 4 fields * 4 bytes
fields=[
PackedElementField(name="x", offset=0, type=f32),
PackedElementField(name="y", offset=4, type=f32),
PackedElementField(name="z", offset=8, type=f32),
PackedElementField(name="rgba", offset=12, type=u32),
],
data=bytes(buffer),
)
def euler_to_quaternion(roll: float, pitch: float, yaw: float) -> Quaternion:
"""Convert Euler angles to a rotation quaternion
See e.g. https://danceswithcode.net/engineeringnotes/quaternions/quaternions.html
:param roll: rotation around X axis (radians)
:param pitch: rotation around Y axis (radians)
:param yaw: rotation around Z axis (radians)
:returns: a protobuf Quaternion
"""
roll, pitch, yaw = roll * 0.5, pitch * 0.5, yaw * 0.5
sin_r, cos_r = sin(roll), cos(roll)
sin_p, cos_p = sin(pitch), cos(pitch)
sin_y, cos_y = sin(yaw), cos(yaw)
w = cos_r * cos_p * cos_y + sin_r * sin_p * sin_y
x = sin_r * cos_p * cos_y - cos_r * sin_p * sin_y
y = cos_r * sin_p * cos_y + sin_r * cos_p * sin_y
z = cos_r * cos_p * sin_y - sin_r * sin_p * cos_y
return Quaternion(x=x, y=y, z=z, w=w)
if __name__ == "__main__":
main()
Write to an MCAP file¶
import argparse
import inspect
import foxglove
from foxglove.channels import LogChannel
from foxglove.schemas import Log, LogLevel
parser = argparse.ArgumentParser()
parser.add_argument("--path", type=str, default="output.mcap")
args = parser.parse_args()
log_chan = LogChannel(topic="/log1")
def main() -> None:
# Create a new mcap file at the given path for recording
with foxglove.open_mcap(args.path):
for i in range(10):
frame = inspect.currentframe()
frameinfo = inspect.getframeinfo(frame) if frame else None
foxglove.log(
"/log2",
Log(
level=LogLevel.Info,
name="SDK example",
file=frameinfo.filename if frameinfo else None,
line=frameinfo.lineno if frameinfo else None,
message=f"message {i}",
),
)
# Or use a typed channel directly to get better type checking
log_chan.log(
Log(
level=LogLevel.Info,
name="SDK example",
file=frameinfo.filename if frameinfo else None,
line=frameinfo.lineno if frameinfo else None,
message=f"message {i}",
),
)
if __name__ == "__main__":
main()
Stream an MCAP file to Foxglove¶
import argparse
import logging
import time
from typing import Optional
import foxglove
import mcap
import mcap.reader
import mcap.records
from foxglove import Channel, Schema
from foxglove.websocket import Capability, WebSocketServer
channels: dict[str, Channel] = {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--file", type=str, required=True)
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--host", type=str, default="127.0.0.1")
args = parser.parse_args()
file_name = args.file
server = foxglove.start_server(
name=file_name, port=args.port, host=args.host, capabilities=[Capability.Time]
)
try:
while True:
stream_until_done(file_name, server)
logging.info("Looping")
server.clear_session()
except KeyboardInterrupt:
server.stop()
def stream_until_done(file_name: str, server: WebSocketServer):
tracker: Optional[TimeTracker] = None
with open(file_name, "rb") as f:
reader = mcap.reader.make_reader(f)
for mcap_schema, mcap_chan, mcap_msg in reader.iter_messages():
if tracker is None:
tracker = new_time_tracker(mcap_msg)
tracker.sleep_until(mcap_msg.log_time)
notify_time = tracker.notify()
if notify_time is not None:
server.broadcast_time(tracker._now_ns)
channel = get_channel(mcap_schema, mcap_chan)
channel.log(mcap_msg.data)
def new_time_tracker(message: mcap.records.Message):
return TimeTracker(offset_ns=message.log_time)
def get_channel(
mcap_schema: Optional[mcap.records.Schema], mcap_channel: mcap.records.Channel
) -> Channel:
"""
Return a Channel we can log to, based on records seen in the mcap.
Channels are stored based on topic name.
"""
if mcap_channel.topic in channels:
return channels[mcap_channel.topic]
channels[mcap_channel.topic] = Channel(
topic=mcap_channel.topic,
message_encoding=mcap_channel.message_encoding,
schema=(
None
if mcap_schema is None
else Schema(
name=mcap_schema.name,
encoding=mcap_schema.encoding,
data=mcap_schema.data,
)
),
)
return channels[mcap_channel.topic]
class TimeTracker:
"""
Helper for keep tracking of the relationship between a file timestamp and the wallclock.
:param offset_ns: The offset from epoch to treat as "now".
"""
def __init__(self, *, offset_ns: int):
self._offset_ns = offset_ns
self._now_ns = offset_ns
self._notify_interval_ns = 1e9 / 60
self._notify_last = 0
self._start = time.time_ns()
def sleep_until(self, offset_ns: int):
elapsed = time.time_ns() - self._start
delta = offset_ns - self._offset_ns - elapsed
if delta > 0:
time.sleep(delta / 1e9)
self._now_ns = offset_ns
def notify(self) -> Optional[int]:
if self._now_ns - self._notify_last > self._notify_interval_ns:
self._notify_last = self._now_ns
return self._now_ns
return None
if __name__ == "__main__":
main()
Parameter server¶
"""
This implements a parameter server for live visualization.
View and edit parameters from a Parameters panel in Foxglove:
https://docs.foxglove.dev/docs/visualization/panels/parameters
"""
import logging
import time
from typing import List, Optional
import foxglove
from foxglove.websocket import Capability, Client, Parameter
class ParameterStore(foxglove.websocket.ServerListener):
def __init__(self, parameters: list[Parameter]) -> None:
# In this example our parameters are unique by name
self.parameters = {param.name: param for param in parameters}
# We can keep track of any parameters that are subscribed to
self.subscribed_param_names: set[str] = set()
# Foxglove server callback
def on_get_parameters(
self,
client: Client,
param_names: list[str],
request_id: Optional[str] = None,
) -> list[Parameter]:
logging.debug(f"on_get_parameters: {param_names}, {client.id}, {request_id}")
if not param_names:
return list(self.parameters.values())
return [
self.parameters[name] for name in param_names if name in self.parameters
]
def on_set_parameters(
self,
client: Client,
parameters: list[Parameter],
request_id: Optional[str] = None,
) -> list[Parameter]:
logging.debug(f"on_set_parameters: {parameters}, {client.id}, {request_id}")
for changed_param in parameters:
if changed_param.value is None:
del self.parameters[changed_param.name]
else:
# Add or update
self.parameters[changed_param.name] = changed_param
return parameters
def on_parameters_subscribe(self, param_names: List[str]) -> None:
# The SDK takes care of notifying the client of the current parameters;
# this is informational only.
logging.debug(f"New subscriptions for: {param_names}")
self.subscribed_param_names.update(param_names)
def on_parameters_unsubscribe(self, param_names: List[str]) -> None:
# The SDK takes care of notifying the client of the current parameters;
# this is informational only.
logging.debug(f"Remove subscriptions for: {param_names}")
self.subscribed_param_names.difference_update(param_names)
def main() -> None:
foxglove.set_log_level(logging.DEBUG)
initial_values: list[Parameter] = [
Parameter("p0"),
Parameter(
"p1",
value={
"a": 1,
"b": True,
"c": "hello",
"arr": [1, True],
},
),
Parameter("p2", value=True),
Parameter("p3", value=0.124),
Parameter("p4", value=[1, 1, 2, 3, 5]),
Parameter("p5", value=b"data"),
Parameter("p6", value="hello"),
]
store = ParameterStore(initial_values)
websocket_server = foxglove.start_server(
server_listener=store,
capabilities=[
# 'Parameters' is required for get/set callbacks
Capability.Parameters,
],
)
try:
while True:
websocket_server.publish_parameter_values(list(store.parameters.values()))
time.sleep(10)
except KeyboardInterrupt:
websocket_server.stop()
if __name__ == "__main__":
main()
Advertising Services¶
"""
This example demonstrates how to use the Foxglove WebSocket API to implement services which can be
called from the Service Call panel in the Foxglove app.
https://docs.foxglove.dev/docs/visualization/panels/service-call
"""
import argparse
import logging
import foxglove
from foxglove.websocket import (
Capability,
Service,
ServiceRequest,
ServiceSchema,
)
# A handler can also be a bare function.
def logging_handler(
request: ServiceRequest,
) -> bytes:
"""
A handler for the service, adhering to the `ServiceHandler` type.
The handler should return a bytes object which will be sent back to the client.
"""
log_request(request)
return b"{}"
# A handler can also be defined as any callable.
class EchoService:
def __call__(
self,
request: ServiceRequest,
) -> bytes:
log_request(request)
return request.payload
def log_request(r: ServiceRequest):
logging.debug(
f"[{r.service_name}] "
f"client {r.client_id} call {r.call_id}: "
f"({r.encoding}): {r.payload!r}"
)
def main():
"""
This example demonstrates how to use the Foxglove WebSocket API to implement services which can
be called from the Foxglove app.
"""
foxglove.set_log_level("DEBUG")
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--host", type=str, default="127.0.0.1")
args = parser.parse_args()
logging_service = Service(
name="logging",
schema=ServiceSchema(
name="logging-schema",
),
handler=logging_handler,
)
echo_service = Service(
name="echo",
schema=ServiceSchema(
name="echo-schema",
),
handler=EchoService(),
)
server = foxglove.start_server(
name="ws-services-example",
port=args.port,
host=args.host,
capabilities=[Capability.Services],
# If publishing from Foxglove, add at least one supported encoding (json, ros1, or cdr).
# These examples use json.
supported_encodings=["json"],
# The services to publish
services=[echo_service, logging_service],
)
try:
while True:
pass
except KeyboardInterrupt:
server.stop()
if __name__ == "__main__":
main()
Asset server¶
import logging
import time
from typing import Optional
import foxglove
def asset_handler(uri: str) -> Optional[bytes]:
"""
This will respond to "package://" asset requests from Foxglove by reading files from disk.
This example doesn't do any path validation or upward traversal prevention.
"""
asset = None
if uri.startswith("package://"):
filepath = uri.replace("package://", "", 1)
try:
with open(filepath, "rb") as file:
asset = file.read()
except FileNotFoundError:
pass
status = "OK" if asset else "Not Found"
logging.debug(f"asset_handler {status}: {uri}")
return asset
def main() -> None:
foxglove.set_log_level(logging.DEBUG)
server = foxglove.start_server(
asset_handler=asset_handler,
)
try:
while True:
# Send transforms for the model as needed, on a `FrameTransformsChannel`
time.sleep(1)
except KeyboardInterrupt:
server.stop()
if __name__ == "__main__":
main()