Skip to content

Commit

Permalink
Use EtcdIndex as reference for watch
Browse files Browse the repository at this point in the history
  • Loading branch information
jojimt committed Feb 1, 2016
1 parent 2afee8d commit d36b6be
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions etcdService.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,31 @@ func (self *etcdPlugin) RegisterService(serviceInfo ServiceInfo) error {
return nil
}

// List all end points for a service
// GetService lists all end points for a service
func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
keyName := "/contiv.io/service/" + name + "/"

_, srvcList, err := self.getServiceState(keyName)
return srvcList, err
}

func (self *etcdPlugin) getServiceState(key string) (uint64, []ServiceInfo, error) {

// Get the object from etcd client
resp, err := self.client.Get(keyName, true, true)
resp, err := self.client.Get(key, true, true)
if err != nil {
if strings.Contains(err.Error(), "Key not found") {
return nil, nil
return 0, nil, nil
} else {
log.Errorf("Error getting key %s. Err: %v", keyName, err)
return nil, err
log.Errorf("Error getting key %s. Err: %v", key, err)
return 0, nil, err
}

}

if !resp.Node.Dir {
log.Errorf("Err. Response is not a directory: %+v", resp.Node)
return nil, errors.New("Invalid Response from etcd")
return 0, nil, errors.New("Invalid Response from etcd")
}

srvcList := make([]ServiceInfo, 0)
Expand All @@ -91,23 +97,35 @@ func (self *etcdPlugin) GetService(name string) ([]ServiceInfo, error) {
err = json.Unmarshal([]byte(node.Value), &respSrvc)
if err != nil {
log.Errorf("Error parsing object %s, Err %v", node.Value, err)
return nil, err
return 0, nil, err
}

srvcList = append(srvcList, respSrvc)
}

return srvcList, nil
watchIndex := resp.EtcdIndex + 1
return watchIndex, srvcList, nil
}

func (self *etcdPlugin) getCurrentIndex(key string) (uint64, error) {
// Get the object from etcd client
resp, err := self.client.Get(key, true, false)
// initServiceState reads the current state and injects it to the channel
// additionally, it returns the next index to watch
func (self *etcdPlugin) initServiceState(key string, eventCh chan WatchServiceEvent) (uint64, error) {
mIndex, srvcList, err := self.getServiceState(key)
if err != nil {
return 0, err
return mIndex, err
}

return resp.Node.ModifiedIndex, nil
// walk each service and inject it as an add event
for _, srvInfo := range srvcList {
log.Infof("Sending service add event: %+v", srvInfo)
// Send Add event
eventCh <- WatchServiceEvent{
EventType: WatchServiceEventAdd,
ServiceInfo: srvInfo,
}
}

return mIndex, nil
}

// Watch for a service
Expand All @@ -121,9 +139,9 @@ func (self *etcdPlugin) WatchService(name string,

// Start the watch thread
go func() {
// Watch from current index to force a read of the initial state
watchIndex, err := self.getCurrentIndex(keyName)
if (err != nil) {
// Get current state and etcd index to watch
watchIndex, err := self.initServiceState(keyName, eventCh)
if err != nil {
log.Fatalf("Unable to watch service key: %s - %v", keyName,
err)
}
Expand Down

0 comments on commit d36b6be

Please sign in to comment.