/
controller.go
92 lines (74 loc) · 2.42 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package cluster
import (
"context"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
crv1 "github.com/grtl/mysql-operator/pkg/apis/cr/v1"
"github.com/grtl/mysql-operator/pkg/client/clientset/versioned"
"github.com/grtl/mysql-operator/pkg/client/informers/externalversions"
"github.com/grtl/mysql-operator/pkg/controller"
"github.com/grtl/mysql-operator/pkg/logging"
operator "github.com/grtl/mysql-operator/pkg/operator/cluster"
)
// NewClusterController returns new cluster controller.
func NewClusterController(clientset versioned.Interface, kubeClientset kubernetes.Interface) controller.Controller {
return &clusterController{
Base: controller.NewControllerBase(),
clientset: clientset,
clusterOperator: operator.NewClusterOperator(clientset, kubeClientset),
}
}
type clusterController struct {
controller.Base
clientset versioned.Interface
clusterOperator operator.Operator
}
func (c *clusterController) Run(ctx context.Context) error {
factory := externalversions.NewSharedInformerFactory(c.clientset, 0)
informer := factory.Cr().V1().MySQLClusters().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
})
informer.Run(ctx.Done())
<-ctx.Done()
return ctx.Err()
}
func (c *clusterController) onAdd(obj interface{}) {
cluster := obj.(*crv1.MySQLCluster)
logClusterEventBegin(cluster, ClusterAdded)
err := c.clusterOperator.AddCluster(cluster)
if err != nil {
logging.LogCluster(cluster).WithField("event", ClusterAdded).Error(err)
} else {
logClusterEventSuccess(cluster, ClusterAdded)
}
// Run hooks
for _, hook := range c.GetHooks() {
hook.OnAdd(cluster)
}
}
func (c *clusterController) onUpdate(oldObj, newObj interface{}) {
newCluster := newObj.(*crv1.MySQLCluster)
logClusterEventBegin(newCluster, ClusterUpdated)
err := c.clusterOperator.UpdateCluster(newCluster)
if err != nil {
logging.LogCluster(newCluster).WithField("event", ClusterUpdated).Error(err)
} else {
logClusterEventSuccess(newCluster, ClusterUpdated)
}
// Run hooks
for _, hook := range c.GetHooks() {
hook.OnUpdate(newCluster)
}
}
func (c *clusterController) onDelete(obj interface{}) {
cluster := obj.(*crv1.MySQLCluster)
logClusterEventBegin(cluster, ClusterDeleted)
logClusterEventSuccess(cluster, ClusterDeleted)
// Run hooks
for _, hook := range c.GetHooks() {
hook.OnDelete(cluster)
}
}