Skip to content

Commit

Permalink
feat: support artifacts retention
Browse files Browse the repository at this point in the history
  • Loading branch information
zhujian7 committed Apr 15, 2020
1 parent 4c9b637 commit bfbd2d4
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 8 deletions.
5 changes: 5 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main

import (
"flag"
"time"

"github.com/caicloud/nirvana"
nconfig "github.com/caicloud/nirvana/config"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/caicloud/cyclone/pkg/server/apis"
"github.com/caicloud/cyclone/pkg/server/apis/filters"
"github.com/caicloud/cyclone/pkg/server/apis/modifiers"
"github.com/caicloud/cyclone/pkg/server/biz/artifact"
_ "github.com/caicloud/cyclone/pkg/server/biz/scm/bitbucket"
_ "github.com/caicloud/cyclone/pkg/server/biz/scm/github"
_ "github.com/caicloud/cyclone/pkg/server/biz/scm/gitlab"
Expand Down Expand Up @@ -115,6 +117,9 @@ func main() {

initialize(opts)

artifactManager := artifact.NewManager()
go artifactManager.CleanPeriodically(config.Config.Artifact.RetentionSeconds * time.Second)

// Create nirvana command.
cmd := nconfig.NewNamedNirvanaCommand("cyclone-server", &nconfig.Option{
IP: config.Config.CycloneServerHost,
Expand Down
4 changes: 4 additions & 0 deletions helm/cyclone/templates/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ data:
"client_set": {
"qps": {{ .Values.server.clientSet.qps }},
"burst": {{ .Values.server.clientSet.burst }}
},
"artifact": {
"retention_seconds": {{ .Values.server.artifact.retentionSeconds }},
"available_disk_percentage": {{ .Values.server.artifact.availableDiskPercentage }}
}
}
Expand Down
3 changes: 3 additions & 0 deletions helm/cyclone/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ server:
clientSet:
qps: 50.0
burst: 100
artifact:
retentionSeconds: 604800
availableDiskPercentage: 0.2

# Cyclone web variables
web:
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/apis/v1alpha1/descriptors/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ var workflowrun = []definition.Descriptor{
Source: definition.Body,
Description: "JSON body to describe the new workflowrun",
},
{
Source: definition.Header,
Name: httputil.HeaderDryRun,
Default: false,
Description: "Whether to do a rehearsal of creating workflowRun",
},
},
Results: definition.DataErrorResults("workflowrun"),
},
Expand Down
103 changes: 103 additions & 0 deletions pkg/server/biz/artifact/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package artifact

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"k8s.io/kubernetes/pkg/volume/util/fs"

"github.com/caicloud/cyclone/pkg/server/common"
"github.com/caicloud/nirvana/log"
)

// Manager manages artifacts
type Manager struct {
cleanPeriod time.Duration
artifactHomeDir string
}

// NewManager Initiates a artifacts manager.
// If artifactHomeDir not passed, the default '/var/lib/cyclone' will be used.
func NewManager(artifactHomeDir ...string) *Manager {
var home = common.CycloneHome
if artifactHomeDir != nil && artifactHomeDir[0] != "" {
home = artifactHomeDir[0]
}
m := &Manager{
cleanPeriod: time.Duration(time.Hour),
artifactHomeDir: home,
}

return m
}

// CleanPeriodically will clean up artifacts which exceeded retention time periodically.
// This func will run forever unless panics, you'd better invoke it by a go-routine.
func (m *Manager) CleanPeriodically(retention time.Duration) {
t := time.NewTicker(time.Duration(time.Hour))
defer t.Stop()

for ; true; <-t.C {
log.Info("Start to scan and clean artifacts")
if err := m.scanAndClean(selectArtifact, retention); err != nil {
log.Warningf("Clean artifacts error: ", err)
}
}
}

type artifactSelector func(path string) bool

