Skip to content

Commit

Permalink
Refactor customHealthServer and updaterPool
Browse files Browse the repository at this point in the history
  • Loading branch information
omerap12 committed Aug 3, 2024
1 parent e8d7217 commit bb435df
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 43 deletions.
47 changes: 9 additions & 38 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,21 @@ type customHealthServer struct {
func (s *customHealthServer) SetMetricServerStatus(status bool) {
s.metricServerStatus = status
}
func (s *customHealthServer) SetNodeUpdaterStatus(status bool) {
s.nodeUpdaterStatus = status
}

// Check method for customHealthServer
func (s *customHealthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
klog.InfoS("Check request received")

klog.InfoS("Checking nfd's metric server")
// nfd metrics server status
metricServerStatus := s.metricServerStatus
klog.InfoS("nfd master's metric server status", "status", metricServerStatus)
if !metricServerStatus {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}

klog.InfoS("Checking nfd's node updater pool")
// ndf node updater pool status
nodeUpdaterStatus := s.nodeUpdaterStatus
klog.InfoS("nfd master's node updater pool status", "status", nodeUpdaterStatus)
if !nodeUpdaterStatus {
Expand All @@ -202,32 +204,6 @@ func (s *customHealthServer) Watch(req *grpc_health_v1.HealthCheckRequest, srv g
return nil
}

func (s *customHealthServer) CheckPods(namespace string, labelSelector string) (bool, error) {
pods, err := getPods(s.k8sclient, namespace, labelSelector)
if err != nil {
return false, err
}
// TODO: print if items is empty

status := false
for _, pod := range pods.Items {
klog.InfoS("Found pod name", "name", pod.Name)
if pod.Status.Phase != "Running" {
klog.InfoS("Pod is not in running state", "name", pod.Name)
} else {
klog.InfoS("Pod is in running state", "name", pod.Name)
status = true
// can stop here : return status, nil
// not stopping for logs.
}
}
return status, nil
}

// func (s *customHealthServer) CheckNfdApiController() (bool, error) {
// TODO
// }

// NewNfdMaster creates a new NfdMaster server instance.
func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
nfd := &nfdMaster{
Expand Down Expand Up @@ -293,8 +269,11 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
}
nfd.nfdClient = nfdClient
}

nfd.updaterPool = newUpdaterPool(nfd)

// customHealthServer
nfd.customHealthServer = &customHealthServer{k8sclient: nfd.k8sClient, metricServerStatus: false, nodeUpdaterStatus: true}

return nfd, nil
}
Expand Down Expand Up @@ -381,10 +360,6 @@ func (m *nfdMaster) Run() error {
}
}

// Create a custom health server
klog.InfoS("Creating custom health server for gRPC health server")
m.createCustomHealthServer()

// Register to metrics server
if m.args.MetricsPort > 0 {
server := utils.CreateMetricsServer(m.args.MetricsPort,
Expand Down Expand Up @@ -502,10 +477,6 @@ func (m *nfdMaster) startGrpcHealthServer(errChan chan<- error) error {
return nil
}

func (m *nfdMaster) createCustomHealthServer() {
m.customHealthServer = &customHealthServer{k8sclient: m.k8sClient, metricServerStatus: false, nodeUpdaterStatus: true}
}

func (m *nfdMaster) runGrpcServer(errChan chan<- error) {
// Create server listening for TCP connections
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", m.args.Port))
Expand Down
9 changes: 4 additions & 5 deletions pkg/nfd-master/updater-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,10 @@ func (u *updaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue wo
}

func (u *updaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
defer u.handlePanic()
defer u.handlePanic() // For nfd-master health check
var cli k8sclient.Interface
if u.nfdMaster.kubeconfig != nil {
// For normal execution, initialize a separate api client for each updater
// catch the panic if the kubeconfig is invalid
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
} else {
// For tests, re-use the api client from nfd-master
Expand Down Expand Up @@ -121,7 +120,7 @@ func (u *updaterPool) processNodeFeatureGroupUpdateRequest(cli nfdclientset.Inte
}

func (u *updaterPool) runNodeFeatureGroupUpdater(ngfQueue workqueue.RateLimitingInterface) {
defer u.handlePanic()
defer u.handlePanic() // For nfd-master health check
cli := nfdclientset.NewForConfigOrDie(u.nfdMaster.kubeconfig)
for u.processNodeFeatureGroupUpdateRequest(cli, ngfQueue) {
}
Expand Down Expand Up @@ -198,7 +197,7 @@ func (u *updaterPool) addNodeFeatureGroup(nodeFeatureGroupName string) {

func (u *updaterPool) handlePanic() {
if err := recover(); err != nil {
klog.ErrorS(nil, "panic in node updater", "error", err)
u.nfdMaster.customHealthServer.nodeUpdaterStatus = false
klog.ErrorS(err.(error), "updater pool panic")
u.nfdMaster.customHealthServer.SetNodeUpdaterStatus(false)
}
}

0 comments on commit bb435df

Please sign in to comment.