Skip to content

Commit

Permalink
Merge pull request #2073 from tnozicka/nc-limit
Browse files Browse the repository at this point in the history
Ensure CRI calls are not retried in NodeConfig's sync loop and have a timeout
  • Loading branch information
scylla-operator-bot[bot] committed Aug 14, 2024
2 parents 5cfb93e + 4cb1c8b commit 433476b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/nodetune/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (ncdc *Controller) processNextItem(ctx context.Context) bool {
}
defer ncdc.queue.Done(key)

ctx, cancel := context.WithTimeout(ctx, maxSyncDuration)
ctx, cancel := context.WithTimeoutCause(ctx, maxSyncDuration, fmt.Errorf("exceeded max sync duration (%v)", maxSyncDuration))
defer cancel()
err := ncdc.sync(ctx)
// TODO: Do smarter filtering then just Reduce to handle cases like 2 conflict errors.
Expand Down
52 changes: 39 additions & 13 deletions pkg/controller/nodetune/tune.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"regexp"
"sort"
"time"

"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
Expand All @@ -20,6 +21,10 @@ import (
"k8s.io/kubelet/pkg/apis/podresources/v1"
)

const (
criCallTimeout = 5 * time.Second
)

func getIRQCPUs(ctx context.Context, kubeletPodResourcesClient kubelet.PodResourcesClient, scyllaPods []*corev1.Pod, hostFullCpuset cpuset.CPUSet) (cpuset.CPUSet, error) {
scyllaCPUs := cpuset.CPUSet{}
for _, scyllaPod := range scyllaPods {
Expand All @@ -44,28 +49,49 @@ func getIRQCPUs(ctx context.Context, kubeletPodResourcesClient kubelet.PodResour
return hostFullCpuset.Difference(scyllaCPUs), nil
}

func scyllaDataDirMountHostPathsForPod(ctx context.Context, criClient cri.Client, scyllaPod *corev1.Pod) ([]string, error) {
dataDirs := strset.New()

cid, err := getScyllaContainerIDInCRIFormat(scyllaPod)
if err != nil {
return nil, fmt.Errorf("get Scylla container ID: %w", err)
}

klog.V(4).InfoS("Inspecting container", "ContainerID", cid, "Pod", naming.ObjRef(scyllaPod))
criCtx, criCtxCancel := context.WithTimeoutCause(ctx, criCallTimeout, fmt.Errorf("exceeded cri inspect container timeout (%v)", criCallTimeout))
defer criCtxCancel()
cs, err := criClient.Inspect(criCtx, cid)
klog.V(4).InfoS("Finished inspecting container", "ContainerID", cid)
if err != nil {
return nil, fmt.Errorf("can't inspect container %q: %w", cid, err)
}

if cs != nil {
for _, mount := range cs.Status.GetMounts() {
if mount.ContainerPath != naming.DataDir {
continue
}
dataDirs.Add(mount.HostPath)
}
}

return dataDirs.List(), nil
}

func scyllaDataDirMountHostPaths(ctx context.Context, criClient cri.Client, scyllaPods []*corev1.Pod) ([]string, error) {
dataDirs := strset.New()

for _, pod := range scyllaPods {
cid, err := getScyllaContainerIDInCRIFormat(pod)
if err != nil {
return nil, fmt.Errorf("get Scylla container ID: %w", err)
if ctx.Err() != nil {
return nil, ctx.Err()
}

cs, err := criClient.Inspect(ctx, cid)
podDataDirs, err := scyllaDataDirMountHostPathsForPod(ctx, criClient, pod)
if err != nil {
return nil, fmt.Errorf("can't inspect container %q: %w", cid, err)
return nil, fmt.Errorf("can't get data dirs for pod %q: %w", naming.ObjRef(pod), err)
}

if cs != nil {
for _, mount := range cs.Status.GetMounts() {
if mount.ContainerPath != naming.DataDir {
continue
}
dataDirs.Add(mount.HostPath)
}
}
dataDirs.Add(podDataDirs...)
}

return dataDirs.List(), nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/cri/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ func connectToEndpoint(ctx context.Context, endpoint string) *endpointState {
grpc.WithContextDialer(dialer),
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: dialTimeout,
Backoff: backoffConfig,
// Backoff configures the exponential backoff when establishing a new connection with DialContext.
Backoff: backoffConfig,
}),
// WithDisableRetry disables retries (we have our own work queue with backoff) and retryThrottler.
// We should not retry calls within the same sync loop and this is essential to avoid an infinite loop
// between the sync loop context timing out and retryThrottler reaching a max delay higher than the timeout.
grpc.WithDisableRetry(),
)
return &endpointState{
endpoint: endpoint,
Expand Down

0 comments on commit 433476b

Please sign in to comment.