From ad3b06db9f9a5027cb2a9de24f5807918776a8ca Mon Sep 17 00:00:00 2001 From: "He, Wanchen" Date: Tue, 23 May 2023 18:24:20 +0800 Subject: [PATCH] Add ControlBlock. --- nosql_lib/redis/src/RedisConnection.cc | 123 ++++++++++++++++++------- nosql_lib/redis/src/RedisConnection.h | 9 +- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/nosql_lib/redis/src/RedisConnection.cc b/nosql_lib/redis/src/RedisConnection.cc index 0a680e387e..eb2dd4e675 100644 --- a/nosql_lib/redis/src/RedisConnection.cc +++ b/nosql_lib/redis/src/RedisConnection.cc @@ -37,6 +37,24 @@ RedisConnection::RedisConnection(const trantor::InetAddress &serverAddress, loop_->queueInLoop([this]() { startConnectionInLoop(); }); } +struct ControlBlock +{ + std::weak_ptr weakConn; + std::shared_ptr channel; + trantor::InetAddress addr; +}; + +RedisConnection::~RedisConnection() +{ + LOG_TRACE << "status_: " << (int)status_; + if (redisContext_ && status_ != ConnectStatus::kEnd) + { + auto context = redisContext_; + redisContext_ = nullptr; + loop_->queueInLoop([context]() { redisAsyncDisconnect(context); }); + } +} + void RedisConnection::startConnectionInLoop() { loop_->assertInLoopThread(); @@ -62,19 +80,32 @@ void RedisConnection::startConnectionInLoop() return; } + channel_ = std::make_shared(loop_, redisContext_->c.fd); + channel_->setReadCallback([this]() { handleRedisRead(); }); + channel_->setWriteCallback([this]() { handleRedisWrite(); }); + + auto *cb = new ControlBlock(); + cb->weakConn = weak_from_this(); + cb->channel = channel_; + cb->addr = serverAddr_; + redisContext_->ev.addWrite = addWrite; redisContext_->ev.delWrite = delWrite; redisContext_->ev.addRead = addRead; redisContext_->ev.delRead = delRead; redisContext_->ev.cleanup = cleanup; - redisContext_->ev.data = this; + redisContext_->ev.data = cb; - channel_ = std::make_unique(loop_, redisContext_->c.fd); - channel_->setReadCallback([this]() { handleRedisRead(); }); - channel_->setWriteCallback([this]() { handleRedisWrite(); }); redisAsyncSetConnectCallback( redisContext_, [](const redisAsyncContext *context, int status) { - auto thisPtr = static_cast(context->ev.data); + auto *cb = static_cast(context->ev.data); + auto thisPtr = cb->weakConn.lock(); + if (!thisPtr) + { + // TODO? + LOG_ERROR << "RedisConnection destruct unexpectedly!"; + return; + } if (status != REDIS_OK) { LOG_ERROR << "Failed to connect to " @@ -224,14 +255,21 @@ void RedisConnection::startConnectionInLoop() }); redisAsyncSetDisconnectCallback( redisContext_, [](const redisAsyncContext *context, int /*status*/) { - auto thisPtr = static_cast(context->ev.data); - + auto *cb = static_cast(context->ev.data); + auto thisPtr = cb->weakConn.lock(); + if (!thisPtr) + { + LOG_TRACE << "Disconnected from " << cb->addr.toIpPort() + << ", no more reconnect because RedisConnection has " + "been destructed"; + delete cb; + return; + } thisPtr->handleDisconnect(); if (thisPtr->disconnectCallback_) { thisPtr->disconnectCallback_(thisPtr->shared_from_this()); } - LOG_TRACE << "Disconnected from " << thisPtr->serverAddr_.toIpPort(); }); @@ -260,35 +298,44 @@ void RedisConnection::handleDisconnect() redisContext_->ev.addRead = nullptr; redisContext_->ev.delRead = nullptr; redisContext_->ev.cleanup = nullptr; + delete (ControlBlock *)redisContext_->ev.data; redisContext_->ev.data = nullptr; } void RedisConnection::addWrite(void *userData) { - auto thisPtr = static_cast(userData); - assert(thisPtr->channel_); - thisPtr->channel_->enableWriting(); + auto *cb = static_cast(userData); + assert(cb->channel); + cb->channel->enableWriting(); } void RedisConnection::delWrite(void *userData) { - auto thisPtr = static_cast(userData); - assert(thisPtr->channel_); - thisPtr->channel_->disableWriting(); + auto *cb = static_cast(userData); + assert(cb->channel); + cb->channel->disableWriting(); } void RedisConnection::addRead(void *userData) { - auto thisPtr = static_cast(userData); - assert(thisPtr->channel_); - thisPtr->channel_->enableReading(); + auto *cb = static_cast(userData); + assert(cb->channel); + cb->channel->enableReading(); } void RedisConnection::delRead(void *userData) { - auto thisPtr = static_cast(userData); - assert(thisPtr->channel_); - thisPtr->channel_->disableReading(); + auto *cb = static_cast(userData); + assert(cb->channel); + cb->channel->disableReading(); } -void RedisConnection::cleanup(void * /*userData*/) +void RedisConnection::cleanup(void *userData) { - LOG_TRACE << "cleanup"; + LOG_TRACE << "RedisConnection::cleanup"; + auto *cb = static_cast(userData); + if (cb) + { + assert(cb->channel); + cb->channel->remove(); + // cleanup() should only update socket status + // should not delete cb here + } } void RedisConnection::handleRedisRead() @@ -318,8 +365,12 @@ void RedisConnection::sendCommandInLoop( redisAsyncFormattedCommand( redisContext_, [](redisAsyncContext *context, void *r, void * /*userData*/) { - auto thisPtr = static_cast(context->ev.data); - thisPtr->handleResult(static_cast(r)); + auto *cb = static_cast(context->ev.data); + auto thisPtr = cb->weakConn.lock(); + if (thisPtr) + { + thisPtr->handleResult(static_cast(r)); + } }, nullptr, command.c_str(), @@ -405,10 +456,14 @@ void RedisConnection::sendSubscribeInLoop( redisAsyncFormattedCommand( redisContext_, [](redisAsyncContext *context, void *r, void *subCtx) { - auto thisPtr = static_cast(context->ev.data); - thisPtr->handleSubscribeResult(static_cast(r), - static_cast( - subCtx)); + auto *cb = static_cast(context->ev.data); + auto thisPtr = cb->weakConn.lock(); + if (thisPtr) + { + thisPtr->handleSubscribeResult(static_cast(r), + static_cast( + subCtx)); + } }, subCtx.get(), subCtx->subscribeCommand().c_str(), @@ -427,10 +482,14 @@ void RedisConnection::sendUnsubscribeInLoop( redisAsyncFormattedCommand( redisContext_, [](redisAsyncContext *context, void *r, void *subCtx) { - auto thisPtr = static_cast(context->ev.data); - thisPtr->handleSubscribeResult(static_cast(r), - static_cast( - subCtx)); + auto *cb = static_cast(context->ev.data); + auto thisPtr = cb->weakConn.lock(); + if (thisPtr) + { + thisPtr->handleSubscribeResult(static_cast(r), + static_cast( + subCtx)); + } }, subCtx.get(), subCtx->unsubscribeCommand().c_str(), diff --git a/nosql_lib/redis/src/RedisConnection.h b/nosql_lib/redis/src/RedisConnection.h index d37136fae4..712bd83396 100644 --- a/nosql_lib/redis/src/RedisConnection.h +++ b/nosql_lib/redis/src/RedisConnection.h @@ -47,6 +47,7 @@ class RedisConnection : public trantor::NonCopyable, const std::string &password, unsigned int db, trantor::EventLoop *loop); + ~RedisConnection(); void setConnectCallback( const std::function &&)> &callback) @@ -149,12 +150,6 @@ class RedisConnection : public trantor::NonCopyable, void sendSubscribe(const std::shared_ptr &subCtx); void sendUnsubscribe(const std::shared_ptr &subCtx); - ~RedisConnection() - { - LOG_TRACE << (int)status_; - if (redisContext_ && status_ != ConnectStatus::kEnd) - redisAsyncDisconnect(redisContext_); - } void disconnect(); void sendCommand(RedisResultCallback &&resultCallback, RedisExceptionCallback &&exceptionCallback, @@ -181,7 +176,7 @@ class RedisConnection : public trantor::NonCopyable, const std::string password_; const unsigned int db_; trantor::EventLoop *loop_{nullptr}; - std::unique_ptr channel_{nullptr}; + std::shared_ptr channel_{nullptr}; std::function &&)> connectCallback_; std::function &&)> disconnectCallback_;