Skip to content

Commit

Permalink
client manager return one client when db backed
Browse files Browse the repository at this point in the history
  • Loading branch information
randmonkey committed Oct 24, 2023
1 parent 72e1313 commit a8b9162
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 42 deletions.
68 changes: 67 additions & 1 deletion internal/clients/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@ package clients
import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/samber/lo"
"golang.org/x/exp/maps"
k8stypes "k8s.io/apimachinery/pkg/types"

"github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
dataplaneutil "github.com/kong/kubernetes-ingress-controller/v2/internal/util/dataplane"
)

// DefaultReadinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop.
// It's the same as the default interval of a Kubernetes container's readiness probe.
const DefaultReadinessReconciliationInterval = 10 * time.Second

const DefaultAdminAPIServicePort = 8444

// ClientFactory is responsible for creating Admin API clients.
type ClientFactory interface {
CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error)
Expand Down Expand Up @@ -47,6 +52,12 @@ type AdminAPIClientsManager struct {
// endpoints list that should be used for configuring the dataplane.
discoveredAdminAPIsNotifyChan chan []adminapi.DiscoveredAdminAPI
gatewayClientsChangesSubscribers []chan struct{}
// adminAPIServiceName is the name of admin API service.
// When Kong gateway is running with DB, the manager will return the address of admin API service
// since KIC need only to send the requests to the admin API once with DB backed Kong gateway.
adminAPIServiceName k8stypes.NamespacedName
adminAPIServicePort int
dbMode string

ctx context.Context
onceNotifyLoopRunning sync.Once
Expand All @@ -60,6 +71,9 @@ type AdminAPIClientsManager struct {
// configured.
pendingGatewayClients map[string]adminapi.DiscoveredAdminAPI

// clientFactory is used to create admin API client pointing to the admin API service.
clientFactory ClientFactory

// readinessChecker is used to check readiness of the clients.
readinessChecker ReadinessChecker

Expand All @@ -85,10 +99,30 @@ func WithReadinessReconciliationTicker(ticker Ticker) AdminAPIClientsManagerOpti
}
}

func WithAdminAPIServiceName(serviceNN k8stypes.NamespacedName) AdminAPIClientsManagerOption {
return func(m *AdminAPIClientsManager) {
m.adminAPIServiceName = serviceNN
}
}

func WithAdminAPIServicePort(port int) AdminAPIClientsManagerOption {
return func(m *AdminAPIClientsManager) {
m.adminAPIServicePort = port
}
}

// WithDBMode allows to set the DBMode of the Kong gateway instances behind the admin API service.
func WithDBMode(dbMode string) AdminAPIClientsManagerOption {
return func(m *AdminAPIClientsManager) {
m.dbMode = dbMode
}
}

func NewAdminAPIClientsManager(
ctx context.Context,
logger logr.Logger,
initialClients []*adminapi.Client,
clientFactory ClientFactory,
readinessChecker ReadinessChecker,
opts ...AdminAPIClientsManagerOption,
) (*AdminAPIClientsManager, error) {
Expand All @@ -102,6 +136,8 @@ func NewAdminAPIClientsManager(
c := &AdminAPIClientsManager{
readyGatewayClients: readyClients,
pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI),
clientFactory: clientFactory,
adminAPIServicePort: DefaultAdminAPIServicePort,
readinessChecker: readinessChecker,
readinessReconciliationTicker: clock.NewTicker(),
discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI),
Expand Down Expand Up @@ -170,7 +206,37 @@ func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient {
func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client {
c.lock.RLock()
defer c.lock.RUnlock()
return lo.Values(c.readyGatewayClients)
readyGatewayClients := lo.Values(c.readyGatewayClients)
// With dbless mode, we should send configuration to ALL Kong gateway instances.
if dataplaneutil.IsDBLessMode(c.dbMode) {
return readyGatewayClients
}
// if there are no ready gateway clients, we should return an empty list.
if len(readyGatewayClients) == 0 {
return []*adminapi.Client{}
}
// return ONE client if Kong gateway is DB backed.
// if the name of admin API service is given, return the address of the service.
if c.adminAPIServiceName.Name != "" && c.adminAPIServiceName.Namespace != "" {
cl, err := c.clientFactory.CreateAdminAPIClient(c.ctx, adminapi.DiscoveredAdminAPI{
Address: fmt.Sprintf("https://%s.%s:%d",
c.adminAPIServiceName.Name,
c.adminAPIServiceName.Namespace,
c.adminAPIServicePort,
),
})
if err != nil {
// Fallback to choose a single instance if we failed to create a client from admin API service.
c.logger.Error(err, "failed to create admin API client from admin API service address",
"service_name", c.adminAPIServiceName.String(),
"service_port", c.adminAPIServicePort,
)
} else {
return []*adminapi.Client{cl}
}
}
// if no admin API service is given, choose one discovered client to send configurations.
return readyGatewayClients[:1]
}

func (c *AdminAPIClientsManager) GatewayClientsCount() int {
Expand Down
24 changes: 20 additions & 4 deletions internal/clients/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.
ctx,
logger,
[]*adminapi.Client{initialClient},
mocks.NewAdminAPIClientFactory(map[string]error{}),
readinessChecker,
)
require.NoError(t, err)
Expand Down Expand Up @@ -158,7 +159,13 @@ func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.
}

func TestNewAdminAPIClientsManager_NoInitialClientsDisallowed(t *testing.T) {
_, err := clients.NewAdminAPIClientsManager(context.Background(), zapr.NewLogger(zap.NewNop()), nil, &mockReadinessChecker{})
_, err := clients.NewAdminAPIClientsManager(
context.Background(),
zapr.NewLogger(zap.NewNop()),
nil,
mocks.NewAdminAPIClientFactory(map[string]error{}),
&mockReadinessChecker{},
)
require.ErrorContains(t, err, "at least one initial client must be provided")
}

Expand All @@ -171,6 +178,7 @@ func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) {
context.Background(),
zapr.NewLogger(zap.NewNop()),
[]*adminapi.Client{testClient},
mocks.NewAdminAPIClientFactory(map[string]error{}),
&mockReadinessChecker{},
)
require.NoError(t, err)
Expand All @@ -191,6 +199,7 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) {
context.Background(),
zapr.NewLogger(zap.NewNop()),
[]*adminapi.Client{testClient},
mocks.NewAdminAPIClientFactory(map[string]error{}),
&mockReadinessChecker{},
)
require.NoError(t, err)
Expand All @@ -212,7 +221,13 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
m, err := clients.NewAdminAPIClientsManager(ctx, zapr.NewLogger(zap.NewNop()), []*adminapi.Client{testClient}, readinessChecker)
m, err := clients.NewAdminAPIClientsManager(
ctx,
zapr.NewLogger(zap.NewNop()),
[]*adminapi.Client{testClient},
mocks.NewAdminAPIClientFactory(map[string]error{}),
readinessChecker)

