Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgraded Azure Pipelines to support demands #2795

Merged
merged 9 commits into from
Aug 8, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **Azure Pipelines Scaler:** Add support for Azure Pipelines to support demands (capabilities) ([#2795](https://github.com/kedacore/keda/pull/2795))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
Expand Down
69 changes: 67 additions & 2 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type azurePipelinesMetadata struct {
organizationURL string
organizationName string
personalAccessToken string
parent string
demands string
poolID int
targetPipelinesQueueLength int64
scalerIndex int
Expand Down Expand Up @@ -106,6 +108,18 @@ func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, http
return nil, fmt.Errorf("no personalAccessToken given")
}

if val, ok := config.TriggerMetadata["parent"]; ok && val != "" {
meta.parent = config.TriggerMetadata["parent"]
} else {
meta.parent = ""
}

if val, ok := config.TriggerMetadata["demands"]; ok && val != "" {
meta.demands = config.TriggerMetadata["demands"]
} else {
meta.demands = ""
}

if val, ok := config.TriggerMetadata["poolName"]; ok && val != "" {
var err error
meta.poolID, err = getPoolIDFromName(ctx, val, &meta, httpClient)
Expand Down Expand Up @@ -231,16 +245,67 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data despite successful code. url: %s", url)
}

// for each job check if it parent fulfilled, then demand fulfilled, then finally pool fulfilled
for _, value := range jobs {
v := value.(map[string]interface{})
if v["result"] == nil {
count++
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(v, s.metadata) {
count++
}
} else {
// does use parent
if getCanAgentParentFulfilJob(v, s.metadata) {
count++
}
}
}
}
}

return count, err
}

// Determine if the scaledjob has the right demands to spin up
func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
var demandsReq = v["demands"].([]interface{})
var demandsAvail = strings.Split(metadata.demands, ",")
var countDemands = 0
for _, dr := range demandsReq {
for _, da := range demandsAvail {
strDr := fmt.Sprintf("%v", dr)
if !strings.HasPrefix(strDr, "Agent.Version") {
if strDr == da {
countDemands++
}
}
}
}

return countDemands == len(demandsReq)-1
}

// Determine if the Job and Parent Agent Template have matching capabilities
func getCanAgentParentFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
matchedAgents, ok := v["matchedAgents"].([]interface{})
if !ok {
// ADO is already processing
return false
}

for _, m := range matchedAgents {
n := m.(map[string]interface{})
if metadata.parent == n["name"].(string) {
return true
}
}
return false
}

func (s *azurePipelinesScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Expand Down
106 changes: 106 additions & 0 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,109 @@ func TestAzurePipelinesGetMetricSpecForScaling(t *testing.T) {
}
}
}

func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
meta := azurePipelinesMetadata{}
meta.organizationName = "testOrg"
meta.organizationURL = url
meta.parent = "test-keda-template"
meta.personalAccessToken = "testPAT"
meta.poolID = 1
meta.targetPipelinesQueueLength = 1

return &meta
}

func TestAzurePipelinesMatchedAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getMatchedAgentMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen < 1 {
t.Fail()
}
}

func getDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,kubernetes"

return meta
}

func getMismatchDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,iamnotademand"

return meta
}

func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getDemandJobMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen < 1 {
t.Fail()
}
}

func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
}))

meta := getMismatchDemandJobMetaData(apiStub.URL)

mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

queuelen, err := mockAzurePipelinesScaler.GetAzurePipelinesQueueLength(context.TODO())

if err != nil {
t.Fail()
}

if queuelen > 0 {
t.Fail()
}
}