Skip to content

Commit

Permalink
Fix monitoring CRD issues (antrea-io#620)
Browse files Browse the repository at this point in the history
1. Support conitinue updating CRD when error
occurs.

2. Fix Agent monitoring CRD creation issue by setting
agent CRD name.

3. Some code refactor for monitoring CRD part.

Issue: antrea-io#619
  • Loading branch information
mengdie-song committed Apr 27, 2020
1 parent ddf6b10 commit 7ebacb8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 85 deletions.
1 change: 1 addition & 0 deletions pkg/agent/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (aq agentQuerier) getNetworkPolicyControllerInfo() v1beta1.NetworkPolicyCon
func (aq agentQuerier) GetAgentInfo(agentInfo *v1beta1.AntreaAgentInfo, partial bool) {
// LocalPodNum, FlowTable, NetworkPolicyControllerInfo, OVSVersion and AgentConditions can be changed, so reset these fields.
// Only these fields are updated when partial is true.
agentInfo.Name = aq.nodeName
agentInfo.LocalPodNum = int32(aq.interfaceStore.GetContainerInterfaceNum())
agentInfo.OVSInfo.FlowTable = aq.getOVSFlowTable()
agentInfo.NetworkPolicyControllerInfo = aq.getNetworkPolicyControllerInfo()
Expand Down
81 changes: 42 additions & 39 deletions pkg/monitor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package monitor
import (
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
Expand All @@ -29,55 +30,64 @@ import (
type agentMonitor struct {
client clientset.Interface
querier agentquerier.AgentQuerier
// agentCRD is the desired state of agent monitoring CRD which agentMonitor expects.
agentCRD *v1beta1.AntreaAgentInfo
}

// NewAgentMonitor creates a new agent monitor.
func NewAgentMonitor(client clientset.Interface, querier agentquerier.AgentQuerier) *agentMonitor {
return &agentMonitor{client: client, querier: querier}
return &agentMonitor{client: client, querier: querier, agentCRD: nil}
}

// Run creates AntreaAgentInfo CRD first after controller is running.
// Then updates AntreaAgentInfo CRD every 60 seconds.
func (monitor *agentMonitor) Run(stopCh <-chan struct{}) {
klog.Info("Starting Antrea Agent Monitor")
agentCRD := monitor.getAgentCRD()
var err error = nil

// Initialize agent monitoring CRD.
if agentCRD == nil {
agentCRD, err = monitor.createAgentCRD()
if err != nil {
klog.Errorf("Failed to create agent monitoring CRD %+v: %v", agentCRD, err)
return
}
} else {
agentCRD, err = monitor.updateAgentCRD(agentCRD)
if err != nil {
klog.Errorf("Failed to update agent monitoring CRD %+v: %v", agentCRD, err)
// Sync agent monitoring CRD every minute util stopCh is closed.
wait.Until(monitor.syncAgentCRD, time.Minute, stopCh)
}

func (monitor *agentMonitor) syncAgentCRD() {
var err error = nil
if monitor.agentCRD != nil {
if monitor.agentCRD, err = monitor.updateAgentCRD(true); err == nil {
return
}
klog.Errorf("Failed to partially update agent monitoring CRD: %v", err)
monitor.agentCRD = nil
}

// Update agent monitoring CRD variables every 60 seconds util stopCh is closed.
wait.PollUntil(60*time.Second, func() (done bool, err error) {
agentCRD, err = monitor.partialUpdateAgentCRD(agentCRD)
monitor.agentCRD, err = monitor.getAgentCRD()

if errors.IsNotFound(err) {
monitor.agentCRD, err = monitor.createAgentCRD()
if err != nil {
klog.Errorf("Failed to partially update agent monitoring CRD %+v: %v", agentCRD, err)
klog.Errorf("Failed to create agent monitoring CRD: %v", err)
monitor.agentCRD = nil
}
return false, nil
}, stopCh)
return
}

if err != nil {
klog.Errorf("Failed to get agent monitoring CRD: %v", err)
monitor.agentCRD = nil
return
}

monitor.agentCRD, err = monitor.updateAgentCRD(false)
if err != nil {
klog.Errorf("Failed to entirely update agent monitoring CRD: %v", err)
monitor.agentCRD = nil
}
}

// getAgentCRD is used to check the existence of agent monitoring CRD.
// So when the pod restarts, it will update this monitoring CRD instead of creating a new one.
func (monitor *agentMonitor) getAgentCRD() *v1beta1.AntreaAgentInfo {
func (monitor *agentMonitor) getAgentCRD() (*v1beta1.AntreaAgentInfo, error) {
crdName := monitor.querier.GetNodeName()
agentCRD, err := monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Get(crdName, metav1.GetOptions{})
if err != nil {
klog.V(2).Infof("Agent monitoring CRD named %s doesn't exist, will create one", crdName)
return nil
}
return agentCRD
klog.V(2).Infof("Getting agent monitoring CRD %+v", crdName)
return monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Get(crdName, metav1.GetOptions{})
}

// createAgentCRD creates a new agent CRD.
Expand All @@ -88,16 +98,9 @@ func (monitor *agentMonitor) createAgentCRD() (*v1beta1.AntreaAgentInfo, error)
return monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Create(agentCRD)
}

// updateAgentCRD updates all the fields of existing monitoring CRD.
func (monitor *agentMonitor) updateAgentCRD(agentCRD *v1beta1.AntreaAgentInfo) (*v1beta1.AntreaAgentInfo, error) {
monitor.querier.GetAgentInfo(agentCRD, false)
klog.V(2).Infof("Updating agent monitoring CRD %+v", agentCRD)
return monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Update(agentCRD)
}

// partialUpdateAgentCRD only updates some variables.
func (monitor *agentMonitor) partialUpdateAgentCRD(agentCRD *v1beta1.AntreaAgentInfo) (*v1beta1.AntreaAgentInfo, error) {
monitor.querier.GetAgentInfo(agentCRD, true)
klog.V(2).Infof("Partially updating agent monitoring CRD %+v", agentCRD)
return monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Update(agentCRD)
// updateAgentCRD updates the monitoring CRD.
func (monitor *agentMonitor) updateAgentCRD(partial bool) (*v1beta1.AntreaAgentInfo, error) {
monitor.querier.GetAgentInfo(monitor.agentCRD, partial)
klog.V(2).Infof("Updating agent monitoring CRD %+v, partial: %t", monitor.agentCRD, partial)
return monitor.client.ClusterinformationV1beta1().AntreaAgentInfos().Update(monitor.agentCRD)
}
93 changes: 47 additions & 46 deletions pkg/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,48 +30,33 @@ import (
controllerquerier "github.com/vmware-tanzu/antrea/pkg/controller/querier"
)

const crdName = "antrea-controller"

type controllerMonitor struct {
client clientset.Interface
nodeInformer coreinformers.NodeInformer
// nodeListerSynced is a function which returns true if the node shared informer has been synced at least once.
nodeListerSynced cache.InformerSynced
querier controllerquerier.ControllerQuerier
// controllerCRD is the desired state of controller monitoring CRD which controllerMonitor expects.
controllerCRD *v1beta1.AntreaControllerInfo
}

// NewControllerMonitor creates a new controller monitor.
func NewControllerMonitor(client clientset.Interface, nodeInformer coreinformers.NodeInformer, querier controllerquerier.ControllerQuerier) *controllerMonitor {
m := &controllerMonitor{client: client, nodeInformer: nodeInformer, nodeListerSynced: nodeInformer.Informer().HasSynced, querier: querier}
m := &controllerMonitor{client: client, nodeInformer: nodeInformer, nodeListerSynced: nodeInformer.Informer().HasSynced, querier: querier, controllerCRD: nil}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: m.deleteStaleAgentCRD,
})

return m
}

// Run creates AntreaControllerInfo CRD first after controller is running.
// Then updates AntreaControllerInfo CRD every 60 seconds if there is any change.
func (monitor *controllerMonitor) Run(stopCh <-chan struct{}) {
klog.Info("Starting Antrea Controller Monitor")
crdName := "antrea-controller"

// Initialize controller monitoring CRD.
controllerCRD := monitor.getControllerCRD(crdName)
var err error = nil
if controllerCRD == nil {
controllerCRD, err = monitor.createControllerCRD(crdName)
if err != nil {
klog.Errorf("Failed to create controller monitoring CRD %+v: %v", controllerCRD, err)
return
}
} else {
controllerCRD, err = monitor.updateControllerCRD(controllerCRD)
if err != nil {
klog.Errorf("Failed to update controller monitoring CRD %+v: %v", controllerCRD, err)
return
}
}

klog.Info("Waiting for node synced for Controller Monitor")
if !cache.WaitForCacheSync(stopCh, monitor.nodeListerSynced) {
Expand All @@ -81,47 +66,63 @@ func (monitor *controllerMonitor) Run(stopCh <-chan struct{}) {
klog.Info("Caches are synced for Controller Monitor")
monitor.deleteStaleAgentCRDs()

// Update controller monitoring CRD variables every 60 seconds util stopCh is closed.
wait.PollUntil(60*time.Second, func() (done bool, err error) {
controllerCRD, err = monitor.partialUpdateControllerCRD(controllerCRD)
// Sync controller monitoring CRD every minute util stopCh is closed.
wait.Until(monitor.syncControllerCRD, time.Minute, stopCh)
}

func (monitor *controllerMonitor) syncControllerCRD() {
var err error = nil
if monitor.controllerCRD != nil {
if monitor.controllerCRD, err = monitor.updateControllerCRD(true); err == nil {
return
}
klog.Errorf("Failed to partially update controller monitoring CRD: %v", err)
monitor.controllerCRD = nil
}

monitor.controllerCRD, err = monitor.getControllerCRD(crdName)

if errors.IsNotFound(err) {
monitor.controllerCRD, err = monitor.createControllerCRD(crdName)
if err != nil {
klog.Errorf("Failed to partially update controller monitoring CRD %+v: %v", controllerCRD, err)
klog.Errorf("Failed to create controller monitoring CRD: %v", err)
monitor.controllerCRD = nil
}
return false, nil
}, stopCh)
return
}

if err != nil {
klog.Errorf("Failed to get controller monitoring CRD: %v", err)
monitor.controllerCRD = nil
return
}

monitor.controllerCRD, err = monitor.updateControllerCRD(false)
if err != nil {
klog.Errorf("Failed to entirely update controller monitoring CRD: %v", err)
monitor.controllerCRD = nil
}
}

// getControllerCRD is used to check the existence of controller monitoring CRD.
// So when the pod restarts, it will update this monitoring CRD instead of creating a new one.
func (monitor *controllerMonitor) getControllerCRD(crdName string) *v1beta1.AntreaControllerInfo {
controllerCRD, err := monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Get(crdName, metav1.GetOptions{})
if err != nil {
klog.V(2).Infof("Controller monitoring CRD named %s doesn't exist, will create one", crdName)
return nil
}
return controllerCRD
func (monitor *controllerMonitor) getControllerCRD(crdName string) (*v1beta1.AntreaControllerInfo, error) {
return monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Get(crdName, metav1.GetOptions{})
}

func (monitor *controllerMonitor) createControllerCRD(crdName string) (*v1beta1.AntreaControllerInfo, error) {
controllerCRD := new(v1beta1.AntreaControllerInfo)
controllerCRD.Name = crdName
monitor.querier.GetControllerInfo(controllerCRD, false)
controllerCRD.ObjectMeta.Name = crdName
klog.V(2).Infof("Creating controller monitoring CRD %+v", controllerCRD)
return monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Create(controllerCRD)
}

// updateControllerCRD updates all the fields of existing monitoring CRD.
func (monitor *controllerMonitor) updateControllerCRD(controllerCRD *v1beta1.AntreaControllerInfo) (*v1beta1.AntreaControllerInfo, error) {
monitor.querier.GetControllerInfo(controllerCRD, false)
klog.V(2).Infof("Updating controller monitoring CRD %+v", controllerCRD)
return monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Update(controllerCRD)
}

// partialUpdateControllerCRD only updates the variables.
func (monitor *controllerMonitor) partialUpdateControllerCRD(controllerCRD *v1beta1.AntreaControllerInfo) (*v1beta1.AntreaControllerInfo, error) {
monitor.querier.GetControllerInfo(controllerCRD, true)
klog.V(2).Infof("Partially updating controller monitoring CRD %+v", controllerCRD)
return monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Update(controllerCRD)
// updateControllerCRD updates the monitoring CRD.
func (monitor *controllerMonitor) updateControllerCRD(partial bool) (*v1beta1.AntreaControllerInfo, error) {
monitor.querier.GetControllerInfo(monitor.controllerCRD, partial)
klog.V(2).Infof("Updating controller monitoring CRD %+v, partial: %t", monitor.controllerCRD, partial)
return monitor.client.ClusterinformationV1beta1().AntreaControllerInfos().Update(monitor.controllerCRD)
}

func (monitor *controllerMonitor) deleteStaleAgentCRDs() {
Expand Down

0 comments on commit 7ebacb8

Please sign in to comment.