Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auth token propagation for metrics reader #3341

Merged
merged 17 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/apiv3"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/netutils"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
Expand Down Expand Up @@ -158,7 +159,7 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
var handler http.Handler = r
handler = additionalHeadersHandler(handler, queryOpts.AdditionalHeaders)
if queryOpts.BearerTokenPropagation {
handler = bearerTokenPropagationHandler(logger, handler)
handler = bearertoken.PropagationHandler(logger, handler)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -95,7 +95,7 @@ func main() {
opentracing.SetGlobalTracer(tracer)
queryOpts := new(app.QueryOptions).InitFromViper(v, logger)
// TODO: Need to figure out set enable/disable propagation on storage plugins.
v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation)
v.Set(bearertoken.StoragePropagationKey, queryOpts.BearerTokenPropagation)
storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,44 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package app
package bearertoken

import (
"context"
"net/http"
"strings"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

func bearerTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
type contextKey string

// Key is the string literal used internally in the implementation of this context.
const Key = "bearer.token"
const bearerToken = contextKey(Key)

// StoragePropagationKey is a key for viper configuration to pass this option to storage plugins.
const StoragePropagationKey = "storage.propagate.token"

// ContextWithBearerToken set bearer token in context.
func ContextWithBearerToken(ctx context.Context, token string) context.Context {
if token == "" {
return ctx
}
return context.WithValue(ctx, bearerToken, token)
}

// GetBearerToken from context, or empty string if there is no token.
func GetBearerToken(ctx context.Context) (string, bool) {
val, ok := ctx.Value(bearerToken).(string)
return val, ok
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this file:

  • up to this point it's context.go
  • below it's http.go
  • you may also want to add the http transport implementation instead of using private types in different storages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in f0774b2.


// PropagationHandler returns a http.Handler containing the logic to extract
// the Authorization token from the http.Request and inserts it into the http.Request
// context for easier access to the request token via GetBearerToken for bearer token
// propagation use cases.
func PropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed bearerTokenPropagationHandler -> PropagationHandler to reduce stutter.

I considered renaming ContextWithBearerToken and GetBearerToken for similar reasons but on further thought, I felt it reduces readability. Let me know if you disagree.

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authHeaderValue := r.Header.Get("Authorization")
Expand All @@ -40,15 +66,14 @@ func bearerTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Hand
token = headerValue[1]
}
} else if len(headerValue) == 1 {
// Tread all value as a token
// Treat the entire value as a token.
token = authHeaderValue
} else {
logger.Warn("Invalid authorization header value, skipping token propagation")
}
h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, token)))
h.ServeHTTP(w, r.WithContext(ContextWithBearerToken(ctx, token)))
} else {
h.ServeHTTP(w, r.WithContext(ctx))
}
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,54 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package app
package bearertoken

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

func Test_bearTokenPropagationHandler(t *testing.T) {
func Test_GetBearerToken(t *testing.T) {
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this string just be "blah"? Currently it looks like there's special meaning in the value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 8e5f1b9.

ctx := context.Background()
ctx = ContextWithBearerToken(ctx, token)
contextToken, ok := GetBearerToken(ctx)
assert.True(t, ok)
assert.Equal(t, contextToken, token)
}

func Test_bearerTokenPropagationHandler(t *testing.T) {
httpClient := &http.Client{
Timeout: 2 * time.Second,
}

logger := zap.NewNop()
bearerToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"

validTokenHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, ok := spanstore.GetBearerToken(ctx)
token, ok := GetBearerToken(ctx)
assert.Equal(t, token, bearerToken)
assert.True(t, ok)
stop.Done()
})
}
}

emptyHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, _ := spanstore.GetBearerToken(ctx)
token, _ := GetBearerToken(ctx)
assert.Empty(t, token, bearerToken)
stop.Done()
})
}
}

testCases := []struct {
Expand All @@ -68,7 +81,7 @@ func Test_bearTokenPropagationHandler(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
stop := sync.WaitGroup{}
stop.Add(1)
r := bearerTokenPropagationHandler(logger, testCase.handler(&stop))
r := PropagationHandler(logger, testCase.handler(&stop))
server := httptest.NewServer(r)
defer server.Close()
req, err := http.NewRequest("GET", server.URL, nil)
Expand All @@ -81,5 +94,4 @@ func Test_bearTokenPropagationHandler(t *testing.T) {
stop.Wait()
})
}

}
4 changes: 2 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapgrpc"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -538,7 +538,7 @@ type tokenAuthTransport struct {
func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
token := tr.token
if tr.allowOverrideFromCtx {
headerToken, _ := spanstore.GetBearerToken(r.Context())
headerToken, _ := bearertoken.GetBearerToken(r.Context())
if headerToken != "" {
token = headerToken
}
Expand Down
20 changes: 19 additions & 1 deletion plugin/metrics/prometheus/metricsstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/prometheus/config"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus/metricsstore/dbmodel"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
Expand Down Expand Up @@ -253,13 +254,30 @@ func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.R

// KeepAlive and TLSHandshake timeouts are kept to existing Prometheus client's
// DefaultRoundTripper to simplify user configuration and may be made configurable when required.
return &http.Transport{
httpTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: c.ConnectTimeout,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: ctlsConfig,
}
return &tokenAuthTransport{
wrapped: httpTransport,
}, nil
}

