/
main.go
192 lines (167 loc) · 5.7 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package main
import (
"os"
"os/signal"
"syscall"
"time"
log "github.com/Sirupsen/logrus"
handlers "github.com/intuit/katlas/controller/handlers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
// GetKubernetesClient retrieve the Kubernetes cluster client from outside of the cluster
func GetKubernetesClient() kubernetes.Interface {
// construct the path to resolve to `~/.kube/config`
kubeConfigPath := os.Getenv("HOME") + "/.kube/config"
// kubeConfigPath := os.Getenv("HOME") + "/Downloads/admins\\@dev-devx-cmdb-api-usw2-ppd-qal"
var config *rest.Config
// create the config from the path
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
log.Infof("getClusterConfig: %v", err)
// if the container is running inside the cluster, use the incluster config
var err2 error
config, err2 = rest.InClusterConfig()
if err2 != nil {
panic(err.Error())
}
}
// generate the client based off of the config
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
log.Info("Successfully constructed k8s client")
return client
}
// CreateController to build handler base on type
func CreateController(objType string) *Controller {
client := GetKubernetesClient()
var informer cache.SharedIndexInformer
var handlerc handlers.Handler
// create a new queue so that when the informer gets a resource that is either
// a result of listing or watching, we can add an idenfitying key to the queue
// so that it can be handled in the handler
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
switch objType {
case "Pod":
// create the informer so that we can not only list resources
// but also watch them for all pods in the default namespace
informer = handlers.GetPodInformer(client)
// add event handlers to handle the three types of events for resources:
// - adding new resources
// - updating existing resources
// - deleting resources
handlerc = &handlers.PodHandler{}
//return informer
case "Service":
informer = handlers.GetServiceInformer(client)
handlerc = &handlers.ServiceHandler{}
case "Namespace":
informer = handlers.GetNamespaceInformer(client)
handlerc = &handlers.NamespaceHandler{}
case "Deployment":
informer = handlers.GetDeploymentInformer(client)
handlerc = &handlers.DeploymentHandler{}
case "ReplicaSet":
informer = handlers.GetReplicaSetInformer(client)
handlerc = &handlers.ReplicaSetHandler{}
case "Ingress":
informer = handlers.GetIngressInformer(client)
handlerc = &handlers.IngressHandler{}
case "StatefulSet":
informer = handlers.GetStatefulSetInformer(client)
handlerc = &handlers.StatefulSetHandler{}
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// convert the resource object into a key (in this case
// we are just doing it in the format of 'namespace/name')
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Infof("Add %s: %s", objType, key)
if err == nil {
// add the key to the queue for the handler to get
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Update %s: %s", objType, key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamsespaceKeyFunc is a helper function that allows
// us to check the DeletedFinalStateUnknown existence in the event that
// a resource was deleted but it is still contained in the index
//
// this then in turn calls MetaNamespaceKeyFunc
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Delete %s: %s", objType, key)
if err == nil {
queue.Add(key)
}
},
})
// construct the Controller object which has all of the necessary components to
// handle logging, connections, informing (listing and watching), the queue,
// and the handler
controller := Controller{
logger: log.NewEntry(log.New()),
clientset: client,
informer: informer,
queue: queue,
handler: handlerc,
name: objType,
}
return &controller
}
// Synchronizer periodically sync resources with database
func Synchronizer() {
client := GetKubernetesClient()
for {
time.Sleep(time.Hour)
handlers.NamespaceSynchronize(client)
handlers.StatefulSetSynchronize(client)
handlers.DeploymentSynchronize(client)
handlers.ReplicaSetSynchronize(client)
handlers.PodSynchronize(client)
handlers.ServiceSynchronize(client)
handlers.IngressSynchronize(client)
}
}
// main code path
func main() {
log.Info("Current namespace: ", os.Getenv("AppNamespace"))
podcontroller := CreateController("Pod")
svccontroller := CreateController("Service")
nscontroller := CreateController("Namespace")
depcontroller := CreateController("Deployment")
rscontroller := CreateController("ReplicaSet")
ingcontroller := CreateController("Ingress")
sscontroller := CreateController("StatefulSet")
// use a channel to synchronize the finalization for a graceful shutdown
stopCh := make(chan struct{})
defer close(stopCh)
// log.SetLevel(log.DebugLevel)
// start sync task
go Synchronizer()
// run the controller loop to process items
go podcontroller.Run(stopCh)
go svccontroller.Run(stopCh)
go nscontroller.Run(stopCh)
go depcontroller.Run(stopCh)
go rscontroller.Run(stopCh)
go ingcontroller.Run(stopCh)
go sscontroller.Run(stopCh)
// use a channel to handle OS signals to terminate and gracefully shut
// down processing
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM)
signal.Notify(sigTerm, syscall.SIGINT)
<-sigTerm
}