Skip to content

Commit

Permalink
Publish binary time data when use_sim_time parameter is true (#114)
Browse files Browse the repository at this point in the history
**Public-Facing Changes**
- Publish binary time data when `use_sim_time` is `true`

**Description**
Depends on foxglove/ws-protocol#299

- When the `use_sim_time` parameter is set
  - The server advertises the `time` capability
- It subscribes to the `/clock` topic and broadcasts binary time
messages
- I have removed the code that detects presence of the `/clock` topic,
since now it has to be known at startup whether `use_sim_time` is true
or false

- Includes #115 (will rebase after it got merged)
  • Loading branch information
achim-k committed Dec 14, 2022
1 parent 8ab2fc6 commit 8fccb98
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 59 deletions.
3 changes: 3 additions & 0 deletions foxglove_bridge_base/include/foxglove_bridge/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
namespace foxglove {

constexpr char SUPPORTED_SUBPROTOCOL[] = "foxglove.websocket.v1";
constexpr char CAPABILITY_CLIENT_PUBLISH[] = "clientPublish";
constexpr char CAPABILITY_TIME[] = "time";

using ChannelId = uint32_t;
using ClientChannelId = uint32_t;
using SubscriptionId = uint32_t;

enum class BinaryOpcode : uint8_t {
MESSAGE_DATA = 1,
TIME_DATA = 2,
};

enum class ClientBinaryOpcode : uint8_t {
Expand Down
23 changes: 21 additions & 2 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class ServerInterface {

virtual void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
std::string_view data) = 0;
virtual void broadcastTime(uint64_t timestamp) = 0;

virtual std::optional<Tcp::endpoint> localEndpoint() = 0;
virtual std::string remoteEndpointString(ConnHandle clientHandle) = 0;
Expand All @@ -164,7 +165,8 @@ class Server final : public ServerInterface {

static bool USES_TLS;

explicit Server(std::string name, LogCallback logger, const std::string& certfile = "",
explicit Server(std::string name, LogCallback logger,
const std::vector<std::string>& capabilities, const std::string& certfile = "",
const std::string& keyfile = "");
virtual ~Server();

Expand All @@ -188,6 +190,7 @@ class Server final : public ServerInterface {

void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
std::string_view data) override;
void broadcastTime(uint64_t timestamp) override;

std::optional<Tcp::endpoint> localEndpoint() override;
std::string remoteEndpointString(ConnHandle clientHandle) override;
Expand All @@ -208,6 +211,7 @@ class Server final : public ServerInterface {

std::string _name;
LogCallback _logger;
std::vector<std::string> _capabilities;
std::string _certfile;
std::string _keyfile;
ServerType _server;
Expand Down Expand Up @@ -242,9 +246,11 @@ class Server final : public ServerInterface {

template <typename ServerConfiguration>
inline Server<ServerConfiguration>::Server(std::string name, LogCallback logger,
const std::vector<std::string>& capabilities,
const std::string& certfile, const std::string& keyfile)
: _name(std::move(name))
, _logger(logger)
, _capabilities(capabilities)
, _certfile(certfile)
, _keyfile(keyfile) {
// Redirect logging
Expand Down Expand Up @@ -310,7 +316,7 @@ inline void Server<ServerConfiguration>::handleConnectionOpened(ConnHandle hdl)
con->send(json({
{"op", "serverInfo"},
{"name", _name},
{"capabilities", json::array({"clientPublish"})},
{"capabilities", _capabilities},
})
.dump());

Expand Down Expand Up @@ -814,6 +820,19 @@ inline void Server<ServerConfiguration>::sendMessage(ConnHandle clientHandle, Ch
sendBinary(clientHandle, message);
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::broadcastTime(uint64_t timestamp) {
std::vector<uint8_t> message(1 + 8);
message[0] = uint8_t(BinaryOpcode::TIME_DATA);
foxglove::WriteUint64LE(message.data() + 1, timestamp);

std::shared_lock<std::shared_mutex> lock(_clientsChannelMutex);
for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
sendBinary(hdl, message);
}
}

template <typename ServerConfiguration>
inline std::optional<asio::ip::tcp::endpoint> Server<ServerConfiguration>::localEndpoint() {
std::error_code ec;
Expand Down
43 changes: 19 additions & 24 deletions ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#define ASIO_STANDALONE

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -46,6 +45,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
const auto certfile = nhp.param<std::string>("certfile", "");
const auto keyfile = nhp.param<std::string>("keyfile", "");
_maxUpdateMs = static_cast<size_t>(nhp.param<int>("max_update_ms", DEFAULT_MAX_UPDATE_MS));
_useSimTime = nhp.param<bool>("/use_sim_time", false);

const auto regexPatterns = nhp.param<std::vector<std::string>>("topic_whitelist", {".*"});
_topicWhitelistPatterns.reserve(regexPatterns.size());
Expand All @@ -62,14 +62,21 @@ class FoxgloveBridge : public nodelet::Nodelet {
foxglove::WebSocketUserAgent());

try {
std::vector<std::string> serverCapabilities = {
foxglove::CAPABILITY_CLIENT_PUBLISH,
};
if (_useSimTime) {
serverCapabilities.push_back(foxglove::CAPABILITY_TIME);
}

const auto logHandler =
std::bind(&FoxgloveBridge::logHandler, this, std::placeholders::_1, std::placeholders::_2);
if (useTLS) {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketTls>>(
"foxglove_bridge", std::move(logHandler), certfile, keyfile);
"foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile);
} else {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>(
"foxglove_bridge", std::move(logHandler));
"foxglove_bridge", std::move(logHandler), serverCapabilities);
}

_server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this,
Expand All @@ -86,6 +93,13 @@ class FoxgloveBridge : public nodelet::Nodelet {
_server->start(address, static_cast<uint16_t>(port));

updateAdvertisedTopics(ros::TimerEvent());

if (_useSimTime) {
_clockSubscription = getMTNodeHandle().subscribe<rosgraph_msgs::Clock>(
"/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) {
_server->broadcastTime(msg->clock.toNSec());
});
}
} catch (const std::exception& err) {
ROS_ERROR("Failed to start websocket server: %s", err.what());
// Rethrow exception such that the nodelet is unloaded.
Expand Down Expand Up @@ -329,14 +343,10 @@ class FoxgloveBridge : public nodelet::Nodelet {

std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
latestTopics.reserve(topicNamesAndTypes.size());
bool hasClockTopic = false;
for (const auto& topicNameAndType : topicNamesAndTypes) {
const auto& topicName = topicNameAndType.name;
const auto& datatype = topicNameAndType.datatype;

// Check if a /clock topic is published
hasClockTopic = hasClockTopic || (topicName == "/clock" && datatype == "rosgraph_msgs/Clock");

// Ignore the topic if it is not on the topic whitelist
if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(),
[&topicName](const auto& regex) {
Expand All @@ -352,20 +362,6 @@ class FoxgloveBridge : public nodelet::Nodelet {
numIgnoredTopics);
}

// Enable or disable simulated time based on the presence of a /clock topic
if (!_useSimTime && hasClockTopic) {
ROS_INFO("/clock topic found, using simulated time");
_useSimTime = true;
_clockSubscription = getMTNodeHandle().subscribe<rosgraph_msgs::Clock>(
"/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) {
_simTimeNs = msg->clock.toNSec();
});
} else if (_useSimTime && !hasClockTopic) {
ROS_WARN("/clock topic disappeared");
_useSimTime = false;
_clockSubscription.shutdown();
}

// Create a list of topics that are new to us
std::vector<TopicAndDatatype> newTopics;
for (const auto& topic : latestTopics) {
Expand Down Expand Up @@ -475,7 +471,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
const foxglove::Channel& channel, foxglove::ConnHandle clientHandle,
const ros::MessageEvent<ros_babel_fish::BabelFishMessage const>& msgEvent) {
const auto& msg = msgEvent.getConstMessage();
const auto receiptTimeNs = _useSimTime ? _simTimeNs.load() : msgEvent.getReceiptTime().toNSec();
const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec();
_server->sendMessage(
clientHandle, channel.id, receiptTimeNs,
std::string_view(reinterpret_cast<const char*>(msg->buffer()), msg->size()));
Expand All @@ -494,8 +490,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
size_t _maxUpdateMs = size_t(DEFAULT_MAX_UPDATE_MS);
size_t _updateCount = 0;
ros::Subscriber _clockSubscription;
std::atomic<uint64_t> _simTimeNs = 0;
std::atomic<bool> _useSimTime = false;
bool _useSimTime = false;
};

} // namespace foxglove_bridge
Expand Down
58 changes: 25 additions & 33 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#include <atomic>
#include <chrono>
#include <memory>
#include <regex>
Expand Down Expand Up @@ -116,14 +115,22 @@ class FoxgloveBridge : public rclcpp::Node {
const auto useTLS = this->get_parameter("tls").as_bool();
const auto certfile = this->get_parameter("certfile").as_string();
const auto keyfile = this->get_parameter("keyfile").as_string();
_useSimTime = this->get_parameter("use_sim_time").as_bool();
const auto logHandler = std::bind(&FoxgloveBridge::logHandler, this, _1, _2);

std::vector<std::string> serverCapabilities = {
foxglove::CAPABILITY_CLIENT_PUBLISH,
};
if (_useSimTime) {
serverCapabilities.push_back(foxglove::CAPABILITY_TIME);
}

if (useTLS) {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketTls>>(
"foxglove_bridge", std::move(logHandler), certfile, keyfile);
"foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile);
} else {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>("foxglove_bridge",
std::move(logHandler));
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>(
"foxglove_bridge", std::move(logHandler), serverCapabilities);
}
_server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this, _1, _2));
_server->setUnsubscribeHandler(std::bind(&FoxgloveBridge::unsubscribeHandler, this, _1, _2));
Expand Down Expand Up @@ -154,6 +161,16 @@ class FoxgloveBridge : public rclcpp::Node {

_clientPublishCallbackGroup =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

if (_useSimTime) {
_clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
[&](std::shared_ptr<rosgraph_msgs::msg::Clock> msg) {
const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds();
assert(timestamp >= 0 && "Timestamp is negative");
_server->broadcastTime(static_cast<uint64_t>(timestamp));
});
}
}

~FoxgloveBridge() {
Expand Down Expand Up @@ -192,16 +209,10 @@ class FoxgloveBridge : public rclcpp::Node {
auto topicNamesAndTypes = this->get_node_graph_interface()->get_topic_names_and_types();
std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
latestTopics.reserve(topicNamesAndTypes.size());
bool hasClockTopic = false;
for (const auto& topicNamesAndType : topicNamesAndTypes) {
const auto& topicName = topicNamesAndType.first;
const auto& datatypes = topicNamesAndType.second;

// Check if a /clock topic is published
hasClockTopic = hasClockTopic || (topicName == "/clock" &&
std::find(datatypes.begin(), datatypes.end(),
"rosgraph_msgs/msg/Clock") != datatypes.end());

// Ignore the topic if it is not on the topic whitelist
if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(),
[&topicName](const auto& regex) {
Expand All @@ -220,21 +231,6 @@ class FoxgloveBridge : public rclcpp::Node {
numIgnoredTopics);
}

// Enable or disable simulated time based on the presence of a /clock topic
if (!_useSimTime && hasClockTopic) {
RCLCPP_INFO(this->get_logger(), "/clock topic found, using simulated time");
_useSimTime = true;
_clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
[&](std::shared_ptr<rosgraph_msgs::msg::Clock> msg) {
_simTimeNs = uint64_t(rclcpp::Time{msg->clock}.nanoseconds());
});
} else if (_useSimTime && !hasClockTopic) {
RCLCPP_WARN(this->get_logger(), "/clock topic disappeared");
_useSimTime = false;
_clockSubscription.reset();
}

// Create a list of topics that are new to us
std::vector<TopicAndDatatype> newTopics;
for (const auto& topic : latestTopics) {
Expand Down Expand Up @@ -343,8 +339,7 @@ class FoxgloveBridge : public rclcpp::Node {
std::unique_ptr<std::thread> _rosgraphPollThread;
size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH;
std::shared_ptr<rclcpp::Subscription<rosgraph_msgs::msg::Clock>> _clockSubscription;
std::atomic<uint64_t> _simTimeNs = 0;
std::atomic<bool> _useSimTime = false;
bool _useSimTime = false;

void subscribeHandler(foxglove::ChannelId channelId, foxglove::ConnHandle clientHandle) {
std::lock_guard<std::mutex> lock(_subscriptionsMutex);
Expand Down Expand Up @@ -638,15 +633,12 @@ class FoxgloveBridge : public rclcpp::Node {
std::shared_ptr<rclcpp::SerializedMessage> msg) {
// NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing
// to `/rosout` will cause a feedback loop
const auto timestamp = currentTimeNs();
const auto timestamp = this->now().nanoseconds();
assert(timestamp >= 0 && "Timestamp is negative");
const auto payload =
std::string_view{reinterpret_cast<const char*>(msg->get_rcl_serialized_message().buffer),
msg->get_rcl_serialized_message().buffer_length};
_server->sendMessage(clientHandle, channel.id, timestamp, payload);
}

uint64_t currentTimeNs() {
return _useSimTime ? _simTimeNs.load() : uint64_t(this->now().nanoseconds());
_server->sendMessage(clientHandle, channel.id, static_cast<uint64_t>(timestamp), payload);
}
};

Expand Down

0 comments on commit 8fccb98

Please sign in to comment.