Skip to content

Commit

Permalink
Support for ECS Placement Constraints/Strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
ejholmes committed Aug 4, 2017
1 parent b2bcf82 commit bbf2b3b
Show file tree
Hide file tree
Showing 23 changed files with 1,027 additions and 415 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Godeps/_workspace/pkg
tests
.git
build
.env*
29 changes: 28 additions & 1 deletion cmd/empire/factories.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"encoding/json"
"fmt"
"html/template"
"log"
Expand All @@ -27,9 +28,11 @@ import (
"github.com/remind101/empire/pkg/dockerauth"
"github.com/remind101/empire/pkg/dockerutil"
"github.com/remind101/empire/pkg/troposphere"
"github.com/remind101/empire/procfile"
"github.com/remind101/empire/scheduler/cloudformation"
"github.com/remind101/empire/scheduler/docker"
"github.com/remind101/empire/stats"
"github.com/remind101/empire/twelvefactor"
"github.com/remind101/pkg/reporter"
"github.com/remind101/pkg/reporter/config"
)
Expand Down Expand Up @@ -136,7 +139,7 @@ func newScheduler(db *empire.DB, c *Context) (empire.Scheduler, error) {
return a, nil
}

func newCloudFormationScheduler(db *empire.DB, c *Context) (*cloudformation.Scheduler, error) {
func newCloudFormationScheduler(db *empire.DB, c *Context) (twelvefactor.Scheduler, error) {
logDriver := c.String(FlagECSLogDriver)
logOpts := c.StringSlice(FlagECSLogOpts)
logConfiguration := newLogConfiguration(logDriver, logOpts)
Expand Down Expand Up @@ -209,9 +212,33 @@ func newCloudFormationScheduler(db *empire.DB, c *Context) (*cloudformation.Sche
log.Println(fmt.Sprintf(" ZoneID: %v", zoneID))
log.Println(fmt.Sprintf(" LogConfiguration: %v", t.LogConfiguration))

if v := c.String(FlagECSPlacementConstraintsDefault); v != "" {
var placementConstraints []*ecs.PlacementConstraint
if err := json.Unmarshal([]byte(v), &placementConstraints); err != nil {
return nil, fmt.Errorf("unable to unmarshal placement constraints: %v", err)
}
log.Println(fmt.Sprintf(" DefaultPlacementConstraints: %v", placementConstraints))
return twelvefactor.Transform(s, setDefaultPlacementConstraints(placementConstraints)), nil
}

return s, nil
}

func setDefaultPlacementConstraints(placementConstraints []*ecs.PlacementConstraint) func(*twelvefactor.Manifest) *twelvefactor.Manifest {
return func(m *twelvefactor.Manifest) *twelvefactor.Manifest {
for _, p := range m.Processes {
if p.ECS == nil {
p.ECS = &procfile.ECS{}
}

if p.ECS.PlacementConstraints == nil {
p.ECS.PlacementConstraints = placementConstraints
}
}
return m
}
}

func newLogConfiguration(logDriver string, logOpts []string) *ecs.LogConfiguration {
if logDriver == "" {
// Default to the docker daemon default logging driver.
Expand Down
27 changes: 17 additions & 10 deletions cmd/empire/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,17 @@ const (
FlagDockerCert = "docker.cert"
FlagDockerAuth = "docker.auth"

FlagAWSDebug = "aws.debug"
FlagS3TemplateBucket = "s3.templatebucket"
FlagCustomResourcesTopic = "customresources.topic"
FlagCustomResourcesQueue = "customresources.queue"
FlagECSCluster = "ecs.cluster"
FlagECSServiceRole = "ecs.service.role"
FlagECSLogDriver = "ecs.logdriver"
FlagECSLogOpts = "ecs.logopt"
FlagECSAttachedEnabled = "ecs.attached.enabled"
FlagECSDockerCert = "ecs.docker.cert"
FlagAWSDebug = "aws.debug"
FlagS3TemplateBucket = "s3.templatebucket"
FlagCustomResourcesTopic = "customresources.topic"
FlagCustomResourcesQueue = "customresources.queue"
FlagECSCluster = "ecs.cluster"
FlagECSServiceRole = "ecs.service.role"
FlagECSLogDriver = "ecs.logdriver"
FlagECSLogOpts = "ecs.logopt"
FlagECSAttachedEnabled = "ecs.attached.enabled"
FlagECSDockerCert = "ecs.docker.cert"
FlagECSPlacementConstraintsDefault = "ecs.placement-constraints.default"

FlagELBSGPrivate = "elb.sg.private"
FlagELBSGPublic = "elb.sg.public"
Expand Down Expand Up @@ -333,6 +334,12 @@ var EmpireFlags = []cli.Flag{
Usage: "A path to the certificates to use when connecting to Docker daemon's on container instances.",
EnvVar: "EMPIRE_ECS_DOCKER_CERT_PATH",
},
cli.StringFlag{
Name: FlagECSPlacementConstraintsDefault,
Value: "",
Usage: "ECS placement constraints to set when a process does not set any.",
EnvVar: "EMPIRE_ECS_PLACEMENT_CONSTRAINTS_DEFAULT",
},
cli.StringFlag{
Name: FlagELBSGPrivate,
Value: "",
Expand Down
1 change: 1 addition & 0 deletions extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func formationFromExtendedProcfile(p procfile.ExtendedProcfile) (Formation, erro
NoService: process.NoService,
Ports: ports,
Environment: process.Environment,
ECS: process.ECS,
}
}

Expand Down
4 changes: 4 additions & 0 deletions processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/remind101/empire/internal/shellwords"
"github.com/remind101/empire/pkg/constraints"
"github.com/remind101/empire/procfile"
)

// DefaultQuantities maps a process type to the default number of instances to
Expand Down Expand Up @@ -96,6 +97,9 @@ type Process struct {

// An process specific environment variables.
Environment map[string]string `json:"Environment,omitempty"`

// ECS specific parameters.
ECS *procfile.ECS `json:"ECS,omitempty"`
}

type Port struct {
Expand Down
16 changes: 16 additions & 0 deletions procfile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,19 @@ environment:

See [documentation about deploying an application](../docs/deploying_an_application.md#environment-variables)
for a list of other supported environment variables.

**ECS**

This allows you to specify any ECS specific properties, like placement strategies and constraints:

```yaml
ecs:
placement_constraints:
- type: memberOf
expression: "attribute:ecs.instance-type =~ t2.*"
placement_strategy:
- type: spread
field: "attribute:ecs.availability-zone"
```

See http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html for details.
8 changes: 8 additions & 0 deletions procfile/procfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/service/ecs"
"github.com/remind101/empire/procfile/internal/yaml"
)

Expand All @@ -30,6 +31,13 @@ type Process struct {
NoService bool `yaml:"noservice,omitempty"`
Ports []Port `yaml:"ports,omitempty"`
Environment map[string]string `yaml:"environment,omitempty"`
ECS *ECS `yaml:"ecs,omitempty"`
}

// ECS specific options.
type ECS struct {
PlacementConstraints []*ecs.PlacementConstraint `yaml:"placement_constraints"`
PlacementStrategy []*ecs.PlacementStrategy `yaml:"placement_strategy"`
}

// Port represents a port mapping.
Expand Down
29 changes: 29 additions & 0 deletions procfile/procfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -123,6 +125,33 @@ web:
},
},
},

// ECS placement constraints
{
strings.NewReader(`---
web:
command: nginx
ecs:
placement_constraints:
- type: memberOf
expression: "attribute:ecs.instance-type =~ t2.*"
placement_strategy:
- type: spread
field: "attribute:ecs.availability-zone"`),
ExtendedProcfile{
"web": Process{
Command: "nginx",
ECS: &ECS{
PlacementConstraints: []*ecs.PlacementConstraint{
{Type: aws.String("memberOf"), Expression: aws.String("attribute:ecs.instance-type =~ t2.*")},
},
PlacementStrategy: []*ecs.PlacementStrategy{
{Type: aws.String("spread"), Field: aws.String("attribute:ecs.availability-zone")},
},
},
},
},
},
}

func TestParse(t *testing.T) {
Expand Down
19 changes: 1 addition & 18 deletions releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,7 @@ func (s *releasesService) ReleaseApp(ctx context.Context, db *gorm.DB, app *App,
// Restart will find the last release for an app and submit it to the scheduler
// to restart the app.
func (s *releasesService) Restart(ctx context.Context, db *gorm.DB, app *App) error {
release, err := releasesFind(db, ReleasesQuery{App: app})
if err != nil {
if err == gorm.RecordNotFound {
return ErrNoReleases
}

return err
}

if release == nil {
return nil
}

a, err := newSchedulerApp(release)
if err != nil {
return err
}
return s.Scheduler.Restart(ctx, a, nil)
return s.Scheduler.Restart(ctx, app.ID, nil)
}

// These associations are always available on a Release.
Expand Down
51 changes: 35 additions & 16 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"golang.org/x/net/context"
)

const (
// GenericProcessName is the process name for `emp run` processes not defined in the procfile.
GenericProcessName = "run"
)

// RunRecorder is a function that returns an io.Writer that will be written to
// to record Stdout and Stdin of interactive runs.
type RunRecorder func() (io.Writer, error)
Expand All @@ -21,43 +26,57 @@ func (r *runnerService) Run(ctx context.Context, opts RunOpts) error {
}

procName := opts.Command[0]
proc := Process{
Quantity: 1,
}
var proc Process

// First, let's check if the command we're running matches a defined
// process in the Procfile/Formation. If it does, we'll replace the
// command, with the one in the procfile and expand it's arguments.
//
// For example, given a procfile like this:
//
// psql:
// command: ./bin/psql
//
// Calling `emp run psql DATABASE_URL` will expand the command to
// `./bin/psql DATABASE_URL`.
if cmd, ok := release.Formation[procName]; ok {
proc = cmd
proc.Command = append(cmd.Command, opts.Command[1:]...)
proc.NoService = false
} else {
// If we've set the flag to only allow `emp run` on commands
// defined in the procfile, return an error since the command is
// not defined in the procfile.
if r.AllowedCommands == AllowCommandProcfile {
return commandNotInFormation(Command{procName}, release.Formation)
}

// This is an unnamed command, fallback to a generic proc name.
procName = "run"
procName = GenericProcessName
proc.Command = opts.Command
proc.SetConstraints(DefaultConstraints)
}

proc.Quantity = 1

// Set the size of the process.
constraints := DefaultConstraints
if opts.Constraints != nil {
constraints = *opts.Constraints
proc.SetConstraints(*opts.Constraints)
}
proc.SetConstraints(constraints)

release.Formation = Formation{procName: proc}
a, err := newSchedulerApp(release)
if err != nil {
return err
}
p, err := newSchedulerProcess(release, procName, proc)
if err != nil {
return err
}
p.Labels["empire.user"] = opts.User.Name
for _, p := range a.Processes {
p.Labels["empire.user"] = opts.User.Name

// Add additional environment variables to the process.
for k, v := range opts.Env {
p.Env[k] = v
// Add additional environment variables to the process.
for k, v := range opts.Env {
p.Env[k] = v
}
}

return r.Scheduler.Run(ctx, a, p, opts.Input, opts.Output)
return r.Scheduler.Run(ctx, a, opts.Input, opts.Output)
}
10 changes: 6 additions & 4 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (m *FakeScheduler) Submit(ctx context.Context, app *twelvefactor.Manifest,
return nil
}

func (m *FakeScheduler) Restart(ctx context.Context, app *twelvefactor.Manifest, ss twelvefactor.StatusStream) error {
return m.Submit(ctx, app, ss)
func (m *FakeScheduler) Restart(ctx context.Context, appID string, ss twelvefactor.StatusStream) error {
return nil
}

func (m *FakeScheduler) Remove(ctx context.Context, appID string) error {
Expand Down Expand Up @@ -63,9 +63,11 @@ func (m *FakeScheduler) Stop(ctx context.Context, instanceID string) error {
return nil
}

func (m *FakeScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, p *twelvefactor.Process, in io.Reader, out io.Writer) error {
func (m *FakeScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error {
if out != nil {
fmt.Fprintf(out, "Fake output for `%s` on %s\n", p.Command, app.Name)
for _, p := range app.Processes {
fmt.Fprintf(out, "Fake output for `%s` on %s\n", p.Command, app.Name)
}
}
return nil
}
Loading

0 comments on commit bbf2b3b

Please sign in to comment.