Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor interceptor main package #1467

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 11 additions & 70 deletions cmd/interceptors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"log"
"net"
"net/http"
"os"
"time"

triggersclientset "github.com/tektoncd/triggers/pkg/client/clientset/versioned"
Expand All @@ -32,12 +31,9 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
kubeclient "knative.dev/pkg/client/injection/kube/client"
secretInformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
certresources "knative.dev/pkg/webhook/certificates/resources"
)

const (
Expand Down Expand Up @@ -84,72 +80,12 @@ func main() {
return
}

serverCert, caCert, err := server.CreateCerts(ctx, kubeclient.Get(ctx).CoreV1(), time.Now().Add(server.Decade), logger)
if err != nil {
return
}
server.CreateAndValidateCerts(ctx, kubeclient.Get(ctx).CoreV1(), logger, service, tc.TriggersV1alpha1())

if err := service.ListAndUpdateClusterInterceptorCRD(ctx, tc.TriggersV1alpha1(), caCert); err != nil {
return
}
// watch for caCert existence in clusterInterceptor, update with new caCert if its missing in clusterInterceptor
server.UpdateCACertToClusterInterceptorCRD(ctx, service, tc.TriggersV1alpha1(), logger, time.Minute)

// After creating certificates using CreateCerts lets validate validity of created certificates
service.CheckCertValidity(ctx, serverCert, caCert, kubeclient.Get(ctx).CoreV1(), logger, tc.TriggersV1alpha1(), time.Minute)

interceptorSecretName := os.Getenv(server.InterceptorTLSSecretKey)
ticker := time.NewTicker(time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
secret, err := secretInformer.Get(ctx).Lister().Secrets(system.Namespace()).Get(interceptorSecretName)
if err != nil {
logger.Errorf("failed to fetch secret %v", err)
return
}
caCert, ok := secret.Data[certresources.CACert]
if !ok {
logger.Warn("CACert key missing")
return
}
if err := service.ListAndUpdateClusterInterceptorCRD(ctx, tc.TriggersV1alpha1(), caCert); err != nil {
return
}
case <-quit:
ticker.Stop()
return
}
}
}()

tlsData := &tls.Config{
MinVersion: tls.VersionTLS13,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
secret, err := secretInformer.Get(ctx).Lister().Secrets(system.Namespace()).Get(interceptorSecretName)
if err != nil {
logger.Errorf("failed to fetch secret %v", err)
return nil, nil
}

serverKey, ok := secret.Data[certresources.ServerKey]
if !ok {
logger.Warn("server key missing")
return nil, nil
}
serverCert, ok := secret.Data[certresources.ServerCert]
if !ok {
logger.Warn("server cert missing")
return nil, nil
}
cert, err := tls.X509KeyPair(serverCert, serverKey)
if err != nil {
return nil, err
}
return &cert, nil
},
}
if err := startServer(ctx, ctx.Done(), mux, tlsData, logger); err != nil {
if err := startServer(ctx, ctx.Done(), mux, logger); err != nil {
logger.Fatal(err)
}
}
Expand All @@ -158,7 +94,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func startServer(ctx context.Context, stop <-chan struct{}, mux *http.ServeMux, tlsData *tls.Config, logger *zap.SugaredLogger) error {
func startServer(ctx context.Context, stop <-chan struct{}, mux *http.ServeMux, logger *zap.SugaredLogger) error {

srv := &http.Server{
Addr: fmt.Sprintf(":%d", HTTPSPort),
Expand All @@ -170,7 +106,12 @@ func startServer(ctx context.Context, stop <-chan struct{}, mux *http.ServeMux,
WriteTimeout: writeTimeout,
IdleTimeout: idleTimeout,
Handler: mux,
TLSConfig: tlsData,
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS13,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
return server.GetTLSData(ctx, logger)
},
},
}

eg, ctx := errgroup.WithContext(ctx)
Expand Down
142 changes: 97 additions & 45 deletions pkg/interceptors/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
Expand All @@ -26,14 +27,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
secretInformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/system"
certresources "knative.dev/pkg/webhook/certificates/resources"
)

const (
Decade = 100 * 365 * 24 * time.Hour
InterceptorTLSSecretKey = "INTERCEPTOR_TLS_SECRET_NAME"
InterceptorTLSSvcKey = "INTERCEPTOR_TLS_SVC_NAME"
interceptorTLSSecretKey = "INTERCEPTOR_TLS_SECRET_NAME"
interceptorTLSSvcKey = "INTERCEPTOR_TLS_SVC_NAME"
)

