From 8fb3a1659ee5a9d415696c2adb11904e40b8ed53 Mon Sep 17 00:00:00 2001 From: rcouto Date: Tue, 3 Aug 2021 12:03:03 -0400 Subject: [PATCH 1/4] remove async --- async/async_error.go | 48 --- async/async_error_test.go | 77 ----- async/mailbox.go | 121 ------- async/mailbox_test.go | 96 ------ async/runner.go | 72 ----- async/runner_test.go | 41 --- common/stats/stats_names.go | 5 - scheduler/server/stateful_scheduler.go | 331 +++++++++----------- scheduler/server/stateful_scheduler_test.go | 4 +- 9 files changed, 158 insertions(+), 637 deletions(-) delete mode 100644 async/async_error.go delete mode 100644 async/async_error_test.go delete mode 100644 async/mailbox.go delete mode 100644 async/mailbox_test.go delete mode 100644 async/runner.go delete mode 100644 async/runner_test.go diff --git a/async/async_error.go b/async/async_error.go deleted file mode 100644 index 9c34c8c02..000000000 --- a/async/async_error.go +++ /dev/null @@ -1,48 +0,0 @@ -package async - -// AsyncError is an async value that will eventually return an error -// It is similar to a Promise/Future which returns an error -// The value is supplied by calling SetValue. -// Once the value is supplied AsyncError is considered completed -// The value can be retrieved once AsyncError is completed via TryGetValue -type AsyncError struct { - errCh chan error - val error - completed bool -} - -func newAsyncError() *AsyncError { - return &AsyncError{ - errCh: make(chan error, 1), - } -} - -// Sets the value for the AsyncError. Marks AsyncError as Completed or Fulfilled. -// This method should only ever be called once per AsyncError instance. -// Calling this method more than once will panic -func (e *AsyncError) SetValue(err error) { - e.errCh <- err - close(e.errCh) -} - -// Returns the Status of this AsyncError: -// Completed(true) or Pending(false) -// and the value of the AsyncError if it is Completed. -// -// The returned bool is true if Completed, false if Pending -// If Completed the returned error is the Value of this AsyncError -// If AsyncError is not completed the returned error is nil. -func (e *AsyncError) TryGetValue() (bool, error) { - if e.completed { - return true, e.val - } else { - select { - case err := <-e.errCh: - e.val = err - e.completed = true - return true, err - default: - return false, nil - } - } -} diff --git a/async/async_error_test.go b/async/async_error_test.go deleted file mode 100644 index e8febdfb1..000000000 --- a/async/async_error_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package async - -import ( - "errors" - "testing" -) - -// Verify that TryGetValue returns false for an uncompleted AsyncError -func TestAsyncError_NotCompleted(t *testing.T) { - err := newAsyncError() - ok, retErr := err.TryGetValue() - - if ok { - t.Error("Expected TryGetValue to return false for uncompleted AsyncError") - } - if retErr != nil { - t.Error("Expected TryGetValue to return nil for uncompleted AsyncError") - } -} - -// Verify that TryGetValue returns true for a completed AsyncError and the -// supplied error it was completed with -func TestAsyncError_Completed(t *testing.T) { - err := newAsyncError() - testErr := errors.New("Test Error!") - err.SetValue(testErr) - ok, retErr := err.TryGetValue() - - if !ok { - t.Error("Expected TryGetValue to return true for completed AsyncError") - } - - if retErr == nil { - t.Error("Expected TryGetValue to return an error for completed AsyncError") - } - - if retErr.Error() != testErr.Error() { - t.Errorf("Expected returned error {%v} to be the same as SetValue error {%v}", - retErr.Error(), testErr.Error()) - } - - // verify it can be called multiple times, and returns the result - ok, retErr = err.TryGetValue() - if !ok { - t.Error("Expected calling TryGetValue to return true") - } - - if retErr == nil { - t.Error("Expected TryGetValue to return an error for completed AsyncError") - } -} - -func TestAsyncError_CompletedNilError(t *testing.T) { - err := newAsyncError() - err.SetValue(nil) - ok, retErr := err.TryGetValue() - - if !ok { - t.Error("Expected TryGetValue to return true for completed AsyncError") - } - - if retErr != nil { - t.Error("Expected TryGetValue to return an error of nil completed AsyncError") - } -} - -func TestAsyncError_CallingSetValueMoreThanOncePanics(t *testing.T) { - err := newAsyncError() - err.SetValue(nil) - - defer func() { - if r := recover(); r != nil { - } - }() - err.SetValue(nil) - t.Errorf("Expected calling SetValue twice to cause a panic") -} diff --git a/async/mailbox.go b/async/mailbox.go deleted file mode 100644 index 0ea6f7cc8..000000000 --- a/async/mailbox.go +++ /dev/null @@ -1,121 +0,0 @@ -package async - -// An AsyncMailbox stores AsyncErrors and their associated callbacks -// and invokes them once the AsyncError is completed -// -// Often times we may spawn go routines in an event loop to do some concurrent work, -// go routines provide no way to return a response, however we may want -// to be notified if the work the go routine was doing completed successfully -// or unsuccessfully, and then take some action based on that result. -// AsyncMailbox provides a construct to do this. -// -// The below example is a storeValue function, which tries to store -// A value durably. A value is considered durably stored if it successfully -// writes to two of three replicas. We want to write to all replicas in parallel -// and return as soon as two writes succeed. We return an error if < 2 writes succeed -// -// func storeValue(num int) error { -// successfulWrites := 0 -// returnedWrites := 0 -// mailbox := NewAsyncMailbox() -// -// writeCallback := func (err error) { -// if err != nil { -// successfulWrites++ -// } -// returnedWrites++ -// } -// -// // Send to Replica One -// go func(rsp *AsyncError){ -// rsp.SetValue(write(num, "replicaOne")) -// }(mailbox.NewAsyncError(writeCallback)) -// -// // Send to Replica Two -// go func(rsp *AsyncError){ -// rsp.SetValue(write(num, "replicaTwo")) -// }(mailbox.NewAsyncError(writeCallback)) -// -// // Send to Replica Three -// go func(rsp *AsyncError){ -// rsp.SetValue(write(num, "replicaThree")) -// }(mailbox.NewAsyncError(writeCallback)) -// -// // Value is Considered Durably Stored if at least two write calls succeeded -// for sucessfullWrites < 2 && returnedWrites < 3 { -// mailbox.ProcessMessages() -// } -// -// if successfulWrites >= 2 { -// return nil -// } else { -// return errors.New("Could Not Durably Store Value") -// } -// -// // a function which makes a call to a durable register -// // which is accessed via the network -// func write (num int, address string) error { ... } -// -// A Mailbox is not a concurrent structure and should only -// ever be accessed from a single go routine. This ensures that the callbacks -// are always executed within the same context and only one at a time. -// A Mailbox for keeping track of in progress AsyncMessages. -// This structure is not thread-safe. -type Mailbox struct { - msgs []message -} - -// The function type of the callback invoked when an AsyncError is Completed -type AsyncErrorResponseHandler func(error) - -// async message is a struct composed of an AsyncError -// and its associated callback -type message struct { - Err *AsyncError - callback AsyncErrorResponseHandler -} - -func newMessage(cb AsyncErrorResponseHandler) message { - return message{ - Err: newAsyncError(), - callback: cb, - } -} - -func NewMailbox() *Mailbox { - return &Mailbox{ - msgs: make([]message, 0), - } -} - -func (bx *Mailbox) Count() int { - return len(bx.msgs) -} - -// Creates a NewAsyncError and associates the supplied callback with it. -// Once the AsyncError has been completed, SetValue called, the callback -// will be invoked on the next execution of ProcessMessages -func (bx *Mailbox) NewAsyncError(cb AsyncErrorResponseHandler) *AsyncError { - msg := newMessage(cb) - bx.msgs = append(bx.msgs, msg) - return msg.Err -} - -// Processes the mailbox. For all messages with completed AsyncErrors -// the callback function and removes the message from the mailbox -func (bx *Mailbox) ProcessMessages() { - var unCompletedMsgs []message - for _, msg := range bx.msgs { - ok, err := msg.Err.TryGetValue() - - // if a AsyncErr's value has been set, invoke the callback - if ok { - msg.callback(err) - } else { - unCompletedMsgs = append(unCompletedMsgs, msg) - } - } - - // reset inProgress messages to unCompletedMsgs only - bx.msgs = unCompletedMsgs -} diff --git a/async/mailbox_test.go b/async/mailbox_test.go deleted file mode 100644 index ef951a8cf..000000000 --- a/async/mailbox_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package async - -import ( - "errors" - log "github.com/sirupsen/logrus" - "testing" -) - -func Test_Mailbox(t *testing.T) { - mailbox := NewMailbox() - - cbInvoked := false - var retErr error - - asyncErr := mailbox.NewAsyncError(func(err error) { - retErr = err - cbInvoked = true - }) - - // spawn a go function that to do something - // that sets the AsyncError value when - // its completed - go func(rsp *AsyncError) { - sum := 0 - for i := 0; i < 100; i++ { - sum = sum + i - } - rsp.SetValue(errors.New("Test Error!")) - }(asyncErr) - - for !cbInvoked { - mailbox.ProcessMessages() - } - if retErr == nil { - t.Error("Expected Callback to be invoked with an error not nil") - } - if retErr.Error() != "Test Error!" { - t.Error("Expected Callback to be invoked with `Test Error!` not: ", retErr.Error()) - } -} - -// test to verify that example code for mailbox.go docs works! -func Test_MailboxExample(t *testing.T) { - err := storeValue_withMailbox(5) - if err == nil { - t.Error("expected to storeValue to complete successfully") - } -} - -// example code for mailbox.go -func storeValue_withMailbox(num int) error { - successfulWrites := 0 - returnedWrites := 0 - mailbox := NewMailbox() - - writeCallback := func(err error) { - if err != nil { - successfulWrites++ - } - returnedWrites++ - log.Info("completedWrites", returnedWrites) - } - - // Send to Replica One - go func(rsp *AsyncError) { - rsp.SetValue(write(num, "replicaOne")) - }(mailbox.NewAsyncError(writeCallback)) - - // Send to Replica Two - go func(rsp *AsyncError) { - rsp.SetValue(write(num, "replicaTwo")) - }(mailbox.NewAsyncError(writeCallback)) - - // Send to Replica Three - go func(rsp *AsyncError) { - rsp.SetValue(write(num, "replicaThree")) - }(mailbox.NewAsyncError(writeCallback)) - - // Value is Considered Durably Stored if at least two write calls succeeded - for successfulWrites < 2 && returnedWrites < 3 { - mailbox.ProcessMessages() - } - - if successfulWrites >= 2 { - return nil - } else { - return errors.New("Could Not Durably Store Value") - } -} - -// a function which makes a call to a durable register -// which is accessed via the network, fake function that always -// succeeds -func write(num int, address string) error { - return nil -} diff --git a/async/runner.go b/async/runner.go deleted file mode 100644 index 802ba9ee7..000000000 --- a/async/runner.go +++ /dev/null @@ -1,72 +0,0 @@ -// Async provides tools for asynchronous callback processing using Goroutines -package async - -// An AsyncRunner is a helper class to spawn Go Routines to run -// AsyncFunctions and to associate callbacks with them. This builds -// ontop of AsyncMailbox to make simplify the code that needs to be written. -// -// The below example is a storeValue function, which tries to store -// A value durably. A value is considered durably stored if it successfully -// writes to two of three replicas. We want to write to all replicas in parallel -// and return as soon as two writes succeed. We return an error if < 2 writes succeed -// -// func storeValue(num int) error { -// successfulWrites := 0 -// returnedWrites := 0 -// -// runner := NewAsyncRunner() -// -// writeCb := func(err error) { -// if err != nil { -// successfulWrites++ -// } -// returnedWrites++ -// } -// -// runner.RunAsync(func() error { return write(num, "replicatOne") }, writeCb) -// runner.RunAsync(func() error { return write(num, "replicatTwo") }, writeCb) -// runner.RunAsync(func() error { return write(num, "replicaThree") }, writeCb) -// -// for successfulWrites < 2 && returnedWrites < 3 { -// runner.ProcessMessages() -// } -// -// if successfulWrites >= 2 { -// return nil -// } else { -// return errors.New("Could Not Durably Store Value") -// } -// } -// // a function which makes a call to a durable register -// // which is accessed via the network -// func write (num int, address string) error { ... } -// -type Runner struct { - bx *Mailbox -} - -func NewRunner() Runner { - return Runner{ - bx: NewMailbox(), - } -} - -func (r *Runner) NumRunning() int { - return r.bx.Count() -} - -// RunAsync creates a go routine to run the specified function f. -// The callback, cb, is invoked once f is completed by calling ProcessMessages. -func (r *Runner) RunAsync(f func() error, cb AsyncErrorResponseHandler) { - asyncErr := r.bx.NewAsyncError(cb) - go func(rsp *AsyncError) { - err := f() - rsp.SetValue(err) - }(asyncErr) -} - -// Invokes all callbacks of completed asyncfunctions. -// Callbacks are ran synchronously and by the calling go routine -func (r *Runner) ProcessMessages() { - r.bx.ProcessMessages() -} diff --git a/async/runner_test.go b/async/runner_test.go deleted file mode 100644 index 1a42947a8..000000000 --- a/async/runner_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package async - -import ( - "errors" - "testing" -) - -func Test_Runner(t *testing.T) { - err := storeValue_withRunner(5) - if err == nil { - t.Error("expected to storeValue to complete successfully") - } -} - -func storeValue_withRunner(num int) error { - successfulWrites := 0 - returnedWrites := 0 - - runner := NewRunner() - - writeCb := func(err error) { - if err != nil { - successfulWrites++ - } - returnedWrites++ - } - - runner.RunAsync(func() error { return write(num, "replicatOne") }, writeCb) - runner.RunAsync(func() error { return write(num, "replicatTwo") }, writeCb) - runner.RunAsync(func() error { return write(num, "replicaThree") }, writeCb) - - for successfulWrites < 2 && returnedWrites < 3 { - runner.ProcessMessages() - } - - if successfulWrites >= 2 { - return nil - } else { - return errors.New("Could Not Durably Store Value") - } -} diff --git a/common/stats/stats_names.go b/common/stats/stats_names.go index 660a55ac8..d3ed165ca 100644 --- a/common/stats/stats_names.go +++ b/common/stats/stats_names.go @@ -199,11 +199,6 @@ const ( */ SchedPreemptedTasksCounter = "preemptedTasksCounter" - /* - the number of times the platform retried sending an end saga message - */ - SchedRetriedEndSagaCounter = "schedRetriedEndSagaCounter" - /* record the start of the scheduler server */ diff --git a/scheduler/server/stateful_scheduler.go b/scheduler/server/stateful_scheduler.go index b9aaf313f..92ee8446e 100644 --- a/scheduler/server/stateful_scheduler.go +++ b/scheduler/server/stateful_scheduler.go @@ -14,7 +14,6 @@ import ( uuid "github.com/nu7hatch/gouuid" log "github.com/sirupsen/logrus" - "github.com/twitter/scoot/async" "github.com/twitter/scoot/cloud/cluster" "github.com/twitter/scoot/common/log/hooks" "github.com/twitter/scoot/common/log/tags" @@ -151,8 +150,7 @@ type RunnerFactory func(node cluster.Node) runner.Service // so that it can make smarter scheduling decisions // // Scheduler Concurrency: The Scheduler runs an update loop in its own go routine. -// periodically the scheduler does some async work using async.Runner. The async -// work is executed in its own Go routine, nothing in async functions should read +// The async work is executed in its own Go routine, nothing in async functions should read // or modify scheduler state directly. // // The callbacks are executed as part of the scheduler loop. They therefore can @@ -161,7 +159,6 @@ type statefulScheduler struct { config *SchedulerConfig sagaCoord saga.SagaCoordinator runnerFactory RunnerFactory - asyncRunner async.Runner checkJobCh chan jobCheckMsg addJobCh chan jobAddedMsg killJobCh chan jobKillRequest @@ -321,7 +318,6 @@ func NewStatefulScheduler( config: &config, sagaCoord: sc, runnerFactory: rf, - asyncRunner: async.NewRunner(), checkJobCh: make(chan jobCheckMsg, 1), addJobCh: make(chan jobAddedMsg, 1), killJobCh: make(chan jobKillRequest, 1), // TODO - what should this value be? @@ -535,7 +531,6 @@ func (s *statefulScheduler) step() { s.clusterState.updateCluster() procMessagesLatency := s.stat.Latency(stats.SchedProcessMessagesLatency_ms).Time() - s.asyncRunner.ProcessMessages() procMessagesLatency.Stop() // TODO: make processUpdates on scheduler state wait until an update @@ -645,7 +640,6 @@ func (s *statefulScheduler) updateStats() { s.stat.Gauge(stats.SchedInProgressTasksGauge).Update(int64(remainingTasks)) s.stat.Gauge(stats.SchedNumRunningTasksGauge).Update(int64(runningTasks)) s.stat.Gauge(stats.SchedNumWaitingTasksGauge).Update(int64(waitingTasks)) - s.stat.Gauge(stats.SchedNumAsyncRunnersGauge).Update(int64(s.asyncRunner.NumRunning())) //TODO remove when done debugging // print internal data structure sizes var lbs *LoadBasedAlg = s.config.SchedAlg.(*LoadBasedAlg) @@ -659,7 +653,6 @@ func (s *statefulScheduler) updateStats() { s.stat.Gauge(stats.SchedRequestorHistorySize).Update(int64(s.requestorHistory.Len())) s.stat.Gauge(stats.SchedTaskDurationsSize).Update(int64(s.taskDurations.Len())) s.stat.Gauge(stats.SchedSagasSize).Update(int64(s.sagaCoord.GetNumSagas())) - s.stat.Gauge(stats.SchedRunnersSize).Update(int64(s.asyncRunner.NumRunning())) } // getSchedTaskStartTimeMapSize get the number of running tasks being tracked by tasksByJobClassAndStartTimeSec map @@ -862,40 +855,29 @@ func (s *statefulScheduler) checkForCompletedJobs() { // mark job as being completed jobState.EndingSaga = true - // set up variables for async functions for async function & callbacks - j := jobState - - s.asyncRunner.RunAsync( - func() error { - // FIXME: seeing panic on closed channel here after killjob(). - return j.Saga.EndSaga() - }, - func(err error) { - if err == nil { - log.WithFields( - log.Fields{ - "jobID": j.Job.Id, - "requestor": j.Job.Def.Requestor, - "jobType": j.Job.Def.JobType, - "tag": j.Job.Def.Tag, - }).Info("Job completed and logged") - // This job is fully processed remove from InProgressJobs - s.deleteJob(j.Job.Id) - } else { - // set the jobState flag to false, will retry logging - // EndSaga message on next scheduler loop - j.EndingSaga = false - s.stat.Counter(stats.SchedRetriedEndSagaCounter).Inc(1) // TODO errata metric - remove if unused - log.WithFields( - log.Fields{ - "jobID": j.Job.Id, - "err": err, - "requestor": j.Job.Def.Requestor, - "jobType": j.Job.Def.JobType, - "tag": j.Job.Def.Tag, - }).Info("Job completed but failed to log") - } - }) + go func() { + if err := jobState.Saga.EndSaga(); err != nil { + jobState.EndingSaga = false + log.WithFields( + log.Fields{ + "jobID": jobState.Job.Id, + "err": err, + "requestor": jobState.Job.Def.Requestor, + "jobType": jobState.Job.Def.JobType, + "tag": jobState.Job.Def.Tag, + }).Info("Job completed but failed to log") + return + } + log.WithFields( + log.Fields{ + "jobID": jobState.Job.Id, + "requestor": jobState.Job.Def.Requestor, + "jobType": jobState.Job.Def.JobType, + "tag": jobState.Job.Def.Tag, + }).Info("Job completed and logged") + // This job is fully processed remove from InProgressJobs + s.deleteJob(jobState.Job.Id) + }() } } } @@ -978,160 +960,159 @@ func (s *statefulScheduler) scheduleTasks() { // mark the task as started in the jobState and record its taskRunner jobState.taskStarted(taskID, tRunner) - s.asyncRunner.RunAsync( - tRunner.run, - func(err error) { - defer rs.Release() - // Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first. - if err == nil || err.(*taskError).st.State == runner.TIMEDOUT || - (err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) { - addOrUpdateTaskDuration(s.taskDurations, durationID, time.Now().Sub(tRunner.startTime)) - } + go func() { + defer rs.Release() + err := tRunner.run() + // Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first. + if err == nil || err.(*taskError).st.State == runner.TIMEDOUT || + (err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) { + addOrUpdateTaskDuration(s.taskDurations, durationID, time.Now().Sub(tRunner.startTime)) + } - // If the node is absent, or was deleted then re-added, then we need to selectively clean up. - // The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup. - // We need the fake value so we don't clobber any new job assignments to that nodeId. - nodeId := nodeSt.node.Id() - nodeStInstance, ok := s.clusterState.getNodeState(nodeId) - nodeAbsent := !ok - nodeReAdded := false - if !nodeAbsent { - nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh) - } - nodeStChanged := nodeAbsent || nodeReAdded - preempted := false + // If the node is absent, or was deleted then re-added, then we need to selectively clean up. + // The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup. + // We need the fake value so we don't clobber any new job assignments to that nodeId. + nodeId := nodeSt.node.Id() + nodeStInstance, ok := s.clusterState.getNodeState(nodeId) + nodeAbsent := !ok + nodeReAdded := false + if !nodeAbsent { + nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh) + } + nodeStChanged := nodeAbsent || nodeReAdded + preempted := false - if nodeStChanged { - nodeId = nodeId + ":ERROR" - log.WithFields( - log.Fields{ - "node": nodeSt.node, - "jobID": jobID, - "taskID": taskID, - "runningJob": nodeSt.runningJob, - "runningTask": nodeSt.runningTask, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Task *node* lost, cleaning up.") - } - if nodeReAdded { - preempted = true - } + if nodeStChanged { + nodeId = nodeId + ":ERROR" + log.WithFields( + log.Fields{ + "node": nodeSt.node, + "jobID": jobID, + "taskID": taskID, + "runningJob": nodeSt.runningJob, + "runningTask": nodeSt.runningTask, + "requestor": requestor, + "jobType": jobType, + "tag": tag, + }).Info("Task *node* lost, cleaning up.") + } + if nodeReAdded { + preempted = true + } - flaky := false - aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED) - if err != nil { - // Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes. - // TODO - we no longer set a node as flaky on failed status. - // In practice, we've observed that this results in checkout failures causing - // nodes to drop out of the cluster and reduce capacity to no benefit. - // A more comprehensive solution would be to overhaul this behavior. - taskErr := err.(*taskError) - flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED) - - msg := "Error running job (will be retried):" - if aborted { - msg = "Error running task, but job kill request received, (will not retry):" + flaky := false + aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED) + if err != nil { + // Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes. + // TODO - we no longer set a node as flaky on failed status. + // In practice, we've observed that this results in checkout failures causing + // nodes to drop out of the cluster and reduce capacity to no benefit. + // A more comprehensive solution would be to overhaul this behavior. + taskErr := err.(*taskError) + flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED) + + msg := "Error running job (will be retried):" + if aborted { + msg = "Error running task, but job kill request received, (will not retry):" + err = nil + } else { + if preventRetries { + msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask) err = nil } else { - if preventRetries { - msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask) - err = nil - } else { - jobState.errorRunningTask(taskID, err, preempted) - } + jobState.errorRunningTask(taskID, err, preempted) } - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "err": taskErr, - "cmd": strings.Join(taskDef.Argv, " "), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info(msg) - - // If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds. - if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil { - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - }).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ") - // TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask - go func() { - for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) { - time.Sleep(time.Second) - } - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ") - }() - } - } - if err == nil || aborted { - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "command": strings.Join(taskDef.Argv, " "), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Ending task.") - jobState.taskCompleted(taskID, true) } - - // update cluster state that this node is now free and if we consider the runner to be flaky. log.WithFields( log.Fields{ "jobId": jobID, "taskId": taskID, - "node": nodeSt.node, - "flaky": flaky, + "err": taskErr, + "cmd": strings.Join(taskDef.Argv, " "), "requestor": requestor, "jobType": jobType, "tag": tag, - }).Info("Freeing node, removed job.") - s.clusterState.taskCompleted(nodeId, flaky) - - total := 0 - completed := 0 - running := 0 - for _, job := range s.inProgressJobs { - total += len(job.Tasks) - completed += job.TasksCompleted - running += job.TasksRunning + }).Info(msg) + + // If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds. + if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil { + log.WithFields( + log.Fields{ + "jobId": jobID, + "taskId": taskID, + }).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ") + // TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask + go func() { + for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) { + time.Sleep(time.Second) + } + log.WithFields( + log.Fields{ + "jobId": jobID, + "taskId": taskID, + "requestor": requestor, + "jobType": jobType, + "tag": tag, + }).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ") + }() } + } + if err == nil || aborted { log.WithFields( log.Fields{ "jobId": jobID, - "running": jobState.TasksRunning, - "completed": jobState.TasksCompleted, - "total": len(jobState.Tasks), - "isdone": jobState.TasksCompleted == len(jobState.Tasks), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info() - log.WithFields( - log.Fields{ - "running": running, - "completed": completed, - "total": total, - "alldone": completed == total, + "taskId": taskID, + "command": strings.Join(taskDef.Argv, " "), "requestor": requestor, "jobType": jobType, "tag": tag, - }).Info("Jobs task summary") - }) + }).Info("Ending task.") + jobState.taskCompleted(taskID, true) + } + + // update cluster state that this node is now free and if we consider the runner to be flaky. + log.WithFields( + log.Fields{ + "jobId": jobID, + "taskId": taskID, + "node": nodeSt.node, + "flaky": flaky, + "requestor": requestor, + "jobType": jobType, + "tag": tag, + }).Info("Freeing node, removed job.") + s.clusterState.taskCompleted(nodeId, flaky) + + total := 0 + completed := 0 + running := 0 + for _, job := range s.inProgressJobs { + total += len(job.Tasks) + completed += job.TasksCompleted + running += job.TasksRunning + } + log.WithFields( + log.Fields{ + "jobId": jobID, + "running": jobState.TasksRunning, + "completed": jobState.TasksCompleted, + "total": len(jobState.Tasks), + "isdone": jobState.TasksCompleted == len(jobState.Tasks), + "requestor": requestor, + "jobType": jobType, + "tag": tag, + }).Info() + log.WithFields( + log.Fields{ + "running": running, + "completed": completed, + "total": total, + "alldone": completed == total, + "requestor": requestor, + "jobType": jobType, + "tag": tag, + }).Info("Jobs task summary") + }() } } diff --git a/scheduler/server/stateful_scheduler_test.go b/scheduler/server/stateful_scheduler_test.go index f79147e2c..3507dcc22 100644 --- a/scheduler/server/stateful_scheduler_test.go +++ b/scheduler/server/stateful_scheduler_test.go @@ -376,7 +376,7 @@ func Test_StatefulScheduler_JobRunsToCompletion(t *testing.T) { // verify state changed appropriately if s.clusterState.nodes["node1"].runningTask != noTask { - t.Errorf("Expected node1 to not have any running tasks") + t.Errorf("Expected node1 to not have any running tasks.\n\ttaskID: %s\n\tclusterState: %+v", s.clusterState.nodes["node1"].runningTask, s.clusterState.nodes) } // advance scheduler until job gets marked completed @@ -454,7 +454,7 @@ func Test_StatefulScheduler_KillFinishedJob(t *testing.T) { // verify state changed appropriately for i := 0; i < 5; i++ { if s.clusterState.nodes[cluster.NodeId(fmt.Sprintf("node%d", i+1))].runningTask != noTask { - t.Errorf("Expected nodes to not have any running tasks") + t.Errorf("Expected nodes to not have any running tasks: %+v", s.clusterState.nodes) } } From 6176c91bca17db94f45c83c5b86118cc814712d6 Mon Sep 17 00:00:00 2001 From: rcouto Date: Tue, 3 Aug 2021 15:39:31 -0400 Subject: [PATCH 2/4] move taskRunner run() & post run actions into named func --- scheduler/server/stateful_scheduler.go | 288 +++++++++----------- scheduler/server/stateful_scheduler_test.go | 2 +- 2 files changed, 129 insertions(+), 161 deletions(-) diff --git a/scheduler/server/stateful_scheduler.go b/scheduler/server/stateful_scheduler.go index 92ee8446e..aa5b33922 100644 --- a/scheduler/server/stateful_scheduler.go +++ b/scheduler/server/stateful_scheduler.go @@ -6,7 +6,6 @@ import ( "os" "reflect" "sort" - "strings" "sync" "time" @@ -262,7 +261,6 @@ func NewStatefulScheduler( "error": s.Error, "jobID": s.JobID, "taskID": s.TaskID, - "tag": s.Tag, }).Info("Aborting existing run on new node") run.Abort(s.RunID) } @@ -399,7 +397,6 @@ func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, er log.Fields{ "requestor": jobDef.Requestor, "jobType": jobDef.JobType, - "tag": jobDef.Tag, "basis": jobDef.Basis, "priority": jobDef.Priority, "numTasks": len(jobDef.Tasks), @@ -417,7 +414,6 @@ func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, er "jobDef": jobDef, "requestor": jobDef.Requestor, "jobType": jobDef.JobType, - "tag": jobDef.Tag, "basis": jobDef.Basis, "priority": jobDef.Priority, "err": err, @@ -440,7 +436,6 @@ func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, er "jobDef": jobDef, "requestor": jobDef.Requestor, "jobType": jobDef.JobType, - "tag": jobDef.Tag, "basis": jobDef.Basis, "priority": jobDef.Priority, "err": err, @@ -457,7 +452,6 @@ func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, er "err": err, "requestor": jobDef.Requestor, "jobType": jobDef.JobType, - "tag": jobDef.Tag, }).Error("Failed to create saga for job request") return "", err } @@ -465,7 +459,6 @@ func (s *statefulScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, er log.Fields{ "requestor": jobDef.Requestor, "jobType": jobDef.JobType, - "tag": jobDef.Tag, "basis": jobDef.Basis, "priority": jobDef.Priority, "numTasks": len(jobDef.Tasks), @@ -597,8 +590,7 @@ func (s *statefulScheduler) updateStats() { log.Fields{ "requestor": job.Job.Def.Requestor, "jobType": job.Job.Def.JobType, - "jobId": job.Job.Id, - "tag": job.Job.Def.Tag, + "jobID": job.Job.Id, "basis": job.Job.Def.Basis, "priority": job.Job.Def.Priority, "numTasks": len(job.Tasks), @@ -745,7 +737,6 @@ func (s *statefulScheduler) addJobsLoop() { "jobID": newJobMsg.job.Id, "requestor": newJobMsg.job.Def.Requestor, "jobType": newJobMsg.job.Def.JobType, - "tag": newJobMsg.job.Def.Tag, "basis": newJobMsg.job.Def.Basis, "priority": newJobMsg.job.Def.Priority, "numTasks": len(newJobMsg.job.Def.Tasks), @@ -864,7 +855,6 @@ func (s *statefulScheduler) checkForCompletedJobs() { "err": err, "requestor": jobState.Job.Def.Requestor, "jobType": jobState.Job.Def.JobType, - "tag": jobState.Job.Def.Tag, }).Info("Job completed but failed to log") return } @@ -873,7 +863,6 @@ func (s *statefulScheduler) checkForCompletedJobs() { "jobID": jobState.Job.Id, "requestor": jobState.Job.Def.Requestor, "jobType": jobState.Job.Def.JobType, - "tag": jobState.Job.Def.Tag, }).Info("Job completed and logged") // This job is fully processed remove from InProgressJobs s.deleteJob(jobState.Job.Id) @@ -925,7 +914,6 @@ func (s *statefulScheduler) scheduleTasks() { "node": nodeSt.node, "requestor": requestor, "jobType": jobType, - "tag": tag, "taskDef": taskDef, "durationID": durationID, "avgDuration": fmt.Sprintf("%d (sec)", avgDur), @@ -959,161 +947,142 @@ func (s *statefulScheduler) scheduleTasks() { // mark the task as started in the jobState and record its taskRunner jobState.taskStarted(taskID, tRunner) + go s.runHandler(tRunner, nodeSt, durationID) + } +} - go func() { - defer rs.Release() - err := tRunner.run() - // Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first. - if err == nil || err.(*taskError).st.State == runner.TIMEDOUT || - (err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) { - addOrUpdateTaskDuration(s.taskDurations, durationID, time.Now().Sub(tRunner.startTime)) - } - - // If the node is absent, or was deleted then re-added, then we need to selectively clean up. - // The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup. - // We need the fake value so we don't clobber any new job assignments to that nodeId. - nodeId := nodeSt.node.Id() - nodeStInstance, ok := s.clusterState.getNodeState(nodeId) - nodeAbsent := !ok - nodeReAdded := false - if !nodeAbsent { - nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh) - } - nodeStChanged := nodeAbsent || nodeReAdded - preempted := false +// runHandler tells the taskRunner to run its assigned task & handles post-run actions, e.g. updating the scheduler's +// clusterState based on +func (s *statefulScheduler) runHandler(tRunner *taskRunner, nodeSt *nodeState, durationID string) { + defer tRunner.runner.Release() + err := tRunner.run() + // Update the average duration for this task so, for new jobs, we can schedule the likely long running tasks first. + if err == nil || err.(*taskError).st.State == runner.TIMEDOUT || + (err.(*taskError).st.State == runner.COMPLETE && err.(*taskError).st.ExitCode == 0) { + addOrUpdateTaskDuration(s.taskDurations, durationID, time.Now().Sub(tRunner.startTime)) + } + + // If the node is absent, or was deleted then re-added, then we need to selectively clean up. + // The job update is normal but we update the cluster with a fake value which denotes abnormal cleanup. + // We need the fake value so we don't clobber any new job assignments to that nodeId. + nodeId := nodeSt.node.Id() + nodeStInstance, ok := s.clusterState.getNodeState(nodeId) + nodeAbsent := !ok + nodeReAdded := false + if !nodeAbsent { + nodeReAdded = (&nodeStInstance.readyCh != &nodeSt.readyCh) + } + nodeStChanged := nodeAbsent || nodeReAdded + preempted := false + + if nodeStChanged { + nodeId = nodeId + ":ERROR" + log.WithFields( + log.Fields{ + "node": nodeSt.node, + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + "runningJob": nodeSt.runningJob, + "runningTask": nodeSt.runningTask, + }).Info("Task *node* lost, cleaning up.") + } + if nodeReAdded { + preempted = true + } - if nodeStChanged { - nodeId = nodeId + ":ERROR" - log.WithFields( - log.Fields{ - "node": nodeSt.node, - "jobID": jobID, - "taskID": taskID, - "runningJob": nodeSt.runningJob, - "runningTask": nodeSt.runningTask, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Task *node* lost, cleaning up.") - } - if nodeReAdded { - preempted = true + flaky := false + aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED) + jobState := s.getJob(tRunner.JobID) + if err != nil { + // Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes. + // TODO - we no longer set a node as flaky on failed status. + // In practice, we've observed that this results in checkout failures causing + // nodes to drop out of the cluster and reduce capacity to no benefit. + // A more comprehensive solution would be to overhaul this behavior. + taskErr := err.(*taskError) + flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED) + + msg := "Error running job (will be retried):" + if aborted { + msg = "Error running task, but job kill request received, (will not retry):" + err = nil + } else { + if tRunner.markCompleteOnFailure { + msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask) + err = nil + } else { + jobState.errorRunningTask(tRunner.TaskID, err, preempted) } + } + log.WithFields( + log.Fields{ + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + "err": taskErr, + }).Info(msg) - flaky := false - aborted := (err != nil && err.(*taskError).st.State == runner.ABORTED) - if err != nil { - // Get the type of error. Currently we only care to distinguish runner (ex: thrift) errors to mark flaky nodes. - // TODO - we no longer set a node as flaky on failed status. - // In practice, we've observed that this results in checkout failures causing - // nodes to drop out of the cluster and reduce capacity to no benefit. - // A more comprehensive solution would be to overhaul this behavior. - taskErr := err.(*taskError) - flaky = (taskErr.runnerErr != nil && taskErr.st.State != runner.FAILED) - - msg := "Error running job (will be retried):" - if aborted { - msg = "Error running task, but job kill request received, (will not retry):" - err = nil - } else { - if preventRetries { - msg = fmt.Sprintf("Error running task (quitting, hit max retries of %d):", s.config.MaxRetriesPerTask) - err = nil - } else { - jobState.errorRunningTask(taskID, err, preempted) - } - } - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "err": taskErr, - "cmd": strings.Join(taskDef.Argv, " "), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info(msg) - - // If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds. - if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil { - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - }).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ") - // TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask - go func() { - for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) { - time.Sleep(time.Second) - } - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ") - }() + // If the task completed successfully but sagalog failed, start a goroutine to retry until it succeeds. + if taskErr.sagaErr != nil && taskErr.st.RunID != "" && taskErr.runnerErr == nil && taskErr.resultErr == nil { + log.WithFields( + log.Fields{ + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + }).Info(msg, " -> starting goroutine to handle failed saga.EndTask. ") + // TODO this may result in closed channel panic due to sending endSaga to sagalog (below) before endTask + go func() { + for err := errors.New(""); err != nil; err = tRunner.logTaskStatus(&taskErr.st, saga.EndTask) { + time.Sleep(time.Second) } - } - if err == nil || aborted { log.WithFields( log.Fields{ - "jobId": jobID, - "taskId": taskID, - "command": strings.Join(taskDef.Argv, " "), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Ending task.") - jobState.taskCompleted(taskID, true) - } + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + }).Info(msg, " -> finished goroutine to handle failed saga.EndTask. ") + }() + } + } + if err == nil || aborted { + log.WithFields( + log.Fields{ + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + }).Info("Ending task.") + jobState.taskCompleted(tRunner.TaskID, true) + } - // update cluster state that this node is now free and if we consider the runner to be flaky. - log.WithFields( - log.Fields{ - "jobId": jobID, - "taskId": taskID, - "node": nodeSt.node, - "flaky": flaky, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Freeing node, removed job.") - s.clusterState.taskCompleted(nodeId, flaky) - - total := 0 - completed := 0 - running := 0 - for _, job := range s.inProgressJobs { - total += len(job.Tasks) - completed += job.TasksCompleted - running += job.TasksRunning - } - log.WithFields( - log.Fields{ - "jobId": jobID, - "running": jobState.TasksRunning, - "completed": jobState.TasksCompleted, - "total": len(jobState.Tasks), - "isdone": jobState.TasksCompleted == len(jobState.Tasks), - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info() - log.WithFields( - log.Fields{ - "running": running, - "completed": completed, - "total": total, - "alldone": completed == total, - "requestor": requestor, - "jobType": jobType, - "tag": tag, - }).Info("Jobs task summary") - }() + // update cluster state that this node is now free and if we consider the runner to be flaky. + log.WithFields( + log.Fields{ + "jobID": tRunner.JobID, + "taskID": tRunner.TaskID, + "node": nodeSt.node, + "flaky": flaky, + }).Info("Freeing node, removed job.") + s.clusterState.taskCompleted(nodeId, flaky) + + total := 0 + completed := 0 + running := 0 + for _, job := range s.inProgressJobs { + total += len(job.Tasks) + completed += job.TasksCompleted + running += job.TasksRunning + log.WithFields( + log.Fields{ + "jobID": tRunner.JobID, + "running": jobState.TasksRunning, + "completed": jobState.TasksCompleted, + "total": len(jobState.Tasks), + "isdone": jobState.TasksCompleted == len(jobState.Tasks), + }).Infof("Job %s summary", tRunner.JobID) } + log.WithFields( + log.Fields{ + "running": running, + "completed": completed, + "total": total, + "alldone": completed == total, + }).Info("Jobs task summary") } //Put the kill request on channel that is processed by the main @@ -1206,7 +1175,6 @@ func (s *statefulScheduler) processKillJobRequests(reqs []jobKillRequest) { "jobID": req.jobId, "requestor": jobState.Job.Def.Requestor, "jobType": jobState.Job.Def.JobType, - "tag": jobState.Job.Def.Tag, } var updateMessages []saga.SagaMessage for _, task := range s.getJob(req.jobId).Tasks { diff --git a/scheduler/server/stateful_scheduler_test.go b/scheduler/server/stateful_scheduler_test.go index 3507dcc22..fee52402f 100644 --- a/scheduler/server/stateful_scheduler_test.go +++ b/scheduler/server/stateful_scheduler_test.go @@ -454,7 +454,7 @@ func Test_StatefulScheduler_KillFinishedJob(t *testing.T) { // verify state changed appropriately for i := 0; i < 5; i++ { if s.clusterState.nodes[cluster.NodeId(fmt.Sprintf("node%d", i+1))].runningTask != noTask { - t.Errorf("Expected nodes to not have any running tasks: %+v", s.clusterState.nodes) + t.Errorf("Expected nodes to not have any running tasks.\n\tclusterState: %+v", s.clusterState.nodes) } } From 476e255bbf3311ce9cd750bff806f72b3c924564 Mon Sep 17 00:00:00 2001 From: rcouto Date: Wed, 4 Aug 2021 12:46:20 -0400 Subject: [PATCH 3/4] Remove unused legacy runner interfaces --- common/stats/stats_names.go | 2 - .../scheduler_simulator/fake_worker_cli.go | 20 --------- runner/README.md | 6 +-- runner/queries.go | 4 +- runner/runner.go | 2 +- runner/runners/chaos.go | 16 ------- runner/runners/polling.go | 24 +++------- runner/runners/polling_test.go | 2 +- runner/runners/queue.go | 2 +- runner/runners/queue_test.go | 6 +-- runner/runners/service.go | 2 +- runner/runners/single_test.go | 17 ++----- runner/runners/status_manager.go | 10 ----- runner/status_rw.go | 44 ++++++++----------- scheduler/server/stateful_scheduler.go | 2 +- scheduler/server/stateful_scheduler_test.go | 2 +- workerapi/client/client.go | 37 ++++------------ workerapi/server/server.go | 6 +-- 18 files changed, 52 insertions(+), 152 deletions(-) diff --git a/common/stats/stats_names.go b/common/stats/stats_names.go index d3ed165ca..7b8dc4c9e 100644 --- a/common/stats/stats_names.go +++ b/common/stats/stats_names.go @@ -402,8 +402,6 @@ const ( /* The number of runs that the worker tried to run an whose state is failed - TODO - understand how/when this gets reset - it's based on the runs in the worker's StatusAll() - response - how/when do old jobs drop out of StatusAll()? */ WorkerFailedCachedRunsGauge = "failedCachedRunsGauge" diff --git a/perftests/scheduler_simulator/fake_worker_cli.go b/perftests/scheduler_simulator/fake_worker_cli.go index cd1b6ada1..c081876c5 100644 --- a/perftests/scheduler_simulator/fake_worker_cli.go +++ b/perftests/scheduler_simulator/fake_worker_cli.go @@ -123,23 +123,3 @@ func (fw *FakeWorker) Query(q runner.Query, w runner.Wait) ([]runner.RunStatus, func (fw *FakeWorker) QueryNow(q runner.Query) ([]runner.RunStatus, runner.ServiceStatus, error) { return nil, runner.ServiceStatus{Initialized: false, Error: nil}, nil } - -func (fw *FakeWorker) Status(run runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) { - rs := runner.RunStatus{ - RunID: runner.RunID(fw.cmdId), - State: fw.state, - LogTags: tags.LogTags{}, - StdoutRef: "", - StderrRef: "", - SnapshotID: "", - ExitCode: 0, - Error: "", - ActionResult: nil, - } - - return rs, runner.ServiceStatus{Initialized: false, Error: nil}, nil -} - -func (fw *FakeWorker) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) { - return nil, runner.ServiceStatus{Initialized: false, Error: nil}, nil -} diff --git a/runner/README.md b/runner/README.md index 71661d895..82181d4d3 100644 --- a/runner/README.md +++ b/runner/README.md @@ -34,11 +34,7 @@ There are several interfaces having to do with reading/writing RunStatus'es. StatusWriter for writing Statuses. -There are more reader interfaces: -* StatusReader includes all the ones below -* LegacyStatusReader offers legacy APIs -* StatusQuerier offers Query, which allows waiting for some period of time -* StatusQueryNower offers QueryNow which is the same as Query, but does not allow waiting +StatusQuerier offers Query & QueryNow, which allows waiting for some period of time or immediately, respectively. ## StatusManager implementations (runner/runners/statuses.go) ## StatusManager holds RunStatus'es and implements the various Status interfaces. diff --git a/runner/queries.go b/runner/queries.go index 328857d5b..329361560 100644 --- a/runner/queries.go +++ b/runner/queries.go @@ -12,7 +12,7 @@ func WaitForever() Wait { } // Status returns the current status of id from q. -func StatusNow(q StatusQueryNower, id RunID) (RunStatus, ServiceStatus, error) { +func StatusNow(q StatusQuerier, id RunID) (RunStatus, ServiceStatus, error) { statuses, service, err := q.QueryNow(Query{Runs: []RunID{id}, States: ALL_MASK}) if err != nil { return RunStatus{}, ServiceStatus{}, err @@ -38,6 +38,6 @@ func SingleStatus(statuses []RunStatus, service ServiceStatus, err error) (RunSt } // StatusAll returns the Current status of all runs -func StatusAll(q StatusQueryNower) ([]RunStatus, ServiceStatus, error) { +func StatusAll(q StatusQuerier) ([]RunStatus, ServiceStatus, error) { return q.QueryNow(Query{States: ALL_MASK, AllRuns: true}) } diff --git a/runner/runner.go b/runner/runner.go index 28ffe8641..af254d763 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -103,5 +103,5 @@ var EmptyID RunnerID = RunnerID{ID: ""} // Service allows starting/abort'ing runs and checking on their status. type Service interface { Controller - StatusReader + StatusQuerier } diff --git a/runner/runners/chaos.go b/runner/runners/chaos.go index f7fb1ba35..58c071371 100644 --- a/runner/runners/chaos.go +++ b/runner/runners/chaos.go @@ -76,22 +76,6 @@ func (r *ChaosRunner) QueryNow(q runner.Query) ([]runner.RunStatus, runner.Servi return r.del.QueryNow(q) } -func (r *ChaosRunner) Status(run runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) { - err := r.delay() - if err != nil { - return runner.RunStatus{}, runner.ServiceStatus{}, err - } - return r.del.Status(run) -} - -func (r *ChaosRunner) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) { - err := r.delay() - if err != nil { - return nil, runner.ServiceStatus{}, err - } - return r.del.StatusAll() -} - func (r *ChaosRunner) Abort(run runner.RunID) (runner.RunStatus, error) { err := r.delay() if err != nil { diff --git a/runner/runners/polling.go b/runner/runners/polling.go index fd101de94..3c3d9593a 100644 --- a/runner/runners/polling.go +++ b/runner/runners/polling.go @@ -7,24 +7,22 @@ import ( "github.com/twitter/scoot/runner" ) -// polling.go: turns a StatusQueryNower into a StatusQuerier by polling - -// NewPollingStatusQuerier creates a new StatusQuerier by polling a StatusQueryNower that polls every period -func NewPollingStatusQuerier(del runner.StatusQueryNower, period time.Duration) *PollingStatusQuerier { +// NewPollingStatusQuerier creates a new StatusQuerier by polling a StatusQuerier that polls every period +func NewPollingStatusQuerier(del runner.StatusQuerier, period time.Duration) *PollingStatusQuerier { runner := &PollingStatusQuerier{del, period} return runner } -// NewPollingService creates a new Service from a Controller, and a StatusQueryNower. +// NewPollingService creates a new Service from a Controller, and a StatusQuerier. // (This is a convenience function over NewPollingStatusQuerier -func NewPollingService(c runner.Controller, nower runner.StatusQueryNower, period time.Duration) runner.Service { +func NewPollingService(c runner.Controller, nower runner.StatusQuerier, period time.Duration) runner.Service { q := NewPollingStatusQuerier(nower, period) return &Service{c, q} } -// PollingStatusQuerier turns a StatusQueryNower into a StatusQuerier +// PollingStatusQuerier turns a StatusQuerier into a StatusQuerier type PollingStatusQuerier struct { - del runner.StatusQueryNower + del runner.StatusQuerier period time.Duration } @@ -51,13 +49,3 @@ func (r *PollingStatusQuerier) Query(q runner.Query, wait runner.Wait) ([]runner } return nil, service, nil } - -// Status returns the current status of id from q. -func (r *PollingStatusQuerier) Status(id runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) { - return runner.StatusNow(r, id) -} - -// StatusAll returns the Current status of all runs -func (r *PollingStatusQuerier) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) { - return runner.StatusAll(r) -} diff --git a/runner/runners/polling_test.go b/runner/runners/polling_test.go index 5afb0117e..363abf497 100644 --- a/runner/runners/polling_test.go +++ b/runner/runners/polling_test.go @@ -18,7 +18,7 @@ func setupPoller() (*execers.SimExecer, *ChaosRunner, runner.Service) { filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} single := NewSingleRunner(ex, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) chaos := NewChaosRunner(single) - var nower runner.StatusQueryNower + var nower runner.StatusQuerier nower = chaos poller := NewPollingService(chaos, nower, 10*time.Microsecond) return ex, chaos, poller diff --git a/runner/runners/queue.go b/runner/runners/queue.go index fef3c5c15..070093f07 100644 --- a/runner/runners/queue.go +++ b/runner/runners/queue.go @@ -235,7 +235,7 @@ func (c *QueueController) Run(cmd *runner.Command) (runner.RunStatus, error) { } func (c *QueueController) enqueue(cmd *runner.Command) (runner.RunStatus, error) { - _, svcStatus, _ := c.statusManager.StatusAll() + _, svcStatus, _ := runner.StatusAll(c.statusManager) log.WithFields( log.Fields{ "ready": svcStatus.Initialized, diff --git a/runner/runners/queue_test.go b/runner/runners/queue_test.go index fe2bd386a..9e6aff70d 100644 --- a/runner/runners/queue_test.go +++ b/runner/runners/queue_test.go @@ -84,7 +84,7 @@ func TestUnknownRunIDInStatusRequest(t *testing.T) { env := setup(4, snapshot.NoDuration, t) defer env.teardown() - st, _, err := env.r.Status(runner.RunID("not a real run id")) + st, _, err := runner.StatusNow(env.r, runner.RunID("not a real run id")) if err == nil || !strings.Contains(err.Error(), fmt.Sprintf(UnknownRunIDMsg, "")) { t.Fatalf("Should not be able to get status: %q %q", err, st) } @@ -130,7 +130,7 @@ func TestStatus(t *testing.T) { run7 := assertRun(t, env.r, pending(), "complete 0") run8 := assertRun(t, env.r, pending(), "complete 0") - all, _, err := env.r.StatusAll() + all, _, err := runner.StatusAll(env.r) if err != nil { t.Fatal(err) } @@ -139,7 +139,7 @@ func TestStatus(t *testing.T) { // We've already waited for each status to be correct, so we trust that. // So, for each status in StatusAll, we'll call Status on its ID and error if not equal for _, st := range all { - st2, _, err := env.r.Status(st.RunID) + st2, _, err := runner.StatusNow(env.r, st.RunID) if err != nil { t.Fatal(err) } diff --git a/runner/runners/service.go b/runner/runners/service.go index 419c5054b..6727d65f4 100644 --- a/runner/runners/service.go +++ b/runner/runners/service.go @@ -7,5 +7,5 @@ import ( // Service makes a runner.Service from component parts. type Service struct { runner.Controller - runner.StatusReader + runner.StatusQuerier } diff --git a/runner/runners/single_test.go b/runner/runners/single_test.go index f0dedeec5..b76bf0aed 100644 --- a/runner/runners/single_test.go +++ b/runner/runners/single_test.go @@ -20,23 +20,13 @@ import ( "github.com/twitter/scoot/snapshot/snapshots" ) -func TestRun(t *testing.T) { - defer teardown(t) - r, _ := newRunner() - assertRun(t, r, complete(0), "complete 0") - assertRun(t, r, complete(1), "complete 1") - if status, _, err := r.StatusAll(); len(status) != 1 { - t.Fatalf("Expected history count of 1, got %d, err=%v", len(status), err) - } -} - func TestOutput(t *testing.T) { defer teardown(t) r, _ := newRunner() stdoutExpected, stderrExpected := "hello world\n", "hello err\n" id := assertRun(t, r, complete(0), "stdout "+stdoutExpected, "stderr "+stderrExpected, "complete 0") - st, _, err := r.Status(id) + st, _, err := runner.StatusNow(r, id) if err != nil { t.Fatal(err) } @@ -93,7 +83,7 @@ func TestAbort(t *testing.T) { assertWait(t, r, runID, running(), args...) r.Abort(runID) // use r.Status instead of assertWait so that we make sure it's aborted immediately, not eventually - st, _, err := r.Status(runID) + st, _, err := runner.StatusNow(r, runID) if err != nil { t.Fatal(err) } @@ -129,8 +119,7 @@ func TestMemCap(t *testing.T) { } else if len(runs) != 1 { t.Fatalf("Expected a single COMPLETE run, got %v", len(runs)) } else if runs[0].ExitCode != 1 || !strings.Contains(runs[0].Error, "Cmd exceeded MemoryCap, aborting") { - status, _, err := r.StatusAll() - t.Fatalf("Expected result with error message mentioning MemoryCap & an exit code of 1, got: %v -- status %v err %v -- exitCode %v", runs, status, err, runs[0].ExitCode) + t.Fatalf("Expected result with error message mentioning MemoryCap & an exit code of 1, got: %v -- err %v -- exitCode %v", runs, err, runs[0].ExitCode) } } diff --git a/runner/runners/status_manager.go b/runner/runners/status_manager.go index 5bcdcb8a2..b68cbd930 100644 --- a/runner/runners/status_manager.go +++ b/runner/runners/status_manager.go @@ -160,16 +160,6 @@ func (s *StatusManager) QueryNow(q runner.Query) ([]runner.RunStatus, runner.Ser return s.Query(q, runner.Wait{}) } -// Status returns the current status of id from q. -func (s *StatusManager) Status(id runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) { - return runner.StatusNow(s, id) -} - -// StatusAll returns the Current status of all runs -func (s *StatusManager) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) { - return runner.StatusAll(s) -} - // queryAndListen performs a query, returning the current results and optionally a channel for // listening for future results // returns: diff --git a/runner/status_rw.go b/runner/status_rw.go index 3ed492746..f7e9f84bb 100644 --- a/runner/status_rw.go +++ b/runner/status_rw.go @@ -86,37 +86,31 @@ type StatusQuerier interface { // Query returns all RunStatus'es matching q, waiting as described by w Query(q Query, w Wait) ([]RunStatus, ServiceStatus, error) - StatusQueryNower -} - -// StatusQueryNower allows Query'ing Statuses but with no Waiting -// This is separate from StatusQuerier because talking to the Worker, e.g., we will be able to -// QueryNow easily, but because Thrift doesn't like long waits on RPCs, it can't do -// Query with a decent wait. We want our type system to help protect us from this blowing up -// at runtime, so the RPC client will implement StatusQueryNower. -// We will implement a PollingQueuer that wraps a StatusQueryNower and satisfies StatusQuerier. -type StatusQueryNower interface { // QueryNow returns all RunStatus'es matching q in their current state QueryNow(q Query) ([]RunStatus, ServiceStatus, error) -} - -// LegacyStatusReader contains legacy methods to read Status'es. -// Prefer using the convenience methods above. -type LegacyStatusReader interface { - // Status returns the current status of id from q. - Status(run RunID) (RunStatus, ServiceStatus, error) - // StatusAll returns the Current status of all runs - StatusAll() ([]RunStatus, ServiceStatus, error) + // StatusQueryNower } -// StatusReader includes both the preferred and the legacy api. -type StatusReader interface { - StatusQuerier +// // StatusQueryNower allows Query'ing Statuses but with no Waiting +// // This is separate from StatusQuerier because talking to the Worker, e.g., we will be able to +// // QueryNow easily, but because Thrift doesn't like long waits on RPCs, it can't do +// // Query with a decent wait. We want our type system to help protect us from this blowing up +// // at runtime, so the RPC client will implement StatusQueryNower. +// // We will implement a PollingQueuer that wraps a StatusQueryNower and satisfies StatusQuerier. +// type StatusQueryNower interface { - // TODO(dbentley): remove - LegacyStatusReader -} +// } + +// LegacyStatusReader contains legacy methods to read Status'es. +// // Prefer using the convenience methods above. +// type LegacyStatusReader interface { +// // Status returns the current status of id from q. +// Status(run RunID) (RunStatus, ServiceStatus, error) + +// // StatusAll returns the Current status of all runs +// StatusAll() ([]RunStatus, ServiceStatus, error) +// } // StatusWriter allows writing Statuses type StatusWriter interface { diff --git a/scheduler/server/stateful_scheduler.go b/scheduler/server/stateful_scheduler.go index aa5b33922..d2eabf727 100644 --- a/scheduler/server/stateful_scheduler.go +++ b/scheduler/server/stateful_scheduler.go @@ -236,7 +236,7 @@ func NewStatefulScheduler( durationKeyExtractorFn func(string) string) *statefulScheduler { nodeReadyFn := func(node cluster.Node) (bool, time.Duration) { run := rf(node) - st, svc, err := run.StatusAll() + st, svc, err := runner.StatusAll(run) if err != nil || !svc.Initialized { if svc.Error != nil { log.WithFields( diff --git a/scheduler/server/stateful_scheduler_test.go b/scheduler/server/stateful_scheduler_test.go index fee52402f..9361129e8 100644 --- a/scheduler/server/stateful_scheduler_test.go +++ b/scheduler/server/stateful_scheduler_test.go @@ -454,7 +454,7 @@ func Test_StatefulScheduler_KillFinishedJob(t *testing.T) { // verify state changed appropriately for i := 0; i < 5; i++ { if s.clusterState.nodes[cluster.NodeId(fmt.Sprintf("node%d", i+1))].runningTask != noTask { - t.Errorf("Expected nodes to not have any running tasks.\n\tclusterState: %+v", s.clusterState.nodes) + t.Errorf("Expected nodes to not have any running tasks.\n\tclusterState:\n\n%+v", s.clusterState.nodes) } } diff --git a/workerapi/client/client.go b/workerapi/client/client.go index a051de6f6..3e253dcf5 100644 --- a/workerapi/client/client.go +++ b/workerapi/client/client.go @@ -21,8 +21,7 @@ type Client interface { // Worker API Interactions QueryWorker() (workerapi.WorkerStatus, error) runner.Controller - runner.StatusQueryNower - runner.LegacyStatusReader + runner.StatusQuerier } type simpleClient struct { @@ -114,27 +113,7 @@ func (c *simpleClient) QueryWorker() (workerapi.WorkerStatus, error) { return workerapi.ThriftWorkerStatusToDomain(status), nil } -// Implements Scoot Worker API -func (c *simpleClient) Status(id runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) { - ws, err := c.QueryWorker() - if err != nil { - return runner.RunStatus{}, runner.ServiceStatus{}, err - } - var svcErr error - if ws.Error != "" { - svcErr = errors.New(ws.Error) - } - svc := runner.ServiceStatus{Initialized: ws.Initialized, Error: svcErr} - for _, p := range ws.Runs { - if p.RunID == id { - return p, svc, nil - } - } - return runner.RunStatus{}, svc, fmt.Errorf("no such process %v", id) -} - -// Implements Scoot Worker API -func (c *simpleClient) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) { +func (c *simpleClient) QueryNow(q runner.Query) ([]runner.RunStatus, runner.ServiceStatus, error) { ws, err := c.QueryWorker() if err != nil { return nil, runner.ServiceStatus{}, err @@ -143,14 +122,16 @@ func (c *simpleClient) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, er if ws.Error != "" { svcErr = errors.New(ws.Error) } - return ws.Runs, runner.ServiceStatus{Initialized: ws.Initialized, Error: svcErr}, nil -} - -func (c *simpleClient) QueryNow(q runner.Query) ([]runner.RunStatus, runner.ServiceStatus, error) { - st, svc, err := c.StatusAll() + st := ws.Runs + svc := runner.ServiceStatus{Initialized: ws.Initialized, Error: svcErr} if err != nil { return nil, runner.ServiceStatus{}, err } st, err = runners.StatusesRO(st).QueryNow(q) return st, svc, err } + +func (c *simpleClient) Query(q runner.Query, w runner.Wait) ([]runner.RunStatus, runner.ServiceStatus, error) { + // TODO: implement + return []runner.RunStatus{}, runner.ServiceStatus{}, nil +} diff --git a/workerapi/server/server.go b/workerapi/server/server.go index de42868d3..902ec3c8c 100644 --- a/workerapi/server/server.go +++ b/workerapi/server/server.go @@ -66,7 +66,7 @@ func (h *handler) stats() { case <-ticker.C: h.mu.Lock() - processes, svcStatus, err := h.run.StatusAll() + processes, svcStatus, err := runner.StatusAll(h.run) if err != nil { continue } @@ -123,7 +123,7 @@ func (h *handler) QueryWorker() (*worker.WorkerStatus, error) { h.updateTimeLastRpc() ws := worker.NewWorkerStatus() - st, svc, err := h.run.StatusAll() + st, svc, err := runner.StatusAll(h.run) if err != nil { ws.Error = err.Error() } @@ -165,7 +165,7 @@ func (h *handler) Run(cmd *worker.RunCommand) (*worker.RunStatus, error) { //TODO(jschiller): accept a cmd.Nonce field so we can be precise about hiccups with dup cmd resends? if err != nil && err.Error() == runners.QueueFullMsg && reflect.DeepEqual(c, h.currentCmd) { log.Infof("Worker received dup request, recovering runID: %v", h.currentRunID) - status, _, err = h.run.Status(h.currentRunID) + status, _, err = runner.StatusNow(h.run, h.currentRunID) } if err != nil { // Set invalid status and nil err to indicate handleable internal err. From 73d3941edc63f83be89895fc4581ff1564c4125c Mon Sep 17 00:00:00 2001 From: rcouto Date: Wed, 4 Aug 2021 13:09:08 -0400 Subject: [PATCH 4/4] cleanup --- runner/runners/single_test.go | 13 ++++++++++++- runner/status_rw.go | 22 ---------------------- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/runner/runners/single_test.go b/runner/runners/single_test.go index b76bf0aed..68483c33a 100644 --- a/runner/runners/single_test.go +++ b/runner/runners/single_test.go @@ -20,6 +20,16 @@ import ( "github.com/twitter/scoot/snapshot/snapshots" ) +func TestRun(t *testing.T) { + defer teardown(t) + r, _ := newRunner() + assertRun(t, r, complete(0), "complete 0") + assertRun(t, r, complete(1), "complete 1") + if status, _, err := runner.StatusAll(r); len(status) != 1 { + t.Fatalf("Expected history count of 1, got %d, err=%v", len(status), err) + } +} + func TestOutput(t *testing.T) { defer teardown(t) r, _ := newRunner() @@ -119,7 +129,8 @@ func TestMemCap(t *testing.T) { } else if len(runs) != 1 { t.Fatalf("Expected a single COMPLETE run, got %v", len(runs)) } else if runs[0].ExitCode != 1 || !strings.Contains(runs[0].Error, "Cmd exceeded MemoryCap, aborting") { - t.Fatalf("Expected result with error message mentioning MemoryCap & an exit code of 1, got: %v -- err %v -- exitCode %v", runs, err, runs[0].ExitCode) + status, _, err := runner.StatusAll(r) + t.Fatalf("Expected result with error message mentioning MemoryCap & an exit code of 1, got: %v -- status %v err %v -- exitCode %v", runs, status, err, runs[0].ExitCode) } } diff --git a/runner/status_rw.go b/runner/status_rw.go index f7e9f84bb..d239d588e 100644 --- a/runner/status_rw.go +++ b/runner/status_rw.go @@ -88,30 +88,8 @@ type StatusQuerier interface { // QueryNow returns all RunStatus'es matching q in their current state QueryNow(q Query) ([]RunStatus, ServiceStatus, error) - - // StatusQueryNower } -// // StatusQueryNower allows Query'ing Statuses but with no Waiting -// // This is separate from StatusQuerier because talking to the Worker, e.g., we will be able to -// // QueryNow easily, but because Thrift doesn't like long waits on RPCs, it can't do -// // Query with a decent wait. We want our type system to help protect us from this blowing up -// // at runtime, so the RPC client will implement StatusQueryNower. -// // We will implement a PollingQueuer that wraps a StatusQueryNower and satisfies StatusQuerier. -// type StatusQueryNower interface { - -// } - -// LegacyStatusReader contains legacy methods to read Status'es. -// // Prefer using the convenience methods above. -// type LegacyStatusReader interface { -// // Status returns the current status of id from q. -// Status(run RunID) (RunStatus, ServiceStatus, error) - -// // StatusAll returns the Current status of all runs -// StatusAll() ([]RunStatus, ServiceStatus, error) -// } - // StatusWriter allows writing Statuses type StatusWriter interface { // NewRun creates a new RunID in state PENDING