Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nfd-worker: Add a readiness probe #1838

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deployment/helm/node-feature-discovery/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,14 @@ worker:
livenessProbe:
grpc:
port: 8082
service: "liveness"
initialDelaySeconds: 10
# failureThreshold: 3
# periodSeconds: 10
readinessProbe:
grpc:
port: 8082
service: "readiness"
initialDelaySeconds: 5
failureThreshold: 10
# periodSeconds: 10
Expand Down Expand Up @@ -581,4 +583,4 @@ tls:
prometheus:
enable: false
scrapeInterval: 10s
labels: {}
labels: {}
6 changes: 3 additions & 3 deletions docs/deployment/helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ API's you need to install the prometheus operator in your cluster.
| `worker.annotations` | dict | {} | NFD worker pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
| `worker.daemonsetAnnotations` | dict | {} | NFD worker daemonset [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
| `worker.args` | array | [] | Additional [command line arguments](../reference/worker-commandline-reference.md) to pass to nfd-worker |
| `worker.revisionHistoryLimit` | integer | | Specify how many old ControllerRevisions for this DaemonSet you want to retain. [revisionHistoryLimit](https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/daemon-set-v1/ #DaemonSetSpec) |
| `worker.livenessProbe` | dict | {"grpc":{"port":8082},"initialDelaySeconds":10} | NFD worker pod [liveness probe](https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/#liveness-probe) |
| `worker.readinessProbe` | dict | {"grpc":{"port":8082},"initialDelaySeconds":5,"failureThreshold": 10} | NFD worker pod [readiness probe](https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/#readiness-probe)|
| `worker.revisionHistoryLimit` | integer | | Specify how many old ControllerRevisions for this DaemonSet you want to retain. [revisionHistoryLimit](https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/daemon-set-v1/ #DaemonSetSpec)|
| `worker.livenessProbe` | dict | {"grpc":{"port":8082,"service":"liveness"},"initialDelaySeconds":10} | NFD worker pod [liveness probe](https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/#liveness-probe)|
| `worker.readinessProbe` | dict | {"grpc":{"port":8082,"service":"readiness"},"initialDelaySeconds":5,"failureThreshold": 10} | NFD worker pod [readiness probe](https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/#readiness-probe)|

### Topology updater parameters

Expand Down
44 changes: 42 additions & 2 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -121,6 +120,41 @@ type ConfigOverrideArgs struct {
LabelSources *utils.StringSliceVal
}

// CustomHealthServer implements grpc_health_v1.HealthServer
type CustomHealthServer struct {
grpc_health_v1.UnimplementedHealthServer
featureDiscoveryStatus bool
nodeFeatureObject bool
}

func (s *CustomHealthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
switch req.Service {
case "liveness":
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil

case "readiness":
if !s.featureDiscoveryStatus {
klog.InfoS("Feature discovery status is false", "featureDiscoveryStatus", s.featureDiscoveryStatus)
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}
if features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) && !s.nodeFeatureObject {
klog.InfoS("NodeFeature object status is false", "nodeFeatureObject", s.nodeFeatureObject)
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil

default:
klog.InfoS("Unknown service", "service", req.Service)
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVICE_UNKNOWN}, nil
}

}

// Watch method for the health server not needed.
func (s *CustomHealthServer) Watch(req *grpc_health_v1.HealthCheckRequest, srv grpc_health_v1.Health_WatchServer) error {
return nil
}

type nfdWorker struct {
args Args
certWatch *utils.FsWatcher
Expand All @@ -136,6 +170,7 @@ type nfdWorker struct {
featureSources []source.FeatureSource
labelSources []source.LabelSource
ownerReference []metav1.OwnerReference
customHealthServer *CustomHealthServer
}

// This ticker can represent infinite and normal intervals.
Expand Down Expand Up @@ -179,6 +214,7 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) {
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}),
customHealthServer: &CustomHealthServer{featureDiscoveryStatus: false, nodeFeatureObject: false},
}

for _, o := range opts {
Expand Down Expand Up @@ -248,7 +284,7 @@ func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error {
}

s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
grpc_health_v1.RegisterHealthServer(s, w.customHealthServer)
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)

go func() {
Expand All @@ -271,8 +307,10 @@ func (w *nfdWorker) runFeatureDiscovery() error {
currentSourceStart := time.Now()
if err := s.Discover(); err != nil {
klog.ErrorS(err, "feature discovery failed", "source", s.Name())
w.customHealthServer.featureDiscoveryStatus = false
}
klog.V(3).InfoS("feature discovery completed", "featureSource", s.Name(), "duration", time.Since(currentSourceStart))
w.customHealthServer.featureDiscoveryStatus = true
}

discoveryDuration := time.Since(discoveryStart)
Expand Down Expand Up @@ -743,8 +781,10 @@ func (w *nfdWorker) advertiseFeatures(labels Labels) error {
if features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) {
// Create/update NodeFeature CR object
if err := w.updateNodeFeatureObject(labels); err != nil {
w.customHealthServer.nodeFeatureObject = false
return fmt.Errorf("failed to advertise features (via CRD API): %w", err)
}
w.customHealthServer.nodeFeatureObject = true
} else {
// Create/update feature labels through gRPC connection to nfd-master
if err := w.advertiseFeatureLabels(labels); err != nil {
Expand Down