Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support getting connection info from RedisClient #1770

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions lib/src/RedisClientSkipped.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ namespace drogon
{
namespace nosql
{

std::shared_ptr<RedisClient> RedisClient::newRedisClient(
const trantor::InetAddress& /*serverAddress*/,
size_t /*numberOfConnections*/,
const std::string& /*password*/,
const unsigned int /*db*/,
const std::string& /*username*/)
RedisConnectionInfo,
size_t /*numberOfConnections*/)
{
LOG_FATAL << "Redis is not supported by drogon, please install the "
"hiredis library first.";
abort();
}

} // namespace nosql
} // namespace drogon
34 changes: 31 additions & 3 deletions nosql_lib/redis/inc/drogon/nosql/RedisClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ struct [[nodiscard]] RedisTransactionAwaiter

class RedisTransaction;

struct RedisConnectionInfo
{
trantor::InetAddress addr;
std::string username;
std::string password;
unsigned int db = 0;
};

/**
* @brief This class represents a redis client that contains several connections
* to a redis server.
Expand All @@ -100,11 +108,23 @@ class DROGON_EXPORT RedisClient
* @return std::shared_ptr<RedisClient>
*/
static std::shared_ptr<RedisClient> newRedisClient(
const trantor::InetAddress &serverAddress,
trantor::InetAddress serverAddress,
size_t numberOfConnections = 1,
const std::string &password = "",
std::string password = "",
unsigned int db = 0,
const std::string &username = "");
std::string username = "")
{
return newRedisClient({.addr = std::move(serverAddress),
.username = std::move(username),
.password = std::move(password),
.db = db},
numberOfConnections);
}

static std::shared_ptr<RedisClient> newRedisClient(
RedisConnectionInfo connInfo,
size_t numberOfConnections = 1);

