Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid polling in worker #4094

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 27 additions & 44 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (m *MessageHandlerRegistry) Add(mh interface {
}
}

func (m *MessageHandlerRegistry) Remove(name string) {
func (m *MessageHandlerRegistry) remove(name string) {
if _, ok := m.Handlers[name]; ok {
delete(m.Handlers, name)
}
Expand All @@ -542,7 +542,7 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str
switch incoming.(type) {
case *events.WorkerStopMessage:
msg, _ := incoming.(*events.WorkerStopMessage)
workers.Remove(msg.Name())
workers.remove(msg.Name())
workerStatusManager.SetWorkerStatus(msg.Name(), STATUS_TERMINATED)
return successMsg, nil
}
Expand All @@ -557,65 +557,48 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str
return successMsg, nil
}

// This function combines all messages (events) from workers into a single global message queue. From this
// global queue, each message will get delivered to each worker by the event handler function.
func mux(workers *MessageHandlerRegistry, muxed chan events.Message) chan events.Message {

// Start a goroutine for each worker to forward worker messages into the given "multiplexed" channel.
// From this global queue, each message will get delivered to each worker by the event handler function.
func mux(workers *MessageHandlerRegistry, muxed chan events.Message) {
for _, w := range workers.Handlers {
select {
case ev := <-(*w).Messages():
muxed <- ev
default: // nothing
}
go func(c <-chan events.Message) {
for v := range c {
muxed <- v
}
}((*w).Messages())
}

return muxed
}

func (workers *MessageHandlerRegistry) ProcessEventMessages() {

// 200 messages should be plenty. We will never get more than 1 message from every worker each time
// we write into this stream.
messageStream := make(chan events.Message, 200)
mux(workers, messageStream)

last := int64(0)

// Process messages on the combined worker message queue as they arrive
for {
// Exit the event processing loop if all workers have deregistered.
if workers.IsEmpty() {
glog.V(3).Infof(mdLogString(fmt.Sprintf("Terminating")))
break
}

// Grab messages that are outbound from the workers.
messageStream = mux(workers, messageStream)

// Process any new messages on the combined worker message queue.
done := false
for !done {
select {
case msg := <-messageStream:
glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString())))
glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg)))

// Push outbound messages into each worker.
if successMsg, err := eventHandler(msg, workers); err != nil {
// error! do some barfing and then continue
glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err)))
} else {
glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg)))
}
default:
now := time.Now().Unix()
if now-last > 30 {
glog.V(5).Infof(mdLogString(fmt.Sprintf("No incoming messages for router to handle")))
last = now
}
done = true
}
msg, ok := <-messageStream
if !ok {
// channel closed: currently won't happen
glog.V(3).Infof(mdLogString("Muxed channel closed: Terminating"))
break
}

time.Sleep(200 * time.Millisecond)
glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString())))
glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg)))

// Push outbound messages into each worker.
if successMsg, err := eventHandler(msg, workers); err != nil {
// error! do some barfing and then continue
glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err)))
} else {
glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg)))
}
}

// Brief delay just in case.
Expand Down
Loading