Skip to content

Commit

Permalink
refactor: dragonfly implements in p2p provider
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Sep 12, 2024
1 parent 93b0f11 commit ba0b332
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 83 deletions.
290 changes: 207 additions & 83 deletions src/pkg/p2p/preheat/provider/dragonfly.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,138 @@
package provider

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/client"
"github.com/olekukonko/tablewriter"
)

const (
healthCheckEndpoint = "/_ping"
preheatEndpoint = "/preheats"
preheatTaskEndpoint = "/preheats/{task_id}"
dragonflyPending = "WAITING"
dragonflyFailed = "FAILED"
// dragonflyHealthPath is the health check path for dragonfly openapi.
dragonflyHealthPath = "/healthy"

// dragonflyJobPath is the job path for dragonfly openapi.
dragonflyJobPath = "/oapi/v1/jobs"
)

type dragonflyPreheatCreateResp struct {
ID string `json:"ID"`
const (
// dragonflyJobPendingState is the pending state of the job, which means
// the job is waiting to be processed and running.
dragonflyJobPendingState = "PENDING"

// dragonflyJobSuccessState is the success state of the job, which means
// the job is processed successfully.
dragonflyJobSuccessState = "SUCCESS"

// dragonflyJobFailureState is the failure state of the job, which means
// the job is processed failed.
dragonflyJobFailureState = "FAILURE"
)

type dragonflyCreateJobRequest struct {
// Type is the job type, support preheat.
Type string `json:"type" binding:"required"`

// Args is the preheating args.
Args dragonflyCreateJobRequestArgs `json:"args" binding:"omitempty"`

// SchedulerClusterIDs is the scheduler cluster ids for preheating.
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
}

type dragonflyPreheatInfo struct {
ID string `json:"ID"`
StartTime string `json:"startTime,omitempty"`
FinishTime string `json:"finishTime,omitempty"`
ErrorMsg string `json:"errorMsg"`
Status string
type dragonflyCreateJobRequestArgs struct {
// Type is the preheating type, support image and file.
Type string `json:"type"`

// URL is the image url for preheating.
URL string `json:"url"`

// Tag is the tag for preheating.
Tag string `json:"tag"`

// FilteredQueryParams is the filtered query params for preheating.
FilteredQueryParams string `json:"filtered_query_params"`

// Headers is the http headers for authentication.
Headers map[string]string `json:"headers"`

// Scope is the scope for preheating, default is single_peer.
Scope string `json:"scope"`

// BatchSize is the batch size for preheating all peers, default is 50.
ConcurrentCount int64 `json:"concurrent_count"`

// Timeout is the timeout for preheating, default is 30 minutes.
Timeout time.Duration `json:"timeout"`
}

type dragonflyJobResponse struct {
// ID is the job id.
ID string `json:"id"`

// CreatedAt is the job created time.
CreatedAt time.Time `json:"created_at"`

// UpdatedAt is the job updated time.
UpdatedAt time.Time `json:"updated_at"`

// State is the job state, support PENDING, SUCCESS, FAILURE.
State string `json:"state"`

// Results is the job results.
Result struct {

// JobStates is the job states, including each job state.
JobStates []struct {

// Error is the job error message.
Error string `json:"error"`

// Results is the job results.
Results []struct {

// SuccessTasks is the success tasks.
SuccessTasks []*struct {

// TaskID is the task id.
TaskID string `json:"task_id"`

// Hostname is the hostname of the task.
Hostname string `json:"hostname"`

// IP is the ip of the task.
IP string `json:"ip"`
} `json:"success_tasks"`

// FailureTasks is the failure tasks.
FailureTasks []*struct {

// TaskID is the task id.
TaskID string `json:"task_id"`

// Hostname is the hostname of the task.
Hostname string `json:"hostname"`

// IP is the ip of the task.
IP string `json:"ip"`

// Description is the failure description.
Description string `json:"description"`
} `json:"failure_tasks"`

// SchedulerClusterID is the scheduler cluster id.
SchedulerClusterID uint `json:"scheduler_cluster_id"`
} `json:"results"`
} `json:"job_states"`
} `json:"result"`
}

// DragonflyDriver implements the provider driver interface for Alibaba dragonfly.
Expand All @@ -60,7 +161,7 @@ func (dd *DragonflyDriver) Self() *Metadata {
ID: "dragonfly",
Name: "Dragonfly",
Icon: "https://raw.githubusercontent.com/dragonflyoss/Dragonfly2/master/docs/images/logo/dragonfly-linear.png",
Version: "2.1.56",
Version: "2.1.57",
Source: "https://github.com/dragonflyoss/Dragonfly2",
Maintainers: []string{"chlins.zhang@gmail.com", "gaius.qi@gmail.com"},
}
Expand All @@ -72,13 +173,13 @@ func (dd *DragonflyDriver) GetHealth() (*DriverStatus, error) {
return nil, errors.New("missing instance metadata")
}

url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), healthCheckEndpoint)
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyHealthPath)
url, err := lib.ValidateHTTPURL(url)
if err != nil {
return nil, err
}
_, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
if err != nil {

if _, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil); err != nil {
// Unhealthy
return nil, err
}
Expand All @@ -99,97 +200,109 @@ func (dd *DragonflyDriver) Preheat(preheatingImage *PreheatImage) (*PreheatingSt
return nil, errors.New("no image specified")
}

