-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial implementation of StatusStream #917
Conversation
@@ -78,5 +80,22 @@ func (s *deployerService) Deploy(ctx context.Context, db *gorm.DB, opts DeployOp | |||
return r, err | |||
} | |||
|
|||
if s, ok := opts.Updates.(status.SubscribableStream); ok { | |||
for update := range s.Subscribe() { | |||
msg := fmt.Sprintf("Status: %s", update.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will already get prefixed with Status:
when the jsonmessage is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that, maybe not.
|
This is looking great so far. |
i'm fine moving within |
I think everything in |
ok cool |
Done(error) | ||
} | ||
|
||
type SubscribableStream interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just food for thought. Another possibility here would be to forgo channels entirely and make the interface:
// StatusStream is an interface for publishing status updates while executing
// an Empire action.
type StatusStream interface {
// Publish publishes an update to the status stream
Publish(Status) error
// Done finalizes the status stream
Done(error)
// Wait returns a channel that that receives once Done() is called.
// Consumers should call the Err() method to determine if an error
// ocurred.
Wait() <-chan struct{}
// Returns the error from calling Done().
Err() error
}
Then we just pass in an implementation that writes to the logstream:
type jsonmessageStatusStream struct {
sync.Mutex
done chan struct{}
err error
w io.Writer
}
func (s *jsonmessageStream) Publish(message scheduler.Status) {
select {
case <-done:
panic("Publish called on finalized stream")
default:
}
json.NewEncoder(s.w).Encode(jsonmessage.Message{Status: status.Message})
}
func (s *jsonmessageStream) Done(err error) {
close(s.done)
s.err = err
}
func (s *jsonmessageStream) Err() error {
return s.err
}
func (s *jsonmessageStream) Wait() <-chan struct{} {
return s.done
}
Main advantage would be that we don't need to worry about buffering the channel, calls to Publish
will immediately write to the stream and it's easy to wrap with middleware (e.g. writing to the apps kinesis stream, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like that.
another thing i'm coming across is having to do: https://github.com/remind101/empire/pull/917/files#diff-48d76aef8c283046a79db32824933290R523.
not the end of the world, but definitely annoying.
i was thinking of adding an active
flag within the stream, if it never gets activated, then the subscriber never blocks. there could either be a specific activate
method or as soon as you publish
the stream would get activated. but that defeats the purpose of being able to write to the stream at any point within a go routine and having the client subscribe.
also, i'm not a fan of throwing: panic("Publish called on finalized stream")
. given these are just status updates, it feels aggressive to throw a panic. what would be the proper way to warn instead? i couldn't find any examples in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, i'm not a fan of throwing: panic("Publish called on finalized stream"). given these are just status updates, it feels aggressive to throw a panic. what would be the proper way to warn instead? i couldn't find any examples in the code.
Agreed. If we change the interface to Publish(context.Context, Status)
, then you can do a logger.Warn(ctx, "message")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok cool
This allows us to subscribe to the StatusStream without blocking concurrent deploys.
ok, i implemented the the only part i'm not happy with is having to do: https://github.com/remind101/empire/pull/917/files#diff-48d76aef8c283046a79db32824933290R523 |
var msg jsonmessage.JSONMessage | ||
|
||
r, err := s.deploy(ctx, db, opts) | ||
tx := s.db.Begin() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd also wanna rollback here. Might be easier to wrap the transaction logic in a separate method:
func (s *deployerService) deployInTransaction(ctx context.Context, stream scheduler.StatusStream, opts De
tx := s.db.Begin()
r, err := s.deploy(ctx, tx, stream, opts)
if err != nil {
tx.Rollback()
return r, err
}
if err := tx.Commit().Error; err != nil {
return r, err
}
return r, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was rolling back here: https://github.com/remind101/empire/pull/917/files#diff-ff47559c0383dd1959cdbe50cd2c036dR76 which is roughly equivalent, but +1 on the separation, i was thinking the same
I just hacked together a couple changes in the CloudFormation backend to use this. So awesome seeing more context in the deployment stream: https://asciinema.org/a/b6vnccpg2u2uykf9wyyb1ijej This looks gtm 👍. We can make changes to |
|
||
// StatusStream is an interface for publishing status updates while a scheduler | ||
// is executing. | ||
type StatusStream interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking some more about this last night and I think there's one more small change we might wanna do that will make this much simpler. We can give semantic meaning to when the StatusStream is nil, and change the signature of scheduler.Submit
to:
// When StatusStream is nil, Submit should return as quickly as possible,
// usually when the new version has been received, and validated. If
// StatusStream is not nil, it's recommended that the method not return until
// the deployment has fully completed.
scheduler.Submit(context.Context, *App, StatusStream) error
This would mean, that we can remove the Done
, Wait
, and Err
methods on the StatusStream
interface so it's just Publish
. For the CloudFormation backend, we'd just change this line to be if ss != nil
.
The primary advantage will be in cases like the migration scheduler where we'd call Submit on two different schedulers, that might both call Done
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like this. i think it helps differentiate EventStream
as async empire notifications and StatusStream
as synchronous notifications during the execution of an action.
Awesome 👍 (sorry for the churn :)). |
This will enable the status stream for the command.
@ejholmes ok i think this is good to go. i added an |
Deploy now looks like:
When a deploy has been superseded:
While waiting for another deploy to finish:
Create looks like:
I wanted to get the actual cloudformation url, but i didn't see that we had easy access to the stack arn. |
@@ -208,6 +208,10 @@ func (s *Scheduler) submit(ctx context.Context, tx *sql.Tx, app *scheduler.App, | |||
return err | |||
} | |||
|
|||
if err := scheduler.Publish(ss, fmt.Sprintf("Created cloudformation template: %v", *t.URL)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of me feels like we should just ignore errors from Publish
. If for example, the emp client
disconnected from the network, this would result in err == io.EOF
and would cause the deployment to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea i was thinking the same thing initially but also didn't want to just swallow errors. maybe we just log these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
We should also add empire/server/github/deployer.go Lines 59 to 63 in 0f2cc1a
|
|
||
// Schedule the new release onto the cluster. | ||
return r, s.Release(ctx, r) | ||
return r, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, you can just return releasesCreate(db, r)
.
Couple of minor things, which we can follow up with later. This is awesome 👍. |
Oh, can you update the changelog too? |
We'll log these as warnings but we don't want it to disrupt a deploy
Split out setting up events from: #916
Calling these "Scheduler Events" was too confusing given that we also have "Empire Events" that publish to various sources. I went with calling this a "StatusStream" that can be created and passed to an action (like Deploy) and can be published and read from throughout the lifecycle of the action.
I ended up creating a separate package
status
because otherwise we would have had to import thescheduler
withindeployments.go
which felt off to me.