diff --git a/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf b/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf index 5a6c324b30..31f8f1a687 100644 --- a/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf +++ b/installer/helm/chart/volcano/config/volcano-scheduler-ci.conf @@ -9,3 +9,4 @@ tiers: - name: predicates - name: proportion - name: nodeorder + - name: binpack diff --git a/installer/helm/chart/volcano/config/volcano-scheduler.conf b/installer/helm/chart/volcano/config/volcano-scheduler.conf index 14fa3072a2..cac99d7117 100644 --- a/installer/helm/chart/volcano/config/volcano-scheduler.conf +++ b/installer/helm/chart/volcano/config/volcano-scheduler.conf @@ -9,3 +9,4 @@ tiers: - name: predicates - name: proportion - name: nodeorder + - name: binpack diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 69946c8658..c36003b6dd 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -22,6 +22,7 @@ data: - name: predicates - name: proportion - name: nodeorder + - name: binpack volcano-scheduler.conf: | actions: "enqueue, allocate, backfill" tiers: @@ -34,6 +35,7 @@ data: - name: predicates - name: proportion - name: nodeorder + - name: binpack --- apiVersion: v1 diff --git a/pkg/scheduler/plugins/binpack/binpack.go b/pkg/scheduler/plugins/binpack/binpack.go new file mode 100644 index 0000000000..108c4d06f4 --- /dev/null +++ b/pkg/scheduler/plugins/binpack/binpack.go @@ -0,0 +1,261 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package binpack + +import ( + "fmt" + "strings" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +const ( + // PluginName indicates name of volcano scheduler plugin. + PluginName = "binpack" +) + +const ( + // BinpackWeight is the key for providing Binpack Priority Weight in YAML + BinpackWeight = "binpack.weight" + // BinpackCPU is the key for weight of cpu + BinpackCPU = "binpack.cpu" + // BinpackMemory is the key for memory of cpu + BinpackMemory = "binpack.memory" + + // BinpackResources is the key for additional resource key name + BinpackResources = "binpack.resources" + // BinpackResourcesPrefix is the key prefix for additional resource key name + BinpackResourcesPrefix = BinpackResources + "." + + resourceFmt = "%s[%d]" +) + +type priorityWeight struct { + BinPackingWeight int + BinPackingCPU int + BinPackingMemory int + BinPackingResources map[v1.ResourceName]int +} + +func (w *priorityWeight) String() string { + length := 3 + if extendLength := len(w.BinPackingResources); extendLength == 0 { + length++ + } else { + length += extendLength + } + msg := make([]string, 0, length) + msg = append(msg, + fmt.Sprintf(resourceFmt, BinpackWeight, w.BinPackingWeight), + fmt.Sprintf(resourceFmt, BinpackCPU, w.BinPackingCPU), + fmt.Sprintf(resourceFmt, BinpackMemory, w.BinPackingMemory), + ) + + if len(w.BinPackingResources) == 0 { + msg = append(msg, "no extend resources.") + } else { + for name, weight := range w.BinPackingResources { + msg = append(msg, fmt.Sprintf(resourceFmt, name, weight)) + } + } + return strings.Join(msg, ", ") +} + +type binpackPlugin struct { + // Arguments given for the plugin + weight priorityWeight +} + +//New function returns prioritizePlugin object +func New(aruguments framework.Arguments) framework.Plugin { + weight := calculateWeight(aruguments) + return &binpackPlugin{weight: weight} +} + +func calculateWeight(args framework.Arguments) priorityWeight { + /* + User Should give priorityWeight in this format(binpack.weight, binpack.cpu, binpack.memory). + Support change the weight about cpu, memory and additional resource by arguments. + + actions: "enqueue, reclaim, allocate, backfill, preempt" + tiers: + - plugins: + - name: binpack + arguments: + binpack.weight: 10 + binpack.cpu: 5 + binpack.memory: 1 + binpack.resources: nvidia.com/gpu, example.com/foo + binpack.resources.nvidia.com/gpu: 2 + binpack.resources.example.com/foo: 3 + */ + // Values are initialized to 1. + weight := priorityWeight{ + BinPackingWeight: 1, + BinPackingCPU: 1, + BinPackingMemory: 1, + BinPackingResources: make(map[v1.ResourceName]int), + } + + // Checks whether binpack.weight is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.BinPackingWeight, BinpackWeight) + // Checks whether binpack.cpu is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.BinPackingCPU, BinpackCPU) + if weight.BinPackingCPU < 0 { + weight.BinPackingCPU = 1 + } + // Checks whether binpack.memory is provided or not, if given, modifies the value in weight struct. + args.GetInt(&weight.BinPackingMemory, BinpackMemory) + if weight.BinPackingMemory < 0 { + weight.BinPackingMemory = 1 + } + + resourcesStr := args[BinpackResources] + resources := strings.Split(resourcesStr, ",") + for _, resource := range resources { + resource = strings.TrimSpace(resource) + if resource == "" { + continue + } + + // binpack.resources.[ResourceName] + resourceKey := BinpackResourcesPrefix + resource + resourceWeight := 1 + args.GetInt(&resourceWeight, resourceKey) + if resourceWeight < 0 { + resourceWeight = 1 + } + weight.BinPackingResources[v1.ResourceName(resource)] = resourceWeight + } + + return weight +} + +func (bp *binpackPlugin) Name() string { + return PluginName +} + +func (bp *binpackPlugin) OnSessionOpen(ssn *framework.Session) { + glog.V(4).Infof("Enter binpack plugin ...") + if glog.V(4) { + defer func() { + glog.V(4).Infof("Leaving binpack plugin. %s ...", bp.weight.String()) + }() + + notFoundResource := []string{} + for resource := range bp.weight.BinPackingResources { + found := false + for _, nodeInfo := range ssn.Nodes { + if nodeInfo.Allocatable.Get(resource) > 0 { + found = true + break + } + } + if !found { + notFoundResource = append(notFoundResource, string(resource)) + } + } + glog.V(4).Infof("resources [%s] record in weight but not found on any node", strings.Join(notFoundResource, ", ")) + } + + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + binPackingScore := BinPackingScore(task, node, bp.weight) + + glog.V(4).Infof("Binpack score for Task %s/%s on node %s is: %v", task.Namespace, task.Name, node.Name, binPackingScore) + return binPackingScore, nil + } + if bp.weight.BinPackingWeight != 0 { + ssn.AddNodeOrderFn(bp.Name(), nodeOrderFn) + } else { + glog.Infof("binpack weight is zero, skip node order function") + } +} + +func (bp *binpackPlugin) OnSessionClose(ssn *framework.Session) { +} + +// BinPackingScore use the best fit polices during scheduling. +// Goals: +// - Schedule Jobs using BestFit Policy using Resource Bin Packing Priority Function +// - Reduce Fragmentation of scarce resources on the Cluster +func BinPackingScore(task *api.TaskInfo, node *api.NodeInfo, weight priorityWeight) float64 { + score := 0.0 + weightSum := 0 + requested := task.Resreq + allocatable := node.Allocatable + used := node.Used + + for _, resource := range requested.ResourceNames() { + request := requested.Get(resource) + if request == 0 { + continue + } + allocate := allocatable.Get(resource) + nodeUsed := used.Get(resource) + + resourceWeight := 0 + found := false + switch resource { + case v1.ResourceCPU: + resourceWeight = weight.BinPackingCPU + found = true + case v1.ResourceMemory: + resourceWeight = weight.BinPackingMemory + found = true + default: + resourceWeight, found = weight.BinPackingResources[resource] + } + if !found { + continue + } + + resourceScore := ResourceBinPackingScore(request, allocate, nodeUsed, resourceWeight) + glog.V(5).Infof("task %s/%s on node %s resource %s, need %f, used %f, allocatable %f, weight %d, score %f", task.Namespace, task.Name, node.Name, resource, request, nodeUsed, allocate, resourceWeight, resourceScore) + + score += resourceScore + weightSum += resourceWeight + } + + // mapping the result from [0, weightSum] to [0, 10(MaxPriority)] + if weightSum > 0 { + score = score / float64(weightSum) + } + score *= schedulerapi.MaxPriority * float64(weight.BinPackingWeight) + + return score +} + +// ResourceBinPackingScore calculate the binpack score for resource with provided info +func ResourceBinPackingScore(requested, capacity, used float64, weight int) float64 { + if capacity == 0 || weight == 0 { + return 0 + } + + usedFinally := requested + used + if usedFinally > capacity { + return 0 + } + + score := usedFinally * float64(weight) / capacity + return score +} diff --git a/pkg/scheduler/plugins/binpack/binpack_test.go b/pkg/scheduler/plugins/binpack/binpack_test.go new file mode 100644 index 0000000000..644a47255a --- /dev/null +++ b/pkg/scheduler/plugins/binpack/binpack_test.go @@ -0,0 +1,291 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package binpack + +import ( + "fmt" + "math" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + + kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +const ( + eps = 1e-8 +) + +func TestArguments(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + arguments := framework.Arguments{ + "binpack.weight": "10", + "binpack.cpu": "5", + "binpack.memory": "2", + "binpack.resources": "nvidia.com/gpu, example.com/foo", + "binpack.resources.nvidia.com/gpu": "7", + "binpack.resources.example.com/foo": "-3", + } + + builder, ok := framework.GetPluginBuilder(PluginName) + if !ok { + t.Fatalf("should have plugin named %s", PluginName) + } + + plugin := builder(arguments) + binpack, ok := plugin.(*binpackPlugin) + if !ok { + t.Fatalf("plugin should be %T, but not %T", binpack, plugin) + } + + weight := binpack.weight + if weight.BinPackingWeight != 10 { + t.Errorf("weight should be 10, but not %v", weight.BinPackingWeight) + } + if weight.BinPackingCPU != 5 { + t.Errorf("cpu should be 5, but not %v", weight.BinPackingCPU) + } + if weight.BinPackingMemory != 2 { + t.Errorf("memory should be 2, but not %v", weight.BinPackingMemory) + } + for name, weight := range weight.BinPackingResources { + switch name { + case "nvidia.com/gpu": + if weight != 7 { + t.Errorf("gpu should be 7, but not %v", weight) + } + case "example.com/foo": + if weight != 1 { + t.Errorf("example.com/foo should be 1, but not %v", weight) + } + default: + t.Errorf("resource %s with weight %d should not appear", name, weight) + } + } +} + +func addResource(resourceList v1.ResourceList, name v1.ResourceName, need string) { + resourceList[name] = resource.MustParse(need) +} + +func TestNode(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + GPU := v1.ResourceName("nvidia.com/gpu") + FOO := v1.ResourceName("example.com/foo") + + p1 := util.BuildPod("c1", "p1", "n1", v1.PodPending, util.BuildResourceList("1", "1Gi"), "pg1", make(map[string]string), make(map[string]string)) + p2 := util.BuildPod("c1", "p2", "n3", v1.PodPending, util.BuildResourceList("1.5", "0Gi"), "pg1", make(map[string]string), make(map[string]string)) + p3 := util.BuildPod("c1", "p3", "", v1.PodPending, util.BuildResourceList("2", "10Gi"), "pg1", make(map[string]string), make(map[string]string)) + addResource(p3.Spec.Containers[0].Resources.Requests, GPU, "2") + p4 := util.BuildPod("c1", "p4", "", v1.PodPending, util.BuildResourceList("3", "4Gi"), "pg1", make(map[string]string), make(map[string]string)) + addResource(p4.Spec.Containers[0].Resources.Requests, FOO, "3") + + n1 := util.BuildNode("n1", util.BuildResourceList("2", "4Gi"), make(map[string]string)) + n2 := util.BuildNode("n2", util.BuildResourceList("4", "16Gi"), make(map[string]string)) + addResource(n2.Status.Allocatable, GPU, "4") + n3 := util.BuildNode("n3", util.BuildResourceList("2", "4Gi"), make(map[string]string)) + addResource(n3.Status.Allocatable, FOO, "16") + + pg1 := &kbv1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "c1", + }, + Spec: kbv1.PodGroupSpec{ + Queue: "c1", + }, + } + queue1 := &kbv1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c1", + }, + Spec: kbv1.QueueSpec{ + Weight: 1, + }, + } + + tests := []struct { + name string + podGroups []*kbv1.PodGroup + pods []*v1.Pod + nodes []*v1.Node + queues []*kbv1.Queue + arguments framework.Arguments + expected map[string]map[string]float64 + }{ + { + name: "single job", + podGroups: []*kbv1.PodGroup{ + pg1, + }, + queues: []*kbv1.Queue{ + queue1, + }, + pods: []*v1.Pod{ + p1, p2, p3, p4, + }, + nodes: []*v1.Node{ + n1, n2, n3, + }, + arguments: framework.Arguments{ + "binpack.weight": "10", + "binpack.cpu": "2", + "binpack.memory": "3", + "binpack.resources": "nvidia.com/gpu, example.com/foo", + "binpack.resources.nvidia.com/gpu": "7", + "binpack.resources.example.com/foo": "8", + }, + expected: map[string]map[string]float64{ + "c1/p1": { + "n1": 70, + "n2": 13.75, + "n3": 15, + }, + "c1/p2": { + "n1": 0, + "n2": 37.5, + "n3": 0, + }, + "c1/p3": { + "n1": 0, + "n2": 53.125, + "n3": 0, + }, + "c1/p4": { + "n1": 0, + "n2": 17.3076923076, + "n3": 34.6153846153, + }, + }, + }, + { + name: "single job", + podGroups: []*kbv1.PodGroup{ + pg1, + }, + queues: []*kbv1.Queue{ + queue1, + }, + pods: []*v1.Pod{ + p1, p2, p3, p4, + }, + nodes: []*v1.Node{ + n1, n2, n3, + }, + arguments: framework.Arguments{ + "binpack.weight": "1", + "binpack.cpu": "1", + "binpack.memory": "1", + "binpack.resources": "nvidia.com/gpu", + "binpack.resources.nvidia.com/gpu": "23", + }, + expected: map[string]map[string]float64{ + "c1/p1": { + "n1": 7.5, + "n2": 1.5625, + "n3": 1.25, + }, + "c1/p2": { + "n1": 0, + "n2": 3.75, + "n3": 0, + }, + "c1/p3": { + "n1": 0, + "n2": 5.05, + "n3": 0, + }, + "c1/p4": { + "n1": 0, + "n2": 5, + "n3": 5, + }, + }, + }, + } + + for i, test := range tests { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string), + } + schedulerCache := &cache.SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + Queues: make(map[api.QueueID]*api.QueueInfo), + Binder: binder, + StatusUpdater: &util.FakeStatusUpdater{}, + VolumeBinder: &util.FakeVolumeBinder{}, + + Recorder: record.NewFakeRecorder(100), + } + for _, node := range test.nodes { + schedulerCache.AddNode(node) + } + for _, pod := range test.pods { + schedulerCache.AddPod(pod) + } + for _, ss := range test.podGroups { + schedulerCache.AddPodGroupV1alpha1(ss) + } + for _, q := range test.queues { + schedulerCache.AddQueueV1alpha1(q) + } + + trueValue := true + ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledNodeOrder: &trueValue, + Arguments: test.arguments, + }, + }, + }, + }) + defer framework.CloseSession(ssn) + + for _, job := range ssn.Jobs { + for _, task := range job.Tasks { + taskID := fmt.Sprintf("%s/%s", task.Namespace, task.Name) + for _, node := range ssn.Nodes { + score, err := ssn.NodeOrderFn(task, node) + if err != nil { + t.Errorf("case%d: task %s on node %s has err %v", i, taskID, node.Name, err) + continue + } + if expectScore := test.expected[taskID][node.Name]; math.Abs(expectScore-score) > eps { + t.Errorf("case%d: task %s on node %s expect have score %v, but get %v", i, taskID, node.Name, expectScore, score) + } + } + } + } + } +} diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index a061ac38d4..197042e374 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -19,6 +19,7 @@ package plugins import ( "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/plugins/binpack" "volcano.sh/volcano/pkg/scheduler/plugins/conformance" "volcano.sh/volcano/pkg/scheduler/plugins/drf" "volcano.sh/volcano/pkg/scheduler/plugins/gang" @@ -36,6 +37,7 @@ func init() { framework.RegisterPluginBuilder(priority.PluginName, priority.New) framework.RegisterPluginBuilder(nodeorder.PluginName, nodeorder.New) framework.RegisterPluginBuilder(conformance.PluginName, conformance.New) + framework.RegisterPluginBuilder(binpack.PluginName, binpack.New) // Plugins for Queues framework.RegisterPluginBuilder(proportion.PluginName, proportion.New) diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index afd5030f21..9a77506252 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -17,6 +17,7 @@ limitations under the License. package e2e import ( + "fmt" "time" . "github.com/onsi/ginkgo" @@ -24,7 +25,10 @@ import ( batchv1 "k8s.io/api/batch/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kbapi "volcano.sh/volcano/pkg/scheduler/api" ) var _ = Describe("Job E2E Test", func() { @@ -291,6 +295,89 @@ var _ = Describe("Job E2E Test", func() { Expect(evicted).NotTo(BeTrue()) }) + It("support binpack policy", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := oneCPU + + By("create base job") + spec := &jobSpec{ + name: "binpack-base-1", + namespace: "test", + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: slot, + min: 1, + rep: 1, + }, + }, + } + + baseJob := createJob(context, spec) + err := waitJobReady(context, baseJob) + Expect(err).NotTo(HaveOccurred()) + + basePods := getTasksOfJob(context, baseJob) + basePod := basePods[0] + baseNodeName := basePod.Spec.NodeName + + node, err := context.kubeclient.CoreV1().Nodes().Get(baseNodeName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + clusterPods, err := context.kubeclient.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + alloc := kbapi.NewResource(node.Status.Allocatable) + for _, pod := range clusterPods.Items { + nodeName := pod.Spec.NodeName + if nodeName != baseNodeName || len(nodeName) == 0 || pod.DeletionTimestamp != nil { + continue + } + + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + continue + } + + for _, c := range pod.Spec.Containers { + req := kbapi.NewResource(c.Resources.Requests) + alloc.Sub(req) + } + } + + need := kbapi.NewResource(v1.ResourceList{"cpu": resource.MustParse("500m")}) + var count int32 + for need.LessEqual(alloc) { + count++ + alloc.Sub(need) + } + + By(fmt.Sprintf("create test job with %d pods", count)) + spec = &jobSpec{ + name: "binpack-test-1", + namespace: "test", + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: slot, + min: count, + rep: count, + }, + }, + } + job := createJob(context, spec) + err = waitJobReady(context, job) + Expect(err).NotTo(HaveOccurred()) + + pods := getTasksOfJob(context, baseJob) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + Expect(nodeName).Should(Equal(baseNodeName), + fmt.Sprintf("Pod %s/%s should assign to node %s, but not %s", pod.Namespace, pod.Name, baseNodeName, nodeName)) + } + }) + It("Schedule v1.Job type using Volcano scheduler", func() { context := initTestContext() defer cleanupTestContext(context)