Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add support for HTTP filters (gRFC A39) #4206

Merged
merged 7 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/resolver/config_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ type RPCConfig struct {
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}

// ClientStream will ultimately be a superset of grpc.ClientStream as
// operations become necessary to support.
type ClientStream interface {
// Done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations.
Done()
}

// NOPClientStream is a ClientStream that does nothing
type NOPClientStream struct{}

// Done is a nop.
func (NOPClientStream) Done() {}

var _ ClientStream = NOPClientStream{}

// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream can intercept ClientStream calls. The provided ClientStream
// should not be used during NewStream. RPCInfo.Context should not be used
// (will be nil).
NewStream(context.Context, RPCInfo, ClientStream) (context.Context, ClientStream, error)
}

// ServerInterceptor is unimplementable; do not use.
type ServerInterceptor interface {
notDefined()
}

type csKeyType string
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype,
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
Expand Down Expand Up @@ -832,6 +833,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
if s.doneFunc != nil {
s.doneFunc()
}
}

// Close kicks off the shutdown process of the transport. This should be called
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
Expand Down Expand Up @@ -611,6 +612,8 @@ type CallHdr struct {
ContentSubtype string

PreviousAttempts int // value of grpc-previous-rpc-attempts header to set

DoneFunc func() // called when the stream is finished
}

// ClientTransport is the common interface for all gRPC client-side transport
Expand Down
14 changes: 13 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,27 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth

var mc serviceconfig.MethodConfig
var onCommit func()
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, status.Convert(err).Err()
}
var doneFunc func()
if rpcConfig != nil {
if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
mc = rpcConfig.MethodConfig
onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
newCtx, cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, iresolver.NOPClientStream{})
if err != nil {
return nil, status.Convert(err).Err()
}
ctx = newCtx
doneFunc = cs.Done
}
}

if mc.WaitForReady != nil {
Expand Down Expand Up @@ -223,6 +234,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}

// Set our outgoing compression according to the UseCompressor CallOption, if
Expand Down
42 changes: 39 additions & 3 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*
*/

// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
// Package client implements a full fledged gRPC client for the xDS API used by
// the xds resolver and balancer implementations.
package client

import (
Expand All @@ -33,6 +33,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/httpfilter"

"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -199,11 +200,27 @@ type ListenerUpdate struct {
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
MaxStreamDuration time.Duration
// HTTPFilters is a list of HTTP filters (name, config) from the LDS
// response.
HTTPFilters []HTTPFilter

// Raw is the resource from the xds response.
Raw *anypb.Any
}

// HTTPFilter represents one HTTP filter from an LDS response's HTTP connection
// manager field.
type HTTPFilter struct {
// Name is an arbitrary name of the filter. Used for applying override
// settings in virtual host / route / weighted cluster configuration (not
// yet supported).
Name string
// Filter is the HTTP filter found in the registry for the config type.
Filter httpfilter.Filter
// Config contains the filter's configuration
Config httpfilter.FilterConfig
}

func (lu *ListenerUpdate) String() string {
return fmt.Sprintf("{RouteConfigName: %q, SecurityConfig: %+v", lu.RouteConfigName, lu.SecurityCfg)
}
Expand All @@ -226,6 +243,11 @@ type VirtualHost struct {
// Routes contains a list of routes, each containing matchers and
// corresponding action.
Routes []*Route
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the virtual host which may be present. An individual filter's override
// may be unused if the matching Route contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// Route is both a specification of how to match a request as well as an
Expand All @@ -239,13 +261,27 @@ type Route struct {
Fraction *uint32

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
// max_stream_duration fields (grpc_timeout_header_max nor
// max_stream_duration) were set. In this case, the ListenerUpdate's
// MaxStreamDuration field should be used. If MaxStreamDuration is set to
// an explicit zero duration, the application's deadline should be used.
MaxStreamDuration *time.Duration
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the route which may be present. An individual filter's override may be
// unused if the matching WeightedCluster contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// WeightedCluster contains settings for an xds RouteAction.WeightedCluster.
type WeightedCluster struct {
// Weight is the relative weight of the cluster. It will never be zero.
Weight uint32
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the weighted cluster which may be present.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HeaderMatcher represents header matchers.
Expand Down
Loading