// tokenAuthTransport wraps an instance of http.Transport for the purpose of
// propagating an Authorization token from inbound to outbound HTTP requests.
type tokenAuthTransport struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could we take this opportunity and refactor/cleanup bearer token functionality into a standalone package? We have ./cmd/query/app/token_propagation_handler.go and ./storage/spanstore/token_propagation.go, which should really be in a single package like ./pkg/bearertoken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in d1c96ad.

wrapped *http.Transport
}

// RoundTrip implements the http.RoundTripper interface, injecting the outbound
// Authorization header with the token provided in the inbound request.
func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
headerToken, _ := bearertoken.GetBearerToken(r.Context())
r.Header.Set("Authorization", "Bearer "+headerToken)
return tr.wrapped.RoundTrip(r)
}
28 changes: 25 additions & 3 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/prometheus/config"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
Expand Down Expand Up @@ -331,12 +332,33 @@ func TestGetRoundTripper(t *testing.T) {
},
}, logger)
require.NoError(t, err)
assert.IsType(t, &http.Transport{}, rt)
assert.IsType(t, &tokenAuthTransport{}, rt)
if tc.tlsEnabled {
assert.NotNil(t, rt.(*http.Transport).TLSClientConfig)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the common transport into the bearertoken package meant that we can't inspect the wrapped http.Transport.
The original test didn't feel quite right in the first place when it had to make type assertions.

Suggestions also welcome.

assert.NotNil(t, rt.(*tokenAuthTransport).wrapped.TLSClientConfig)
} else {
assert.Nil(t, rt.(*http.Transport).TLSClientConfig)
assert.Nil(t, rt.(*tokenAuthTransport).wrapped.TLSClientConfig)
}

server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "Bearer foo", r.Header.Get("Authorization"))
},
),
)
defer server.Close()

req, err := http.NewRequestWithContext(
bearertoken.ContextWithBearerToken(context.Background(), "foo"),
http.MethodGet,
server.URL,
nil,
)
require.NoError(t, err)

resp, err := rt.RoundTrip(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -328,7 +328,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.UseILM = v.GetBool(cfg.namespace + suffixUseILM)

// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
cfg.AllowTokenFromContext = v.GetBool(bearertoken.StoragePropagationKey)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v)

remoteReadClusters := stripWhiteSpace(v.GetString(cfg.namespace + suffixRemoteReadClusters))
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -67,13 +68,13 @@ func composeContextUpgradeFuncs(funcs ...ContextUpgradeFunc) ContextUpgradeFunc
// in the request metadata, if the original context has bearer token attached.
// Otherwise returns original context.
func upgradeContextWithBearerToken(ctx context.Context) context.Context {
bearerToken, hasToken := spanstore.GetBearerToken(ctx)
bearerToken, hasToken := bearertoken.GetBearerToken(ctx)
if hasToken {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set(spanstore.BearerTokenKey, bearerToken)
md.Set(bearertoken.Key, bearerToken)
return metadata.NewOutgoingContext(ctx, md)
}
return ctx
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
grpcMocks "github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -108,11 +109,11 @@ func withGRPCClient(fn func(r *grpcClientTest)) {

func TestContextUpgradeWithToken(t *testing.T) {
testBearerToken := "test-bearer-token"
ctx := spanstore.ContextWithBearerToken(context.Background(), testBearerToken)
ctx := bearertoken.ContextWithBearerToken(context.Background(), testBearerToken)
upgradedToken := upgradeContextWithBearerToken(ctx)
md, ok := metadata.FromOutgoingContext(upgradedToken)
assert.Truef(t, ok, "Expected metadata in context")
bearerTokenFromMetadata := md.Get(spanstore.BearerTokenKey)
bearerTokenFromMetadata := md.Get(bearertoken.Key)
assert.Equal(t, []string{testBearerToken}, bearerTokenFromMetadata)
}

Expand Down
Loading