Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Preemption (policy) enabled overwritten by task manager. #722

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 74 additions & 42 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,49 +175,63 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) {

// Update update task.
func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) {
task := &Task{}
err = db.First(task, requested.ID).Error
found := &Task{}
err = db.First(found, requested.ID).Error
if err != nil {
return
}
switch task.State {
case Created,
Ready:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Kind = requested.Kind
task.Addon = requested.Addon
task.Extensions = requested.Extensions
task.State = requested.State
task.Locator = requested.Locator
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
task.Data = requested.Data
task.ApplicationID = requested.ApplicationID
case Pending,
switch found.State {
case Created:
db = db.Select(
"UpdateUser",
"Name",
"Kind",
"Addon",
"Extensions",
"State",
"Locator",
"Priority",
"Policy",
"TTL",
"Data",
"ApplicationID")
err = m.findRefs(requested)
if err != nil {
return
}
db = db.Where("State", Created)
err = db.Save(requested).Error
if err != nil {
err = liberr.Wrap(err)
return
}
case Ready,
Pending,
QuotaBlocked,
Postponed:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Locator = requested.Locator
task.Data = requested.Data
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
db = db.Select(
"UpdateUser",
"Name",
"Locator",
"Policy",
"TTL")
db = db.Where(
"state IN (?)",
[]string{
Ready,
Pending,
QuotaBlocked,
Postponed,
})
err = db.Save(requested).Error
if err != nil {
err = liberr.Wrap(err)
return
}
default:
// discarded.
return
}
err = m.findRefs(task)
if err != nil {
return
}
err = db.Save(task).Error
if err != nil {
err = liberr.Wrap(err)
return
}
return
}

Expand Down Expand Up @@ -268,7 +282,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) {
if err != nil {
return
}
err = m.DB.Save(task).Error
err = task.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -384,7 +398,7 @@ func (m *Manager) disconnected(list []*Task) (kept []*Task, err error) {
task.State = Failed
task.Terminated = &mark
task.Error("Error", "Hub is disconnected.")
err = m.DB.Save(task).Error
err = task.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -455,7 +469,7 @@ func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) {
task.Error("Error", err.Error())
task.Terminated = &mark
task.State = Failed
err = m.DB.Save(task).Error
err = task.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -608,7 +622,7 @@ func (m *Manager) postpone(list []*Task) (err error) {
updated = true
}
if updated {
err = m.DB.Save(task).Error
err = task.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -640,7 +654,7 @@ func (m *Manager) adjustPriority(list []*Task) (err error) {
return
}
task.State = Ready
err = m.DB.Save(task).Error
err = task.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -671,7 +685,7 @@ func (m *Manager) createPod(list []*Task) (err error) {
Log.Error(err, "")
return
}
err = m.DB.Save(ready).Error
err = ready.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -788,7 +802,7 @@ func (m *Manager) preempt(list []*Task) (err error) {
p.Errors = nil
p.Event(Preempted, reason)
Log.Info(reason)
err = m.DB.Save(p).Error
err = p.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -848,7 +862,7 @@ func (m *Manager) updateRunning() {
}
}
}
err = m.DB.Save(&running).Error
err = running.update(m.DB)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -1627,6 +1641,24 @@ func (r *Task) containsAny(str string, substr ...string) (matched bool) {
return
}

// update manager controlled fields.
func (r *Task) update(db *gorm.DB) (err error) {
db = db.Select(
"Addon",
"Extensions",
"State",
"Priority",
"Started",
"Terminated",
"Events",
"Error",
"Retries",
"Attached",
"Pod")
err = db.Save(r).Error
return
}

// Event represents a pod event.
type Event struct {
Type string
Expand Down
Loading