Skip to content

Commit

Permalink
Make async_logger::flush() synchronous and wait for the flush to comp…
Browse files Browse the repository at this point in the history
…lete

- require at least c++14 to build
  • Loading branch information
walkerlala committed Mar 22, 2024
1 parent 73e2e02 commit 4e38aa9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 19 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ if(SPDLOG_USE_STD_FORMAT)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
elseif(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()

Expand Down
20 changes: 12 additions & 8 deletions include/spdlog/async_logger-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ SPDLOG_LOGGER_CATCH(msg.source)

// send flush request to the thread pool
SPDLOG_INLINE void spdlog::async_logger::flush_(){
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
pool_ptr->post_flush(shared_from_this(), overflow_policy_);
}
else {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
}
}
SPDLOG_LOGGER_CATCH(source_loc())
SPDLOG_TRY {
auto pool_ptr = thread_pool_.lock();
if (!pool_ptr) {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
}

std::future<void> future = pool_ptr->post_flush(shared_from_this(), overflow_policy_);
// Wait for the flush operation to complete.
// This might throw exception if the flush message get dropped because of overflow.
future.get();
}
SPDLOG_LOGGER_CATCH(source_loc())
}

//
Expand Down
27 changes: 20 additions & 7 deletions include/spdlog/details/thread_pool-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#endif

#include <cassert>
#include <memory>
#include <spdlog/common.h>

namespace spdlog {
Expand Down Expand Up @@ -62,9 +63,13 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
post_async_msg_(std::move(async_m), overflow_policy);
}

void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
std::future<void> SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
std::unique_ptr<std::promise<void>> promise = std::make_unique<std::promise<void>>();
std::future<void> future = promise->get_future();
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)),
overflow_policy);
return future;
}

size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
Expand Down Expand Up @@ -101,26 +106,34 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() {
async_msg incoming_async_msg;
q_.dequeue(incoming_async_msg);

bool active = true;
switch (incoming_async_msg.msg_type) {
case async_msg_type::log: {
incoming_async_msg.worker_ptr->backend_sink_it_(incoming_async_msg);
return true;
active = true;
break;
}
case async_msg_type::flush: {
incoming_async_msg.worker_ptr->backend_flush_();
return true;
active = true;
break;
}

case async_msg_type::terminate: {
return false;
active = false;
break;
}

default: {
assert(false);
}
}

return true;
if (incoming_async_msg.promise) {
incoming_async_msg.promise->set_value();
}

return active;
}

} // namespace details
Expand Down
17 changes: 14 additions & 3 deletions include/spdlog/details/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <chrono>
#include <functional>
#include <future>
#include <memory>
#include <thread>
#include <vector>
Expand All @@ -27,6 +28,7 @@ enum class async_msg_type { log, flush, terminate };
struct async_msg : log_msg_buffer {
async_msg_type msg_type{async_msg_type::log};
async_logger_ptr worker_ptr;
std::unique_ptr<std::promise<void>> promise;

async_msg() = default;
~async_msg() = default;
Expand Down Expand Up @@ -56,12 +58,20 @@ struct async_msg : log_msg_buffer {
async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
: log_msg_buffer{m},
msg_type{the_type},
worker_ptr{std::move(worker)} {}
worker_ptr{std::move(worker)},
promise{nullptr} {}

async_msg(async_logger_ptr &&worker, async_msg_type the_type)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)} {}
worker_ptr{std::move(worker)},
promise{nullptr} {}

async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::unique_ptr<std::promise<void>> promise)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)},
promise{std::move(promise)} {}

explicit async_msg(async_msg_type the_type)
: async_msg{nullptr, the_type} {}
Expand All @@ -88,7 +98,8 @@ class SPDLOG_API thread_pool {
void post_log(async_logger_ptr &&worker_ptr,
const details::log_msg &msg,
async_overflow_policy overflow_policy);
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy);
std::future<void> post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy);
size_t overrun_counter();
void reset_overrun_counter();
size_t discard_counter();
Expand Down
45 changes: 45 additions & 0 deletions tests/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "spdlog/sinks/basic_file_sink.h"
#include "test_sink.h"

#include <iostream>

#define TEST_FILENAME "test_logs/async_test.log"

TEST_CASE("basic async test ", "[async]") {
Expand Down Expand Up @@ -93,6 +95,49 @@ TEST_CASE("flush", "[async]") {
REQUIRE(test_sink->flush_counter() == 1);
}

TEST_CASE("multithread flush", "[async]") {
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
size_t queue_size = 2;
size_t messages = 10;
size_t n_threads = 10;
size_t flush_count = 1024;
std::mutex mtx;
std::vector<std::string> errmsgs;
{
auto tp = std::make_shared<spdlog::details::thread_pool>(queue_size, 1);
auto logger = std::make_shared<spdlog::async_logger>(
"as", test_sink, tp, spdlog::async_overflow_policy::discard_new);

logger->set_error_handler([&](const std::string &err) {
std::unique_lock<std::mutex> lock(mtx);
errmsgs.push_back(err);
});

for (size_t i = 0; i < messages; i++) {
logger->info("Hello message #{}", i);
}

std::vector<std::thread> threads;
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([logger, flush_count] {
for (size_t j = 0; j < flush_count; j++) {
// flush does not throw exception even if failed.
// Instead, the error handler is invoked.
logger->flush();
}
});
}

for (auto &t : threads) {
t.join();
}
}
REQUIRE(test_sink->flush_counter() >= 1);
REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count);
REQUIRE(errmsgs.size() >= 1);
REQUIRE(errmsgs[0] == "std::future_error: Broken promise");
}

TEST_CASE("async periodic flush", "[async]") {
auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as");
auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]);
Expand Down

0 comments on commit 4e38aa9

Please sign in to comment.