Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the ECS RunTask API to run attached processes #1043

Merged
merged 1 commit into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## HEAD

**Features**

* Empire now supports a new (experimental) feature to enable attached processes to be ran with ECS. [#1043](https://github.com/remind101/empire/pull/1043)

## 0.12.0 (2017-03-10)

**Features**
Expand Down
28 changes: 26 additions & 2 deletions cmd/empire/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
cf "github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecr"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/inconshreveable/log15"
Expand Down Expand Up @@ -106,6 +107,12 @@ func newScheduler(db *empire.DB, c *Context) (scheduler.Scheduler, error) {
return nil, fmt.Errorf("failed to initialize %s scheduler: %v", c.String(FlagScheduler), err)
}

// If ECS tasks support being attached to with a TTY + stdin, let the
// CloudFormation backend run attached processes.
if c.Bool(FlagECSAttachedEnabled) {
return s, nil
}

d, err := newDockerClient(c)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,6 +165,23 @@ func newCloudFormationScheduler(db *empire.DB, c *Context) (*cloudformation.Sche
s.StackNameTemplate = prefixedStackName(c.String(FlagEnvironment))
s.Bucket = c.String(FlagS3TemplateBucket)
s.Tags = tags
s.NewDockerClient = func(ec2Instance *ec2.Instance) (cloudformation.DockerClient, error) {
certPath := c.String(FlagECSDockerCert)
host := ec2Instance.PrivateIpAddress
if host == nil {
return nil, fmt.Errorf("instance %s does not have a private ip address", aws.StringValue(ec2Instance.InstanceId))
}
port := "2376"
if certPath == "" {
port = "2375"
}
c, err := dockerutil.NewDockerClient(fmt.Sprintf("tcp://%s:%s", *host, port), certPath)
if err != nil {
return c, err
}
// Ping the host, just to make sure we can connect.
return c, c.Ping()
}

log.Println("Using CloudFormation backend with the following configuration:")
log.Println(fmt.Sprintf(" Cluster: %v", s.Cluster))
Expand Down Expand Up @@ -202,14 +226,14 @@ func prefixedStackName(prefix string) *template.Template {
// DockerClient ========================

func newDockerClient(c *Context) (*dockerutil.Client, error) {
socket := c.String(FlagDockerSocket)
host := c.String(FlagDockerHost)
certPath := c.String(FlagDockerCert)
authProvider, err := newAuthProvider(c)
if err != nil {
return nil, err
}

return dockerutil.NewClient(authProvider, socket, certPath)
return dockerutil.NewClient(authProvider, host, certPath)
}

// LogStreamer =========================
Expand Down
21 changes: 17 additions & 4 deletions cmd/empire/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ const (

FlagDB = "db"

FlagDockerSocket = "docker.socket"
FlagDockerCert = "docker.cert"
FlagDockerAuth = "docker.auth"
FlagDockerHost = "docker.socket"
FlagDockerCert = "docker.cert"
FlagDockerAuth = "docker.auth"

FlagAWSDebug = "aws.debug"
FlagS3TemplateBucket = "s3.templatebucket"
Expand All @@ -57,6 +57,8 @@ const (
FlagECSServiceRole = "ecs.service.role"
FlagECSLogDriver = "ecs.logdriver"
FlagECSLogOpts = "ecs.logopt"
FlagECSAttachedEnabled = "ecs.attached.enabled"
FlagECSDockerCert = "ecs.docker.cert"

FlagELBSGPrivate = "elb.sg.private"
FlagELBSGPublic = "elb.sg.public"
Expand Down Expand Up @@ -249,7 +251,7 @@ var DBFlags = []cli.Flag{

var EmpireFlags = []cli.Flag{
cli.StringFlag{
Name: FlagDockerSocket,
Name: FlagDockerHost,
Value: "unix:///var/run/docker.sock",
Usage: "The location of the docker api",
EnvVar: "DOCKER_HOST",
Expand Down Expand Up @@ -310,6 +312,17 @@ var EmpireFlags = []cli.Flag{
Usage: "Log driver to options. Maps to the --log-opt docker cli arg",
EnvVar: "EMPIRE_ECS_LOG_OPT",
},
cli.BoolFlag{
Name: FlagECSAttachedEnabled,
Usage: "When enabled, indicates that ECS tasks can be attached to, using `docker attach`. When provided, this will also use ECS to run attached processes. At the moment, this flag should only be set if you're running a patched ECS agent. See http://empire.readthedocs.io/en/latest/configuration/ for more information.",
EnvVar: "EMPIRE_ECS_ATTACHED_ENABLED",
},
cli.StringFlag{
Name: FlagECSDockerCert,
Value: "",
Usage: "A path to the certificates to use when connecting to Docker daemon's on container instances.",
EnvVar: "EMPIRE_ECS_DOCKER_CERT_PATH",
},
cli.StringFlag{
Name: FlagELBSGPrivate,
Value: "",
Expand Down
74 changes: 74 additions & 0 deletions contrib/amazon-ecs-agent/tty
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go
index 9a9f3a3..cd5171d 100644
--- a/agent/engine/docker_task_engine.go
+++ b/agent/engine/docker_task_engine.go
@@ -17,6 +17,7 @@ package engine
import (
"errors"
"fmt"
+ "strconv"
"sync"
"time"

@@ -548,6 +549,16 @@ func (engine *DockerTaskEngine) createContainer(task *api.Task, container *api.C
return DockerContainerMetadata{Error: api.NamedError(err)}
}

+ if v, ok := config.Labels["docker.config.Tty"]; ok {
+ config.Tty, _ = strconv.ParseBool(v)
+ delete(config.Labels, "docker.config.Tty")
+ }
+
+ if v, ok := config.Labels["docker.config.OpenStdin"]; ok {
+ config.OpenStdin, _ = strconv.ParseBool(v)
+ delete(config.Labels, "docker.config.OpenStdin")
+ }
+
// Augment labels with some metadata from the agent. Explicitly do this last
// such that it will always override duplicates in the provided raw config
// data.
diff --git a/agent/engine/docker_task_engine_test.go b/agent/engine/docker_task_engine_test.go
index cc6523c..3818f25 100644
--- a/agent/engine/docker_task_engine_test.go
+++ b/agent/engine/docker_task_engine_test.go
@@ -675,6 +675,40 @@ func TestCreateContainerMergesLabels(t *testing.T) {
taskEngine.(*DockerTaskEngine).createContainer(testTask, testTask.Containers[0])
}

+func TestCreateContainerAllowsExtraDockerConfigInLabels(t *testing.T) {
+ ctrl, client, _, taskEngine, _, _ := mocks(t, &defaultConfig)
+ defer ctrl.Finish()
+
+ testTask := &api.Task{
+ Arn: "arn:aws:ecs:us-east-1:012345678910:task/c09f0188-7f87-4b0f-bfc3-16296622b6fe",
+ Family: "myFamily",
+ Version: "1",
+ Containers: []*api.Container{
+ &api.Container{
+ Name: "c1",
+ DockerConfig: api.DockerConfig{
+ Config: aws.String(`{"Labels":{"docker.config.Tty":"true","docker.config.OpenStdin":"true"}}`),
+ },
+ },
+ },
+ }
+ expectedConfig, err := testTask.DockerConfig(testTask.Containers[0])
+ if err != nil {
+ t.Fatal(err)
+ }
+ expectedConfig.Tty = true
+ expectedConfig.OpenStdin = true
+ expectedConfig.Labels = map[string]string{
+ "com.amazonaws.ecs.task-arn": "arn:aws:ecs:us-east-1:012345678910:task/c09f0188-7f87-4b0f-bfc3-16296622b6fe",
+ "com.amazonaws.ecs.container-name": "c1",
+ "com.amazonaws.ecs.task-definition-family": "myFamily",
+ "com.amazonaws.ecs.task-definition-version": "1",
+ "com.amazonaws.ecs.cluster": "",
+ }
+ client.EXPECT().CreateContainer(expectedConfig, gomock.Any(), gomock.Any(), gomock.Any())
+ taskEngine.(*DockerTaskEngine).createContainer(testTask, testTask.Containers[0])
+}
+
// TestTaskTransitionWhenStopContainerTimesout tests that task transitions to stopped
// only when terminal events are recieved from docker event stream when
// StopContainer times out
7 changes: 7 additions & 0 deletions docs/cloudformation.json
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,13 @@
],
"Resource": { "Fn::Join": ["", ["arn:aws:route53:::hostedzone/", { "Ref": "InternalDomain" }]] }
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,12 @@ In this configuration, you would create a dedicated Docker host, exposing the Do
#### Use Docker Swarm

Theoretically, you could point Empire at multiple Docker daemons that are connected via Docker swarm.

#### Use ECS

By default, Empire will run attached processes entirely through the Docker daemon that you point Empire at. You can specify the `--ecs.attached.enabled` (`EMPIRE_ECS_ATTACHED_ENABLED`) to run attached processes via ECS. This method is not yet suitable for production, and there's some important caveats and tradeoff's to be aware of:

1. It currently requires a [patch](https://github.com/remind101/empire/tree/master/contrib/amazon-ecs-agent/tty) to the Amazon ECS agent, to allow Empire to pass additional flags down to Docker when creating the container.
2. Empire needs to be able to connect to the Docker daemon of container instances in the ECS cluster. If you do this, it's _highly_ encouraged that you only expose the Docker socket over TLS (https://docs.docker.com/engine/security/https/) and restrict your security groups to only allow Empire access to port 2376 on container instances.

The primary benefit of this approach is that, by using ECS, attached runs can be easily scaled out to a group of hosts, and it also allows attached processes to benefit from AWS Roles for ECS tasks.
29 changes: 6 additions & 23 deletions pkg/dockerutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dockerutil

import (
"fmt"
"os"

"golang.org/x/net/context"

Expand All @@ -14,22 +13,16 @@ import (
// Docker API.
var dockerAPI124, _ = docker.NewAPIVersion("1.24")

// NewDockerClient returns a new docker.Client using the given socket and certificate path.
func NewDockerClient(socket, certPath string) (*docker.Client, error) {
// NewDockerClient returns a new docker.Client using the given host and certificate path.
func NewDockerClient(host, certPath string) (*docker.Client, error) {
if certPath != "" {
cert := certPath + "/cert.pem"
key := certPath + "/key.pem"
ca := certPath + "/ca.pem"
return docker.NewTLSClient(socket, cert, key, ca)
return docker.NewTLSClient(host, cert, key, ca)
}

return docker.NewClient(socket)
}

// NewDockerClientFromEnv returns a new docker client configured by the DOCKER_*
// environment variables.
func NewDockerClientFromEnv() (*docker.Client, error) {
return NewDockerClient(os.Getenv("DOCKER_HOST"), os.Getenv("DOCKER_CERT_PATH"))
return docker.NewClient(host)
}

// Client wraps a docker.Client to authenticate pulls.
Expand All @@ -44,18 +37,8 @@ type Client struct {
}

// NewClient returns a new Client instance.
func NewClient(authProvider dockerauth.AuthProvider, socket, certPath string) (*Client, error) {
c, err := NewDockerClient(socket, certPath)
if err != nil {
return nil, err
}
return newClient(authProvider, c)
}

// NewClientFromEnv returns a new Client instance configured by the DOCKER_*
// environment variables.
func NewClientFromEnv(authProvider dockerauth.AuthProvider) (*Client, error) {
c, err := NewDockerClientFromEnv()
func NewClient(authProvider dockerauth.AuthProvider, host, certPath string) (*Client, error) {
c, err := NewDockerClient(host, certPath)
if err != nil {
return nil, err
}
Expand Down
45 changes: 44 additions & 1 deletion scheduler/cloudformation/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudformation
import (
"time"

awswaiter "github.com/aws/aws-sdk-go/private/waiter"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/pmylund/go-cache"
"github.com/remind101/empire/pkg/arn"
Expand Down Expand Up @@ -30,7 +31,7 @@ type cachingECSClient struct {
}

// ecsWithCaching wraps an ecs.ECS client with caching.
func ecsWithCaching(ecs *ecs.ECS) *cachingECSClient {
func ecsWithCaching(ecs *ECS) *cachingECSClient {
return &cachingECSClient{
ecsClient: ecs,
taskDefinitions: cache.New(defaultExpiration, defaultPurge),
Expand Down Expand Up @@ -59,3 +60,45 @@ func (c *cachingECSClient) DescribeTaskDefinition(input *ecs.DescribeTaskDefinit

return resp, err
}

// ECS augments the ecs.ECS client with extra waiters.
type ECS struct {
*ecs.ECS
}

// WaitUntilTasksNotPending waits until all the given tasks are either RUNNING
// or STOPPED.
func (c *ECS) WaitUntilTasksNotPending(input *ecs.DescribeTasksInput) error {
waiterCfg := awswaiter.Config{
Operation: "DescribeTasks",
Delay: 6,
MaxAttempts: 100,
Acceptors: []awswaiter.WaitAcceptor{
{
State: "failure",
Matcher: "pathAny",
Argument: "failures[].reason",
Expected: "MISSING",
},
{
State: "success",
Matcher: "pathAll",
Argument: "tasks[].lastStatus",
Expected: "RUNNING",
},
{
State: "success",
Matcher: "pathAll",
Argument: "tasks[].lastStatus",
Expected: "STOPPED",
},
},
}

w := awswaiter.Waiter{
Client: c.ECS,
Input: input,
Config: waiterCfg,
}
return w.Wait()
}
Loading