func selectArtifact(path string) bool {
path = strings.TrimPrefix(path, "/")
// artifact path must be in format of {tenant}/{project}/{workflow}/{workflowrun}/artifacts/{stage}/xxx
splitPaths := strings.SplitN(path, "/", 7)

if len(splitPaths) < 7 {
return false
}

if splitPaths[4] != "artifacts" {
return false
}
return true
}

// scanAndClean scans the artifacts folders and finds out exceeded retention time artifacts, and
// then delete them.
func (m *Manager) scanAndClean(selectArtifact artifactSelector, retention time.Duration) error {
return filepath.Walk(m.artifactHomeDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("walk in %s error: %v", path, err)
}

if !selectArtifact(strings.TrimPrefix(path, m.artifactHomeDir)) {
return nil
}

if time.Now().Before(info.ModTime().Add(retention)) {
return nil
}

log.Infof("Start to remove artifact: %s", path)
return os.RemoveAll(path)
})
}

// GetDiskAvailablePercentage returns available space in percentage format of the artifact home folder.
func (m *Manager) GetDiskAvailablePercentage() (float64, error) {
s := time.Now()

available, capacity, _, _, _, _, err := fs.FsInfo(m.artifactHomeDir)
if err != nil {
return 0, err
}

e := time.Now()
log.Infof("fsInfo cost time: %s, available: %d, capacity: %d, percentage: %v",
e.Sub(s), available, capacity, float64(available)/float64(capacity))

return float64(available) / float64(capacity), nil
}
19 changes: 19 additions & 0 deletions pkg/server/biz/artifact/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package artifact

import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestScanAndClean(t *testing.T) {
manager := NewManager("testdata")
err := manager.scanAndClean(selectArtifact, time.Duration(0))
assert.Nil(t, err)

_, err = os.Stat("testdata/tenant1/project1/wf1/wfr1/artifacts/stage1/artifacts.tar")
assert.NotNil(t, err)
assert.True(t, os.IsNotExist(err))
}
Empty file.
Empty file.
3 changes: 3 additions & 0 deletions pkg/server/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ const (

// CacheCleanupContainerName is container name of the pod used to cleanup acceleration cache.
CacheCleanupContainerName = "cache-cleaner"

// CycloneHome is the home folder for Cyclone.
CycloneHome = "/var/lib/cyclone"
)
25 changes: 25 additions & 0 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/caicloud/nirvana/log"
core_v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -71,6 +72,20 @@ type CycloneServerConfig struct {
// ClientSet holds the common attributes that can be passed to a Kubernetes client on cyclone server handlers
// initialization.
ClientSet ClientSetConfig `json:"client_set"`

// Artifact config for artifacts which are managed by cyclone server
Artifact ArtifactConfig `json:"artifact"`
}

// ArtifactConfig configures artifacts which are managed by cyclone server
type ArtifactConfig struct {
// RetentionSeconds describes the retention time for artifacts, cyclone will delete artifacts exceeded retention
// time periodically.
RetentionSeconds time.Duration `json:"retention_seconds"`

// AvailableDiskPercentage is a threshold, if disk available space is less than this value, artifacts can not
// be stored.
AvailableDiskPercentage float64 `json:"available_disk_percentage"`
}

// ClientSetConfig defines rate limit config for a Kubernetes client
Expand Down Expand Up @@ -202,6 +217,16 @@ func modifier(config *CycloneServerConfig) {
core_v1.ResourceRequestsMemory: common.QuotaMemoryRequest,
}
}

if config.Artifact.RetentionSeconds == 0 {
log.Warning("artifact RetentionSeconds not configured, will use default value '604800'")
config.Artifact.RetentionSeconds = 60 * 60 * 24 * 7
}

if config.Artifact.AvailableDiskPercentage == 0 {
log.Warning("artifact AvailableDiskPercentage not configured, will use default value '0.2'")
config.Artifact.AvailableDiskPercentage = 0.2
}
}

