Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP K8s ring (aka r2g) #92

Merged
merged 8 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ require (
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.7
k8s.io/apimachinery v0.21.7
k8s.io/client-go v0.21.7
)

replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.4
replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.7

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet.
replace github.com/hashicorp/memberlist v0.2.3 => github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b
259 changes: 254 additions & 5 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/etcd"
"github.com/grafana/dskit/kv/kubernetes"
"github.com/grafana/dskit/kv/memberlist"
)

Expand Down Expand Up @@ -142,6 +143,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
})
client = inmemoryStore

case "kubernetes":
client, err = kubernetes.NewClient(&kubernetes.Config{}, codec, logger, reg)

case "memberlist":
kv, err := cfg.MemberlistKV()
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions kv/kubernetes/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kubernetes

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

func (c *Client) startController() error {
// create the pod watcher
configMapWatcher := cache.NewFilteredListWatchFromClient(c.clientset.CoreV1().RESTClient(), "configMaps", c.namespace, func(options *metav1.ListOptions) {
options.FieldSelector = "metadata.name=" + c.name
})

// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

indexer, informer := cache.NewIndexerInformer(configMapWatcher, &v1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})

c.queue = queue
c.indexer = indexer
c.informer = informer

go c.runController()

return nil
}

func (c *Client) runController() {
defer runtime.HandleCrash()

// Let the workers stop when we are done
defer c.queue.ShutDown()
c.logger.Log("msg", "starting configmap controller")

go c.informer.Run(c.stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(c.stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}

go wait.Until(func() {
for c.process() {
}
}, time.Second, c.stopCh)

<-c.stopCh
c.logger.Log("msg", "stopping configmap controller")
}

func (c *Client) process() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)

obj, exists, err := c.indexer.GetByKey(key.(string))
if err != nil {
c.logger.Log("Fetching object with key %s from store failed with %v", key.(string), err)
return false
}

if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("configMap %s does not exist anymore\n", key)
} else {
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Sync/Add/Update for configMap %s\n", obj.(*v1.ConfigMap).GetName())
}
return true

}
70 changes: 70 additions & 0 deletions kv/kubernetes/jsonpatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"encoding/base64"
"encoding/json"
)

type operation struct {
Op string `json:"op"`
Path string `json:"path"`
// Value should be a base64-encoded value or nil.
Value *string `json:"value"`
}

// preparePatch prepares a JSON patch (RFC 6902) to update a configmap key. If oldHash is empty or nil
// this is equivalent to adding a key.
func preparePatch(key string, oldHash, newVal, newHash []byte) ([]byte, error) {
hashKey := "/binaryData/" + convertKeyToStoreHash(key)

b64 := func(b []byte) *string {
str := base64.StdEncoding.EncodeToString(b)
return &str
}

// testing with nil is equivalent to testing that the key doesn't exist
var expectedHash *string
if len(oldHash) > 0 {
expectedHash = b64(oldHash)
}

testHashOp := operation{
Op: "test",
Path: hashKey,
Value: expectedHash,
}

setHashOp := operation{
Op: "replace",
Path: hashKey,
Value: b64(newHash),
}

setDataOp := operation{
Op: "replace",
Path: "/binaryData/" + convertKeyToStore(key),
Value: b64(newVal),
}

patch := []operation{testHashOp, setHashOp, setDataOp}

return json.Marshal(patch)
}

func prepareDeletePatch(key string) ([]byte, error) {
removeHashOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStoreHash(key),
Value: nil,
}

removeObjectOp := operation{
Op: "remove",
Path: "/binaryData/" + convertKeyToStore(key),
Value: nil,
}

patch := []operation{removeHashOp, removeObjectOp}

return json.Marshal(patch)
}
Loading