-
Notifications
You must be signed in to change notification settings - Fork 579
/
statefulset_scale.go
419 lines (370 loc) · 18.1 KB
/
statefulset_scale.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
package resources
import (
"context"
"errors"
"fmt"
"strconv"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
)
const (
PodAnnotationNodeIDKey = "operator.redpanda.com/node-id"
decommissionWaitJitterFactor = 0.2
)
// handleScaling is responsible for managing the current number of replicas running for a cluster.
//
// Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be
// respected by all other external reconcile functions, including the one that sets the actual amount of replicas in the StatefulSet.
// External functions should use `cluster.getCurrentReplicas()` to get the number of expected replicas for a cluster,
// since it deals with cases where the status is not initialized yet.
//
// When users change the value of `spec.replicas` for a cluster, this function is responsible for progressively changing the `status.currentReplicas` to match that value.
// In the case of a Cluster downscaled by 1 replica (i.e. `spec.replicas` lower than current value of `status.currentReplicas` by `1`), before lowering the
// value of `status.currentReplicas`, this handler will first decommission the last node, using the admin API of the cluster.
//
// There are cases where the cluster will not decommission a node (e.g. user reduces `spec.replicas` to `2`, but there are topics with `3` partition replicas),
// in which case the draining phase will hang indefinitely in Redpanda. When this happens, the controller will not downscale the cluster and
// users will find in `status.decommissioningNode` that a node is constantly being decommissioned.
//
// In cases where the decommissioning process hangs, users can increase again the value of `spec.replicas` and the handler will contact the admin API
// to recommission the last node, ensuring that the number of replicas matches the expected value and that the node joins the cluster again.
//
// Users can change the value of `spec.replicas` freely (except it cannot be 0). In case of downscaling by multiple replicas, the handler will
// decommission one node at time, until the desired number of replicas is met.
//
// When a new cluster is created, the value of `status.currentReplicas` will be initially set to `1`, no matter what the user sets in `spec.replicas`.
// This handler will first ensure that the initial node forms a cluster, by retrieving the list of brokers using the admin API.
// After the cluster is formed, the handler will increase the `status.currentReplicas` as requested.
//
// This is due to the fact that Redpanda is currently unable to initialize a cluster if each node is given the full list of seed servers: https://github.com/redpanda-data/redpanda/issues/333.
// Previous versions of the operator use to hack the list of seeds server (in the configurator pod) of node with ordinal 0, to set it always to an empty set,
// allowing it to create an initial cluster. That strategy worked because the list of seed servers is not read again from the `redpanda.yaml` file once the
// cluster is initialized.
// But the drawback was that, on an existing running cluster, when node 0 loses its data directory (e.g. because it's using local storage, and it undergoes a k8s node upgrade),
// then node 0 (having no data and an empty seed server list in `redpanda.yaml`) creates a brand new cluster ignoring the other nodes (split-brain).
// The strategy implemented here (to initialize the cluster at 1 replica, then upscaling to the desired number, without hacks on the seed server list),
// should fix this problem, since the list of seeds servers will be the same in all nodes once the cluster is created.
//
//nolint:nestif // for clarity
func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
if r.pandaCluster.Status.DecommissioningNode != nil {
decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode
if *r.pandaCluster.Spec.Replicas > decommissionTargetReplicas {
// Decommissioning can also be canceled and we need to recommission
return r.handleRecommission(ctx)
}
return r.handleDecommission(ctx)
}
if r.pandaCluster.Status.CurrentReplicas == 0 {
// Initialize the currentReplicas field, so that it can be later controlled
r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField()
return r.Status().Update(ctx, r.pandaCluster)
}
if *r.pandaCluster.Spec.Replicas == r.pandaCluster.Status.CurrentReplicas {
// No changes to replicas, we do nothing here
return nil
}
if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas {
r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas)
// We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup
if r.pandaCluster.Status.CurrentReplicas == 1 {
r.logger.Info("Waiting for first node to form a cluster before upscaling")
formed, err := r.isClusterFormed(ctx)
if err != nil {
return err
}
if !formed {
return &RequeueAfterError{
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas),
}
}
r.logger.Info("Initial cluster has been formed")
}
// Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas
return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger)
}
// User required replicas is lower than current replicas (currentReplicas): start the decommissioning process
targetOrdinal := r.pandaCluster.Status.CurrentReplicas - 1 // Always decommission last node
r.logger.Info("Start decommission of last broker pod", "ordinal", targetOrdinal)
r.pandaCluster.Status.DecommissioningNode = &targetOrdinal
return r.Status().Update(ctx, r.pandaCluster)
}
// handleDecommission manages the case of decommissioning of the last node of a cluster.
//
// When this handler is called, the `status.decommissioningNode` is populated with the pod ordinal of the
// node that needs to be decommissioned.
//
// The handler verifies that the node_id for that pod is not present in the list of brokers registered in the cluster,
// via admin API, then downscales the StatefulSet via decreasing the `status.currentReplicas`.
//
// Before completing the process, it double-checks if the node_id is still not registered, for handling cases where the node was
// about to start when the decommissioning process started. If the broker is found, the process is restarted.
func (r *StatefulSetResource) handleDecommission(ctx context.Context) error {
targetReplicas := *r.pandaCluster.Status.DecommissioningNode
r.logger.Info("Handling cluster in decommissioning phase", "target replicas", targetReplicas)
adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
return err
}
broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI)
if err != nil {
return err
}
if broker != nil {
r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID, "status", broker.MembershipStatus)
if broker.MembershipStatus != admin.MembershipStatusDraining {
// We ask to decommission since it does not seem done
err = adminAPI.DecommissionBroker(ctx, broker.NodeID)
if err != nil {
return fmt.Errorf("error while trying to decommission node %d in cluster %s: %w", broker.NodeID, r.pandaCluster.Name, err)
}
r.logger.Info("Node marked for decommissioning in cluster", "node_id", broker.NodeID)
}
// The draining phase must always be completed with all nodes running, to let single-replica partitions be transferred.
// The value may diverge in case we restarted the process after a complete scale down.
drainingReplicas := targetReplicas + 1
if r.pandaCluster.Status.CurrentReplicas != drainingReplicas {
return setCurrentReplicas(ctx, r, r.pandaCluster, drainingReplicas, r.logger)
}
// Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node)
return &RequeueAfterError{
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID),
}
}
// Broker is now missing from cluster API
r.logger.Info("Node is not registered in the cluster: initializing downscale", "node_id", *r.pandaCluster.Status.DecommissioningNode)
// We set status.currentReplicas accordingly to trigger scaling down of the statefulset
if err = setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil {
return err
}
scaledDown, err := r.verifyRunningCount(ctx, targetReplicas)
if err != nil {
return err
}
if !scaledDown {
return &RequeueAfterError{
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas),
}
}
// There's a chance that the node was initially not present in the broker list, but appeared after we started to scale down.
// Since the node may hold data that need to be propagated to other nodes, we need to restart it to let the decommission process finish.
broker, err = getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI)
if err != nil {
return err
}
if broker != nil {
// Node reappeared in the cluster, we restart the process to handle it
return &NodeReappearingError{NodeID: broker.NodeID}
}
r.logger.Info("Decommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode)
r.pandaCluster.Status.DecommissioningNode = nil
return r.Status().Update(ctx, r.pandaCluster)
}
// handleRecommission manages the case of a node being recommissioned after a failed/wrong decommission.
//
// Recommission can only work for nodes that are still in the "draining" phase according to Redpanda.
//
// When this handler is triggered, `status.decommissioningNode` is populated with the node that was being decommissioned and
// `spec.replicas` reports a value that include that node, indicating the intention from the user to recommission it.
//
// The handler ensures that the node is running and also calls the admin API to recommission it.
// The process finishes when the node is registered among brokers and the StatefulSet is correctly scaled.
func (r *StatefulSetResource) handleRecommission(ctx context.Context) error {
r.logger.Info("Handling cluster in recommissioning phase")
targetOrdinal := *r.pandaCluster.Status.DecommissioningNode
// First we ensure we've enough replicas to let the recommissioning node run
targetReplicas := targetOrdinal + 1
err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger)
if err != nil {
return err
}
adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
return err
}
broker, err := getNodeInfoFromCluster(ctx, targetOrdinal, adminAPI)
if err != nil {
return err
}
brokerID, err := getPodNodeIDFromAnnotation(ctx, r.Client, r.pandaCluster.GetNamespace(), r.pandaCluster.GetName(), targetOrdinal)
if err != nil {
return err
}
if broker == nil || broker.MembershipStatus != admin.MembershipStatusActive {
err = adminAPI.RecommissionBroker(ctx, brokerID)
if err != nil {
return fmt.Errorf("error while trying to recommission node %d in cluster %s: %w", brokerID, r.pandaCluster.Name, err)
}
r.logger.Info("Node marked for being recommissioned in cluster", "node_id", brokerID)
return &RequeueAfterError{
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", brokerID, r.pandaCluster.Name),
}
}
r.logger.Info("Recommissioning process successfully completed", "node_id", brokerID)
r.pandaCluster.Status.DecommissioningNode = nil
return r.Status().Update(ctx, r.pandaCluster)
}
func (r *StatefulSetResource) getAdminAPIClient(
ctx context.Context, ordinals ...int32,
) (adminutils.AdminAPIClient, error) {
return r.adminAPIClientFactory(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...)
}
func (r *StatefulSetResource) isClusterFormed(
ctx context.Context,
) (bool, error) {
rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0)
if err != nil {
return false, err
}
brokers, err := rootNodeAdminAPI.Brokers(ctx)
if err != nil {
// Eat the error and return that the cluster is not formed
return false, nil
}
return len(brokers) > 0, nil
}
// disableMaintenanceModeOnDecommissionedNodes can be used to put a cluster in a consistent state, disabling maintenance mode on
// nodes that have been decommissioned.
//
// A decommissioned node may activate maintenance mode via shutdown hooks and the cluster may enter an inconsistent state,
// preventing other pods clean shutdown.
//
// See: https://github.com/redpanda-data/redpanda/issues/4999
func (r *StatefulSetResource) disableMaintenanceModeOnDecommissionedNodes(
ctx context.Context,
) error {
if !featuregates.MaintenanceMode(r.pandaCluster.Status.Version) {
return nil
}
if r.pandaCluster.Status.DecommissioningNode == nil || r.pandaCluster.Status.CurrentReplicas > *r.pandaCluster.Status.DecommissioningNode {
// Only if actually in a decommissioning phase
return nil
}
ordinal := *r.pandaCluster.Status.DecommissioningNode
targetReplicas := ordinal
scaledDown, err := r.verifyRunningCount(ctx, targetReplicas)
if err != nil || !scaledDown {
// This should be done only when the pod disappears from the cluster
return err
}
adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
return err
}
r.logger.Info("Forcing deletion of maintenance mode for the decommissioned node", "node_id", ordinal)
err = adminAPI.DisableMaintenanceMode(ctx, int(ordinal))
if err != nil {
var httpErr *admin.HTTPResponseError
if errors.As(err, &httpErr) {
if httpErr.Response != nil && httpErr.Response.StatusCode/100 == 4 {
// Cluster says we don't need to do it
r.logger.Info("No need to disable maintenance mode on the decommissioned node", "node_id", ordinal, "status_code", httpErr.Response.StatusCode)
return nil
}
}
return fmt.Errorf("could not disable maintenance mode on decommissioning node %d: %w", ordinal, err)
}
r.logger.Info("Maintenance mode disabled for the decommissioned node", "node_id", ordinal)
return nil
}
// verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations
func (r *StatefulSetResource) verifyRunningCount(
ctx context.Context, replicas int32,
) (bool, error) {
var sts appsv1.StatefulSet
if err := r.Get(ctx, r.Key(), &sts); err != nil {
return false, fmt.Errorf("could not get statefulset for checking replicas: %w", err)
}
if sts.Spec.Replicas == nil || *sts.Spec.Replicas != replicas || sts.Status.Replicas != replicas {
return false, nil
}
var podList corev1.PodList
err := r.List(ctx, &podList, &k8sclient.ListOptions{
Namespace: r.pandaCluster.Namespace,
LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(),
})
if err != nil {
return false, fmt.Errorf("could not list pods for checking replicas: %w", err)
}
return len(podList.Items) == int(replicas), nil
}
// getNodeInfoFromCluster allows to get broker information using the admin API
func getNodeInfoFromCluster(
ctx context.Context, ordinal int32, adminAPI adminutils.AdminAPIClient,
) (*admin.Broker, error) {
brokers, err := adminAPI.Brokers(ctx)
if err != nil {
return nil, fmt.Errorf("could not get the list of brokers for checking decommission: %w", err)
}
for i := range brokers {
if brokers[i].NodeID == int(ordinal) {
return &brokers[i], nil
}
}
return nil, nil
}
// setCurrentReplicas allows to set the number of status.currentReplicas in the CR, which in turns controls the replicas
// assigned to the StatefulSet
func setCurrentReplicas(
ctx context.Context,
c k8sclient.Client,
pandaCluster *redpandav1alpha1.Cluster,
replicas int32,
logger logr.Logger,
) error {
if pandaCluster.Status.CurrentReplicas == replicas {
// Skip if already done
return nil
}
logger.Info("Scaling StatefulSet", "replicas", replicas)
pandaCluster.Status.CurrentReplicas = replicas
if err := c.Status().Update(ctx, pandaCluster); err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err)
}
logger.Info("StatefulSet scaled", "replicas", replicas)
return nil
}
// NodeReappearingError indicates that a node has appeared in the cluster before completion of the a direct downscale
type NodeReappearingError struct {
NodeID int
}
// Error makes the NodeReappearingError a proper error
func (e *NodeReappearingError) Error() string {
return fmt.Sprintf("node has appeared in the cluster with id=%d", e.NodeID)
}
func getPodNodeIDFromAnnotation(ctx context.Context, c k8sclient.Client, clusterNamespace, clusterName string, ordinal int32) (int, error) {
podName := fmt.Sprintf("%s-%d", clusterName, ordinal)
pod := &corev1.Pod{}
pod.SetName(podName)
pod.SetNamespace(clusterNamespace)
err := c.Get(ctx, k8sclient.ObjectKeyFromObject(pod), pod)
if err != nil {
return 0, err
}
idString, ok := pod.GetAnnotations()[PodAnnotationNodeIDKey]
if !ok {
//nolint:goerr113 // this is sufficient
return 0, fmt.Errorf("cannot retrieve broker id from annotation for pod %s/%s", clusterNamespace, podName)
}
return strconv.Atoi(idString)
}