diff --git a/foxglove_bridge_base/include/foxglove_bridge/common.hpp b/foxglove_bridge_base/include/foxglove_bridge/common.hpp index f53f01c..087b5d0 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/common.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/common.hpp @@ -7,6 +7,8 @@ namespace foxglove { constexpr char SUPPORTED_SUBPROTOCOL[] = "foxglove.websocket.v1"; +constexpr char CAPABILITY_CLIENT_PUBLISH[] = "clientPublish"; +constexpr char CAPABILITY_TIME[] = "time"; using ChannelId = uint32_t; using ClientChannelId = uint32_t; @@ -14,6 +16,7 @@ using SubscriptionId = uint32_t; enum class BinaryOpcode : uint8_t { MESSAGE_DATA = 1, + TIME_DATA = 2, }; enum class ClientBinaryOpcode : uint8_t { diff --git a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp index b328986..66d99f9 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp @@ -142,6 +142,7 @@ class ServerInterface { virtual void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, std::string_view data) = 0; + virtual void broadcastTime(uint64_t timestamp) = 0; virtual std::optional localEndpoint() = 0; virtual std::string remoteEndpointString(ConnHandle clientHandle) = 0; @@ -164,7 +165,8 @@ class Server final : public ServerInterface { static bool USES_TLS; - explicit Server(std::string name, LogCallback logger, const std::string& certfile = "", + explicit Server(std::string name, LogCallback logger, + const std::vector& capabilities, const std::string& certfile = "", const std::string& keyfile = ""); virtual ~Server(); @@ -188,6 +190,7 @@ class Server final : public ServerInterface { void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, std::string_view data) override; + void broadcastTime(uint64_t timestamp) override; std::optional localEndpoint() override; std::string remoteEndpointString(ConnHandle clientHandle) override; @@ -208,6 +211,7 @@ class Server final : public ServerInterface { std::string _name; LogCallback _logger; + std::vector _capabilities; std::string _certfile; std::string _keyfile; ServerType _server; @@ -242,9 +246,11 @@ class Server final : public ServerInterface { template inline Server::Server(std::string name, LogCallback logger, + const std::vector& capabilities, const std::string& certfile, const std::string& keyfile) : _name(std::move(name)) , _logger(logger) + , _capabilities(capabilities) , _certfile(certfile) , _keyfile(keyfile) { // Redirect logging @@ -310,7 +316,7 @@ inline void Server::handleConnectionOpened(ConnHandle hdl) con->send(json({ {"op", "serverInfo"}, {"name", _name}, - {"capabilities", json::array({"clientPublish"})}, + {"capabilities", _capabilities}, }) .dump()); @@ -814,6 +820,19 @@ inline void Server::sendMessage(ConnHandle clientHandle, Ch sendBinary(clientHandle, message); } +template +inline void Server::broadcastTime(uint64_t timestamp) { + std::vector message(1 + 8); + message[0] = uint8_t(BinaryOpcode::TIME_DATA); + foxglove::WriteUint64LE(message.data() + 1, timestamp); + + std::shared_lock lock(_clientsChannelMutex); + for (const auto& [hdl, clientInfo] : _clients) { + (void)clientInfo; + sendBinary(hdl, message); + } +} + template inline std::optional Server::localEndpoint() { std::error_code ec; diff --git a/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp b/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp index cadf74d..ec66ee8 100644 --- a/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp +++ b/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp @@ -1,6 +1,5 @@ #define ASIO_STANDALONE -#include #include #include #include @@ -46,6 +45,7 @@ class FoxgloveBridge : public nodelet::Nodelet { const auto certfile = nhp.param("certfile", ""); const auto keyfile = nhp.param("keyfile", ""); _maxUpdateMs = static_cast(nhp.param("max_update_ms", DEFAULT_MAX_UPDATE_MS)); + _useSimTime = nhp.param("/use_sim_time", false); const auto regexPatterns = nhp.param>("topic_whitelist", {".*"}); _topicWhitelistPatterns.reserve(regexPatterns.size()); @@ -62,14 +62,21 @@ class FoxgloveBridge : public nodelet::Nodelet { foxglove::WebSocketUserAgent()); try { + std::vector serverCapabilities = { + foxglove::CAPABILITY_CLIENT_PUBLISH, + }; + if (_useSimTime) { + serverCapabilities.push_back(foxglove::CAPABILITY_TIME); + } + const auto logHandler = std::bind(&FoxgloveBridge::logHandler, this, std::placeholders::_1, std::placeholders::_2); if (useTLS) { _server = std::make_unique>( - "foxglove_bridge", std::move(logHandler), certfile, keyfile); + "foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile); } else { _server = std::make_unique>( - "foxglove_bridge", std::move(logHandler)); + "foxglove_bridge", std::move(logHandler), serverCapabilities); } _server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this, @@ -86,6 +93,13 @@ class FoxgloveBridge : public nodelet::Nodelet { _server->start(address, static_cast(port)); updateAdvertisedTopics(ros::TimerEvent()); + + if (_useSimTime) { + _clockSubscription = getMTNodeHandle().subscribe( + "/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) { + _server->broadcastTime(msg->clock.toNSec()); + }); + } } catch (const std::exception& err) { ROS_ERROR("Failed to start websocket server: %s", err.what()); // Rethrow exception such that the nodelet is unloaded. @@ -329,14 +343,10 @@ class FoxgloveBridge : public nodelet::Nodelet { std::unordered_set latestTopics; latestTopics.reserve(topicNamesAndTypes.size()); - bool hasClockTopic = false; for (const auto& topicNameAndType : topicNamesAndTypes) { const auto& topicName = topicNameAndType.name; const auto& datatype = topicNameAndType.datatype; - // Check if a /clock topic is published - hasClockTopic = hasClockTopic || (topicName == "/clock" && datatype == "rosgraph_msgs/Clock"); - // Ignore the topic if it is not on the topic whitelist if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(), [&topicName](const auto& regex) { @@ -352,20 +362,6 @@ class FoxgloveBridge : public nodelet::Nodelet { numIgnoredTopics); } - // Enable or disable simulated time based on the presence of a /clock topic - if (!_useSimTime && hasClockTopic) { - ROS_INFO("/clock topic found, using simulated time"); - _useSimTime = true; - _clockSubscription = getMTNodeHandle().subscribe( - "/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) { - _simTimeNs = msg->clock.toNSec(); - }); - } else if (_useSimTime && !hasClockTopic) { - ROS_WARN("/clock topic disappeared"); - _useSimTime = false; - _clockSubscription.shutdown(); - } - // Create a list of topics that are new to us std::vector newTopics; for (const auto& topic : latestTopics) { @@ -475,7 +471,7 @@ class FoxgloveBridge : public nodelet::Nodelet { const foxglove::Channel& channel, foxglove::ConnHandle clientHandle, const ros::MessageEvent& msgEvent) { const auto& msg = msgEvent.getConstMessage(); - const auto receiptTimeNs = _useSimTime ? _simTimeNs.load() : msgEvent.getReceiptTime().toNSec(); + const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec(); _server->sendMessage( clientHandle, channel.id, receiptTimeNs, std::string_view(reinterpret_cast(msg->buffer()), msg->size())); @@ -494,8 +490,7 @@ class FoxgloveBridge : public nodelet::Nodelet { size_t _maxUpdateMs = size_t(DEFAULT_MAX_UPDATE_MS); size_t _updateCount = 0; ros::Subscriber _clockSubscription; - std::atomic _simTimeNs = 0; - std::atomic _useSimTime = false; + bool _useSimTime = false; }; } // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index 4990514..c59b819 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -116,14 +115,22 @@ class FoxgloveBridge : public rclcpp::Node { const auto useTLS = this->get_parameter("tls").as_bool(); const auto certfile = this->get_parameter("certfile").as_string(); const auto keyfile = this->get_parameter("keyfile").as_string(); + _useSimTime = this->get_parameter("use_sim_time").as_bool(); const auto logHandler = std::bind(&FoxgloveBridge::logHandler, this, _1, _2); + std::vector serverCapabilities = { + foxglove::CAPABILITY_CLIENT_PUBLISH, + }; + if (_useSimTime) { + serverCapabilities.push_back(foxglove::CAPABILITY_TIME); + } + if (useTLS) { _server = std::make_unique>( - "foxglove_bridge", std::move(logHandler), certfile, keyfile); + "foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile); } else { - _server = std::make_unique>("foxglove_bridge", - std::move(logHandler)); + _server = std::make_unique>( + "foxglove_bridge", std::move(logHandler), serverCapabilities); } _server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this, _1, _2)); _server->setUnsubscribeHandler(std::bind(&FoxgloveBridge::unsubscribeHandler, this, _1, _2)); @@ -154,6 +161,16 @@ class FoxgloveBridge : public rclcpp::Node { _clientPublishCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + + if (_useSimTime) { + _clockSubscription = this->create_subscription( + "/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(), + [&](std::shared_ptr msg) { + const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds(); + assert(timestamp >= 0 && "Timestamp is negative"); + _server->broadcastTime(static_cast(timestamp)); + }); + } } ~FoxgloveBridge() { @@ -192,16 +209,10 @@ class FoxgloveBridge : public rclcpp::Node { auto topicNamesAndTypes = this->get_node_graph_interface()->get_topic_names_and_types(); std::unordered_set latestTopics; latestTopics.reserve(topicNamesAndTypes.size()); - bool hasClockTopic = false; for (const auto& topicNamesAndType : topicNamesAndTypes) { const auto& topicName = topicNamesAndType.first; const auto& datatypes = topicNamesAndType.second; - // Check if a /clock topic is published - hasClockTopic = hasClockTopic || (topicName == "/clock" && - std::find(datatypes.begin(), datatypes.end(), - "rosgraph_msgs/msg/Clock") != datatypes.end()); - // Ignore the topic if it is not on the topic whitelist if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(), [&topicName](const auto& regex) { @@ -220,21 +231,6 @@ class FoxgloveBridge : public rclcpp::Node { numIgnoredTopics); } - // Enable or disable simulated time based on the presence of a /clock topic - if (!_useSimTime && hasClockTopic) { - RCLCPP_INFO(this->get_logger(), "/clock topic found, using simulated time"); - _useSimTime = true; - _clockSubscription = this->create_subscription( - "/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(), - [&](std::shared_ptr msg) { - _simTimeNs = uint64_t(rclcpp::Time{msg->clock}.nanoseconds()); - }); - } else if (_useSimTime && !hasClockTopic) { - RCLCPP_WARN(this->get_logger(), "/clock topic disappeared"); - _useSimTime = false; - _clockSubscription.reset(); - } - // Create a list of topics that are new to us std::vector newTopics; for (const auto& topic : latestTopics) { @@ -343,8 +339,7 @@ class FoxgloveBridge : public rclcpp::Node { std::unique_ptr _rosgraphPollThread; size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH; std::shared_ptr> _clockSubscription; - std::atomic _simTimeNs = 0; - std::atomic _useSimTime = false; + bool _useSimTime = false; void subscribeHandler(foxglove::ChannelId channelId, foxglove::ConnHandle clientHandle) { std::lock_guard lock(_subscriptionsMutex); @@ -638,15 +633,12 @@ class FoxgloveBridge : public rclcpp::Node { std::shared_ptr msg) { // NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing // to `/rosout` will cause a feedback loop - const auto timestamp = currentTimeNs(); + const auto timestamp = this->now().nanoseconds(); + assert(timestamp >= 0 && "Timestamp is negative"); const auto payload = std::string_view{reinterpret_cast(msg->get_rcl_serialized_message().buffer), msg->get_rcl_serialized_message().buffer_length}; - _server->sendMessage(clientHandle, channel.id, timestamp, payload); - } - - uint64_t currentTimeNs() { - return _useSimTime ? _simTimeNs.load() : uint64_t(this->now().nanoseconds()); + _server->sendMessage(clientHandle, channel.id, static_cast(timestamp), payload); } };