From 9d679620fddca38336c4f85273384abbf8733688 Mon Sep 17 00:00:00 2001 From: Michael Pratt Date: Fri, 9 Aug 2024 23:59:03 -0400 Subject: [PATCH] Implement GNU jobserver posix client support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The core principle of a jobserver is simple: before starting a new job (edge in ninja-speak), a token must be acquired from an external entity as approval. Once a job is finished, the token is returned to represent a free job slot. In the case of GNU Make, this external entity is the parent process which has executed Ninja and is managing the load capacity for all subprocesses which it has spawned. Introducing client support for this model allows Ninja to give load capacity management to it's parent process, allowing it to control the number of subprocesses that Ninja spawns at any given time. This functionality is desirable when Ninja is part of a bigger build, such as Yocto/OpenEmbedded, Openwrt/Linux, Buildroot, and Android. Here, multiple compile jobs are executed in parallel in order to maximize cpu utilization, but if each compile job in Ninja uses all available cores, the system is overloaded. This implementation instantiates the client in real_main() and passes pointers to the Jobserver class into other classes. All tokens are returned whenever the CommandRunner aborts, and the current number of tokens compared to the current number of running subprocesses controls the available load capacity, used to determine how many new tokens to attempt to acquire in order to try to start another job for each loop to find work. Jobserver related functions are defined as no-op for Windows pending Windows-specific support for the jobserver. Co-authored-by: Martin Hundebøll Co-developed-by: Martin Hundebøll Signed-off-by: Martin Hundebøll Signed-off-by: Michael Pratt --- CMakeLists.txt | 5 +- configure.py | 1 + src/build.cc | 44 ++++++++++-- src/build.h | 9 ++- src/build_test.cc | 63 ++++++++-------- src/graph.h | 1 + src/jobserver-posix.cc | 160 +++++++++++++++++++++++++++++++++++++++++ src/jobserver.h | 112 +++++++++++++++++++++++++++++ src/ninja.cc | 25 +++++-- 9 files changed, 376 insertions(+), 44 deletions(-) create mode 100644 src/jobserver-posix.cc create mode 100644 src/jobserver.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b8fdee7d3a..967aa245f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -169,7 +169,10 @@ if(WIN32) # errors by telling windows.h to not define those two. add_compile_definitions(NOMINMAX) else() - target_sources(libninja PRIVATE src/subprocess-posix.cc) + target_sources(libninja PRIVATE + src/jobserver-posix.cc + src/subprocess-posix.cc + ) if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX") target_sources(libninja PRIVATE src/getopt.c) # Build getopt.c, which can be compiled as either C or C++, as C++ diff --git a/configure.py b/configure.py index c88daad508..b39cfa6b46 100755 --- a/configure.py +++ b/configure.py @@ -564,6 +564,7 @@ def has_re2c() -> bool: objs += cxx('minidump-win32', variables=cxxvariables) objs += cc('getopt') else: + objs += cxx('jobserver-posix') objs += cxx('subprocess-posix') if platform.is_aix(): objs += cc('getopt') diff --git a/src/build.cc b/src/build.cc index 7e5f790eb9..31a609c5ab 100644 --- a/src/build.cc +++ b/src/build.cc @@ -163,6 +163,16 @@ Edge* Plan::FindWork() { return NULL; Edge* work = ready_.top(); + + // Only initiate work if the jobserver client can acquire a token. + if (builder_ && builder_->jobserver_ && + builder_->jobserver_->Enabled()) { + int job_tokens = builder_->jobserver_->Tokens(); + work->job_token_ = builder_->jobserver_->Acquire(); + if (job_tokens == builder_->jobserver_->Tokens()) + return NULL; + } + ready_.pop(); return work; } @@ -199,6 +209,10 @@ bool Plan::EdgeFinished(Edge* edge, EdgeResult result, string* err) { edge->pool()->EdgeFinished(*edge); edge->pool()->RetrieveReadyEdges(&ready_); + // If jobserver is used, return the token for this job. + if (builder_ && builder_->jobserver_) + builder_->jobserver_->Release(&edge->job_token_); + // The rest of this function only applies to successful commands. if (result != kEdgeSucceeded) return true; @@ -592,14 +606,18 @@ void Plan::Dump() const { } struct RealCommandRunner : public CommandRunner { - explicit RealCommandRunner(const BuildConfig& config) : config_(config) {} + explicit RealCommandRunner(const BuildConfig& config, Jobserver* jobserver) : + config_(config), jobserver_(jobserver) {} + size_t CanRunMore() const override; bool StartCommand(Edge* edge) override; bool WaitForCommand(Result* result) override; vector GetActiveEdges() override; + void ClearJobTokens(const std::vector&) override; void Abort() override; const BuildConfig& config_; + Jobserver* jobserver_; SubprocessSet subprocs_; map subproc_to_edge_; }; @@ -612,7 +630,13 @@ vector RealCommandRunner::GetActiveEdges() { return edges; } +void RealCommandRunner::ClearJobTokens(const std::vector &edges) { + for (Edge* edge : edges) + jobserver_->Release(&edge->job_token_); +} + void RealCommandRunner::Abort() { + ClearJobTokens(GetActiveEdges()); subprocs_.Clear(); } @@ -628,6 +652,14 @@ size_t RealCommandRunner::CanRunMore() const { capacity = load_capacity; } + // When initialized, behave as if the implicit token is acquired already. + // Otherwise, this happens after a token is released but before it is replaced, + // so the base capacity is represented by job_tokens + 1 when positive. + // Add an extra loop on capacity for each job in order to get an extra token. + int job_tokens = jobserver_->Tokens(); + if (job_tokens) + capacity = abs(job_tokens) - subproc_number + 2; + if (capacity < 0) capacity = 0; @@ -667,10 +699,10 @@ bool RealCommandRunner::WaitForCommand(Result* result) { return true; } -Builder::Builder(State* state, const BuildConfig& config, BuildLog* build_log, - DepsLog* deps_log, DiskInterface* disk_interface, - Status* status, int64_t start_time_millis) - : state_(state), config_(config), plan_(this), status_(status), +Builder::Builder(State* state, const BuildConfig& config, Jobserver* jobserver, + BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface, + Status* status, int64_t start_time_millis) : state_(state), + config_(config), jobserver_(jobserver), plan_(this), status_(status), start_time_millis_(start_time_millis), disk_interface_(disk_interface), explanations_(g_explaining ? new Explanations() : nullptr), scan_(state, build_log, deps_log, disk_interface, @@ -775,7 +807,7 @@ bool Builder::Build(string* err) { if (config_.dry_run) command_runner_.reset(new DryRunCommandRunner); else - command_runner_.reset(new RealCommandRunner(config_)); + command_runner_.reset(new RealCommandRunner(config_, jobserver_)); } // We are about to start the build process. diff --git a/src/build.h b/src/build.h index 9bb0c70b5c..586d7c534d 100644 --- a/src/build.h +++ b/src/build.h @@ -24,6 +24,7 @@ #include "depfile_parser.h" #include "exit_status.h" #include "graph.h" +#include "jobserver.h" #include "util.h" // int64_t struct BuildLog; @@ -161,6 +162,7 @@ struct CommandRunner { virtual bool WaitForCommand(Result* result) = 0; virtual std::vector GetActiveEdges() { return std::vector(); } + virtual void ClearJobTokens(const std::vector&) {} virtual void Abort() {} }; @@ -187,9 +189,9 @@ struct BuildConfig { /// Builder wraps the build process: starting commands, updating status. struct Builder { - Builder(State* state, const BuildConfig& config, BuildLog* build_log, - DepsLog* deps_log, DiskInterface* disk_interface, Status* status, - int64_t start_time_millis); + Builder(State* state, const BuildConfig& config, Jobserver* jobserver, + BuildLog* build_log, DepsLog* deps_log, DiskInterface* disk_interface, + Status* status, int64_t start_time_millis); ~Builder(); /// Clean up after interrupted commands by deleting output files. @@ -224,6 +226,7 @@ struct Builder { State* state_; const BuildConfig& config_; + Jobserver* jobserver_; Plan plan_; std::unique_ptr command_runner_; Status* status_; diff --git a/src/build_test.cc b/src/build_test.cc index c84190a040..fb0167e5ae 100644 --- a/src/build_test.cc +++ b/src/build_test.cc @@ -525,6 +525,7 @@ struct FakeCommandRunner : public CommandRunner { virtual bool StartCommand(Edge* edge); virtual bool WaitForCommand(Result* result); virtual vector GetActiveEdges(); + virtual void ClearJobTokens(const std::vector&); virtual void Abort(); vector commands_ran_; @@ -535,12 +536,12 @@ struct FakeCommandRunner : public CommandRunner { struct BuildTest : public StateTestWithBuiltinRules, public BuildLogUser { BuildTest() : config_(MakeConfig()), command_runner_(&fs_), status_(config_), - builder_(&state_, config_, NULL, NULL, &fs_, &status_, 0) { + builder_(&state_, config_, NULL, NULL, NULL, &fs_, &status_, 0) { } explicit BuildTest(DepsLog* log) : config_(MakeConfig()), command_runner_(&fs_), status_(config_), - builder_(&state_, config_, NULL, log, &fs_, &status_, 0) {} + builder_(&state_, config_, NULL, NULL, log, &fs_, &status_, 0) {} virtual void SetUp() { StateTestWithBuiltinRules::SetUp(); @@ -610,7 +611,7 @@ void BuildTest::RebuildTarget(const string& target, const char* manifest, pdeps_log = &deps_log; } - Builder builder(pstate, config_, pbuild_log, pdeps_log, &fs_, &status_, 0); + Builder builder(pstate, config_, NULL, pbuild_log, pdeps_log, &fs_, &status_, 0); EXPECT_TRUE(builder.AddTarget(target, &err)); command_runner_.commands_ran_.clear(); @@ -797,7 +798,13 @@ vector FakeCommandRunner::GetActiveEdges() { return active_edges_; } +void FakeCommandRunner::ClearJobTokens(const std::vector &edges) { + for (Edge* edge : edges) + edge->job_token_ = '\0'; +} + void FakeCommandRunner::Abort() { + ClearJobTokens(GetActiveEdges()); active_edges_.clear(); } @@ -2559,7 +2566,7 @@ TEST_F(BuildWithDepsLogTest, Straightforward) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2589,7 +2596,7 @@ TEST_F(BuildWithDepsLogTest, Straightforward) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -2630,7 +2637,7 @@ TEST_F(BuildWithDepsLogTest, ObsoleteDeps) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2659,7 +2666,7 @@ TEST_F(BuildWithDepsLogTest, ObsoleteDeps) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -2695,7 +2702,7 @@ TEST_F(BuildWithDepsLogTest, DepsIgnoredInDryRun) { // The deps log is NULL in dry runs. config_.dry_run = true; - Builder builder(&state, config_, NULL, NULL, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, NULL, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2730,7 +2737,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { BuildLog::LogEntry* log_entry = NULL; { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2750,7 +2757,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { } { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2772,7 +2779,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceCondition) { } { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2811,7 +2818,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); { - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); // Run the build, out gets built, dep file is created @@ -2832,7 +2839,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Trigger the build again - "out" will rebuild since its newest input mtime (header.h) // is newer than the recorded mtime of out in the build log - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2848,7 +2855,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Trigger the build again - "out" won't rebuild since the file wasn't updated during // the previous build - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2867,7 +2874,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Rebuild. This time, long-cc will cause header.h to be updated while the build is // in progress - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2883,7 +2890,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // Rebuild. Because header.h is now in the deplog for out, it should be detectable as // a change-while-in-progress and should cause a rebuild of out. - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2899,7 +2906,7 @@ TEST_F(BuildWithDepsLogTest, TestInputMtimeRaceConditionWithDepFile) { { // This time, the header.h file was not updated during the build, so the target should // not be considered dirty. - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); @@ -2957,7 +2964,7 @@ TEST_F(BuildWithDepsLogTest, RestatDepfileDependencyDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out", &err)); ASSERT_EQ("", err); @@ -2983,7 +2990,7 @@ TEST_F(BuildWithDepsLogTest, RestatDepfileDependencyDepsLog) { ASSERT_TRUE(deps_log.Load(deps_log_file_.path(), &state, &err)); ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); command_runner_.commands_ran_.clear(); EXPECT_TRUE(builder.AddTarget("out", &err)); @@ -3016,7 +3023,7 @@ TEST_F(BuildWithDepsLogTest, DepFileOKDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("fo o.o", &err)); ASSERT_EQ("", err); @@ -3037,7 +3044,7 @@ TEST_F(BuildWithDepsLogTest, DepFileOKDepsLog) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); Edge* edge = state.edges_.back(); @@ -3087,7 +3094,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_FALSE(builder.AlreadyUpToDate()); @@ -3111,7 +3118,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_FALSE(builder.AlreadyUpToDate()); @@ -3134,7 +3141,7 @@ TEST_F(BuildWithDepsLogTest, DiscoveredDepDuringBuildChanged) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, &build_log, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, &build_log, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); EXPECT_TRUE(builder.AlreadyUpToDate()); @@ -3162,7 +3169,7 @@ TEST_F(BuildWithDepsLogTest, DepFileDepsLogCanonicalize) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("a/b/c/d/e/fo o.o", &err)); ASSERT_EQ("", err); @@ -3185,7 +3192,7 @@ TEST_F(BuildWithDepsLogTest, DepFileDepsLogCanonicalize) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); state.GetNode("bar.h", 0)->MarkDirty(); // Mark bar.h as missing. @@ -4264,7 +4271,7 @@ TEST_F(BuildWithDepsLogTest, ValidationThroughDepfile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); @@ -4300,7 +4307,7 @@ TEST_F(BuildWithDepsLogTest, ValidationThroughDepfile) { ASSERT_TRUE(deps_log.OpenForWrite(deps_log_file_.path(), &err)); ASSERT_EQ("", err); - Builder builder(&state, config_, NULL, &deps_log, &fs_, &status_, 0); + Builder builder(&state, config_, NULL, NULL, &deps_log, &fs_, &status_, 0); builder.command_runner_.reset(&command_runner_); EXPECT_TRUE(builder.AddTarget("out2", &err)); diff --git a/src/graph.h b/src/graph.h index 314c44296a..3668b22280 100644 --- a/src/graph.h +++ b/src/graph.h @@ -227,6 +227,7 @@ struct Edge { bool deps_loaded_ = false; bool deps_missing_ = false; bool generated_by_dep_loader_ = false; + unsigned char job_token_ = '\0'; TimeStamp command_start_time_ = 0; const Rule& rule() const { return *rule_; } diff --git a/src/jobserver-posix.cc b/src/jobserver-posix.cc new file mode 100644 index 0000000000..f983248825 --- /dev/null +++ b/src/jobserver-posix.cc @@ -0,0 +1,160 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "jobserver.h" + +#include +#include + +#include +#include + +#include "util.h" + +// Declare complete type for static constants in class. +constexpr char const Jobserver::kAuthKey[]; +constexpr char const Jobserver::kFdsKey[]; +constexpr char const Jobserver::kFifoKey[]; + +PosixJobserverClient::PosixJobserverClient() { + assert(!Enabled()); + + // Set name, type of pipe, and if non-parallel from MAKEFLAGS. + Parse(); + + const char* jobserver = jobserver_name_.c_str(); + + // Warn if jobserver type is unknown (neither fifo nor pipe). + if (!jobserver_fifo_ && sscanf(jobserver, "%d,%d", &rfd_, &wfd_) != 2) + if (!jobserver_name_.empty()) + Warning("invalid jobserver auth: '%s'", jobserver); + + // Open FDs to the pipe if needed, read must be non-blocking. + // If passed FDs are blocking on read, force non-parallel build. + if (jobserver_fifo_) { + rfd_ = open(jobserver + strlen(kFifoKey), O_RDONLY | O_NONBLOCK); + wfd_ = open(jobserver + strlen(kFifoKey), O_WRONLY); + } else if (Enabled() && (fcntl(rfd_, F_GETFL) & O_NONBLOCK) == 0) { + jobserver_closed_ = true; + } + + // Exit on failure to open FDs, build non-parallel for invalid passed FDs. + if (Enabled()) + Info("using jobserver: %s", jobserver); + else if (jobserver_fifo_ && (rfd_ == -1 || wfd_ == -1)) + Fatal("failed to open jobserver: %s: %s", jobserver, strerror(errno)); + else if (!jobserver_name_.empty()) + jobserver_closed_ = true; + + // Signal that we have initialized but do not have a token yet. + if (Enabled()) + token_count_ = -1; +} + +void PosixJobserverClient::Parse() { + // Return early if no makeflags are passed in the environment. + const char* makeflags = std::getenv("MAKEFLAGS"); + if (makeflags == nullptr || strlen(makeflags) == 0) + return; + + std::string::size_type flag_char = 0; + std::string flag; + std::vector flags; + + // Tokenize string to characters in flag, then words in flags. + while (flag_char < strlen(makeflags)) { + while (flag_char < strlen(makeflags) && + !isblank(static_cast(makeflags[flag_char]))) { + flag.push_back(static_cast(makeflags[flag_char])); + flag_char++; + } + + if (!flag.empty()) + flags.push_back(flag); + + flag.clear(); + flag_char++; + } + + // --jobserver-auth= + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find(kAuthKey) == 0) + flag = flags[n].substr(strlen(kAuthKey)); + + // --jobserver-fds= + if (flag.empty()) + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find(kFdsKey) == 0) + flag = flags[n].substr(strlen(kFdsKey)); + + // -j 1 + if (flag.empty()) + for (size_t n = 0; n < flags.size(); n++) + if (flags[n].find("-j") == 0) + jobserver_closed_ = true; + + // Check for fifo pipe. + if (flag.find(kFifoKey) == 0) + jobserver_fifo_ = true; + + jobserver_name_.assign(flag); +} + +bool PosixJobserverClient::Enabled() const { + return (rfd_ >= 0 && wfd_ >= 0) || jobserver_closed_; +} + +unsigned char PosixJobserverClient::Acquire() { + unsigned char token = '\0'; + + // The first token is implicitly handed to a process. + // Fallback to non-parallel if jobserver-capable parent has no pipe. + if (token_count_ <= 0 || jobserver_closed_) { + token_count_ = 1; + return token; + } + + int ret = read(rfd_, &token, 1); + if (ret < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + jobserver_closed_ = true; + if (!jobserver_fifo_) + Warning("pipe closed: %d (mark the command as recursive)", rfd_); + else + Fatal("failed to read from jobserver: %d: %s", rfd_, strerror(errno)); + } + + if (ret > 0) + token_count_++; + + return token; +} + +void PosixJobserverClient::Release(unsigned char* token) { + if (token_count_ < 0) + token_count_ = 0; + if (token_count_ > 0) + token_count_--; + + // The first token is implicitly handed to a process. + // Writing is not possible if the pipe is closed. + if (*token == '\0' || jobserver_closed_) + return; + + int ret = write(wfd_, token, 1); + if (ret != 1) { + Fatal("failed to write to jobserver: %d: %s", wfd_, strerror(errno)); + } + + *token = '\0'; +} diff --git a/src/jobserver.h b/src/jobserver.h new file mode 100644 index 0000000000..c60751f365 --- /dev/null +++ b/src/jobserver.h @@ -0,0 +1,112 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +/// The GNU jobserver limits parallelism by assigning a token from an external +/// pool for each command. On posix systems, the pool is a fifo or simple pipe +/// with N characters. On windows systems, the pool is a semaphore initialized +/// to N. When a command is finished, the acquired token is released by writing +/// it back to the fifo or pipe or by increasing the semaphore count. +/// +/// The jobserver functionality is enabled by passing --jobserver-auth= +/// (previously --jobserver-fds= in older versions of Make) in the MAKEFLAGS +/// environment variable and creating the respective file descriptors or objects. +/// On posix systems, is 'fifo:' or ',' for pipes. +/// On windows systems, is the name of the semaphore. +/// +/// The classes parse the MAKEFLAGS variable and opens an object handle if needed. +/// Once enabled, Acquire() must be called to acquire a token from the pool. +/// If a token is acquired, a new command can be started. +/// Once the command is completed, Release() must be called to return a token. +/// The token server does not care in which order a token is received. +struct Jobserver { + /// Return current token count or initialization signal if negative. + int Tokens() const { return token_count_; } + + /// Read MAKEFLAGS environment variable and process the jobserver flag value. + virtual void Parse() {}; + + /// Return true if jobserver functionality is enabled and initialized. + virtual bool Enabled() const { return false; } + + /// Implementation-specific method to acquire a token from the external pool + /// which is called for all tokens but returns early for the first token. + /// This method is called every time Ninja needs to start a command process. + /// Return a non-NUL char value on success (token acquired), and '\0' on failure. + /// First call always succeeds. Ninja is aborted on read errors from a fifo pipe. + /// The return is the token character to be saved for release after work is done. + virtual unsigned char Acquire() { return '\0'; } + + /// Implementation-specific method to release a token to the external pool + /// which is called for all tokens but returns early for the last token. + /// The parameter is the token returned by Acquire() to be sent to the token server. + /// A token with the default value of '\0' will not be sent to the token server. + /// After sent, the token that the pointer parameter points to is cleared to '\0'. + /// It must be called for each successful call to Acquire() after the command exits, + /// even if subprocesses fail or in the case of errors causing Ninja to exit. + /// Ninja is aborted on write errors, and calls always decrement token count. + virtual void Release(unsigned char*) {}; + + protected: + /// The number of currently acquired tokens, or a status signal if negative. + /// This is used to estimate the load capacity for attempting to start a new job, + /// and when the implicit (first) token has been acquired (initialization). + /// -1: initialized without a token + /// 0: uninitialized or disabled + /// +n: number of tokens in use + int token_count_ = 0; + + /// Whether a pipe to the jobserver token pool is closed + /// when it is expected to be open based on MAKEFLAGS + /// (e.g. subcommands not marked recursive, environment passed), + /// or the pipe is closed when expected to be closed + /// but when the parent process is jobserver-capable + /// (e.g. the parent jobserver process build is non-parallel). + bool jobserver_closed_ = false; + + /// String of the parsed value of the jobserver flag passed to environment. + std::string jobserver_name_; + + /// Substrings for parsing MAKEFLAGS environment variable. + static constexpr char const kAuthKey[] = "--jobserver-auth="; + static constexpr char const kFdsKey[] = "--jobserver-fds="; + static constexpr char const kFifoKey[] = "fifo:"; +}; + +struct PosixJobserverClient : public Jobserver { + /// Parse the MAKEFLAGS environment variable to receive the path / FDs + /// of the token pool, and open the handle to the pool if it is an object. + /// If a jobserver argument is found in the MAKEFLAGS environment variable, + /// and the handle is successfully opened, later calls to Enable() return true. + /// If a jobserver argument is found, but the handle fails to be opened, + /// the ninja process is aborted with an error, or, when the FDs provided are bad + /// the build falls back to non-parallel building and the client does not read. + explicit PosixJobserverClient(); + + void Parse() override; + bool Enabled() const override; + unsigned char Acquire() override; + void Release(unsigned char*) override; + + private: + /// Whether the type of jobserver pipe supplied to ninja is named. + bool jobserver_fifo_ = false; + + /// File descriptors to communicate with upstream jobserver token pool. + int rfd_ = -1; + int wfd_ = -1; +}; diff --git a/src/ninja.cc b/src/ninja.cc index 2902359f15..975780e3f0 100644 --- a/src/ninja.cc +++ b/src/ninja.cc @@ -84,8 +84,8 @@ struct Options { /// The Ninja main() loads up a series of data structures; various tools need /// to poke into these, so store them as fields on an object. struct NinjaMain : public BuildLogUser { - NinjaMain(const char* ninja_command, const BuildConfig& config) : - ninja_command_(ninja_command), config_(config), + NinjaMain(const char* ninja_command, const BuildConfig& config, Jobserver* jobserver) : + ninja_command_(ninja_command), config_(config), jobserver_(jobserver), start_time_millis_(GetTimeMillis()) {} /// Command line used to run Ninja. @@ -94,6 +94,9 @@ struct NinjaMain : public BuildLogUser { /// Build configuration set from flags (e.g. parallelism). const BuildConfig& config_; + /// Client for jobserver to allow a parent process to control parallelism. + Jobserver* jobserver_; + /// Loaded state (rules, nodes). State state_; @@ -267,7 +270,8 @@ bool NinjaMain::RebuildManifest(const char* input_file, string* err, if (!node) return false; - Builder builder(&state_, config_, &build_log_, &deps_log_, &disk_interface_, + Builder builder(&state_, config_, jobserver_, + &build_log_, &deps_log_, &disk_interface_, status, start_time_millis_); if (!builder.AddTarget(node, err)) return false; @@ -1355,7 +1359,8 @@ int NinjaMain::RunBuild(int argc, char** argv, Status* status) { disk_interface_.AllowStatCache(g_experimental_statcache); - Builder builder(&state_, config_, &build_log_, &deps_log_, &disk_interface_, + Builder builder(&state_, config_, jobserver_, + &build_log_, &deps_log_, &disk_interface_, status, start_time_millis_); for (size_t i = 0; i < targets.size(); ++i) { if (!builder.AddTarget(targets[i], &err)) { @@ -1542,6 +1547,14 @@ NORETURN void real_main(int argc, char** argv) { Status* status = Status::factory(config); + // Client of jobserver to manage job slots assigned to ninja. +// TODO: jobserver client support for Windows +#ifdef _WIN32 + Jobserver jobserver; +#else + PosixJobserverClient jobserver; +#endif + if (options.working_dir) { // The formatting of this string, complete with funny quotes, is // so Emacs can properly identify that the cwd has changed for @@ -1558,14 +1571,14 @@ NORETURN void real_main(int argc, char** argv) { if (options.tool && options.tool->when == Tool::RUN_AFTER_FLAGS) { // None of the RUN_AFTER_FLAGS actually use a NinjaMain, but it's needed // by other tools. - NinjaMain ninja(ninja_command, config); + NinjaMain ninja(ninja_command, config, &jobserver); exit((ninja.*options.tool->func)(&options, argc, argv)); } // Limit number of rebuilds, to prevent infinite loops. const int kCycleLimit = 100; for (int cycle = 1; cycle <= kCycleLimit; ++cycle) { - NinjaMain ninja(ninja_command, config); + NinjaMain ninja(ninja_command, config, &jobserver); ManifestParserOptions parser_opts; if (options.phony_cycle_should_err) {