type keypairReloader struct {
Expand Down Expand Up @@ -155,9 +157,23 @@ func (is *Server) ExecuteInterceptor(r *http.Request) ([]byte, error) {
return respBytes, nil
}

func CreateCerts(ctx context.Context, coreV1Interface corev1.CoreV1Interface, noAfter time.Time, logger *zap.SugaredLogger) ([]byte, []byte, error) {
interceptorSvcName := os.Getenv(InterceptorTLSSvcKey)
interceptorSecretName := os.Getenv(InterceptorTLSSecretKey)
func CreateAndValidateCerts(ctx context.Context, coreV1Interface corev1.CoreV1Interface, logger *zap.SugaredLogger, service *Server, tc triggersv1alpha1.TriggersV1alpha1Interface) {
serverCert, caCert, err := createCerts(ctx, coreV1Interface, time.Now().Add(Decade), logger)
if err != nil {
return
}

if err := service.listAndUpdateClusterInterceptorCRD(ctx, tc, caCert); err != nil {
return
}

// After creating certificates using CreateCerts lets validate validity of created certificates
service.checkCertValidity(ctx, serverCert, caCert, coreV1Interface, logger, tc, time.Minute)
}

func createCerts(ctx context.Context, coreV1Interface corev1.CoreV1Interface, noAfter time.Time, logger *zap.SugaredLogger) ([]byte, []byte, error) {
interceptorSvcName := os.Getenv(interceptorTLSSvcKey)
interceptorSecretName := os.Getenv(interceptorTLSSecretKey)
namespace := system.Namespace()

secret, err := coreV1Interface.Secrets(namespace).Get(ctx, interceptorSecretName, metav1.GetOptions{})
Expand Down Expand Up @@ -208,70 +224,63 @@ func (is *Server) updateCRDWithCaCert(ctx context.Context, triggersV1Alpha1 trig
return nil
}

func (is *Server) CheckCertValidity(ctx context.Context, serverCert, caCert []byte, coreV1Interface corev1.CoreV1Interface,
func (is *Server) checkCertValidity(ctx context.Context, serverCert, caCert []byte, coreV1Interface corev1.CoreV1Interface,
logger *zap.SugaredLogger, tc triggersv1alpha1.TriggersV1alpha1Interface, tickerTime time.Duration) {

result := &keypairReloader{
caCertData: caCert,
serverCertData: serverCert,
}

ticker := time.NewTicker(tickerTime)
quit := make(chan struct{})
var (
cert *x509.Certificate
err error
)

go func() {
for {
select {
case <-ticker.C:
// Check the expiration date of the certificate to see if it needs to be updated
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(result.caCertData)
if !ok {
logger.Error("failed to parse root certificate")
}
block, _ := pem.Decode(result.serverCertData)
if block == nil {
logger.Error("failed to parse certificate PEM")
} else {
cert, err = x509.ParseCertificate(block.Bytes)
if err != nil {
logger.Errorf("failed to parse certificate: %v", err.Error())
}
<-ticker.C
// Check the expiration date of the certificate to see if it needs to be updated
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(result.caCertData)
if !ok {
logger.Error("failed to parse root certificate")
}
block, _ := pem.Decode(result.serverCertData)
if block == nil {
logger.Error("failed to parse certificate PEM")
} else {
cert, err = x509.ParseCertificate(block.Bytes)
if err != nil {
logger.Errorf("failed to parse certificate: %v", err.Error())
}
}

opts := x509.VerifyOptions{
Roots: roots,
}

if _, err := cert.Verify(opts); err != nil {
logger.Errorf("failed to verify certificate: %v", err.Error())

opts := x509.VerifyOptions{
Roots: roots,
serverCertNew, caCertNew, err := createCerts(ctx, coreV1Interface, time.Now().Add(Decade), logger)
if err != nil {
logger.Errorf("failed to create certs %v", err)
}

if _, err := cert.Verify(opts); err != nil {
logger.Errorf("failed to verify certificate: %v", err.Error())

serverCertNew, caCertNew, err := CreateCerts(ctx, coreV1Interface, time.Now().Add(Decade), logger)
if err != nil {
logger.Errorf("failed to create certs %v", err)
}

result = &keypairReloader{
caCertData: caCertNew,
serverCertData: serverCertNew,
}
if err := is.ListAndUpdateClusterInterceptorCRD(ctx, tc, caCertNew); err != nil {
logger.Error(err.Error())
}
result = &keypairReloader{
caCertData: caCertNew,
serverCertData: serverCertNew,
}
if err := is.listAndUpdateClusterInterceptorCRD(ctx, tc, caCertNew); err != nil {
logger.Error(err.Error())
}
case <-quit:
ticker.Stop()
return
}
}
}()
}

func (is *Server) ListAndUpdateClusterInterceptorCRD(ctx context.Context, tc triggersv1alpha1.TriggersV1alpha1Interface, caCert []byte) error {
func (is *Server) listAndUpdateClusterInterceptorCRD(ctx context.Context, tc triggersv1alpha1.TriggersV1alpha1Interface, caCert []byte) error {
clusterInterceptorList, err := tc.ClusterInterceptors().List(ctx, metav1.ListOptions{})
if err != nil {
return err
Expand All @@ -282,3 +291,46 @@ func (is *Server) ListAndUpdateClusterInterceptorCRD(ctx context.Context, tc tri
}
return nil
}

func GetTLSData(ctx context.Context, logger *zap.SugaredLogger) (*tls.Certificate, error) {
secret, err := secretInformer.Get(ctx).Lister().Secrets(system.Namespace()).Get(os.Getenv(interceptorTLSSecretKey))
if err != nil {
logger.Errorf("failed to fetch secret %v", err)
return nil, err
}
serverKey, ok := secret.Data[certresources.ServerKey]
if !ok {
logger.Warn("server key missing")
return nil, fmt.Errorf("server key missing")
}
serverCert, ok := secret.Data[certresources.ServerCert]
if !ok {
logger.Warn("server cert missing")
return nil, fmt.Errorf("server cert missing")
}
cert, err := tls.X509KeyPair(serverCert, serverKey)
return &cert, err
}

func UpdateCACertToClusterInterceptorCRD(ctx context.Context, service *Server, tc triggersv1alpha1.TriggersV1alpha1Interface, logger *zap.SugaredLogger, timer time.Duration) {
interceptorSecretName := os.Getenv(interceptorTLSSecretKey)
ticker := time.NewTicker(timer)
go func() {
for {
<-ticker.C
secret, err := secretInformer.Get(ctx).Lister().Secrets(system.Namespace()).Get(interceptorSecretName)
if err != nil {
logger.Errorf("failed to fetch secret %v", err)
return
}
caCert, ok := secret.Data[certresources.CACert]
if !ok {
logger.Warn("CACert key missing")
return
}
if err := service.listAndUpdateClusterInterceptorCRD(ctx, tc, caCert); err != nil {
return
}
}
}()
}
Loading