// GetWebhookURLPrefix returns webhook callback url prefix. It tries to get the url from "WEBHOOK_URL_PREFIX"
Expand Down
8 changes: 3 additions & 5 deletions pkg/server/handler/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
)

const (
// cycloneHome is the home folder for Cyclone.
cycloneHome = "/var/lib/cyclone"

// logsFolderName is the folder name for logs files.
logsFolderName = "logs"
Expand Down Expand Up @@ -55,14 +53,14 @@ func getLogFolder(tenant, project, workflow, workflowrun string) (string, error)
if tenant == "" || project == "" || workflow == "" || workflowrun == "" {
return "", fmt.Errorf("tenant/project/workflow/workflowrun can not be empty")
}
return strings.Join([]string{cycloneHome, tenant, project, workflow, workflowrun, logsFolderName}, string(os.PathSeparator)), nil
return strings.Join([]string{common.CycloneHome, tenant, project, workflow, workflowrun, logsFolderName}, string(os.PathSeparator)), nil
}

func getArtifactFolder(tenant, project, workflow, workflowrun, stage string) (string, error) {
if tenant == "" || project == "" || workflow == "" || workflowrun == "" || stage == "" {
return "", fmt.Errorf("tenant/project/workflow/workflowrun/stage can not be empty")
}
return strings.Join([]string{cycloneHome, tenant, project, workflow, workflowrun, artifactFolderName, stage}, string(os.PathSeparator)), nil
return strings.Join([]string{common.CycloneHome, tenant, project, workflow, workflowrun, artifactFolderName, stage}, string(os.PathSeparator)), nil
}

// deleteCollections deletes collections in the sub paths of a tenant in the pvc, collections including:
Expand Down Expand Up @@ -90,7 +88,7 @@ func deleteCollections(tenant string, subpaths ...string) error {
}

func getCollectionFolder(tenant string, subpaths ...string) string {
paths := []string{cycloneHome, tenant}
paths := []string{common.CycloneHome, tenant}
paths = append(paths, subpaths...)
return strings.Join(paths, string(os.PathSeparator))
}
Expand Down
60 changes: 59 additions & 1 deletion pkg/server/handler/v1alpha1/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"github.com/caicloud/cyclone/pkg/meta"
api "github.com/caicloud/cyclone/pkg/server/apis/v1alpha1"
"github.com/caicloud/cyclone/pkg/server/biz/accelerator"
"github.com/caicloud/cyclone/pkg/server/biz/artifact"
"github.com/caicloud/cyclone/pkg/server/biz/stream"
"github.com/caicloud/cyclone/pkg/server/biz/utils"
"github.com/caicloud/cyclone/pkg/server/common"
"github.com/caicloud/cyclone/pkg/server/config"
"github.com/caicloud/cyclone/pkg/server/handler"
"github.com/caicloud/cyclone/pkg/server/handler/v1alpha1/sorter"
"github.com/caicloud/cyclone/pkg/server/types"
Expand All @@ -42,7 +44,11 @@ const (
)