taskStatus := provider.PreheatingStatusPending // default
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), preheatEndpoint)
bytes, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), preheatingImage, nil)
if err != nil {
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusAlreadyReported {
// If the resource was preheated already with empty task ID, we should set preheat status to success.
// Otherwise later querying for the task
taskStatus = provider.PreheatingStatusSuccess
} else {
return nil, err
}
// Construct the preheat job request by the given parameters of the preheating image .
req := &dragonflyCreateJobRequest{
Type: "preheat",
// TODO: Support set SchedulerClusterIDs, FilteredQueryParam, ConcurrentCount and Timeout.
Args: dragonflyCreateJobRequestArgs{
Type: preheatingImage.Type,
URL: preheatingImage.URL,
Headers: headerToMapString(preheatingImage.Headers),
Scope: preheatingImage.Scope,
},
}

result := &dragonflyPreheatCreateResp{}
if err := json.Unmarshal(bytes, result); err != nil {
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath)
data, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), req, nil)
if err != nil {
return nil, err
}

return &PreheatingStatus{
TaskID: result.ID,
Status: taskStatus,
}, nil
}

// CheckProgress implements @Driver.CheckProgress.
func (dd *DragonflyDriver) CheckProgress(taskID string) (*PreheatingStatus, error) {
status, err := dd.getProgressStatus(taskID)
if err != nil {
resp := &dragonflyJobResponse{}
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}

// If preheat job already exists
if strings.Contains(status.ErrorMsg, "preheat task already exists, id:") {
if taskID, err = getTaskExistedFromErrMsg(status.ErrorMsg); err != nil {
return nil, err
}
if status, err = dd.getProgressStatus(taskID); err != nil {
return nil, err
}
}
var (
successMessage string
errorMessage string
)
state := provider.PreheatingStatusPending
switch resp.State {
case dragonflyJobPendingState:
state = provider.PreheatingStatusRunning
case dragonflyJobSuccessState:
state = provider.PreheatingStatusSuccess

if status.Status == dragonflyPending {
status.Status = provider.PreheatingStatusPending
} else if status.Status == dragonflyFailed {
status.Status = provider.PreheatingStatusFail
}
var buffer bytes.Buffer
table := tablewriter.NewWriter(&buffer)
table.SetHeader([]string{"Hostname", "IP", "Cluster ID", "State"})
for _, jobState := range resp.Result.JobStates {
for _, result := range jobState.Results {
// Write the success tasks records to the table.
for _, successTask := range result.SuccessTasks {
table.Append([]string{successTask.Hostname, successTask.IP, string(result.SchedulerClusterID), dragonflyJobSuccessState})
}

res := &PreheatingStatus{
Status: status.Status,
TaskID: taskID,
}
if status.StartTime != "" {
res.StartTime = status.StartTime
}
if status.FinishTime != "" {
res.FinishTime = status.FinishTime
}
// Write the failure tasks records to the table.
for _, failureTask := range result.FailureTasks {
table.Append([]string{failureTask.Hostname, failureTask.IP, string(result.SchedulerClusterID), dragonflyJobFailureState})
}
}
}

return res, nil
}
successMessage = buffer.String()
case dragonflyJobFailureState:
var errs error
state = provider.PreheatingStatusFail
for _, jobState := range resp.Result.JobStates {
errs = errors.Join(errs, errors.New(jobState.Error))
}

func getTaskExistedFromErrMsg(msg string) (string, error) {
begin := strings.Index(msg, "preheat task already exists, id:") + 32
end := strings.LastIndex(msg, "\"}")
if end-begin <= 0 {
return "", errors.Errorf("can't find existed task id by error msg:%s", msg)
errorMessage = errs.Error()
default:
state = provider.PreheatingStatusFail
errorMessage = fmt.Sprintf("unknown state: %s", resp.State)
}
return msg[begin:end], nil

return &PreheatingStatus{
TaskID: resp.ID,
Status: state,
Message: successMessage,
Error: errorMessage,
StartTime: resp.CreatedAt.Format(time.RFC3339),
FinishTime: resp.UpdatedAt.Format(time.RFC3339),
}, nil
}

func (dd *DragonflyDriver) getProgressStatus(taskID string) (*dragonflyPreheatInfo, error) {
// CheckProgress implements @Driver.CheckProgress.
func (dd *DragonflyDriver) CheckProgress(taskID string) (*PreheatingStatus, error) {
if dd.instance == nil {
return nil, errors.New("missing instance metadata")
}

if len(taskID) == 0 {
if taskID == "" {
return nil, errors.New("no task ID")
}

path := strings.Replace(preheatTaskEndpoint, "{task_id}", taskID, 1)
url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), path)
bytes, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
url := fmt.Sprintf("%s%s/%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath, taskID)
data, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil)
if err != nil {
return nil, err
}

status := &dragonflyPreheatInfo{}
if err := json.Unmarshal(bytes, status); err != nil {
resp := &dragonflyJobResponse{}
if err := json.Unmarshal(data, resp); err != nil {
return nil, err
}
return status, nil

return &PreheatingStatus{
TaskID: resp.ID,
Status: resp.State,
Error: resp.Result.JobStates[0].Error,
StartTime: resp.CreatedAt.Format(time.RFC3339),
FinishTime: resp.UpdatedAt.Format(time.RFC3339),
}, nil
}

func (dd *DragonflyDriver) getCred() *auth.Credential {
Expand All @@ -198,3 +311,14 @@ func (dd *DragonflyDriver) getCred() *auth.Credential {
Data: dd.instance.AuthInfo,
}
}

func headerToMapString(header map[string]interface{}) map[string]string {
m := make(map[string]string)
for k, v := range header {
if s, ok := v.(string); ok {
m[k] = s
}
}

return m
}
1 change: 1 addition & 0 deletions src/pkg/p2p/preheat/provider/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type DriverStatus struct {
type PreheatingStatus struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
StartTime string `json:"start_time"`
FinishTime string `json:"finish_time"`
Expand Down

0 comments on commit ba0b332

Please sign in to comment.