-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
controller.go
209 lines (196 loc) · 6.88 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"fmt"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/dao/models"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/operation/execution"
"github.com/goharbor/harbor/src/replication/operation/flow"
"github.com/goharbor/harbor/src/replication/operation/scheduler"
)
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
// trigger is used to specify what this replication is triggered by
StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error)
StopReplication(int64) error
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
GetExecution(int64) (*models.Execution, error)
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
GetTask(int64) (*models.Task, error)
UpdateTaskStatus(id int64, status string, statusCondition ...string) error
GetTaskLog(int64) ([]byte, error)
}
const (
maxReplicators = 1024
)
// NewController returns a controller implementation
func NewController(js job.Client) Controller {
ctl := &controller{
replicators: make(chan struct{}, maxReplicators),
executionMgr: execution.NewDefaultManager(),
scheduler: scheduler.NewScheduler(js),
flowCtl: flow.NewController(),
}
for i := 0; i < maxReplicators; i++ {
ctl.replicators <- struct{}{}
}
return ctl
}
type controller struct {
replicators chan struct{}
flowCtl flow.Controller
executionMgr execution.Manager
scheduler scheduler.Scheduler
}
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
if !policy.Enabled {
return 0, fmt.Errorf("the policy %d is disabled", policy.ID)
}
if len(trigger) == 0 {
trigger = model.TriggerTypeManual
}
id, err := createExecution(c.executionMgr, policy.ID, trigger)
if err != nil {
return 0, err
}
// control the count of concurrent replication requests
log.Debugf("waiting for the available replicator ...")
<-c.replicators
log.Debugf("got an available replicator, starting the replication ...")
go func() {
defer func() {
c.replicators <- struct{}{}
}()
flow := c.createFlow(id, policy, resource)
if n, err := c.flowCtl.Start(flow); err != nil {
// only update the execution when got error.
// if got no error, it will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
}
}()
return id, nil
}
// create different replication flows according to the input parameters
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, resource)
}
resources := []*model.Resource{}
if resource != nil {
resources = append(resources, resource)
}
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy, resources...)
}
func (c *controller) StopReplication(executionID int64) error {
_, tasks, err := c.ListTasks(&models.TaskQuery{
ExecutionID: executionID,
})
if err != nil {
return err
}
// no tasks, just set its status to "stopped"
if len(tasks) == 0 {
execution, err := c.executionMgr.Get(executionID)
if err != nil {
return err
}
if execution == nil {
return fmt.Errorf("the execution %d not found", executionID)
}
if execution.Status != models.ExecutionStatusInProgress {
log.Debugf("the execution %d isn't in progress, no need to stop", executionID)
return nil
}
if err = c.executionMgr.Update(&models.Execution{
ID: executionID,
Status: models.ExecutionStatusStopped,
EndTime: time.Now(),
}, models.ExecutionPropsName.Status, models.ExecutionPropsName.EndTime); err != nil {
return err
}
log.Debugf("the status of execution %d is set to stopped", executionID)
}
// got tasks, stopping the tasks one by one
for _, task := range tasks {
if !isTaskRunning(task) {
log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status)
continue
}
if err = c.scheduler.Stop(task.JobID); err != nil {
return err
}
log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID)
}
return nil
}
func isTaskRunning(task *models.Task) bool {
if task == nil {
return false
}
switch task.Status {
case models.TaskStatusSucceed,
models.TaskStatusStopped,
models.TaskStatusFailed:
return false
}
return true
}
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return c.executionMgr.List(query...)
}
func (c *controller) GetExecution(executionID int64) (*models.Execution, error) {
return c.executionMgr.Get(executionID)
}
func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
return c.executionMgr.ListTasks(query...)
}
func (c *controller) GetTask(id int64) (*models.Task, error) {
return c.executionMgr.GetTask(id)
}
func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
}
func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
return c.executionMgr.GetTaskLog(taskID)
}
// create the execution record in database
func createExecution(mgr execution.Manager, policyID int64, trigger model.TriggerType) (int64, error) {
id, err := mgr.Create(&models.Execution{
PolicyID: policyID,
Trigger: trigger,
Status: models.ExecutionStatusInProgress,
StartTime: time.Now(),
})
if err != nil {
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policyID, err)
}
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
return id, nil
}