require.NoError(t, err)

t.Run("no notify loop running should return false when subscribing", func(t *testing.T) {
Expand Down Expand Up @@ -297,7 +312,7 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m, err := clients.NewAdminAPIClientsManager(ctx, zapr.NewLogger(zap.NewNop()), []*adminapi.Client{testClient}, readinessChecker)
m, err := clients.NewAdminAPIClientsManager(ctx, zapr.NewLogger(zap.NewNop()), []*adminapi.Client{testClient}, mocks.NewAdminAPIClientFactory(map[string]error{}), readinessChecker)
require.NoError(t, err)
m.Run()

Expand Down Expand Up @@ -331,7 +346,7 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
readinessChecker := &mockReadinessChecker{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m, err := clients.NewAdminAPIClientsManager(ctx, zapr.NewLogger(zap.NewNop()), []*adminapi.Client{testClient}, readinessChecker)
m, err := clients.NewAdminAPIClientsManager(ctx, zapr.NewLogger(zap.NewNop()), []*adminapi.Client{testClient}, mocks.NewAdminAPIClientFactory(map[string]error{}), readinessChecker)
require.NoError(t, err)

m.Run()
Expand Down Expand Up @@ -430,6 +445,7 @@ func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) {
ctx,
zapr.NewLogger(zap.NewNop()),
[]*adminapi.Client{testClient},
mocks.NewAdminAPIClientFactory(map[string]error{}),
readinessChecker,
clients.WithReadinessReconciliationTicker(readinessTicker),
)
Expand Down
47 changes: 10 additions & 37 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -463,45 +462,19 @@ func (c *KongClient) sendOutToGatewayClients(
) ([]string, error) {
gatewayClients := c.clientsProvider.GatewayClients()
previousSHAs := c.SHAs
gatewayClientURLs := []string{}
for _, cl := range gatewayClients {
gatewayClientURLs = append(gatewayClientURLs, cl.BaseRootURL())
}

if dataplaneutil.IsDBLessMode(c.dbmode) {
c.logger.V(util.DebugLevel).Info("sending configuration to gateway clients", "urls", gatewayClientURLs)
shas, err := iter.MapErr(gatewayClients, func(client **adminapi.Client) (string, error) {
return c.sendToClient(ctx, *client, s, config)
})
if err != nil {
return nil, err
}
gatewayClientURLs := lo.Map(gatewayClients, func(cl *adminapi.Client, _ int) string { return cl.BaseRootURL() })
c.logger.V(util.DebugLevel).Info("sending configuration to gateway clients", "urls", gatewayClientURLs)

sort.Strings(shas)
c.SHAs = shas
} else {
// directly return if no gateway clients found.
if len(gatewayClients) == 0 {
c.logger.Info("no active gateway clients found, skip sending")
return previousSHAs, nil
}
// randomly choose one client.
// REVIEW: The order of GatewayClients() is random, could we just use the "first" client?
// TODO: if multiple gateway instances uses different DBs, choose one client for each DB:
// https://github.com/Kong/kubernetes-ingress-controller/issues/4845
clientIndex := rand.Intn(len(gatewayClients)) //nolint:gosec
c.logger.V(util.DebugLevel).Info(
fmt.Sprintf("sending configuration to gateway client %d/%d", clientIndex+1, len(gatewayClients)),
"chosen_url", gatewayClientURLs[clientIndex],
"urls", gatewayClientURLs,
)
sha, err := c.sendToClient(ctx, gatewayClients[clientIndex], s, config)
if err != nil {
return nil, err
}
c.SHAs = []string{sha}
shas, err := iter.MapErr(gatewayClients, func(client **adminapi.Client) (string, error) {
return c.sendToClient(ctx, *client, s, config)
})
if err != nil {
return nil, err
}

sort.Strings(shas)
c.SHAs = shas

c.kongConfigFetcher.StoreLastValidConfig(s)

return previousSHAs, nil
Expand Down
2 changes: 2 additions & 0 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ func Run(
ctx,
logger,
initialKongClients,
adminAPIClientsFactory,
readinessChecker,
)
if err != nil {
return fmt.Errorf("failed to create AdminAPIClientsManager: %w", err)
}
if c.KongAdminSvc.IsPresent() {
clients.WithAdminAPIServiceName(c.KongAdminSvc.MustGet())(clientsManager)
setupLog.Info("Running AdminAPIClientsManager loop")
clientsManager.Run()
}
Expand Down

0 comments on commit a8b9162

Please sign in to comment.