Skip to content

Commit

Permalink
add SubscriptionType enum to distniguish between state, delta, patch …
Browse files Browse the repository at this point in the history
…to replace boolean

Summary: extending this so i could add patch api

Differential Revision: D62675733

fbshipit-source-id: 004d1432b2c22cc63014f8a26b97449aa0353958
  • Loading branch information
Wei-Cheng Lin authored and facebook-github-bot committed Sep 18, 2024
1 parent b608792 commit ccc9823
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 55 deletions.
5 changes: 4 additions & 1 deletion fboss/fsdb/client/FsdbPatchSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@ using FsdbPatchSubscriber = FsdbPatchSubscriberImpl<
SubscriberMessage,
SubscriberChunk,
std::map<SubscriptionKey, RawOperPath>>;
// TODO: impl extended patch subscribers
using FsdbExtPatchSubscriber = FsdbPatchSubscriberImpl<
SubscriberMessage,
SubscriberChunk,
std::map<SubscriptionKey, ExtendedOperPath>>;
} // namespace facebook::fboss::fsdb
72 changes: 44 additions & 28 deletions fboss/fsdb/client/FsdbPubSubManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

namespace {
using namespace facebook::fboss::fsdb;
auto constexpr kDelta = "delta";
auto constexpr kState = "state";
auto constexpr kStats = "stats";
auto constexpr kPath = "path";
auto constexpr kReconnectThread = "FsdbReconnectThread";
auto constexpr kSubscriberThread = "FsdbSubscriberThread";
auto constexpr kStatsPublisherThread = "FsdbStatsPublisherThread";
Expand All @@ -25,12 +23,12 @@ auto constexpr kStatePublisherThread = "FsdbStatePublisherThread";
std::string toSubscriptionStr(
const std::string& fsdbHost,
const std::vector<std::string>& path,
bool isDelta,
SubscriptionType subscriptionType,
bool subscribeStats) {
return folly::to<std::string>(
fsdbHost,
":/",
(isDelta ? kDelta : kPath),
subscriptionTypeToStr[subscriptionType],
":/",
(subscribeStats ? kStats : kState),
":/",
Expand All @@ -40,12 +38,12 @@ std::string toSubscriptionStr(
std::string toSubscriptionStr(
const std::string& fsdbHost,
const std::vector<ExtendedOperPath>& paths,
bool isDelta,
SubscriptionType subscriptionType,
bool subscribeStats) {
return folly::to<std::string>(
fsdbHost,
":/",
(isDelta ? kDelta : kPath),
subscriptionTypeToStr[subscriptionType],
":/",
(subscribeStats ? kStats : kState),
":/",
Expand Down Expand Up @@ -411,6 +409,8 @@ void FsdbPubSubManager::addStatePathSubscription(
SubscriptionStateChangeCb stateChangeCb,
FsdbExtStateSubscriber::FsdbOperStateUpdateCb operStateCb,
FsdbStreamClient::ServerOptions&& serverOptions) {
XLOG(INFO) << "addStatePathSubscription: "
<< typeid(FsdbExtStateSubscriber).name();
addSubscriptionImpl<FsdbExtStateSubscriber>(
std::move(subscriptionOptions),
PathHelpers::toExtendedOperPath(subscribePaths),
Expand Down Expand Up @@ -479,13 +479,12 @@ void FsdbPubSubManager::addSubscriptionImpl(
bool subscribeStats,
FsdbStreamClient::ServerOptions&& serverOptions,
const std::optional<std::string>& clientIdSuffix) {
auto isDelta = std::disjunction_v<
std::is_same<SubscriberT, FsdbDeltaSubscriber>,
std::is_same<SubscriberT, FsdbExtDeltaSubscriber>>;
auto subscriptionType = SubscriberT::subscriptionType();
XCHECK(subscriptionType != SubscriptionType::UNKNOWN) << "Unknown data type";
auto subsStr = toSubscriptionStr(
serverOptions.dstAddr.getAddressStr(),
subscribePath,
isDelta,
subscriptionType,
subscribeStats);
auto& path2Subscriber =
subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_;
Expand Down Expand Up @@ -521,13 +520,12 @@ void FsdbPubSubManager::addSubscriptionImpl(
SubscriptionStateChangeCb stateChangeCb,
typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb,
FsdbStreamClient::ServerOptions&& serverOptions) {
auto isDelta = std::disjunction_v<
std::is_same<SubscriberT, FsdbDeltaSubscriber>,
std::is_same<SubscriberT, FsdbExtDeltaSubscriber>>;
auto subscriptionType = SubscriberT::subscriptionType();
XCHECK(subscriptionType != SubscriptionType::UNKNOWN) << "Unknown data type";
auto subsStr = toSubscriptionStr(
serverOptions.dstAddr.getAddressStr(),
subscribePath,
isDelta,
subscriptionType,
subscriptionOptions.subscribeStats_);
auto& path2Subscriber = subscriptionOptions.subscribeStats_
? statPath2Subscriber_
Expand Down Expand Up @@ -574,25 +572,34 @@ void FsdbPubSubManager::removeStateDeltaSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePath, fsdbHost, true /*delta*/, false /*subscribeStats*/);
subscribePath,
fsdbHost,
SubscriptionType::DELTA,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatePathSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePath, fsdbHost, false /*delta*/, false /*subscribeStats*/);
subscribePath,
fsdbHost,
SubscriptionType::PATH,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatDeltaSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePath, fsdbHost, true /*delta*/, true /*subscribeStats*/);
subscribePath,
fsdbHost,
SubscriptionType::DELTA,
true /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatPathSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePath, fsdbHost, false /*delta*/, true /*subscribeStats*/);
subscribePath, fsdbHost, SubscriptionType::PATH, true /*subscribeStats*/);
}

void FsdbPubSubManager::removeStateDeltaSubscription(
Expand All @@ -601,7 +608,7 @@ void FsdbPubSubManager::removeStateDeltaSubscription(
removeSubscriptionImpl(
PathHelpers::toExtendedOperPath(subscribePath),
fsdbHost,
true /*delta*/,
SubscriptionType::DELTA,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatePathSubscription(
Expand All @@ -610,7 +617,7 @@ void FsdbPubSubManager::removeStatePathSubscription(
removeSubscriptionImpl(
PathHelpers::toExtendedOperPath(subscribePath),
fsdbHost,
false /*delta*/,
SubscriptionType::PATH,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatDeltaSubscription(
Expand All @@ -619,7 +626,7 @@ void FsdbPubSubManager::removeStatDeltaSubscription(
removeSubscriptionImpl(
PathHelpers::toExtendedOperPath(subscribePath),
fsdbHost,
true /*delta*/,
SubscriptionType::DELTA,
true /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatPathSubscription(
Expand All @@ -628,20 +635,26 @@ void FsdbPubSubManager::removeStatPathSubscription(
removeSubscriptionImpl(
PathHelpers::toExtendedOperPath(subscribePath),
fsdbHost,
false /*delta*/,
SubscriptionType::PATH,
true /*subscribeStats*/);
}
void FsdbPubSubManager::removeStateExtDeltaSubscription(
const std::vector<ExtendedOperPath>& subscribePaths,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePaths, fsdbHost, true /*delta*/, false /*subscribeStats*/);
subscribePaths,
fsdbHost,
SubscriptionType::DELTA,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatExtDeltaSubscription(
const std::vector<ExtendedOperPath>& subscribePaths,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePaths, fsdbHost, true /*delta*/, true /*subscribeStats*/);
subscribePaths,
fsdbHost,
SubscriptionType::DELTA,
true /*subscribeStats*/);
}

void FsdbPubSubManager::clearStateSubscriptions() {
Expand All @@ -655,10 +668,10 @@ template <typename PathElement>
void FsdbPubSubManager::removeSubscriptionImpl(
const std::vector<PathElement>& subscribePath,
const std::string& fsdbHost,
bool isDelta,
SubscriptionType subscriptionType,
bool subscribeStats) {
auto subsStr =
toSubscriptionStr(fsdbHost, subscribePath, isDelta, subscribeStats);
auto subsStr = toSubscriptionStr(
fsdbHost, subscribePath, subscriptionType, subscribeStats);
auto& path2Subscriber =
subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_;
if (path2Subscriber.wlock()->erase(subsStr)) {
Expand All @@ -670,7 +683,10 @@ FsdbStreamClient::State FsdbPubSubManager::getStatePathSubsriptionState(
const MultiPath& subscribePath,
const std::string& fsdbHost) const {
auto subsStr = toSubscriptionStr(
fsdbHost, PathHelpers::toExtendedOperPath(subscribePath), false, false);
fsdbHost,
PathHelpers::toExtendedOperPath(subscribePath),
SubscriptionType::PATH,
false);
auto path2SubscriberR = statePath2Subscriber_.rlock();
if (path2SubscriberR->find(subsStr) == path2SubscriberR->end()) {
return FsdbStreamClient::State::CANCELLED;
Expand Down
3 changes: 2 additions & 1 deletion fboss/fsdb/client/FsdbPubSubManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <folly/io/async/ScopedEventBaseThread.h>
#include <gtest/gtest_prod.h>
#include "fboss/fsdb/client/FsdbDeltaSubscriber.h"
#include "fboss/fsdb/client/FsdbPatchSubscriber.h"
#include "fboss/fsdb/client/FsdbStateSubscriber.h"
#include "fboss/fsdb/client/FsdbStreamClient.h"
#include "fboss/fsdb/common/Flags.h"
Expand Down Expand Up @@ -247,7 +248,7 @@ class FsdbPubSubManager {
void removeSubscriptionImpl(
const std::vector<PathElement>& subscribePath,
const std::string& fsdbHost,
bool isDelta,
SubscriptionType subscribeType,
bool subscribeStats);
template <typename SubscriberT, typename PathElement>
void addSubscriptionImpl(
Expand Down
22 changes: 15 additions & 7 deletions fboss/fsdb/client/FsdbSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@
namespace facebook::fboss::fsdb {

template <typename SubUnit, typename Paths>
std::string FsdbSubscriber<SubUnit, Paths>::typeStr() const {
SubscriptionType FsdbSubscriber<SubUnit, Paths>::subscriptionType() {
if constexpr (
std::is_same_v<SubUnit, OperDelta> ||
std::is_same_v<SubUnit, TaggedOperDelta>) {
return "Delta";
} else if (
std::is_same_v<SubUnit, OperSubDeltaUnit>) {
return SubscriptionType::DELTA;
} else if constexpr (
std::is_same_v<SubUnit, OperState> ||
std::is_same_v<SubUnit, TaggedOperState>) {
return "Path";
std::is_same_v<SubUnit, OperSubPathUnit>) {
return SubscriptionType::PATH;
} else if constexpr (std::is_same_v<SubUnit, SubscriberChunk>) {
return SubscriptionType::PATCH;
} else {
return "Patch";
static_assert(folly::always_false<SubUnit>, "unsupported request type");
}
}

template <typename SubUnit, typename Paths>
std::string FsdbSubscriber<SubUnit, Paths>::typeStr() const {
auto subType = subscriptionType();
return subscriptionTypeToStr[subType];
}
template <typename SubUnit, typename Paths>
std::string FsdbSubscriber<SubUnit, Paths>::pathsStr(const Paths& path) const {
return PathHelpers::toString(path);
Expand Down
20 changes: 18 additions & 2 deletions fboss/fsdb/client/FsdbSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ enum class SubscriptionState : uint16_t {
CONNECTED,
};

enum class SubscriptionType {
UNKNOWN = 0,
PATH = 1,
DELTA = 2,
PATCH = 3,
};

static std::unordered_map<SubscriptionType, std::string> subscriptionTypeToStr =
{
{SubscriptionType::PATH, "Path"},
{SubscriptionType::DELTA, "Delta"},
{SubscriptionType::PATCH, "Patch"},
};

inline bool isConnected(const SubscriptionState& state) {
return state == SubscriptionState::CONNECTED;
}
Expand Down Expand Up @@ -82,7 +96,7 @@ struct SubscriptionOptions {

struct SubscriptionInfo {
std::string server;
bool isDelta;
SubscriptionType subscriptionType;
bool isStats;
std::vector<std::string> paths;
FsdbStreamClient::State state;
Expand Down Expand Up @@ -168,10 +182,12 @@ class FsdbSubscriber : public FsdbSubscriberBase {
cancelStaleStateTimeout();
}

static SubscriptionType subscriptionType();

SubscriptionInfo getInfo() const override {
return SubscriptionInfo{
getServer(),
!std::is_same_v<SubUnit, OperState>,
subscriptionType(),
this->isStats(),
PathHelpers::toStringList(subscribePaths_),
getState(),
Expand Down
6 changes: 6 additions & 0 deletions fboss/fsdb/client/test/FsdbPubSubManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,10 @@ TEST_F(PubSubManagerTest, removeAllSubscriptions) {
addStatDeltaSubscription({"foo"});
}

TEST_F(PubSubManagerTest, TestSubscriptionInfo) {
EXPECT_EQ(FsdbPatchSubscriber::subscriptionType(), SubscriptionType::PATCH);
EXPECT_EQ(FsdbStateSubscriber::subscriptionType(), SubscriptionType::PATH);
EXPECT_EQ(FsdbDeltaSubscriber::subscriptionType(), SubscriptionType::DELTA);
}

} // namespace facebook::fboss::fsdb
33 changes: 17 additions & 16 deletions fboss/fsdb/tests/client/FsdbPubSubManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,24 @@ class FsdbPubSubManagerTest : public ::testing::Test {
protected:
folly::Synchronized<std::map<std::string, FsdbErrorCode>>
subscriptionLastDisconnectReason;
void updateSubscriptionLastDisconnectReason(bool isDelta, bool isStats) {
void updateSubscriptionLastDisconnectReason(
SubscriptionType subscriptionType,
bool isStats) {
auto subscriptionInfoList = this->pubSubManager_->getSubscriptionInfo();
for (const auto& subscriptionInfo : subscriptionInfoList) {
if (isDelta == subscriptionInfo.isDelta &&
if (subscriptionType == subscriptionInfo.subscriptionType &&
isStats == subscriptionInfo.isStats) {
auto reason = subscriptionInfo.disconnectReason;
subscriptionLastDisconnectReason.withWLock([&](auto& map) {
std::string subscriberId = isStats ? "STAT" : "State";
subscriberId += isDelta ? "_Delta" : "_Path";
subscriberId +=
fmt::format("_{}", subscriptionTypeToStr[subscriptionType]);
map[subscriberId] = reason;
});
return;
}
}
}
FsdbErrorCode getSubscriptionLastDisconnectReason(
bool isDelta,
bool isStats) {
std::string subscriberId = isStats ? "STAT" : "State";
subscriberId += isDelta ? "_Delta" : "_Path";
return subscriptionLastDisconnectReason.rlock()->at(subscriberId);
}
void checkDisconnectReason(FsdbErrorCode expected) {
WITH_RETRIES({
auto reasons = subscriptionLastDisconnectReason.rlock();
Expand Down Expand Up @@ -493,21 +489,26 @@ TYPED_TEST(FsdbPubSubManagerGRTest, verifySubscriptionDisconnectOnPublisherGR) {
this->addStatDeltaSubscription(
this->makeOperDeltaCb(statDeltas),
this->subscrStateChangeCb(stateDeltas, [this]() {
this->updateSubscriptionLastDisconnectReason(true, true);
this->updateSubscriptionLastDisconnectReason(
SubscriptionType::DELTA, true);
}));
this->addStatPathSubscription(
this->makeOperStateCb(statPaths),
this->subscrStateChangeCb(stateDeltas, [this]() {
this->updateSubscriptionLastDisconnectReason(false, true);
this->updateSubscriptionLastDisconnectReason(
SubscriptionType::PATH, true);
}));
this->addStateDeltaSubscription(
this->makeOperDeltaCb(stateDeltas),
this->subscrStateChangeCb(stateDeltas, [this]() {
this->updateSubscriptionLastDisconnectReason(true, false);
this->updateSubscriptionLastDisconnectReason(
SubscriptionType::DELTA, false);
}));
SubscriptionStateChangeCb stChangeCb = this->subscrStateChangeCb(
statePaths,
[this]() { this->updateSubscriptionLastDisconnectReason(false, false); });
SubscriptionStateChangeCb stChangeCb =
this->subscrStateChangeCb(statePaths, [this]() {
this->updateSubscriptionLastDisconnectReason(
SubscriptionType::PATH, false);
});
this->addStatePathSubscription(this->makeOperStateCb(statePaths), stChangeCb);
// Publish
this->publish(makePortStats(1));
Expand Down

0 comments on commit ccc9823

Please sign in to comment.