Skip to content

Commit

Permalink
Revert "Make gRPC requests go through tendermint Query (cosmos#8549)"
Browse files Browse the repository at this point in the history
This reverts commit e306e5b.
  • Loading branch information
tac0turtle authored and michaelfig committed Sep 7, 2021
1 parent 694e94d commit 277f928
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 140 deletions.
34 changes: 2 additions & 32 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package baseapp

import (
"fmt"
"reflect"

"github.com/cosmos/cosmos-sdk/client/grpc/reflection"

Expand All @@ -14,19 +13,13 @@ import (

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

var protoCodec = encoding.GetCodec(proto.Name)

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
// returnTypes is a map of FQ method name => its return type. It is used
// for cache purposes: the first time a method handler is run, we save its
// return type in this map. Then, on subsequent method handler calls, we
// decode the ABCI response bytes using the cached return type.
returnTypes map[string]reflect.Type
routes map[string]GRPCQueryHandler
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}
Expand All @@ -42,8 +35,7 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
returnTypes: map[string]reflect.Type{},
routes: map[string]GRPCQueryHandler{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -98,17 +90,8 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
if qrt.interfaceRegistry != nil {
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
}

return nil
}, nil)

// If it's the first time we call this handler, then we save
// the return type of the handler in the `returnTypes` map.
// The return type will be used for decoding subsequent requests.
if _, found := qrt.returnTypes[fqName]; !found {
qrt.returnTypes[fqName] = reflect.TypeOf(res)
}

if err != nil {
return abci.ResponseQuery{}, err
}
Expand Down Expand Up @@ -144,16 +127,3 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
reflection.NewReflectionServiceServer(interfaceRegistry),
)
}

// returnTypeOf returns the return type of a gRPC method handler. With the way the
// `returnTypes` cache map is set up, the return type of a method handler is
// guaranteed to be found if it's retrieved **after** the method handler ran at
// least once. If not, then a logic error is return.
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
returnType, found := qrt.returnTypes[method]
if !found {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
}

return returnType, nil
}
83 changes: 36 additions & 47 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,67 @@ package baseapp

import (
"context"
"reflect"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/tx"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will route
// the query through the `clientCtx`, which itself queries Tendermint.
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}

return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}

// Case 2. Querying state.
inMd, _ := metadata.FromIncomingContext(grpcCtx)
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
if err != nil {
return nil, err
// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
}
}

// We need to know the return type of the grpc method for
// unmarshalling abciRes.Value.
//
// When we call each method handler for the first time, we save its
// return type in the `returnTypes` map (see the method handler in
// `grpcrouter.go`). By this time, the method handler has already run
// at least once (in the RunGRPCQuery call), so we're sure the
// returnType maps is populated for this method. We're retrieving it
// for decoding.
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
if err != nil {
return nil, err
}

// returnType is a pointer to a struct. Here, we're creating res which
// is a new pointer to the underlying struct.
res := reflect.New(returnType.Elem()).Interface()
// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

err = protoCodec.Unmarshal(abciRes.Value, res)
if err != nil {
return nil, err
}

// Send the metadata header back. The metadata currently includes:
// - block height.
err = grpc.SendHeader(grpcCtx, outMd)
if err != nil {
return nil, err
// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return res, nil
return handler(grpcCtx, req)
}

// Loop through all services and methods, add the interceptor, and register
Expand Down
104 changes: 48 additions & 56 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,109 +24,101 @@ var _ gogogrpc.ClientConn = Context{}
var protoCodec = encoding.GetCodec(proto.Name)

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// In both cases, we don't allow empty request req (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
}
resProto, ok := reply.(*tx.BroadcastTxResponse)
res, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
if err != nil {
return err
}
*resProto = *broadcastRes
*res = *broadcastRes

return err
}

// Case 2. Querying state.
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
reqBz, err := protoCodec.Marshal(args)
if err != nil {
return err
}

err = protoCodec.Unmarshal(abciRes.Value, reply)
if err != nil {
return err
}

for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = outMd
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
// arguments for the gRPC method, and returns the ABCI response. It is used
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
// gRPC handlers.
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
reqBz, err := protoCodec.Marshal(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return abci.ResponseQuery{}, nil, err
return err
}
if height < 0 {
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
req := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
}

abciRes, err := ctx.QueryABCI(abciReq)
res, err := ctx.QueryABCI(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
return err
}

err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
}

// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = md
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

return abciRes, md, nil
func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
}
2 changes: 1 addition & 1 deletion server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(clientCtx, grpcSrv)
app.RegisterGRPCServer(grpcSrv)
// reflection allows consumers to build dynamic clients that can write
// to any cosmos-sdk application without relying on application packages at compile time
err := reflection.Register(grpcSrv, reflection.Config{
Expand Down
8 changes: 5 additions & 3 deletions server/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
)
s.Require().NoError(err)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height.
s.Require().Equal([]string{"1"}, blockHeight)
}

func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
Expand Down Expand Up @@ -163,6 +163,8 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
Events: []string{"message.action='send'"},
},
)
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
// should not error anymore.
s.Require().NoError(err)
}

Expand Down Expand Up @@ -199,9 +201,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() {
value string
wantErr string
}{
{"-1", "\"x-cosmos-block-height\" must be >= 0"},
{"-1", "height < 0"},
{"9223372036854775808", "value out of range"}, // > max(int64) by 1
{"-10", "\"x-cosmos-block-height\" must be >= 0"},
{"-10", "height < 0"},
{"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64)
{"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64
}
Expand Down
2 changes: 1 addition & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type (

// RegisterGRPCServer registers gRPC services directly with the gRPC
// server.
RegisterGRPCServer(client.Context, grpc.Server)
RegisterGRPCServer(grpc.Server)

// RegisterTxService registers the gRPC Query service for tx (such as tx
// simulation, fetching txs by hash...).
Expand Down

0 comments on commit 277f928

Please sign in to comment.