/**
* @brief Execute a redis command
*
Expand Down Expand Up @@ -310,6 +330,14 @@ class DROGON_EXPORT RedisClient
return internal::RedisTransactionAwaiter(this);
}
#endif

const RedisConnectionInfo &connectionInfo() const
{
return connInfo_;
}

protected:
RedisConnectionInfo connInfo_;
};

class DROGON_EXPORT RedisTransaction : public RedisClient
Expand Down
30 changes: 10 additions & 20 deletions nosql_lib/redis/src/RedisClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,29 @@
#include "RedisSubscriberImpl.h"
#include "RedisTransactionImpl.h"
#include "../../lib/src/TaskTimeoutFlag.h"
#include "drogon/nosql/RedisClient.h"

using namespace drogon::nosql;

std::shared_ptr<RedisClient> RedisClient::newRedisClient(
const trantor::InetAddress &serverAddress,
size_t connectionNumber,
const std::string &password,
unsigned int db,
const std::string &username)
RedisConnectionInfo connInfo,
size_t numberOfConnections)
{
auto client = std::make_shared<RedisClientImpl>(
serverAddress, connectionNumber, username, password, db);
auto client = std::make_shared<RedisClientImpl>(std::move(connInfo),
numberOfConnections);
client->init();
return client;
}

RedisClientImpl::RedisClientImpl(const trantor::InetAddress &serverAddress,
size_t numberOfConnections,
std::string username,
std::string password,
unsigned int db)
RedisClientImpl::RedisClientImpl(RedisConnectionInfo connInfo,
size_t numberOfConnections)
: loops_(numberOfConnections < std::thread::hardware_concurrency()
? numberOfConnections
: std::thread::hardware_concurrency(),
"RedisLoop"),
serverAddr_(serverAddress),
username_(std::move(username)),
password_(std::move(password)),
db_(db),
numberOfConnections_(numberOfConnections)
{
connInfo_ = std::move(connInfo);
}

void RedisClientImpl::init()
Expand All @@ -65,8 +57,7 @@ void RedisClientImpl::init()

RedisConnectionPtr RedisClientImpl::newConnection(trantor::EventLoop *loop)
{
auto conn = std::make_shared<RedisConnection>(
serverAddr_, username_, password_, db_, loop);
auto conn = std::make_shared<RedisConnection>(connInfo_, loop);
std::weak_ptr<RedisClientImpl> thisWeakPtr = shared_from_this();
conn->setConnectCallback([thisWeakPtr](RedisConnectionPtr &&conn) {
auto thisPtr = thisWeakPtr.lock();
Expand Down Expand Up @@ -118,8 +109,7 @@ RedisConnectionPtr RedisClientImpl::newSubscribeConnection(
trantor::EventLoop *loop,
const std::shared_ptr<RedisSubscriberImpl> &subscriber)
{
auto conn = std::make_shared<RedisConnection>(
serverAddr_, username_, password_, db_, loop);
auto conn = std::make_shared<RedisConnection>(connInfo_, loop);
std::weak_ptr<RedisClientImpl> weakThis = shared_from_this();
std::weak_ptr<RedisSubscriberImpl> weakSub(subscriber);
conn->setConnectCallback([weakThis, weakSub](RedisConnectionPtr &&conn) {
Expand Down
10 changes: 1 addition & 9 deletions nosql_lib/redis/src/RedisClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ class RedisClientImpl final
public std::enable_shared_from_this<RedisClientImpl>
{
public:
RedisClientImpl(const trantor::InetAddress &serverAddress,
size_t numberOfConnections,
std::string username = "",
std::string password = "",
unsigned int db = 0);
RedisClientImpl(RedisConnectionInfo connInfo, size_t numberOfConnections);
void execCommandAsync(RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
std::string_view command,
Expand Down Expand Up @@ -84,10 +80,6 @@ class RedisClientImpl final
std::unordered_set<RedisConnectionPtr> connections_;
std::vector<RedisConnectionPtr> readyConnections_;
size_t connectionPos_{0};
const trantor::InetAddress serverAddr_;
const std::string username_;
const std::string password_;
const unsigned int db_;
const size_t numberOfConnections_;
double timeout_{-1.0};
std::list<std::shared_ptr<std::function<void(const RedisConnectionPtr &)>>>
Expand Down
24 changes: 7 additions & 17 deletions nosql_lib/redis/src/RedisClientLockFree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,13 @@
#include "../../lib/src/TaskTimeoutFlag.h"
using namespace drogon::nosql;

RedisClientLockFree::RedisClientLockFree(
const trantor::InetAddress &serverAddress,
size_t numberOfConnections,
trantor::EventLoop *loop,
std::string username,
std::string password,
unsigned int db)
: loop_(loop),
serverAddr_(serverAddress),
username_(std::move(username)),
password_(std::move(password)),
db_(db),
numberOfConnections_(numberOfConnections)
RedisClientLockFree::RedisClientLockFree(RedisConnectionInfo connInfo,
size_t numberOfConnections,
trantor::EventLoop *loop)
: loop_(loop), numberOfConnections_(numberOfConnections)
{
assert(loop_);
connInfo_ = std::move(connInfo);
for (size_t i = 0; i < numberOfConnections_; ++i)
{
loop_->queueInLoop([this]() { connections_.insert(newConnection()); });
Expand All @@ -43,8 +35,7 @@ RedisClientLockFree::RedisClientLockFree(
RedisConnectionPtr RedisClientLockFree::newConnection()
{
loop_->assertInLoopThread();
auto conn = std::make_shared<RedisConnection>(
serverAddr_, username_, password_, db_, loop_);
auto conn = std::make_shared<RedisConnection>(connInfo_, loop_);
std::weak_ptr<RedisClientLockFree> thisWeakPtr = shared_from_this();
conn->setConnectCallback([thisWeakPtr](RedisConnectionPtr &&conn) {
auto thisPtr = thisWeakPtr.lock();
Expand Down Expand Up @@ -89,8 +80,7 @@ RedisConnectionPtr RedisClientLockFree::newSubscribeConnection(
const std::shared_ptr<RedisSubscriberImpl> &subscriber)
{
loop_->assertInLoopThread();
auto conn = std::make_shared<RedisConnection>(
serverAddr_, username_, password_, db_, loop_);
auto conn = std::make_shared<RedisConnection>(connInfo_, loop_);
std::weak_ptr<RedisClientLockFree> weakThis = shared_from_this();
std::weak_ptr<RedisSubscriberImpl> weakSub(subscriber);
conn->setConnectCallback([weakThis, weakSub](RedisConnectionPtr &&conn) {
Expand Down
11 changes: 2 additions & 9 deletions nosql_lib/redis/src/RedisClientLockFree.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ class RedisClientLockFree final
public std::enable_shared_from_this<RedisClientLockFree>
{
public:
RedisClientLockFree(const trantor::InetAddress &serverAddress,
RedisClientLockFree(RedisConnectionInfo conInfo,
size_t numberOfConnections,
trantor::EventLoop *loop,
std::string username = "",
std::string password = "",
unsigned int db = 0);
trantor::EventLoop *loop);
void execCommandAsync(RedisResultCallback &&resultCallback,
RedisExceptionCallback &&exceptionCallback,
std::string_view command,
Expand Down Expand Up @@ -75,10 +72,6 @@ class RedisClientLockFree final
std::unordered_set<RedisConnectionPtr> connections_;
std::vector<RedisConnectionPtr> readyConnections_;
size_t connectionPos_{0};
const trantor::InetAddress serverAddr_;
const std::string username_;
const std::string password_;
const unsigned int db_;
const size_t numberOfConnections_;
std::list<std::shared_ptr<std::function<void(const RedisConnectionPtr &)>>>
tasks_;
Expand Down
20 changes: 8 additions & 12 deletions nosql_lib/redis/src/RedisClientManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ void RedisClientManager::createRedisClients(
assert(redisFastClientsMap_.empty());
for (auto &redisInfo : redisInfos_)
{
RedisConnectionInfo connInfo{.addr = {redisInfo.addr_, redisInfo.port_},
.username = redisInfo.username_,
.password = redisInfo.password_,
.db = redisInfo.db_};
if (redisInfo.isFast_)
{
redisFastClientsMap_[redisInfo.name_] =
Expand All @@ -37,12 +41,7 @@ void RedisClientManager::createRedisClients(
assert(idx == ioLoops[idx]->index());
LOG_TRACE << "create fast redis client for the thread " << idx;
c = std::make_shared<RedisClientLockFree>(
trantor::InetAddress(redisInfo.addr_, redisInfo.port_),
redisInfo.connectionNumber_,
ioLoops[idx],
redisInfo.username_,
redisInfo.password_,
redisInfo.db_);
connInfo, redisInfo.connectionNumber_, ioLoops[idx]);
if (redisInfo.timeout_ > 0.0)
{
c->setTimeout(redisInfo.timeout_);
Expand All @@ -51,12 +50,9 @@ void RedisClientManager::createRedisClients(
}
else
{
auto clientPtr = std::make_shared<RedisClientImpl>(
trantor::InetAddress(redisInfo.addr_, redisInfo.port_),
redisInfo.connectionNumber_,
redisInfo.username_,
redisInfo.password_,
redisInfo.db_);
auto clientPtr =
std::make_shared<RedisClientImpl>(connInfo,
redisInfo.connectionNumber_);
if (redisInfo.timeout_ > 0.0)
{
clientPtr->setTimeout(redisInfo.timeout_);
Expand Down
10 changes: 10 additions & 0 deletions nosql_lib/redis/src/RedisConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ class RedisConnection : public trantor::NonCopyable,
unsigned int db,
trantor::EventLoop *loop);

RedisConnection(const RedisConnectionInfo &connInfo,
trantor::EventLoop *loop)
: RedisConnection(connInfo.addr,
connInfo.username,
connInfo.password,
connInfo.db,
loop)
{
}

void setConnectCallback(
const std::function<void(std::shared_ptr<RedisConnection> &&)>
&callback)
Expand Down
Loading