Program Listing for File server.cpp

Return to documentation for file (foxglove/src/server.cpp)

#include <foxglove-c/foxglove-c.h>
#include <foxglove/channel.hpp>
#include <foxglove/context.hpp>
#include <foxglove/error.hpp>
#include <foxglove/server.hpp>

#include <type_traits>

namespace foxglove {

FoxgloveResult<WebSocketServer> WebSocketServer::create(
  WebSocketServerOptions&& options  // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
) {
  foxglove_internal_register_cpp_wrapper();

  bool has_any_callbacks =
    options.callbacks.onSubscribe || options.callbacks.onUnsubscribe ||
    options.callbacks.onClientAdvertise || options.callbacks.onMessageData ||
    options.callbacks.onClientUnadvertise || options.callbacks.onGetParameters ||
    options.callbacks.onSetParameters || options.callbacks.onParametersSubscribe ||
    options.callbacks.onParametersUnsubscribe || options.callbacks.onConnectionGraphSubscribe ||
    options.callbacks.onConnectionGraphUnsubscribe || options.callbacks.onPlaybackControlRequest;

  std::unique_ptr<WebSocketServerCallbacks> callbacks;
  std::unique_ptr<FetchAssetHandler> fetch_asset;
  std::unique_ptr<SinkChannelFilterFn> sink_channel_filter;

  foxglove_server_callbacks c_callbacks = {};

  if (has_any_callbacks) {
    callbacks = std::make_unique<WebSocketServerCallbacks>(std::move(options.callbacks));
    c_callbacks.context = callbacks.get();
    if (callbacks->onSubscribe) {
      c_callbacks.on_subscribe = [](
                                   const void* context,
                                   uint64_t channel_id,
                                   const foxglove_client_metadata c_client_metadata
                                 ) {
        try {
          ClientMetadata client_metadata{
            c_client_metadata.id,
            c_client_metadata.sink_id == 0 ? std::nullopt
                                           : std::make_optional<uint64_t>(c_client_metadata.sink_id)
          };
          (static_cast<const WebSocketServerCallbacks*>(context))
            ->onSubscribe(channel_id, client_metadata);
        } catch (const std::exception& exc) {
          warn() << "onSubscribe callback failed: " << exc.what();
        }
      };
    }
    if (callbacks->onUnsubscribe) {
      c_callbacks.on_unsubscribe =
        [](const void* context, uint64_t channel_id, foxglove_client_metadata c_client_metadata) {
          try {
            ClientMetadata client_metadata{
              c_client_metadata.id,
              c_client_metadata.sink_id == 0
                ? std::nullopt
                : std::make_optional<uint64_t>(c_client_metadata.sink_id)
            };
            (static_cast<const WebSocketServerCallbacks*>(context))
              ->onUnsubscribe(channel_id, client_metadata);
          } catch (const std::exception& exc) {
            warn() << "onUnsubscribe callback failed: " << exc.what();
          }
        };
    }
    if (callbacks->onClientAdvertise) {
      c_callbacks.on_client_advertise =
        [](const void* context, uint32_t client_id, const foxglove_client_channel* channel) {
          ClientChannel cpp_channel = {
            channel->id,
            channel->topic,
            channel->encoding,
            channel->schema_name,
            channel->schema_encoding == nullptr ? std::string_view{} : channel->schema_encoding,
            reinterpret_cast<const std::byte*>(channel->schema),
            channel->schema_len
          };
          try {
            (static_cast<const WebSocketServerCallbacks*>(context))
              ->onClientAdvertise(client_id, cpp_channel);
          } catch (const std::exception& exc) {
            warn() << "onClientAdvertise callback failed: " << exc.what();
          }
        };
    }
    if (callbacks->onMessageData) {
      c_callbacks.on_message_data = [](
                                      const void* context,
                                      // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
                                      uint32_t client_id,
                                      uint32_t client_channel_id,
                                      const uint8_t* payload,
                                      size_t payload_len
                                    ) {
        try {
          (static_cast<const WebSocketServerCallbacks*>(context))
            ->onMessageData(
              client_id, client_channel_id, reinterpret_cast<const std::byte*>(payload), payload_len
            );
        } catch (const std::exception& exc) {
          warn() << "onMessageData callback failed: " << exc.what();
        }
      };
    }
    if (callbacks->onClientUnadvertise) {
      c_callbacks.on_client_unadvertise =
        // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
        [](uint32_t client_id, uint32_t client_channel_id, const void* context) {
          try {
            (static_cast<const WebSocketServerCallbacks*>(context))
              ->onClientUnadvertise(client_id, client_channel_id);
          } catch (const std::exception& exc) {
            warn() << "onClientUnadvertise callback failed: " << exc.what();
          }
        };
    }
    if (callbacks->onGetParameters) {
      c_callbacks.on_get_parameters = [](
                                        const void* context,
                                        uint32_t client_id,
                                        // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
                                        const struct foxglove_string* c_request_id,
                                        const struct foxglove_string* c_param_names,
                                        size_t param_names_len
                                      ) -> foxglove_parameter_array* {
        std::optional<std::string_view> request_id;
        if (c_request_id != nullptr) {
          request_id.emplace(c_request_id->data, c_request_id->len);
        }
        std::vector<std::string_view> param_names;
        if (c_param_names != nullptr) {
          param_names.reserve(param_names_len);
          for (size_t i = 0; i < param_names_len; ++i) {
            param_names.emplace_back(c_param_names[i].data, c_param_names[i].len);
          }
        }
        std::vector<foxglove::Parameter> params;
        try {
          params = (static_cast<const WebSocketServerCallbacks*>(context))
                     ->onGetParameters(client_id, request_id, param_names);
        } catch (const std::exception& exc) {
          warn() << "onGetParameters callback failed: " << exc.what();
        }
        auto array = ParameterArray(std::move(params));
        return array.release();
      };
    }
    if (callbacks->onSetParameters) {
      c_callbacks.on_set_parameters = [](
                                        const void* context,
                                        uint32_t client_id,
                                        const struct foxglove_string* c_request_id,
                                        const foxglove_parameter_array* c_params
                                      ) -> foxglove_parameter_array* {
        std::optional<std::string_view> request_id;
        if (c_request_id != nullptr) {
          request_id.emplace(c_request_id->data, c_request_id->len);
        }
        if (c_params == nullptr) {
          // Should not happen; the C implementation never passes a null pointer.
          return nullptr;
        }
        std::vector<foxglove::Parameter> params;
        try {
          params =
            (static_cast<const WebSocketServerCallbacks*>(context))
              ->onSetParameters(client_id, request_id, ParameterArrayView(c_params).parameters());
        } catch (const std::exception& exc) {
          warn() << "onSetParameters callback failed: " << exc.what();
        }
        auto array = ParameterArray(std::move(params));
        return array.release();
      };
    }
    if (callbacks->onParametersSubscribe) {
      c_callbacks.on_parameters_subscribe =
        [](const void* context, const struct foxglove_string* c_names, size_t names_len) {
          std::vector<std::string_view> names;
          names.reserve(names_len);
          for (size_t i = 0; i < names_len; ++i) {
            names.emplace_back(c_names[i].data, c_names[i].len);
          }
          try {
            (static_cast<const WebSocketServerCallbacks*>(context))->onParametersSubscribe(names);
          } catch (const std::exception& exc) {
            warn() << "onParametersSubscribe callback failed: " << exc.what();
          }
        };
    }
    if (callbacks->onParametersUnsubscribe) {
      c_callbacks.on_parameters_unsubscribe =
        [](const void* context, const struct foxglove_string* c_names, size_t names_len) {
          std::vector<std::string_view> names;
          names.reserve(names_len);
          for (size_t i = 0; i < names_len; ++i) {
            names.emplace_back(c_names[i].data, c_names[i].len);
          }
          try {
            (static_cast<const WebSocketServerCallbacks*>(context))->onParametersUnsubscribe(names);
          } catch (const std::exception& exc) {
            warn() << "onParametersUnsubscribe callback failed: " << exc.what();
          }
        };
    }
    if (callbacks->onConnectionGraphSubscribe) {
      c_callbacks.on_connection_graph_subscribe = [](const void* context) {
        try {
          (static_cast<const WebSocketServerCallbacks*>(context))->onConnectionGraphSubscribe();
        } catch (const std::exception& exc) {
          warn() << "onConnectionGraphSubscribe callback failed: " << exc.what();
        }
      };
    }
    if (callbacks->onConnectionGraphUnsubscribe) {
      c_callbacks.on_connection_graph_unsubscribe = [](const void* context) {
        try {
          (static_cast<const WebSocketServerCallbacks*>(context))->onConnectionGraphUnsubscribe();
        } catch (const std::exception& exc) {
          warn() << "onConnectionGraphUnsubscribe callback failed: " << exc.what();
        }
      };
    }
    if (callbacks->onPlaybackControlRequest) {
      c_callbacks.on_playback_control_request =
        [](
          const void* context,
          const foxglove_playback_control_request* c_playback_control_request,
          foxglove_playback_state* c_playback_state
        ) {
          if (c_playback_control_request == nullptr) {
            return;
          }

          std::optional<PlaybackState> maybe_playback_state = std::nullopt;
          try {
            const auto* ctx = (static_cast<const WebSocketServerCallbacks*>(context));
            maybe_playback_state =
              ctx->onPlaybackControlRequest(PlaybackControlRequest::from(*c_playback_control_request
              ));
          } catch (const std::exception& exc) {
            warn() << "onPlaybackControlRequest callback failed: " << exc.what();
          }

          if (!maybe_playback_state.has_value()) {
            return;
          }

          const PlaybackState& playback_state = maybe_playback_state.value();
          c_playback_state->status = static_cast<uint8_t>(playback_state.status);
          c_playback_state->current_time = playback_state.current_time;
          c_playback_state->playback_speed = playback_state.playback_speed;
        };
    }
  }

  foxglove_server_options c_options = {};
  c_options.context = options.context.getInner();
  c_options.name = {options.name.c_str(), options.name.length()};
  c_options.host = {options.host.c_str(), options.host.length()};
  c_options.port = options.port;
  c_options.callbacks = has_any_callbacks ? &c_callbacks : nullptr;
  c_options.capabilities =
    static_cast<std::underlying_type_t<decltype(options.capabilities)>>(options.capabilities);
  std::vector<foxglove_string> supported_encodings;
  supported_encodings.reserve(options.supported_encodings.size());
  for (const auto& encoding : options.supported_encodings) {
    supported_encodings.push_back({encoding.c_str(), encoding.length()});
  }
  c_options.supported_encodings = supported_encodings.data();
  c_options.supported_encodings_count = supported_encodings.size();
  if (options.fetch_asset) {
    fetch_asset = std::make_unique<FetchAssetHandler>(options.fetch_asset);
    c_options.fetch_asset_context = fetch_asset.get();
    c_options.fetch_asset = [](
                              const void* context,
                              const struct foxglove_string* c_uri,
                              struct foxglove_fetch_asset_responder* c_responder
                            ) {
      try {
        const auto* handler = static_cast<const FetchAssetHandler*>(context);
        std::string_view uri{c_uri->data, c_uri->len};
        FetchAssetResponder responder(c_responder);
        (*handler)(uri, std::move(responder));
      } catch (const std::exception& exc) {
        warn() << "Fetch asset callback failed: " << exc.what();
      }
    };
  }

  if (options.playback_time_range) {
    c_options.playback_start_time = &options.playback_time_range->first;
    c_options.playback_end_time = &options.playback_time_range->second;
  }

  std::vector<foxglove_key_value> server_info;
  if (options.server_info) {
    server_info.reserve(options.server_info->size());
    for (auto const& [key, value] : *options.server_info) {
      server_info.push_back({{key.data(), key.length()}, {value.data(), value.length()}});
    }
    c_options.server_info = server_info.data();
    c_options.server_info_count = server_info.size();
  }

  if (options.tls_identity) {
    c_options.tls_cert = reinterpret_cast<const uint8_t*>(options.tls_identity->cert.data());
    c_options.tls_cert_len = options.tls_identity->cert.size();
    c_options.tls_key = reinterpret_cast<const uint8_t*>(options.tls_identity->key.data());
    c_options.tls_key_len = options.tls_identity->key.size();
  }

  if (options.sink_channel_filter) {
    sink_channel_filter = std::make_unique<SinkChannelFilterFn>(options.sink_channel_filter);

    c_options.sink_channel_filter_context = sink_channel_filter.get();
    c_options.sink_channel_filter =
      [](const void* context, const struct foxglove_channel_descriptor* channel) -> bool {
      try {
        if (!context) {
          return true;  // Default to allowing if no filter
        }
        auto* filter_func = static_cast<const SinkChannelFilterFn*>(context);
        auto cpp_channel = ChannelDescriptor(channel);
        return (*filter_func)(std::move(cpp_channel));
      } catch (const std::exception& exc) {
        warn() << "Sink channel filter failed: " << exc.what();
        return false;
      }
    };
  }
  foxglove_websocket_server* server = nullptr;
  foxglove_error error = foxglove_server_start(&c_options, &server);
  if (error != foxglove_error::FOXGLOVE_ERROR_OK || server == nullptr) {
    return tl::unexpected(static_cast<FoxgloveError>(error));
  }

  return WebSocketServer(
    server, std::move(callbacks), std::move(fetch_asset), std::move(sink_channel_filter)
  );
}

WebSocketServer::WebSocketServer(
  foxglove_websocket_server* server, std::unique_ptr<WebSocketServerCallbacks> callbacks,
  std::unique_ptr<FetchAssetHandler> fetch_asset,
  std::unique_ptr<SinkChannelFilterFn> sink_channel_filter
)
    : callbacks_(std::move(callbacks))
    , fetch_asset_(std::move(fetch_asset))
    , sink_channel_filter_(std::move(sink_channel_filter))
    , impl_(server, foxglove_server_stop) {}

FoxgloveError WebSocketServer::stop() {
  foxglove_error error = foxglove_server_stop(impl_.release());
  return FoxgloveError(error);
}

uint16_t WebSocketServer::port() const {
  return foxglove_server_get_port(impl_.get());
}

void WebSocketServer::broadcastTime(uint64_t timestamp_nanos) const noexcept {
  foxglove_server_broadcast_time(impl_.get(), timestamp_nanos);
}

void WebSocketServer::broadcastPlaybackState(const PlaybackState& playback_state) const noexcept {
  foxglove_playback_state c_playback_state_ptr;
  c_playback_state_ptr.status = static_cast<uint8_t>(playback_state.status);
  c_playback_state_ptr.current_time = playback_state.current_time;
  c_playback_state_ptr.playback_speed = playback_state.playback_speed;
  if (!playback_state.request_id.has_value() || playback_state.request_id->empty()) {
    c_playback_state_ptr.request_id = {nullptr, 0};
  } else {
    c_playback_state_ptr.request_id = {
      playback_state.request_id->data(), playback_state.request_id->length()
    };
  }
  foxglove_server_broadcast_playback_state(impl_.get(), &c_playback_state_ptr);
}

FoxgloveError WebSocketServer::clearSession(std::optional<std::string_view> session_id
) const noexcept {
  auto c_session_id = session_id
                        ? std::optional<foxglove_string>{{session_id->data(), session_id->size()}}
                        : std::nullopt;
  auto error = foxglove_server_clear_session(impl_.get(), c_session_id ? &*c_session_id : nullptr);
  return FoxgloveError(error);
}

// NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
FoxgloveError WebSocketServer::addService(Service&& service) const noexcept {
  auto error = foxglove_server_add_service(impl_.get(), service.release());
  return FoxgloveError(error);
}

FoxgloveError WebSocketServer::removeService(std::string_view name) const noexcept {
  auto error = foxglove_server_remove_service(impl_.get(), {name.data(), name.length()});
  return FoxgloveError(error);
}

void WebSocketServer::publishParameterValues(std::vector<Parameter>&& params) {
  ParameterArray array(std::move(params));
  foxglove_server_publish_parameter_values(impl_.get(), array.release());
}

void WebSocketServer::publishConnectionGraph(ConnectionGraph& graph) {
  foxglove_server_publish_connection_graph(impl_.get(), graph.impl_.get());
}

FoxgloveError WebSocketServer::publishStatus(
  WebSocketServerStatusLevel level, std::string_view message, std::optional<std::string_view> id
) const noexcept {
  auto c_id = id ? std::optional<foxglove_string>{{id->data(), id->size()}} : std::nullopt;
  auto error = foxglove_server_publish_status(
    impl_.get(),
    static_cast<foxglove_server_status_level>(level),
    {message.data(), message.size()},
    c_id ? &*c_id : nullptr
  );
  return FoxgloveError(error);
}

FoxgloveError WebSocketServer::removeStatus(const std::vector<std::string_view>& ids) const {
  std::vector<foxglove_string> c_ids;
  c_ids.reserve(ids.size());
  for (const auto& id : ids) {
    c_ids.push_back({id.data(), id.size()});
  }
  auto error = foxglove_server_remove_status(impl_.get(), c_ids.data(), c_ids.size());
  return FoxgloveError(error);
}

}  // namespace foxglove