From 2b12f5a06da61f9f80e8e8db15dd65cbda367d99 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 15 Jul 2024 07:33:02 -0700 Subject: [PATCH 1/3] :bug: preemption (policy) enabled stomped by task manager. Race condition. Signed-off-by: Jeff Ortel --- task/manager.go | 75 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/task/manager.go b/task/manager.go index 15a2e515..e04338ae 100644 --- a/task/manager.go +++ b/task/manager.go @@ -181,8 +181,7 @@ func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { return } switch task.State { - case Created, - Ready: + case Created: task.UpdateUser = requested.UpdateUser task.Name = requested.Name task.Kind = requested.Kind @@ -195,29 +194,42 @@ func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { task.TTL = requested.TTL task.Data = requested.Data task.ApplicationID = requested.ApplicationID - case Pending, + err = m.findRefs(task) + if err != nil { + return + } + db = db.Where("state", Created) + err = db.Save(task).Error + if err != nil { + err = liberr.Wrap(err) + return + } + case Ready, + Pending, QuotaBlocked, Postponed: task.UpdateUser = requested.UpdateUser task.Name = requested.Name task.Locator = requested.Locator - task.Data = requested.Data - task.Priority = requested.Priority task.Policy = requested.Policy task.TTL = requested.TTL + db = db.Where( + "state IN (?)", + []string{ + Ready, + Pending, + QuotaBlocked, + Postponed, + }) + err = db.Save(task).Error + if err != nil { + err = liberr.Wrap(err) + return + } default: // discarded. return } - err = m.findRefs(task) - if err != nil { - return - } - err = db.Save(task).Error - if err != nil { - err = liberr.Wrap(err) - return - } return } @@ -268,7 +280,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { if err != nil { return } - err = m.DB.Save(task).Error + err = task.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -384,7 +396,7 @@ func (m *Manager) disconnected(list []*Task) (kept []*Task, err error) { task.State = Failed task.Terminated = &mark task.Error("Error", "Hub is disconnected.") - err = m.DB.Save(task).Error + err = task.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -455,7 +467,7 @@ func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) { task.Error("Error", err.Error()) task.Terminated = &mark task.State = Failed - err = m.DB.Save(task).Error + err = task.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -608,7 +620,7 @@ func (m *Manager) postpone(list []*Task) (err error) { updated = true } if updated { - err = m.DB.Save(task).Error + err = task.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -640,7 +652,7 @@ func (m *Manager) adjustPriority(list []*Task) (err error) { return } task.State = Ready - err = m.DB.Save(task).Error + err = task.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -671,7 +683,7 @@ func (m *Manager) createPod(list []*Task) (err error) { Log.Error(err, "") return } - err = m.DB.Save(ready).Error + err = ready.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -788,7 +800,7 @@ func (m *Manager) preempt(list []*Task) (err error) { p.Errors = nil p.Event(Preempted, reason) Log.Info(reason) - err = m.DB.Save(p).Error + err = p.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -848,7 +860,7 @@ func (m *Manager) updateRunning() { } } } - err = m.DB.Save(&running).Error + err = running.update(m.DB) if err != nil { err = liberr.Wrap(err) return @@ -1627,6 +1639,25 @@ func (r *Task) containsAny(str string, substr ...string) (matched bool) { return } +// update manager controlled fields. +func (r *Task) update(db *gorm.DB) (err error) { + db = db.Debug() // REMOVE THIS + db = db.Select( + "Addon", + "Extensions", + "State", + "Priority", + "Started", + "Terminated", + "Events", + "Error", + "Retries", + "Attached", + "Pod") + err = db.Save(r).Error + return +} + // Event represents a pod event. type Event struct { Type string From ff3aedda0848b0bc2c6d7b1fe275988f57929687 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 15 Jul 2024 07:59:37 -0700 Subject: [PATCH 2/3] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/task/manager.go b/task/manager.go index e04338ae..5af56982 100644 --- a/task/manager.go +++ b/task/manager.go @@ -174,31 +174,32 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { } // Update update task. -func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { - task := &Task{} - err = db.First(task, requested.ID).Error +func (m *Manager) Update(db *gorm.DB, task *Task) (err error) { + found := &Task{} + err = db.First(found, task.ID).Error if err != nil { return } switch task.State { case Created: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Kind = requested.Kind - task.Addon = requested.Addon - task.Extensions = requested.Extensions - task.State = requested.State - task.Locator = requested.Locator - task.Priority = requested.Priority - task.Policy = requested.Policy - task.TTL = requested.TTL - task.Data = requested.Data - task.ApplicationID = requested.ApplicationID + db = db.Select( + "UpdateUser", + "Name", + "Kind", + "Addon", + "Extensions", + "State", + "Locator", + "Priority", + "Policy", + "TTL", + "Data", + "ApplicationID") err = m.findRefs(task) if err != nil { return } - db = db.Where("state", Created) + db = db.Where("State", Created) err = db.Save(task).Error if err != nil { err = liberr.Wrap(err) @@ -208,11 +209,12 @@ func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { Pending, QuotaBlocked, Postponed: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Locator = requested.Locator - task.Policy = requested.Policy - task.TTL = requested.TTL + db = db.Select( + "UpdateUser", + "Name", + "Locator", + "Policy", + "TTL") db = db.Where( "state IN (?)", []string{ @@ -1641,7 +1643,6 @@ func (r *Task) containsAny(str string, substr ...string) (matched bool) { // update manager controlled fields. func (r *Task) update(db *gorm.DB) (err error) { - db = db.Debug() // REMOVE THIS db = db.Select( "Addon", "Extensions", From a17675ac352b484721f9213e66bee585ad9b0e94 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 15 Jul 2024 08:48:04 -0700 Subject: [PATCH 3/3] checkpoint Signed-off-by: Jeff Ortel --- task/manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/task/manager.go b/task/manager.go index 5af56982..6d2f8a4c 100644 --- a/task/manager.go +++ b/task/manager.go @@ -174,13 +174,13 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { } // Update update task. -func (m *Manager) Update(db *gorm.DB, task *Task) (err error) { +func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { found := &Task{} - err = db.First(found, task.ID).Error + err = db.First(found, requested.ID).Error if err != nil { return } - switch task.State { + switch found.State { case Created: db = db.Select( "UpdateUser", @@ -195,12 +195,12 @@ func (m *Manager) Update(db *gorm.DB, task *Task) (err error) { "TTL", "Data", "ApplicationID") - err = m.findRefs(task) + err = m.findRefs(requested) if err != nil { return } db = db.Where("State", Created) - err = db.Save(task).Error + err = db.Save(requested).Error if err != nil { err = liberr.Wrap(err) return @@ -223,7 +223,7 @@ func (m *Manager) Update(db *gorm.DB, task *Task) (err error) { QuotaBlocked, Postponed, }) - err = db.Save(task).Error + err = db.Save(requested).Error if err != nil { err = liberr.Wrap(err) return