Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MON-2807: Use bearer token file for remote write authentication with telemeter #1733

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions assets/prometheus-k8s/telemetry-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
data: {}
kind: Secret
metadata:
labels:
app.kubernetes.io/name: prometheus-k8s
name: telemetry-server
namespace: openshift-monitoring
type: Opaque
13 changes: 13 additions & 0 deletions jsonnet/components/prometheus.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ function(params)

kubeRbacProxySecret: generateSecret.staticAuthSecret(cfg.namespace, cfg.commonLabels, 'kube-rbac-proxy'),

// Secret holding the token to authenticate against the Telemetry server when using native remote-write.
telemetrySecret: {
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: 'telemetry-server',
namespace: cfg.namespace,
labels: { 'app.kubernetes.io/name': 'prometheus-k8s' },
},
type: 'Opaque',
data: {},
},

// This changes the Prometheuses to be scraped with TLS, authN and
// authZ, which are not present in kube-prometheus.

Expand Down
47 changes: 33 additions & 14 deletions pkg/manifests/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (

platformAlertmanagerService = "alertmanager-main"
userWorkloadAlertmanagerService = "alertmanager-user-workload"

telemetryTokenSecretKey = "token"
)

var (
Expand Down Expand Up @@ -145,6 +147,7 @@ var (
PrometheusK8sThanosSidecarServiceMonitor = "prometheus-k8s/service-monitor-thanos-sidecar.yaml"
PrometheusK8sTAlertmanagerRoleBinding = "prometheus-k8s/alertmanager-role-binding.yaml"
PrometheusK8sPodDisruptionBudget = "prometheus-k8s/pod-disruption-budget.yaml"
PrometheusK8sTelemetry = "prometheus-k8s/telemetry-secret.yaml"

PrometheusUserWorkloadServingCertsCABundle = "prometheus-user-workload/serving-certs-ca-bundle.yaml"
PrometheusUserWorkloadServiceAccount = "prometheus-user-workload/service-account.yaml"
Expand Down Expand Up @@ -1567,7 +1570,29 @@ func (f *Factory) NewPrometheusK8s() (*monv1.Prometheus, error) {
return f.NewPrometheus(f.assets.MustNewAssetReader(PrometheusK8s))
}

func (f *Factory) PrometheusK8s(grpcTLS *v1.Secret, trustedCABundleCM *v1.ConfigMap) (*monv1.Prometheus, error) {
func (f *Factory) PrometheusK8sTelemetrySecret() (*v1.Secret, error) {
s, err := f.NewSecret(f.assets.MustNewAssetReader(PrometheusK8sTelemetry))
if err != nil {
return nil, err
}
compositeToken, err := json.Marshal(map[string]string{
"cluster_id": f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID,
"authorization_token": f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.Token,
})
if err != nil {
return nil, err
}

b := make([]byte, base64.StdEncoding.EncodedLen(len(compositeToken)))
base64.StdEncoding.Encode(b, compositeToken)
s.Data = map[string][]byte{
telemetryTokenSecretKey: b,
}

return s, nil
}

func (f *Factory) PrometheusK8s(grpcTLS *v1.Secret, trustedCABundleCM *v1.ConfigMap, telemetrySecret *v1.Secret) (*monv1.Prometheus, error) {
p, err := f.NewPrometheusK8s()
if err != nil {
return nil, err
Expand Down Expand Up @@ -1625,23 +1650,18 @@ func (f *Factory) PrometheusK8s(grpcTLS *v1.Secret, trustedCABundleCM *v1.Config
return nil, err
}

telemetryEnabled := f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.IsEnabled()
clusterID := f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID
if telemetryEnabled && f.config.RemoteWrite {

if f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.IsEnabled() && f.config.RemoteWrite {
selectorRelabelConfig, err := promqlgen.LabelSelectorsToRelabelConfig(f.config.ClusterMonitoringConfiguration.PrometheusK8sConfig.TelemetryMatches)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed separately: Future readers of this code would benefit from checking the actual config condition here (if telemetryEnabled && f.config.RemoteWrite) and assuming the secret was created instead of coupling this to the secret creating.

if err != nil {
return nil, errors.Wrap(err, "generate label selector relabel config")
}

compositeToken, err := json.Marshal(map[string]string{
"cluster_id": clusterID,
"authorization_token": f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.Token,
})
p.Spec.Secrets = append(p.Spec.Secrets, telemetrySecret.GetName())

spec := monv1.RemoteWriteSpec{
URL: f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.TelemeterServerURL,
BearerToken: base64.StdEncoding.EncodeToString(compositeToken),
URL: f.config.ClusterMonitoringConfiguration.TelemeterClientConfig.TelemeterServerURL,
BearerTokenFile: fmt.Sprintf("/etc/prometheus/secrets/%s/%s", telemetrySecret.GetName(), telemetryTokenSecretKey),
QueueConfig: &monv1.QueueConfig{
// Amount of samples to load from the WAL into the in-memory
// buffer before waiting for samples to be sent successfully
Expand Down Expand Up @@ -1678,13 +1698,12 @@ func (f *Factory) PrometheusK8s(grpcTLS *v1.Secret, trustedCABundleCM *v1.Config
Replacement: "alerts",
},
},
MetadataConfig: &monv1.MetadataConfig{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to not be related, was it intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it should be commented. Basically we don't want/need to forward metadata information to the telemeter server.

It comes from #1416 which was the previous PR that exercised the receive path of telemeter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Telemeter forwards rw requests to Thanos, and Thanos doesn't have metadata ingestion yet. We can still send it, but it would just be dropped. 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you still want to add a comment for this @simonpasquier ?

Send: false,
},
}

p.Spec.RemoteWrite = []monv1.RemoteWriteSpec{spec}

}
if !telemetryEnabled {
p.Spec.RemoteWrite = nil
}

if len(f.config.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite) > 0 {
Expand Down
38 changes: 24 additions & 14 deletions pkg/manifests/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestUnconfiguredManifests(t *testing.T) {
t.Fatal(err)
}

_, err = f.PrometheusK8s(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil)
_, err = f.PrometheusK8s(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1051,6 +1051,7 @@ func TestPrometheusK8sRemoteWriteClusterIDRelabel(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand All @@ -1069,24 +1070,21 @@ func TestPrometheusK8sRemoteWriteClusterIDRelabel(t *testing.T) {
}

func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {
telemetrySecret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "telemetry",
Namespace: "openshift-monitoring",
},
}
for _, tc := range []struct {
name string
config func() *Config
telemetrySecret *v1.Secret
expectedRemoteWriteURLs []string
}{
{
name: "default config",

config: func() *Config {
c := NewDefaultConfig()
return c
},

expectedRemoteWriteURLs: nil,
},
{
name: "legacy telemetry",

config: func() *Config {
c := NewDefaultConfig()
c.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID = "123"
Expand All @@ -1102,9 +1100,9 @@ func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {

config: func() *Config {
c := NewDefaultConfig()
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom"}}
c.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID = "123"
c.ClusterMonitoringConfiguration.TelemeterClientConfig.Token = "secret"
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom"}}

return c
},
Expand All @@ -1124,6 +1122,7 @@ func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {

return c
},
telemetrySecret: telemetrySecret,

expectedRemoteWriteURLs: []string{
"https://infogw.api.openshift.com/metrics/v1/receive",
Expand All @@ -1135,12 +1134,13 @@ func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {
config: func() *Config {
c := NewDefaultConfig()
c.SetRemoteWrite(true)
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom"}}
c.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID = "123"
c.ClusterMonitoringConfiguration.TelemeterClientConfig.Token = "secret"
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom"}}

return c
},
telemetrySecret: telemetrySecret,

expectedRemoteWriteURLs: []string{
"http://custom",
Expand All @@ -1154,12 +1154,13 @@ func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {
c := NewDefaultConfig()
c.SetRemoteWrite(true)
c.ClusterMonitoringConfiguration.TelemeterClientConfig.TelemeterServerURL = "http://custom-telemeter"
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom-remote-write"}}
c.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID = "123"
c.ClusterMonitoringConfiguration.TelemeterClientConfig.Token = "secret"
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.RemoteWrite = []RemoteWriteSpec{{URL: "http://custom-remote-write"}}

return c
},
telemetrySecret: telemetrySecret,

expectedRemoteWriteURLs: []string{
"http://custom-remote-write",
Expand All @@ -1174,6 +1175,7 @@ func TestPrometheusK8sRemoteWriteURLs(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
tc.telemetrySecret,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1244,6 +1246,7 @@ func TestPrometheusK8sRemoteWriteOauth2(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1386,6 +1389,7 @@ func TestRemoteWriteAuthorizationConfig(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1452,6 +1456,7 @@ ingress:
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1655,6 +1660,7 @@ func TestPrometheusQueryLogFileConfig(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
if !tc.errExpected {
Expand Down Expand Up @@ -1734,6 +1740,7 @@ func TestPrometheusRetentionConfigs(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)

if err != nil {
Expand Down Expand Up @@ -1785,6 +1792,7 @@ prometheusK8s:
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1991,6 +1999,7 @@ func TestPrometheusK8sAdditionalAlertManagerConfigsSecret(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3342,6 +3351,7 @@ func TestNonHighlyAvailableInfrastructure(t *testing.T) {
p, err := f.PrometheusK8s(
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
nil,
)
if err != nil {
return spec{}, err
Expand Down
19 changes: 18 additions & 1 deletion pkg/tasks/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,23 @@ func (t *PrometheusTask) create(ctx context.Context) error {
}
}

telemetrySecret, err := t.factory.PrometheusK8sTelemetrySecret()
if err != nil {
return errors.Wrap(err, "initializing Prometheus telemetry secret failed")
}

if t.config.ClusterMonitoringConfiguration.TelemeterClientConfig.IsEnabled() && t.config.RemoteWrite {
klog.V(4).Info("updating Prometheus telemetry secret")
if err = t.client.CreateOrUpdateSecret(ctx, telemetrySecret); err != nil {
return errors.Wrap(err, "reconciling Prometheus telemetry secret failed")
}
} else {
klog.V(4).Info("deleting Prometheus telemetry secret")
if err = t.client.DeleteSecret(ctx, telemetrySecret); err != nil {
return errors.Wrap(err, "deleting Prometheus telemetry secret failed")
}
}

{
// Create trusted CA bundle ConfigMap.
trustedCA, err := t.factory.PrometheusK8sTrustedCABundle()
Expand Down Expand Up @@ -355,7 +372,7 @@ func (t *PrometheusTask) create(ctx context.Context) error {
}

klog.V(4).Info("initializing Prometheus object")
p, err := t.factory.PrometheusK8s(s, trustedCA)
p, err := t.factory.PrometheusK8s(s, trustedCA, telemetrySecret)
if err != nil {
return errors.Wrap(err, "initializing Prometheus object failed")
}
Expand Down
10 changes: 5 additions & 5 deletions test/e2e/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *PrometheusClient) getAlertmanager(path string, kvs ...string) ([]byte,
// GetFirstValueFromPromQuery takes a query api response body and returns the
// value of the first timeseries. If body contains multiple timeseries
// GetFirstValueFromPromQuery errors.
func GetFirstValueFromPromQuery(body []byte) (int, error) {
func GetFirstValueFromPromQuery(body []byte) (float64, error) {
res, err := gabs.ParseJSON(body)
if err != nil {
return 0, err
Expand All @@ -305,7 +305,7 @@ func GetFirstValueFromPromQuery(body []byte) (int, error) {
return 0, err
}

v, err := strconv.Atoi(value.Data().(string))
v, err := strconv.ParseFloat(value.Data().(string), 64)
if err != nil {
return 0, fmt.Errorf("failed to parse query value: %v", err)
}
Expand Down Expand Up @@ -333,7 +333,7 @@ func GetResultSizeFromPromQuery(body []byte) (int, error) {
func (c *PrometheusClient) WaitForQueryReturnGreaterEqualOne(t *testing.T, timeout time.Duration, query string) {
t.Helper()

c.WaitForQueryReturn(t, timeout, query, func(v int) error {
c.WaitForQueryReturn(t, timeout, query, func(v float64) error {
if v >= 1 {
return nil
}
Expand All @@ -346,7 +346,7 @@ func (c *PrometheusClient) WaitForQueryReturnGreaterEqualOne(t *testing.T, timeo
func (c *PrometheusClient) WaitForQueryReturnOne(t *testing.T, timeout time.Duration, query string) {
t.Helper()

c.WaitForQueryReturn(t, timeout, query, func(v int) error {
c.WaitForQueryReturn(t, timeout, query, func(v float64) error {
if v == 1 {
return nil
}
Expand All @@ -357,7 +357,7 @@ func (c *PrometheusClient) WaitForQueryReturnOne(t *testing.T, timeout time.Dura

// WaitForQueryReturn waits for a given PromQL query for a given time interval
// and validates the **first and only** result with the given validate function.
func (c *PrometheusClient) WaitForQueryReturn(t *testing.T, timeout time.Duration, query string, validate func(int) error) {
func (c *PrometheusClient) WaitForQueryReturn(t *testing.T, timeout time.Duration, query string, validate func(float64) error) {
t.Helper()

err := Poll(5*time.Second, timeout, func() error {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func testMain(m *testing.M) error {
err = wait.Poll(5*time.Second, 1*time.Minute, func() (bool, error) {
var (
body []byte
v int
v float64
)
body, loopErr = f.ThanosQuerierClient.PrometheusQuery("count(last_over_time(up{job=\"prometheus-k8s\"}[2m]))")
if loopErr != nil {
Expand Down
Loading