// CreateWorkflowRun ...
func CreateWorkflowRun(ctx context.Context, project, workflow, tenant string, wfr *v1alpha1.WorkflowRun) (*v1alpha1.WorkflowRun, error) {
func CreateWorkflowRun(ctx context.Context, project, workflow, tenant string, wfr *v1alpha1.WorkflowRun, dryrun bool) (*v1alpha1.WorkflowRun, error) {
if dryrun {
return workflowRunCreationDryRun(project, workflow, tenant, wfr)
}

modifiers := []CreationModifier{GenerateNameModifier, InjectProjectLabelModifier, InjectWorkflowLabelModifier, InjectWorkflowOwnerRefModifier}
for _, modifier := range modifiers {
err := modifier(tenant, project, workflow, wfr)
Expand All @@ -59,6 +65,45 @@ func CreateWorkflowRun(ctx context.Context, project, workflow, tenant string, wf
return handler.K8sClient.CycloneV1alpha1().WorkflowRuns(common.TenantNamespace(tenant)).Create(wfr)
}

func workflowRunCreationDryRun(project, workflow, tenant string, wfr *v1alpha1.WorkflowRun) (*v1alpha1.WorkflowRun, error) {
wf, err := handler.K8sClient.CycloneV1alpha1().Workflows(common.TenantNamespace(tenant)).Get(workflow, metav1.GetOptions{})
if err != nil {
return nil, err
}

artifactManager := artifact.NewManager()
go artifactManager.CleanPeriodically(config.Config.Artifact.RetentionSeconds * time.Second)

for _, stage := range wf.Spec.Stages {
stg, err := handler.K8sClient.CycloneV1alpha1().Stages(common.TenantNamespace(tenant)).Get(stage.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

if stg.Spec.Pod == nil {
return nil, nil
}

// Check disk space for output http resources
for _, resource := range stg.Spec.Pod.Outputs.Resources {
if resource.Type == v1alpha1.HTTPResourceType {
percentage, err := artifactManager.GetDiskAvailablePercentage()
if err != nil {
log.Errorf("Get disk available space error: %v", err)
return nil, err
}
if percentage < config.Config.Artifact.AvailableDiskPercentage {
err = cerr.ErrorNoSpaceLeftForHTTPResource.Error(common.TenantNamespace(tenant), wfr.Name, stage.Name, percentage)
log.Error(err)
return nil, err
}
return nil, nil
}
}
}
return nil, nil
}

// workflowReference returns a workflowRef
func workflowReference(tenant, workflow string) *core_v1.ObjectReference {
return &core_v1.ObjectReference{
Expand Down Expand Up @@ -469,6 +514,19 @@ func GetContainerLogs(ctx context.Context, project, workflow, workflowrun, tenan

// ReceiveArtifacts receives artifacts produced by workflowrun stage.
func ReceiveArtifacts(ctx context.Context, workflowrun, namespace, stage string) error {
artifactManager := artifact.NewManager()
percentage, err := artifactManager.GetDiskAvailablePercentage()
if err != nil {
log.Errorf("Get disk available space error: %v", err)
return err
}

if percentage < config.Config.Artifact.AvailableDiskPercentage {
err = cerr.ErrorNoSpaceLeftForHTTPResource.Error(namespace, workflowrun, stage, percentage)
log.Error(err)
return nil
}

// get workflowrun
wfr, err := handler.K8sClient.CycloneV1alpha1().WorkflowRuns(namespace).Get(workflowrun, metav1.GetOptions{})
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/cerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
ReasonExistRunningWorkflowRuns = "ReasonExistRunningWorkflows"
// ReasonCreateIntegrationFailed represents creating integration failed error
ReasonCreateIntegrationFailed = "ReasonCreateIntegrationFailed"
// ReasonNoSpaceLeftForHTTPResource represents no space left to store http resources error
ReasonNoSpaceLeftForHTTPResource = "ReasonNoSpaceLeftForHTTPResource"
)

var (
Expand Down Expand Up @@ -137,6 +139,10 @@ var (
// ErrorExistRunningWorkflowRuns defines error that can not update persisten volume while there are workflowRuns running.
ErrorExistRunningWorkflowRuns = nerror.InternalServerError.Build(ReasonExistRunningWorkflowRuns,
"can not update persistent volume, since there are WorkflowRuns running, need to stop following WorkflowRuns firstly: ${workflows}")

// ErrorNoSpaceLeftForHTTPResource defines error that can not update persisten volume while there are workflowRuns running.
ErrorNoSpaceLeftForHTTPResource = nerror.InternalServerError.Build(ReasonNoSpaceLeftForHTTPResource,
"no enough space left to store artifacts for ${namespace}/${workflowRun}/${stage}, available disk percentage ${percentage}")
)

// ConvertK8sError converts k8s error to Cyclone errors.
Expand Down
Loading

0 comments on commit bfbd2d4

Please sign in to comment.