diff --git a/.dockerignore b/.dockerignore index 9bf1dfecd..f78de61c1 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,3 +3,4 @@ Godeps/_workspace/pkg tests .git build +.env* diff --git a/cmd/empire/factories.go b/cmd/empire/factories.go index d39dd5bf8..198db6485 100644 --- a/cmd/empire/factories.go +++ b/cmd/empire/factories.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "html/template" "log" @@ -27,9 +28,11 @@ import ( "github.com/remind101/empire/pkg/dockerauth" "github.com/remind101/empire/pkg/dockerutil" "github.com/remind101/empire/pkg/troposphere" + "github.com/remind101/empire/procfile" "github.com/remind101/empire/scheduler/cloudformation" "github.com/remind101/empire/scheduler/docker" "github.com/remind101/empire/stats" + "github.com/remind101/empire/twelvefactor" "github.com/remind101/pkg/reporter" "github.com/remind101/pkg/reporter/config" ) @@ -136,7 +139,7 @@ func newScheduler(db *empire.DB, c *Context) (empire.Scheduler, error) { return a, nil } -func newCloudFormationScheduler(db *empire.DB, c *Context) (*cloudformation.Scheduler, error) { +func newCloudFormationScheduler(db *empire.DB, c *Context) (twelvefactor.Scheduler, error) { logDriver := c.String(FlagECSLogDriver) logOpts := c.StringSlice(FlagECSLogOpts) logConfiguration := newLogConfiguration(logDriver, logOpts) @@ -209,9 +212,33 @@ func newCloudFormationScheduler(db *empire.DB, c *Context) (*cloudformation.Sche log.Println(fmt.Sprintf(" ZoneID: %v", zoneID)) log.Println(fmt.Sprintf(" LogConfiguration: %v", t.LogConfiguration)) + if v := c.String(FlagECSPlacementConstraintsDefault); v != "" { + var placementConstraints []*ecs.PlacementConstraint + if err := json.Unmarshal([]byte(v), &placementConstraints); err != nil { + return nil, fmt.Errorf("unable to unmarshal placement constraints: %v", err) + } + log.Println(fmt.Sprintf(" DefaultPlacementConstraints: %v", placementConstraints)) + return twelvefactor.Transform(s, setDefaultPlacementConstraints(placementConstraints)), nil + } + return s, nil } +func setDefaultPlacementConstraints(placementConstraints []*ecs.PlacementConstraint) func(*twelvefactor.Manifest) *twelvefactor.Manifest { + return func(m *twelvefactor.Manifest) *twelvefactor.Manifest { + for _, p := range m.Processes { + if p.ECS == nil { + p.ECS = &procfile.ECS{} + } + + if p.ECS.PlacementConstraints == nil { + p.ECS.PlacementConstraints = placementConstraints + } + } + return m + } +} + func newLogConfiguration(logDriver string, logOpts []string) *ecs.LogConfiguration { if logDriver == "" { // Default to the docker daemon default logging driver. diff --git a/cmd/empire/main.go b/cmd/empire/main.go index 609d4bc26..e59c33b6d 100644 --- a/cmd/empire/main.go +++ b/cmd/empire/main.go @@ -53,16 +53,17 @@ const ( FlagDockerCert = "docker.cert" FlagDockerAuth = "docker.auth" - FlagAWSDebug = "aws.debug" - FlagS3TemplateBucket = "s3.templatebucket" - FlagCustomResourcesTopic = "customresources.topic" - FlagCustomResourcesQueue = "customresources.queue" - FlagECSCluster = "ecs.cluster" - FlagECSServiceRole = "ecs.service.role" - FlagECSLogDriver = "ecs.logdriver" - FlagECSLogOpts = "ecs.logopt" - FlagECSAttachedEnabled = "ecs.attached.enabled" - FlagECSDockerCert = "ecs.docker.cert" + FlagAWSDebug = "aws.debug" + FlagS3TemplateBucket = "s3.templatebucket" + FlagCustomResourcesTopic = "customresources.topic" + FlagCustomResourcesQueue = "customresources.queue" + FlagECSCluster = "ecs.cluster" + FlagECSServiceRole = "ecs.service.role" + FlagECSLogDriver = "ecs.logdriver" + FlagECSLogOpts = "ecs.logopt" + FlagECSAttachedEnabled = "ecs.attached.enabled" + FlagECSDockerCert = "ecs.docker.cert" + FlagECSPlacementConstraintsDefault = "ecs.placement-constraints.default" FlagELBSGPrivate = "elb.sg.private" FlagELBSGPublic = "elb.sg.public" @@ -333,6 +334,12 @@ var EmpireFlags = []cli.Flag{ Usage: "A path to the certificates to use when connecting to Docker daemon's on container instances.", EnvVar: "EMPIRE_ECS_DOCKER_CERT_PATH", }, + cli.StringFlag{ + Name: FlagECSPlacementConstraintsDefault, + Value: "", + Usage: "ECS placement constraints to set when a process does not set any.", + EnvVar: "EMPIRE_ECS_PLACEMENT_CONSTRAINTS_DEFAULT", + }, cli.StringFlag{ Name: FlagELBSGPrivate, Value: "", diff --git a/extractor.go b/extractor.go index 8bf421680..60fdcd198 100644 --- a/extractor.go +++ b/extractor.go @@ -107,6 +107,7 @@ func formationFromExtendedProcfile(p procfile.ExtendedProcfile) (Formation, erro NoService: process.NoService, Ports: ports, Environment: process.Environment, + ECS: process.ECS, } } diff --git a/processes.go b/processes.go index a8471ce4b..96d3fca66 100644 --- a/processes.go +++ b/processes.go @@ -9,6 +9,7 @@ import ( "github.com/remind101/empire/internal/shellwords" "github.com/remind101/empire/pkg/constraints" + "github.com/remind101/empire/procfile" ) // DefaultQuantities maps a process type to the default number of instances to @@ -96,6 +97,9 @@ type Process struct { // An process specific environment variables. Environment map[string]string `json:"Environment,omitempty"` + + // ECS specific parameters. + ECS *procfile.ECS `json:"ECS,omitempty"` } type Port struct { diff --git a/procfile/README.md b/procfile/README.md index 5eb6e3445..37a7adefe 100644 --- a/procfile/README.md +++ b/procfile/README.md @@ -81,3 +81,19 @@ environment: See [documentation about deploying an application](../docs/deploying_an_application.md#environment-variables) for a list of other supported environment variables. + +**ECS** + +This allows you to specify any ECS specific properties, like placement strategies and constraints: + +```yaml +ecs: + placement_constraints: + - type: memberOf + expression: "attribute:ecs.instance-type =~ t2.*" + placement_strategy: + - type: spread + field: "attribute:ecs.availability-zone" +``` + +See http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html for details. diff --git a/procfile/procfile.go b/procfile/procfile.go index 09ee8ec29..e160ed7e9 100644 --- a/procfile/procfile.go +++ b/procfile/procfile.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/aws/aws-sdk-go/service/ecs" "github.com/remind101/empire/procfile/internal/yaml" ) @@ -30,6 +31,13 @@ type Process struct { NoService bool `yaml:"noservice,omitempty"` Ports []Port `yaml:"ports,omitempty"` Environment map[string]string `yaml:"environment,omitempty"` + ECS *ECS `yaml:"ecs,omitempty"` +} + +// ECS specific options. +type ECS struct { + PlacementConstraints []*ecs.PlacementConstraint `yaml:"placement_constraints"` + PlacementStrategy []*ecs.PlacementStrategy `yaml:"placement_strategy"` } // Port represents a port mapping. diff --git a/procfile/procfile_test.go b/procfile/procfile_test.go index 1541f976c..5ccbf066a 100644 --- a/procfile/procfile_test.go +++ b/procfile/procfile_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" "github.com/stretchr/testify/assert" ) @@ -123,6 +125,33 @@ web: }, }, }, + + // ECS placement constraints + { + strings.NewReader(`--- +web: + command: nginx + ecs: + placement_constraints: + - type: memberOf + expression: "attribute:ecs.instance-type =~ t2.*" + placement_strategy: + - type: spread + field: "attribute:ecs.availability-zone"`), + ExtendedProcfile{ + "web": Process{ + Command: "nginx", + ECS: &ECS{ + PlacementConstraints: []*ecs.PlacementConstraint{ + {Type: aws.String("memberOf"), Expression: aws.String("attribute:ecs.instance-type =~ t2.*")}, + }, + PlacementStrategy: []*ecs.PlacementStrategy{ + {Type: aws.String("spread"), Field: aws.String("attribute:ecs.availability-zone")}, + }, + }, + }, + }, + }, } func TestParse(t *testing.T) { diff --git a/releases.go b/releases.go index c4251226a..a4b4d993f 100644 --- a/releases.go +++ b/releases.go @@ -188,24 +188,7 @@ func (s *releasesService) ReleaseApp(ctx context.Context, db *gorm.DB, app *App, // Restart will find the last release for an app and submit it to the scheduler // to restart the app. func (s *releasesService) Restart(ctx context.Context, db *gorm.DB, app *App) error { - release, err := releasesFind(db, ReleasesQuery{App: app}) - if err != nil { - if err == gorm.RecordNotFound { - return ErrNoReleases - } - - return err - } - - if release == nil { - return nil - } - - a, err := newSchedulerApp(release) - if err != nil { - return err - } - return s.Scheduler.Restart(ctx, a, nil) + return s.Scheduler.Restart(ctx, app.ID, nil) } // These associations are always available on a Release. diff --git a/runner.go b/runner.go index ba44a1073..0fa811ebd 100644 --- a/runner.go +++ b/runner.go @@ -6,6 +6,11 @@ import ( "golang.org/x/net/context" ) +const ( + // GenericProcessName is the process name for `emp run` processes not defined in the procfile. + GenericProcessName = "run" +) + // RunRecorder is a function that returns an io.Writer that will be written to // to record Stdout and Stdin of interactive runs. type RunRecorder func() (io.Writer, error) @@ -21,43 +26,57 @@ func (r *runnerService) Run(ctx context.Context, opts RunOpts) error { } procName := opts.Command[0] - proc := Process{ - Quantity: 1, - } + var proc Process + // First, let's check if the command we're running matches a defined + // process in the Procfile/Formation. If it does, we'll replace the + // command, with the one in the procfile and expand it's arguments. + // + // For example, given a procfile like this: + // + // psql: + // command: ./bin/psql + // + // Calling `emp run psql DATABASE_URL` will expand the command to + // `./bin/psql DATABASE_URL`. if cmd, ok := release.Formation[procName]; ok { + proc = cmd proc.Command = append(cmd.Command, opts.Command[1:]...) + proc.NoService = false } else { + // If we've set the flag to only allow `emp run` on commands + // defined in the procfile, return an error since the command is + // not defined in the procfile. if r.AllowedCommands == AllowCommandProcfile { return commandNotInFormation(Command{procName}, release.Formation) } // This is an unnamed command, fallback to a generic proc name. - procName = "run" + procName = GenericProcessName proc.Command = opts.Command + proc.SetConstraints(DefaultConstraints) } + proc.Quantity = 1 + // Set the size of the process. - constraints := DefaultConstraints if opts.Constraints != nil { - constraints = *opts.Constraints + proc.SetConstraints(*opts.Constraints) } - proc.SetConstraints(constraints) + release.Formation = Formation{procName: proc} a, err := newSchedulerApp(release) if err != nil { return err } - p, err := newSchedulerProcess(release, procName, proc) - if err != nil { - return err - } - p.Labels["empire.user"] = opts.User.Name + for _, p := range a.Processes { + p.Labels["empire.user"] = opts.User.Name - // Add additional environment variables to the process. - for k, v := range opts.Env { - p.Env[k] = v + // Add additional environment variables to the process. + for k, v := range opts.Env { + p.Env[k] = v + } } - return r.Scheduler.Run(ctx, a, p, opts.Input, opts.Output) + return r.Scheduler.Run(ctx, a, opts.Input, opts.Output) } diff --git a/scheduler.go b/scheduler.go index d9f28cae3..18fe6b73f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -30,8 +30,8 @@ func (m *FakeScheduler) Submit(ctx context.Context, app *twelvefactor.Manifest, return nil } -func (m *FakeScheduler) Restart(ctx context.Context, app *twelvefactor.Manifest, ss twelvefactor.StatusStream) error { - return m.Submit(ctx, app, ss) +func (m *FakeScheduler) Restart(ctx context.Context, appID string, ss twelvefactor.StatusStream) error { + return nil } func (m *FakeScheduler) Remove(ctx context.Context, appID string) error { @@ -63,9 +63,11 @@ func (m *FakeScheduler) Stop(ctx context.Context, instanceID string) error { return nil } -func (m *FakeScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, p *twelvefactor.Process, in io.Reader, out io.Writer) error { +func (m *FakeScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error { if out != nil { - fmt.Fprintf(out, "Fake output for `%s` on %s\n", p.Command, app.Name) + for _, p := range app.Processes { + fmt.Fprintf(out, "Fake output for `%s` on %s\n", p.Command, app.Name) + } } return nil } diff --git a/scheduler/cloudformation/cloudformation.go b/scheduler/cloudformation/cloudformation.go index 4db15ee60..88f02b33b 100644 --- a/scheduler/cloudformation/cloudformation.go +++ b/scheduler/cloudformation/cloudformation.go @@ -225,8 +225,8 @@ func (s *Scheduler) SubmitWithOptions(ctx context.Context, app *twelvefactor.Man return tx.Commit() } -func (s *Scheduler) Restart(ctx context.Context, app *twelvefactor.Manifest, ss twelvefactor.StatusStream) error { - stackName, err := s.stackName(app.AppID) +func (s *Scheduler) Restart(ctx context.Context, appID string, ss twelvefactor.StatusStream) error { + stackName, err := s.stackName(appID) if err != nil { return err } @@ -922,70 +922,79 @@ func (s *Scheduler) Stop(ctx context.Context, taskID string) error { } // Run registers a TaskDefinition for the process, and calls RunTask. -func (m *Scheduler) Run(ctx context.Context, app *twelvefactor.Manifest, process *twelvefactor.Process, in io.Reader, out io.Writer) error { +func (m *Scheduler) Run(ctx context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error { var attached bool if out != nil { attached = true } - t, ok := m.Template.(interface { - ContainerDefinition(*twelvefactor.Manifest, *twelvefactor.Process) *ecs.ContainerDefinition - }) - if !ok { - return errors.New("provided template can't generate a container definition for this process") - } + for _, process := range app.Processes { + t, ok := m.Template.(interface { + ContainerDefinition(*twelvefactor.Manifest, *twelvefactor.Process) *ecs.ContainerDefinition + }) + if !ok { + return errors.New("provided template can't generate a container definition for this process") + } - containerDefinition := t.ContainerDefinition(app, process) - if attached { - if containerDefinition.DockerLabels == nil { - containerDefinition.DockerLabels = make(map[string]*string) + containerDefinition := t.ContainerDefinition(app, process) + if attached { + if containerDefinition.DockerLabels == nil { + containerDefinition.DockerLabels = make(map[string]*string) + } + // NOTE: Currently, this depends on a patched version of the + // Amazon ECS Container Agent, since the official agent doesn't + // provide a method to pass these down to the `CreateContainer` + // call. + containerDefinition.DockerLabels["docker.config.Tty"] = aws.String("true") + containerDefinition.DockerLabels["docker.config.OpenStdin"] = aws.String("true") } - // NOTE: Currently, this depends on a patched version of the - // Amazon ECS Container Agent, since the official agent doesn't - // provide a method to pass these down to the `CreateContainer` - // call. - containerDefinition.DockerLabels["docker.config.Tty"] = aws.String("true") - containerDefinition.DockerLabels["docker.config.OpenStdin"] = aws.String("true") - } - - resp, err := m.ecs.RegisterTaskDefinition(&ecs.RegisterTaskDefinitionInput{ - Family: aws.String(fmt.Sprintf("%s--%s", app.AppID, process.Type)), - TaskRoleArn: taskRoleArn(app), - ContainerDefinitions: []*ecs.ContainerDefinition{ - containerDefinition, - }, - }) - if err != nil { - return fmt.Errorf("error registering TaskDefinition: %v", err) - } - runResp, err := m.ecs.RunTask(&ecs.RunTaskInput{ - TaskDefinition: resp.TaskDefinition.TaskDefinitionArn, - Cluster: aws.String(m.Cluster), - Count: aws.Int64(1), - StartedBy: aws.String(app.AppID), - }) - if err != nil { - return fmt.Errorf("error calling RunTask: %v", err) - } + resp, err := m.ecs.RegisterTaskDefinition(&ecs.RegisterTaskDefinitionInput{ + Family: aws.String(fmt.Sprintf("%s--%s", app.AppID, process.Type)), + TaskRoleArn: taskRoleArn(app), + ContainerDefinitions: []*ecs.ContainerDefinition{ + containerDefinition, + }, + }) + if err != nil { + return fmt.Errorf("error registering TaskDefinition: %v", err) + } - for _, f := range runResp.Failures { - return fmt.Errorf("error running task %s: %s", aws.StringValue(f.Arn), aws.StringValue(f.Reason)) - } + input := &ecs.RunTaskInput{ + TaskDefinition: resp.TaskDefinition.TaskDefinitionArn, + Cluster: aws.String(m.Cluster), + Count: aws.Int64(1), + StartedBy: aws.String(app.AppID), + } - task := runResp.Tasks[0] + if v := process.ECS; v != nil { + input.PlacementConstraints = v.PlacementConstraints + input.PlacementStrategy = v.PlacementStrategy + } - if attached { - // Ensure that we atleast try to stop the task, after we detach - // from the process. This ensures that we don't have zombie - // one-off processes lying around. - defer m.ecs.StopTask(&ecs.StopTaskInput{ - Cluster: task.ClusterArn, - Task: task.TaskArn, - }) + runResp, err := m.ecs.RunTask(input) + if err != nil { + return fmt.Errorf("error calling RunTask: %v", err) + } - if err := m.attach(ctx, task, in, out); err != nil { - return err + for _, f := range runResp.Failures { + return fmt.Errorf("error running task %s: %s", aws.StringValue(f.Arn), aws.StringValue(f.Reason)) + } + + task := runResp.Tasks[0] + + if attached { + // Ensure that we atleast try to stop the task, after we detach + // from the process. This ensures that we don't have zombie + // one-off processes lying around. + defer m.ecs.StopTask(&ecs.StopTaskInput{ + Cluster: task.ClusterArn, + Task: task.TaskArn, + }) + + if err := m.attach(ctx, task, in, out); err != nil { + return err + } } } diff --git a/scheduler/cloudformation/cloudformation_test.go b/scheduler/cloudformation/cloudformation_test.go index 87fa63c91..c6833dc85 100644 --- a/scheduler/cloudformation/cloudformation_test.go +++ b/scheduler/cloudformation/cloudformation_test.go @@ -1480,10 +1480,7 @@ func TestScheduler_Restart(t *testing.T) { StackName: aws.String("acme-inc"), }).Return(nil) - err = s.Restart(context.Background(), &twelvefactor.Manifest{ - AppID: "c9366591-ab68-4d49-a333-95ce5a23df68", - Name: "acme-inc", - }, twelvefactor.NullStatusStream) + err = s.Restart(context.Background(), "c9366591-ab68-4d49-a333-95ce5a23df68", twelvefactor.NullStatusStream) assert.NoError(t, err) c.AssertExpectations(t) @@ -1543,9 +1540,12 @@ func TestScheduler_Run_Detached(t *testing.T) { Env: map[string]string{ "EMPIRE_X_TASK_ROLE_ARN": "arn:aws:iam::897883143566:role/app", }, - }, &twelvefactor.Process{ - Type: "run", - Command: []string{"bundle exec rake db:migrate"}, + Processes: []*twelvefactor.Process{ + &twelvefactor.Process{ + Type: "run", + Command: []string{"bundle exec rake db:migrate"}, + }, + }, }, nil, nil) assert.NoError(t, err) @@ -1679,9 +1679,12 @@ func TestScheduler_Run_Attached(t *testing.T) { err = s.Run(context.Background(), &twelvefactor.Manifest{ AppID: "c9366591-ab68-4d49-a333-95ce5a23df68", Name: "acme-inc", - }, &twelvefactor.Process{ - Type: "run", - Command: []string{"bundle", "exec", "rake", "db:migrate"}, + Processes: []*twelvefactor.Process{ + &twelvefactor.Process{ + Type: "run", + Command: []string{"bundle", "exec", "rake", "db:migrate"}, + }, + }, }, stdin, stdout) assert.NoError(t, err) diff --git a/scheduler/cloudformation/template.go b/scheduler/cloudformation/template.go index 484d1f078..9b9045973 100644 --- a/scheduler/cloudformation/template.go +++ b/scheduler/cloudformation/template.go @@ -690,6 +690,28 @@ func (t *EmpireTemplate) addService(tmpl *troposphere.Template, app *twelvefacto "ServiceName": fmt.Sprintf("%s-%s", app.Name, p.Type), "ServiceToken": t.CustomResourcesTopic, } + if v := p.ECS; v != nil { + if len(v.PlacementConstraints) > 0 { + var placementConstraints []interface{} + for _, c := range v.PlacementConstraints { + placementConstraints = append(placementConstraints, map[string]interface{}{ + "Type": c.Type, + "Expression": c.Expression, + }) + } + serviceProperties["PlacementConstraints"] = placementConstraints + } + if len(v.PlacementStrategy) > 0 { + var placementStrategy []interface{} + for _, c := range v.PlacementStrategy { + placementStrategy = append(placementStrategy, map[string]interface{}{ + "Type": c.Type, + "Field": c.Field, + }) + } + serviceProperties["PlacementStrategy"] = placementStrategy + } + } if len(loadBalancers) > 0 { serviceProperties["Role"] = t.ServiceRole } diff --git a/scheduler/cloudformation/template_test.go b/scheduler/cloudformation/template_test.go index 1434e8d6b..ca578671f 100644 --- a/scheduler/cloudformation/template_test.go +++ b/scheduler/cloudformation/template_test.go @@ -10,10 +10,12 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudformation" + "github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/aws-sdk-go/service/route53" "github.com/remind101/empire/pkg/bytesize" "github.com/remind101/empire/pkg/image" "github.com/remind101/empire/pkg/troposphere" + "github.com/remind101/empire/procfile" "github.com/remind101/empire/twelvefactor" "github.com/stretchr/testify/assert" ) @@ -398,6 +400,64 @@ func TestEmpireTemplate(t *testing.T) { }, }, }, + + { + "ecs-extra.json", + &twelvefactor.Manifest{ + AppID: "1234", + Release: "v1", + Name: "acme-inc", + Env: map[string]string{ + // These should get re-sorted in + // alphabetical order. + "C": "foo", + "A": "foobar", + "B": "bar", + }, + Processes: []*twelvefactor.Process{ + { + Type: "web", + Image: image.Image{Repository: "remind101/acme-inc", Tag: "latest"}, + Command: []string{"./bin/web"}, + Env: map[string]string{ + "PORT": "8080", + }, + Exposure: &twelvefactor.Exposure{ + Ports: []twelvefactor.Port{ + { + Host: 80, + Container: 8080, + Protocol: &twelvefactor.HTTP{}, + }, + }, + }, + Labels: map[string]string{ + "empire.app.process": "web", + }, + Memory: 128 * bytesize.MB, + CPUShares: 256, + Quantity: 1, + Nproc: 256, + ECS: &procfile.ECS{ + PlacementConstraints: []*ecs.PlacementConstraint{ + {Type: aws.String("memberOf"), Expression: aws.String("attribute:ecs.instance-type =~ t2.*")}, + }, + }, + }, + { + Type: "worker", + Image: image.Image{Repository: "remind101/acme-inc", Tag: "latest"}, + Command: []string{"./bin/worker"}, + Labels: map[string]string{ + "empire.app.process": "worker", + }, + Env: map[string]string{ + "FOO": "BAR", + }, + }, + }, + }, + }, } stackTags := []*cloudformation.Tag{ diff --git a/scheduler/cloudformation/templates/ecs-extra.json b/scheduler/cloudformation/templates/ecs-extra.json new file mode 100644 index 000000000..312c56436 --- /dev/null +++ b/scheduler/cloudformation/templates/ecs-extra.json @@ -0,0 +1,342 @@ +{ + "Conditions": { + "DNSCondition": { + "Fn::Equals": [ + { + "Ref": "DNS" + }, + "true" + ] + } + }, + "Outputs": { + "Deployments": { + "Value": { + "Fn::Join": [ + ",", + [ + { + "Fn::Join": [ + "=", + [ + "web", + { + "Fn::GetAtt": [ + "webService", + "DeploymentId" + ] + } + ] + ] + }, + { + "Fn::Join": [ + "=", + [ + "worker", + { + "Fn::GetAtt": [ + "workerService", + "DeploymentId" + ] + } + ] + ] + } + ] + ] + } + }, + "EmpireVersion": { + "Value": "x.x.x" + }, + "Release": { + "Value": "v1" + }, + "Services": { + "Value": { + "Fn::Join": [ + ",", + [ + { + "Fn::Join": [ + "=", + [ + "web", + { + "Ref": "webService" + } + ] + ] + }, + { + "Fn::Join": [ + "=", + [ + "worker", + { + "Ref": "workerService" + } + ] + ] + } + ] + ] + } + } + }, + "Parameters": { + "DNS": { + "Type": "String", + "Description": "When set to `true`, CNAME's will be altered", + "Default": "true" + }, + "RestartKey": { + "Type": "String", + "Description": "Key used to trigger a restart of an app", + "Default": "default" + }, + "webScale": { + "Type": "String" + }, + "workerScale": { + "Type": "String" + } + }, + "Resources": { + "CNAME": { + "Condition": "DNSCondition", + "Properties": { + "HostedZoneId": "Z3DG6IL3SJCGPX", + "Name": "acme-inc.empire", + "ResourceRecords": [ + { + "Fn::GetAtt": [ + "webLoadBalancer", + "DNSName" + ] + } + ], + "TTL": 60, + "Type": "CNAME" + }, + "Type": "AWS::Route53::RecordSet" + }, + "web8080InstancePort": { + "Properties": { + "ServiceToken": "sns topic arn" + }, + "Type": "Custom::InstancePort", + "Version": "1.0" + }, + "webAlias": { + "Condition": "DNSCondition", + "Properties": { + "AliasTarget": { + "DNSName": { + "Fn::GetAtt": [ + "webLoadBalancer", + "DNSName" + ] + }, + "EvaluateTargetHealth": "true", + "HostedZoneId": { + "Fn::GetAtt": [ + "webLoadBalancer", + "CanonicalHostedZoneNameID" + ] + } + }, + "HostedZoneId": "Z3DG6IL3SJCGPX", + "Name": "web.acme-inc.empire", + "Type": "A" + }, + "Type": "AWS::Route53::RecordSet" + }, + "webLoadBalancer": { + "Properties": { + "ConnectionDrainingPolicy": { + "Enabled": true, + "Timeout": 30 + }, + "CrossZone": true, + "Listeners": [ + { + "InstancePort": { + "Fn::GetAtt": [ + "web8080InstancePort", + "InstancePort" + ] + }, + "InstanceProtocol": "http", + "LoadBalancerPort": 80, + "Protocol": "http" + } + ], + "Scheme": "internal", + "SecurityGroups": [ + "sg-e7387381" + ], + "Subnets": [ + "subnet-bb01c4cd", + "subnet-c85f4091" + ], + "Tags": [ + { + "Key": "empire.app.process", + "Value": "web" + } + ] + }, + "Type": "AWS::ElasticLoadBalancing::LoadBalancer" + }, + "webService": { + "Properties": { + "Cluster": "cluster", + "DesiredCount": { + "Ref": "webScale" + }, + "LoadBalancers": [ + { + "ContainerName": "web", + "ContainerPort": 8080, + "LoadBalancerName": { + "Ref": "webLoadBalancer" + } + } + ], + "PlacementConstraints": [ + { + "Expression": "attribute:ecs.instance-type =~ t2.*", + "Type": "memberOf" + } + ], + "Role": "ecsServiceRole", + "ServiceName": "acme-inc-web", + "ServiceToken": "sns topic arn", + "TaskDefinition": { + "Ref": "webTaskDefinition" + } + }, + "Type": "Custom::ECSService" + }, + "webTaskDefinition": { + "Properties": { + "ContainerDefinitions": [ + { + "Command": [ + "./bin/web" + ], + "Cpu": 256, + "DockerLabels": { + "cloudformation.restart-key": { + "Ref": "RestartKey" + }, + "empire.app.process": "web" + }, + "Environment": [ + { + "Name": "A", + "Value": "foobar" + }, + { + "Name": "B", + "Value": "bar" + }, + { + "Name": "C", + "Value": "foo" + }, + { + "Name": "PORT", + "Value": "8080" + } + ], + "Essential": true, + "Image": "remind101/acme-inc:latest", + "Memory": 128, + "Name": "web", + "PortMappings": [ + { + "ContainerPort": 8080, + "HostPort": { + "Fn::GetAtt": [ + "web8080InstancePort", + "InstancePort" + ] + } + } + ], + "Ulimits": [ + { + "HardLimit": 256, + "Name": "nproc", + "SoftLimit": 256 + } + ] + } + ], + "Volumes": [] + }, + "Type": "AWS::ECS::TaskDefinition" + }, + "workerService": { + "Properties": { + "Cluster": "cluster", + "DesiredCount": { + "Ref": "workerScale" + }, + "LoadBalancers": [], + "ServiceName": "acme-inc-worker", + "ServiceToken": "sns topic arn", + "TaskDefinition": { + "Ref": "workerTaskDefinition" + } + }, + "Type": "Custom::ECSService" + }, + "workerTaskDefinition": { + "Properties": { + "ContainerDefinitions": [ + { + "Command": [ + "./bin/worker" + ], + "Cpu": 0, + "DockerLabels": { + "cloudformation.restart-key": { + "Ref": "RestartKey" + }, + "empire.app.process": "worker" + }, + "Environment": [ + { + "Name": "A", + "Value": "foobar" + }, + { + "Name": "B", + "Value": "bar" + }, + { + "Name": "C", + "Value": "foo" + }, + { + "Name": "FOO", + "Value": "BAR" + } + ], + "Essential": true, + "Image": "remind101/acme-inc:latest", + "Memory": 0, + "Name": "worker", + "Ulimits": [] + } + ], + "Volumes": [] + }, + "Type": "AWS::ECS::TaskDefinition" + } + } +} \ No newline at end of file diff --git a/scheduler/docker/docker.go b/scheduler/docker/docker.go index 5338b3108..c6a45b18a 100644 --- a/scheduler/docker/docker.go +++ b/scheduler/docker/docker.go @@ -72,14 +72,14 @@ func RunAttachedWithDocker(s twelvefactor.Scheduler, client *dockerutil.Client) // Run runs attached processes using the docker scheduler, and detached // processes using the wrapped scheduler. -func (s *AttachedScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, process *twelvefactor.Process, in io.Reader, out io.Writer) error { +func (s *AttachedScheduler) Run(ctx context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error { // Attached means stdout, stdin is attached. attached := out != nil || in != nil if attached { - return s.dockerScheduler.Run(ctx, app, process, in, out) + return s.dockerScheduler.Run(ctx, app, in, out) } else { - return s.Scheduler.Run(ctx, app, process, in, out) + return s.Scheduler.Run(ctx, app, in, out) } } @@ -146,73 +146,75 @@ func NewScheduler(client *dockerutil.Client) *Scheduler { } } -func (s *Scheduler) Run(ctx context.Context, app *twelvefactor.Manifest, p *twelvefactor.Process, in io.Reader, out io.Writer) error { +func (s *Scheduler) Run(ctx context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error { attached := out != nil || in != nil if !attached { return errors.New("cannot run detached processes with Docker scheduler") } - labels := twelvefactor.Labels(app, p) - labels[runLabel] = Attached - - if err := s.docker.PullImage(ctx, docker.PullImageOptions{ - Registry: p.Image.Registry, - Repository: p.Image.Repository, - Tag: p.Image.Tag, - OutputStream: replaceNL(out), - }); err != nil { - return fmt.Errorf("error pulling image: %v", err) - } + for _, p := range app.Processes { + labels := twelvefactor.Labels(app, p) + labels[runLabel] = Attached + + if err := s.docker.PullImage(ctx, docker.PullImageOptions{ + Registry: p.Image.Registry, + Repository: p.Image.Repository, + Tag: p.Image.Tag, + OutputStream: replaceNL(out), + }); err != nil { + return fmt.Errorf("error pulling image: %v", err) + } - container, err := s.docker.CreateContainer(ctx, docker.CreateContainerOptions{ - Name: uuid.New(), - Config: &docker.Config{ - Tty: true, - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - OpenStdin: true, - Memory: int64(p.Memory), - CPUShares: int64(p.CPUShares), - Image: p.Image.String(), - Cmd: p.Command, - Env: envKeys(twelvefactor.Env(app, p)), - Labels: labels, - }, - HostConfig: &docker.HostConfig{ - LogConfig: docker.LogConfig{ - Type: "json-file", + container, err := s.docker.CreateContainer(ctx, docker.CreateContainerOptions{ + Name: uuid.New(), + Config: &docker.Config{ + Tty: true, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + OpenStdin: true, + Memory: int64(p.Memory), + CPUShares: int64(p.CPUShares), + Image: p.Image.String(), + Cmd: p.Command, + Env: envKeys(twelvefactor.Env(app, p)), + Labels: labels, }, - }, - }) - if err != nil { - return fmt.Errorf("error creating container: %v", err) - } - defer s.docker.RemoveContainer(ctx, docker.RemoveContainerOptions{ - ID: container.ID, - RemoveVolumes: true, - Force: true, - }) + HostConfig: &docker.HostConfig{ + LogConfig: docker.LogConfig{ + Type: "json-file", + }, + }, + }) + if err != nil { + return fmt.Errorf("error creating container: %v", err) + } + defer s.docker.RemoveContainer(ctx, docker.RemoveContainerOptions{ + ID: container.ID, + RemoveVolumes: true, + Force: true, + }) - if err := s.docker.StartContainer(ctx, container.ID, nil); err != nil { - return fmt.Errorf("error starting container: %v", err) - } - defer tryClose(out) - - if err := s.docker.AttachToContainer(ctx, docker.AttachToContainerOptions{ - Container: container.ID, - InputStream: in, - OutputStream: out, - ErrorStream: out, - Logs: true, - Stream: true, - Stdin: true, - Stdout: true, - Stderr: true, - RawTerminal: true, - }); err != nil { - return fmt.Errorf("error attaching to container: %v", err) + if err := s.docker.StartContainer(ctx, container.ID, nil); err != nil { + return fmt.Errorf("error starting container: %v", err) + } + defer tryClose(out) + + if err := s.docker.AttachToContainer(ctx, docker.AttachToContainerOptions{ + Container: container.ID, + InputStream: in, + OutputStream: out, + ErrorStream: out, + Logs: true, + Stream: true, + Stdin: true, + Stdout: true, + Stderr: true, + RawTerminal: true, + }); err != nil { + return fmt.Errorf("error attaching to container: %v", err) + } } return nil diff --git a/server/cloudformation/cloudformation.go b/server/cloudformation/cloudformation.go index 44346d5c5..dd0cfab05 100644 --- a/server/cloudformation/cloudformation.go +++ b/server/cloudformation/cloudformation.go @@ -4,6 +4,7 @@ package cloudformation import ( "encoding/json" + "errors" "fmt" "time" @@ -55,14 +56,14 @@ func NewCustomResourceProvisioner(empire *empire.Empire, config client.ConfigPro sendResponse: customresources.SendResponse, } - p.add("Custom::InstancePort", &InstancePortsProvisioner{ + p.add("Custom::InstancePort", newInstancePortsProvisioner(&InstancePortsResource{ ports: lb.NewDBPortAllocator(db), - }) + })) ecs := newECSClient(config) - p.add("Custom::ECSService", &ECSServiceResource{ + p.add("Custom::ECSService", newECSServiceProvisioner(&ECSServiceResource{ ecs: ecs, - }) + })) store := &dbEnvironmentStore{db} p.add("Custom::ECSEnvironment", newECSEnvironmentProvisioner(&ECSEnvironmentResource{ @@ -200,6 +201,9 @@ type provisioner struct { } func (p *provisioner) Properties() interface{} { + if p.properties == nil { + return nil + } return p.properties() } @@ -224,6 +228,9 @@ func (p *provisioner) Provision(ctx context.Context, req customresources.Request } id := req.PhysicalResourceId + if p.Update == nil { + return id, nil, errors.New("resource does not support updates") + } data, err := p.Update(ctx, req) return id, data, err case customresources.Delete: diff --git a/server/cloudformation/ecs.go b/server/cloudformation/ecs.go index aeb236925..45ce9edd5 100644 --- a/server/cloudformation/ecs.go +++ b/server/cloudformation/ecs.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strings" "golang.org/x/net/context" @@ -53,12 +52,28 @@ type LoadBalancer struct { // ECSServiceProperties represents the properties for the Custom::ECSService // resource. type ECSServiceProperties struct { - ServiceName *string - Cluster *string - DesiredCount *customresources.IntValue - LoadBalancers []LoadBalancer - Role *string - TaskDefinition *string + ServiceName *string + Cluster *string + DesiredCount *customresources.IntValue `hash:"ignore"` + LoadBalancers []LoadBalancer + Role *string + TaskDefinition *string `hash:"ignore"` + PlacementConstraints []ECSPlacementConstraint + PlacementStrategy []ECSPlacementStrategy +} + +func (p *ECSServiceProperties) ReplacementHash() (uint64, error) { + return hashstructure.Hash(p, nil) +} + +type ECSPlacementConstraint struct { + Type *string + Expression *string +} + +type ECSPlacementStrategy struct { + Type *string + Field *string } // ECSServiceResource is a Provisioner that creates and updates ECS services. @@ -66,68 +81,22 @@ type ECSServiceResource struct { ecs ecsClient } -func (p *ECSServiceResource) Properties() interface{} { - return &ECSServiceProperties{} +func newECSServiceProvisioner(resource *ECSServiceResource) *provisioner { + return &provisioner{ + properties: func() properties { + return &ECSServiceProperties{} + }, + Create: resource.Create, + Update: resource.Update, + Delete: resource.Delete, + } } -func (p *ECSServiceResource) Provision(ctx context.Context, req customresources.Request) (string, interface{}, error) { +func (p *ECSServiceResource) Create(ctx context.Context, req customresources.Request) (string, interface{}, error) { properties := req.ResourceProperties.(*ECSServiceProperties) - oldProperties := req.OldResourceProperties.(*ECSServiceProperties) + clientToken := req.Hash() data := make(map[string]string) - switch req.RequestType { - case customresources.Create: - id, deploymentId, err := p.create(ctx, req.Hash(), properties) - if err == nil { - data["DeploymentId"] = deploymentId - } - return id, data, err - case customresources.Delete: - id := req.PhysicalResourceId - err := p.delete(ctx, aws.String(id), properties.Cluster) - return id, nil, err - case customresources.Update: - id := req.PhysicalResourceId - - // TODO: Update this to use hashstructure. - if serviceRequiresReplacement(properties, oldProperties) { - // If we can't update the service, we'll need to create a new - // one, and destroy the old one. - oldId := id - id, deploymentId, err := p.create(ctx, req.Hash(), properties) - if err != nil { - return oldId, nil, err - } - data["DeploymentId"] = deploymentId - - // There's no need to delete the old service here, since - // CloudFormation will send us a DELETE request for the old - // service. - - return id, data, err - } - - resp, err := p.ecs.UpdateService(&ecs.UpdateServiceInput{ - Service: aws.String(id), - Cluster: properties.Cluster, - DesiredCount: properties.DesiredCount.Value(), - TaskDefinition: properties.TaskDefinition, - }) - if err == nil { - d := primaryDeployment(resp.Service) - if d != nil { - data["DeploymentId"] = *d.Id - } else { - err = fmt.Errorf("no primary deployment found") - } - } - return id, data, err - default: - return "", nil, fmt.Errorf("%s is not supported", req.RequestType) - } -} - -func (p *ECSServiceResource) create(ctx context.Context, clientToken string, properties *ECSServiceProperties) (string, string, error) { var loadBalancers []*ecs.LoadBalancer for _, v := range properties.LoadBalancers { loadBalancers = append(loadBalancers, &ecs.LoadBalancer{ @@ -138,28 +107,47 @@ func (p *ECSServiceResource) create(ctx context.Context, clientToken string, pro }) } + var placementConstraints []*ecs.PlacementConstraint + for _, v := range properties.PlacementConstraints { + placementConstraints = append(placementConstraints, &ecs.PlacementConstraint{ + Type: v.Type, + Expression: v.Expression, + }) + } + + var placementStrategy []*ecs.PlacementStrategy + for _, v := range properties.PlacementStrategy { + placementStrategy = append(placementStrategy, &ecs.PlacementStrategy{ + Type: v.Type, + Field: v.Field, + }) + } + var serviceName *string if properties.ServiceName != nil { serviceName = aws.String(fmt.Sprintf("%s-%s", *properties.ServiceName, clientToken)) } resp, err := p.ecs.CreateService(&ecs.CreateServiceInput{ - ClientToken: aws.String(clientToken), - ServiceName: serviceName, - Cluster: properties.Cluster, - DesiredCount: properties.DesiredCount.Value(), - Role: properties.Role, - TaskDefinition: properties.TaskDefinition, - LoadBalancers: loadBalancers, + ClientToken: aws.String(clientToken), + ServiceName: serviceName, + Cluster: properties.Cluster, + DesiredCount: properties.DesiredCount.Value(), + Role: properties.Role, + TaskDefinition: properties.TaskDefinition, + LoadBalancers: loadBalancers, + PlacementConstraints: placementConstraints, + PlacementStrategy: placementStrategy, }) if err != nil { - return "", "", fmt.Errorf("error creating service: %v", err) + return "", nil, fmt.Errorf("error creating service: %v", err) } d := primaryDeployment(resp.Service) if d == nil { - return "", "", fmt.Errorf("no primary deployment found") + return "", data, fmt.Errorf("no primary deployment found") } + data["DeploymentId"] = *d.Id arn := resp.Service.ServiceArn @@ -180,13 +168,40 @@ func (p *ECSServiceResource) create(ctx context.Context, clientToken string, pro select { case <-stabilized: case <-ctx.Done(): - return *arn, *d.Id, ctx.Err() + return *arn, data, ctx.Err() + } + + return *arn, data, nil +} + +func (p *ECSServiceResource) Update(ctx context.Context, req customresources.Request) (interface{}, error) { + properties := req.ResourceProperties.(*ECSServiceProperties) + data := make(map[string]string) + + resp, err := p.ecs.UpdateService(&ecs.UpdateServiceInput{ + Service: aws.String(req.PhysicalResourceId), + Cluster: properties.Cluster, + DesiredCount: properties.DesiredCount.Value(), + TaskDefinition: properties.TaskDefinition, + }) + if err != nil { + return nil, err + } + + d := primaryDeployment(resp.Service) + if d == nil { + return nil, fmt.Errorf("no primary deployment found") } - return *arn, *d.Id, nil + data["DeploymentId"] = *d.Id + return data, nil } -func (p *ECSServiceResource) delete(ctx context.Context, service, cluster *string) error { +func (p *ECSServiceResource) Delete(ctx context.Context, req customresources.Request) error { + properties := req.ResourceProperties.(*ECSServiceProperties) + service := aws.String(req.PhysicalResourceId) + cluster := properties.Cluster + // We have to scale the service down to 0, before we're able to // destroy it. if _, err := p.ecs.UpdateService(&ecs.UpdateServiceInput{ @@ -267,7 +282,6 @@ func newECSTaskDefinitionProvisioner(resource *ECSTaskDefinitionResource) *provi return &ECSTaskDefinitionProperties{} }, Create: resource.Create, - Update: resource.Update, Delete: resource.Delete, } } @@ -278,13 +292,6 @@ func (p *ECSTaskDefinitionResource) Create(ctx context.Context, req customresour return id, nil, err } -func (p *ECSTaskDefinitionResource) Update(ctx context.Context, req customresources.Request) (interface{}, error) { - // Updates of ECSTaskDefinition will generate a replacement resource, so - // if we've reached this point, it means that the environment is the - // same as it was before. - return nil, nil -} - func (p *ECSTaskDefinitionResource) Delete(ctx context.Context, req customresources.Request) error { return p.delete(req.PhysicalResourceId) } @@ -376,7 +383,7 @@ func (p *ECSTaskDefinitionResource) delete(arn string) error { // ECSEnvironmentProperties are the properties provided to the // Custom::ECSEnvironment custom resource. type ECSEnvironmentProperties struct { - Environment []*ecs.KeyValuePair `hash:"set"` + Environment []*ecs.KeyValuePair } func (p *ECSEnvironmentProperties) ReplacementHash() (uint64, error) { @@ -396,7 +403,6 @@ func newECSEnvironmentProvisioner(resource *ECSEnvironmentResource) *provisioner return &ECSEnvironmentProperties{} }, Create: resource.Create, - Update: resource.Update, Delete: resource.Delete, } } @@ -407,13 +413,6 @@ func (p *ECSEnvironmentResource) Create(ctx context.Context, req customresources return id, nil, err } -func (p *ECSEnvironmentResource) Update(ctx context.Context, req customresources.Request) (interface{}, error) { - // Updates of ECSEnvironment will generate a replacement resource, so if - // we've reached this point, it means that the environment is the same - // as it was before. - return nil, nil -} - func (p *ECSEnvironmentResource) Delete(ctx context.Context, req customresources.Request) error { return nil } @@ -465,30 +464,6 @@ func (s *dbEnvironmentStore) fetch(id string) ([]*ecs.KeyValuePair, error) { return env, err } -// Certain parameters cannot be updated on existing services, so we need to -// create a new physical resource. -func serviceRequiresReplacement(new, old *ECSServiceProperties) bool { - eq := reflect.DeepEqual - - if !eq(new.Cluster, old.Cluster) { - return true - } - - if !eq(new.Role, old.Role) { - return true - } - - if !eq(new.ServiceName, old.ServiceName) { - return true - } - - if !eq(new.LoadBalancers, old.LoadBalancers) { - return true - } - - return false -} - func primaryDeployment(service *ecs.Service) *ecs.Deployment { for _, d := range service.Deployments { if d.Status != nil && *d.Status == "PRIMARY" { diff --git a/server/cloudformation/ecs_test.go b/server/cloudformation/ecs_test.go index 6115e5929..900bbd998 100644 --- a/server/cloudformation/ecs_test.go +++ b/server/cloudformation/ecs_test.go @@ -50,9 +50,9 @@ func TestRetryer(t *testing.T) { func TestECSServiceResource_Create(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("CreateService", &ecs.CreateServiceInput{ ClientToken: aws.String("dxRU5tYsnzt"), @@ -91,9 +91,9 @@ func TestECSServiceResource_Create(t *testing.T) { func TestECSServiceResource_Create_Canceled(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("CreateService", &ecs.CreateServiceInput{ ClientToken: aws.String("dxRU5tYsnzt"), @@ -128,16 +128,16 @@ func TestECSServiceResource_Create_Canceled(t *testing.T) { OldResourceProperties: &ECSServiceProperties{}, }) assert.Equal(t, context.Canceled, err) - assert.Equal(t, data, map[string]string{}) + assert.Equal(t, map[string]string{"DeploymentId": "New"}, data) e.AssertExpectations(t) } func TestECSServiceResource_Update(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("UpdateService", &ecs.UpdateServiceInput{ Service: aws.String("arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web"), @@ -183,9 +183,9 @@ func TestECSServiceResource_Update(t *testing.T) { func TestECSServiceResource_Update_RequiresReplacement(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("CreateService", &ecs.CreateServiceInput{ ClientToken: aws.String("dxRU5tYsnzt"), @@ -230,11 +230,66 @@ func TestECSServiceResource_Update_RequiresReplacement(t *testing.T) { e.AssertExpectations(t) } +func TestECSServiceResource_Update_Placement(t *testing.T) { + e := new(mockECS) + p := newECSServiceProvisioner(&ECSServiceResource{ + ecs: e, + }) + + e.On("CreateService", &ecs.CreateServiceInput{ + ClientToken: aws.String("dxRU5tYsnzt"), + ServiceName: aws.String("acme-inc-web-dxRU5tYsnzt"), + Cluster: aws.String("clusterA"), + DesiredCount: aws.Int64(2), + TaskDefinition: aws.String("arn:aws:ecs:us-east-1:012345678910:task-definition/acme-inc:2"), + PlacementConstraints: []*ecs.PlacementConstraint{ + {Type: aws.String("memberOf"), Expression: aws.String("attribute:ecs.instance-type =~ t2.*")}, + }, + }).Return(&ecs.CreateServiceOutput{ + Service: &ecs.Service{ + ServiceArn: aws.String("arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web-dxRU5tYsnzt"), + Deployments: []*ecs.Deployment{&ecs.Deployment{Id: aws.String("New"), Status: aws.String("PRIMARY")}}, + }, + }, nil) + + e.On("WaitUntilServicesStable", &ecs.DescribeServicesInput{ + Cluster: aws.String("clusterA"), + Services: []*string{aws.String("arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web-dxRU5tYsnzt")}, + }).Return(nil) + + id, data, err := p.Provision(ctx, customresources.Request{ + StackId: "arn:aws:cloudformation:us-east-1:012345678901:stack/acme-inc/bc66fd60-32be-11e6-902b-50d501eb4c17", + RequestId: "411f3f38-565f-4216-a711-aeafd5ba635e", + RequestType: customresources.Update, + PhysicalResourceId: "arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web-dxRU5tYsnzt", + ResourceProperties: &ECSServiceProperties{ + Cluster: aws.String("clusterA"), + ServiceName: aws.String("acme-inc-web"), + DesiredCount: customresources.Int(2), + TaskDefinition: aws.String("arn:aws:ecs:us-east-1:012345678910:task-definition/acme-inc:2"), + PlacementConstraints: []ECSPlacementConstraint{ + {Type: aws.String("memberOf"), Expression: aws.String("attribute:ecs.instance-type =~ t2.*")}, + }, + }, + OldResourceProperties: &ECSServiceProperties{ + Cluster: aws.String("clusterA"), + ServiceName: aws.String("acme-inc-web"), + DesiredCount: customresources.Int(1), + TaskDefinition: aws.String("arn:aws:ecs:us-east-1:012345678910:task-definition/acme-inc:1"), + }, + }) + assert.NoError(t, err) + assert.Equal(t, "arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web-dxRU5tYsnzt", id) + assert.Equal(t, data, map[string]string{"DeploymentId": "New"}) + + e.AssertExpectations(t) +} + func TestECSServiceResource_Delete(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("UpdateService", &ecs.UpdateServiceInput{ Service: aws.String("arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web"), @@ -280,9 +335,9 @@ func TestECSServiceResource_Delete(t *testing.T) { func TestECSServiceResource_Delete_NotActive(t *testing.T) { e := new(mockECS) - p := &ECSServiceResource{ + p := newECSServiceProvisioner(&ECSServiceResource{ ecs: e, - } + }) e.On("UpdateService", &ecs.UpdateServiceInput{ Service: aws.String("arn:aws:ecs:us-east-1:012345678901:service/acme-inc-web"), @@ -485,12 +540,23 @@ func TestECSEnvironment_Create(t *testing.T) { s.AssertExpectations(t) } -func TestECSEnvironment_Update(t *testing.T) { +func TestECSEnvironment_Update_RequiresReplacement(t *testing.T) { s := new(mockEnvironmentStore) p := newECSEnvironmentProvisioner(&ECSEnvironmentResource{ environmentStore: s, }) + s.On("store", []*ecs.KeyValuePair{ + { + Name: aws.String("FOO"), + Value: aws.String("bar"), + }, + { + Name: aws.String("BAR"), + Value: aws.String("foo"), + }, + }).Return("56152438-5fef-4c96-bbe1-9cf92022ae75", nil) + id, data, err := p.Provision(ctx, customresources.Request{ RequestType: customresources.Update, PhysicalResourceId: "56152438-5fef-4c96-bbe1-9cf92022ae75", @@ -528,52 +594,53 @@ func TestECSEnvironment_Update(t *testing.T) { func TestServiceRequiresReplacement(t *testing.T) { tests := []struct { - new, old ECSServiceProperties + new, old properties out bool }{ { - ECSServiceProperties{Cluster: aws.String("cluster"), TaskDefinition: aws.String("td:2"), DesiredCount: customresources.Int(1)}, - ECSServiceProperties{Cluster: aws.String("cluster"), TaskDefinition: aws.String("td:1"), DesiredCount: customresources.Int(0)}, + &ECSServiceProperties{Cluster: aws.String("cluster"), TaskDefinition: aws.String("td:2"), DesiredCount: customresources.Int(1)}, + &ECSServiceProperties{Cluster: aws.String("cluster"), TaskDefinition: aws.String("td:1"), DesiredCount: customresources.Int(0)}, false, }, { - ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elb")}}}, - ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elb")}}}, + &ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elb")}}}, + &ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elb")}}}, false, }, // Can't change clusters. { - ECSServiceProperties{Cluster: aws.String("clusterB")}, - ECSServiceProperties{Cluster: aws.String("clusterA")}, + &ECSServiceProperties{Cluster: aws.String("clusterB")}, + &ECSServiceProperties{Cluster: aws.String("clusterA")}, true, }, // Can't change name. { - ECSServiceProperties{ServiceName: aws.String("acme-inc-B")}, - ECSServiceProperties{ServiceName: aws.String("acme-inc-A")}, + &ECSServiceProperties{ServiceName: aws.String("acme-inc-B")}, + &ECSServiceProperties{ServiceName: aws.String("acme-inc-A")}, true, }, // Can't change role. { - ECSServiceProperties{Role: aws.String("roleB")}, - ECSServiceProperties{Role: aws.String("roleA")}, + &ECSServiceProperties{Role: aws.String("roleB")}, + &ECSServiceProperties{Role: aws.String("roleA")}, true, }, // Can't change load balancers { - ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elbB")}}}, - ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elbA")}}}, + &ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elbB")}}}, + &ECSServiceProperties{LoadBalancers: []LoadBalancer{{ContainerName: aws.String("web"), ContainerPort: customresources.Int(8080), LoadBalancerName: aws.String("elbA")}}}, true, }, } for _, tt := range tests { - out := serviceRequiresReplacement(&tt.new, &tt.old) + out, err := requiresReplacement(tt.new, tt.old) + assert.NoError(t, err) assert.Equal(t, tt.out, out) } } diff --git a/server/cloudformation/ports.go b/server/cloudformation/ports.go index d189e3d7c..dfad1d952 100644 --- a/server/cloudformation/ports.go +++ b/server/cloudformation/ports.go @@ -14,36 +14,34 @@ type portAllocator interface { Put(port int64) error } -// InstancePortsProvisioner is a Provisioner that allocates instance ports. -type InstancePortsProvisioner struct { +// InstancePortsResource is a Provisioner that allocates instance ports. +type InstancePortsResource struct { ports portAllocator } -func (p *InstancePortsProvisioner) Properties() interface{} { +func newInstancePortsProvisioner(resource *InstancePortsResource) *provisioner { + return &provisioner{ + Create: resource.Create, + Delete: resource.Delete, + } +} + +func (p *InstancePortsResource) Properties() interface{} { return nil } -func (p *InstancePortsProvisioner) Provision(_ context.Context, req customresources.Request) (id string, data interface{}, err error) { - switch req.RequestType { - case customresources.Create: - var port int64 - port, err = p.ports.Get() - if err != nil { - return - } - id = fmt.Sprintf("%d", port) - data = map[string]int64{"InstancePort": port} - case customresources.Delete: - port, err2 := strconv.Atoi(req.PhysicalResourceId) - if err2 != nil { - err = fmt.Errorf("physical resource id should have been a port number: %v", err2) - return - } - id = req.PhysicalResourceId - err = p.ports.Put(int64(port)) - default: - err = fmt.Errorf("%s is not supported", req.RequestType) - } +func (p *InstancePortsResource) Create(_ context.Context, req customresources.Request) (string, interface{}, error) { + var port int64 + port, err := p.ports.Get() + data := map[string]int64{"InstancePort": port} + id := fmt.Sprintf("%d", port) + return id, data, err +} - return +func (p *InstancePortsResource) Delete(_ context.Context, req customresources.Request) error { + port, err := strconv.Atoi(req.PhysicalResourceId) + if err != nil { + return fmt.Errorf("physical resource id should have been a port number: %v", err) + } + return p.ports.Put(int64(port)) } diff --git a/tests/empire/empire_test.go b/tests/empire/empire_test.go index 3fe045521..a0abfaa79 100644 --- a/tests/empire/empire_test.go +++ b/tests/empire/empire_test.go @@ -308,26 +308,28 @@ func TestEmpire_Run(t *testing.T) { "empire.app.id": app.ID, "empire.app.release": "v1", }, - }, - &twelvefactor.Process{ - Type: "run", - Image: img, - Command: []string{"bundle", "exec", "rake", "db:migrate"}, - Quantity: 1, - Memory: 536870912, - CPUShares: 256, - Nproc: 256, - Env: map[string]string{ - "EMPIRE_PROCESS": "run", - "EMPIRE_PROCESS_SCALE": "1", - "SOURCE": "acme-inc.run.v1", - "TERM": "xterm", - }, - Labels: map[string]string{ - "empire.app.process": "run", - "empire.user": "ejholmes", + Processes: []*twelvefactor.Process{ + &twelvefactor.Process{ + Type: "run", + Image: img, + Command: []string{"bundle", "exec", "rake", "db:migrate"}, + Quantity: 1, + Memory: 536870912, + CPUShares: 256, + Nproc: 256, + Env: map[string]string{ + "EMPIRE_PROCESS": "run", + "EMPIRE_PROCESS_SCALE": "1", + "SOURCE": "acme-inc.run.v1", + "TERM": "xterm", + }, + Labels: map[string]string{ + "empire.app.process": "run", + "empire.user": "ejholmes", + }, }, - }, nil, nil).Return(nil) + }, + }, nil, nil).Return(nil) err = e.Run(context.Background(), empire.RunOpts{ User: user, @@ -384,26 +386,28 @@ func TestEmpire_Run_WithConstraints(t *testing.T) { "empire.app.id": app.ID, "empire.app.release": "v1", }, - }, - &twelvefactor.Process{ - Type: "run", - Image: img, - Command: []string{"bundle", "exec", "rake", "db:migrate"}, - Quantity: 1, - Memory: 1073741824, - CPUShares: 512, - Nproc: 512, - Env: map[string]string{ - "EMPIRE_PROCESS": "run", - "EMPIRE_PROCESS_SCALE": "1", - "SOURCE": "acme-inc.run.v1", - "TERM": "xterm", - }, - Labels: map[string]string{ - "empire.app.process": "run", - "empire.user": "ejholmes", + Processes: []*twelvefactor.Process{ + &twelvefactor.Process{ + Type: "run", + Image: img, + Command: []string{"bundle", "exec", "rake", "db:migrate"}, + Quantity: 1, + Memory: 1073741824, + CPUShares: 512, + Nproc: 512, + Env: map[string]string{ + "EMPIRE_PROCESS": "run", + "EMPIRE_PROCESS_SCALE": "1", + "SOURCE": "acme-inc.run.v1", + "TERM": "xterm", + }, + Labels: map[string]string{ + "empire.app.process": "run", + "empire.user": "ejholmes", + }, }, - }, nil, nil).Return(nil) + }, + }, nil, nil).Return(nil) constraints := empire.NamedConstraints["2X"] err = e.Run(context.Background(), empire.RunOpts{ @@ -479,26 +483,28 @@ func TestEmpire_Run_WithAllowCommandProcfile(t *testing.T) { "empire.app.name": "acme-inc", "empire.app.release": "v1", }, - }, - &twelvefactor.Process{ - Type: "rake", - Image: img, - Command: []string{"bundle", "exec", "rake", "db:migrate"}, - Quantity: 1, - Memory: 536870912, - CPUShares: 256, - Nproc: 256, - Env: map[string]string{ - "EMPIRE_PROCESS": "rake", - "EMPIRE_PROCESS_SCALE": "1", - "SOURCE": "acme-inc.rake.v1", - "TERM": "xterm", - }, - Labels: map[string]string{ - "empire.app.process": "rake", - "empire.user": "ejholmes", + Processes: []*twelvefactor.Process{ + &twelvefactor.Process{ + Type: "rake", + Image: img, + Command: []string{"bundle", "exec", "rake", "db:migrate"}, + Quantity: 1, + Memory: 536870912, + CPUShares: 256, + Nproc: 256, + Env: map[string]string{ + "EMPIRE_PROCESS": "rake", + "EMPIRE_PROCESS_SCALE": "1", + "SOURCE": "acme-inc.rake.v1", + "TERM": "xterm", + }, + Labels: map[string]string{ + "empire.app.process": "rake", + "empire.user": "ejholmes", + }, }, - }, nil, nil).Return(nil) + }, + }, nil, nil).Return(nil) err = e.Run(context.Background(), empire.RunOpts{ User: user, @@ -683,8 +689,7 @@ func (m *mockScheduler) Submit(_ context.Context, app *twelvefactor.Manifest, ss return args.Error(0) } -func (m *mockScheduler) Run(_ context.Context, app *twelvefactor.Manifest, process *twelvefactor.Process, in io.Reader, out io.Writer) error { - app.Processes = nil // This is bogus and doesn't actually matter for Runs. - args := m.Called(app, process, in, out) +func (m *mockScheduler) Run(_ context.Context, app *twelvefactor.Manifest, in io.Reader, out io.Writer) error { + args := m.Called(app, in, out) return args.Error(0) } diff --git a/twelvefactor/twelvefactor.go b/twelvefactor/twelvefactor.go index 19a76d14d..dc7ae40f4 100644 --- a/twelvefactor/twelvefactor.go +++ b/twelvefactor/twelvefactor.go @@ -9,6 +9,7 @@ import ( "golang.org/x/net/context" "github.com/remind101/empire/pkg/image" + "github.com/remind101/empire/procfile" ) type Manifest struct { @@ -65,6 +66,9 @@ type Process struct { // Can be used to setup a CRON schedule to run this task periodically. Schedule Schedule + + // Any ECS specific configuration. + ECS *procfile.ECS } // Schedule represents a Schedule for scheduled tasks that run periodically. @@ -154,14 +158,10 @@ type Task struct { UpdatedAt time.Time } -type Runner interface { - // Run runs a process. - Run(ctx context.Context, app *Manifest, process *Process, in io.Reader, out io.Writer) error -} - // Scheduler is an interface for interfacing with Services. type Scheduler interface { - Runner + // Run runs a process. + Run(ctx context.Context, app *Manifest, in io.Reader, out io.Writer) error // Submit submits an app, creating it or updating it as necessary. // When StatusStream is nil, Submit should return as quickly as possible, @@ -181,7 +181,32 @@ type Scheduler interface { Stop(ctx context.Context, instanceID string) error // Restart restarts the processes within the App. - Restart(context.Context, *Manifest, StatusStream) error + Restart(context.Context, string, StatusStream) error +} + +// Trasnform wraps a Scheduler to perform transformations on the Manifest. This +// can be used to, for example, add defaults placement constraints before +// providing it to the backend scheduler. +func Transform(s Scheduler, fn func(*Manifest) *Manifest) Scheduler { + return &transformer{s, fn} +} + +// transfomer wraps a Scheduler to perform transformations on the Manifest. This +// can be used to, for example, add defaults placement constraints before +// providing it to the backend scheduler. +type transformer struct { + Scheduler + + // Transform will be called on Submit and Run. + Transform func(*Manifest) *Manifest +} + +func (t *transformer) Submit(ctx context.Context, app *Manifest, ss StatusStream) error { + return t.Scheduler.Submit(ctx, t.Transform(app), ss) +} + +func (t *transformer) Run(ctx context.Context, app *Manifest, in io.Reader, out io.Writer) error { + return t.Scheduler.Run(ctx, t.Transform(app), in, out) } // Env merges the App environment with any environment variables provided