Skip to content

Commit

Permalink
Add websocket support to full node streaming (#1908)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 committed Aug 14, 2024
1 parent 580b29c commit 2bf2d49
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/protocol-build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,4 @@ jobs:
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-staging/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags
41 changes: 34 additions & 7 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ import (
// Full Node Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
"github.com/dydxprotocol/v4-chain/protocol/streaming/ws"
)

var (
Expand Down Expand Up @@ -344,7 +345,9 @@ type App struct {

IndexerEventManager indexer_manager.IndexerEventManager
FullNodeStreamingManager streamingtypes.FullNodeStreamingManager
Server *daemonserver.Server
WebsocketStreamingServer *ws.WebsocketServer

Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -489,6 +492,9 @@ func New(
if app.FullNodeStreamingManager != nil {
app.FullNodeStreamingManager.Stop()
}
if app.WebsocketStreamingServer != nil {
app.WebsocketStreamingServer.Shutdown()
}
return nil
},
)
Expand Down Expand Up @@ -749,7 +755,11 @@ func New(
indexerFlags.SendOffchainData,
)

app.FullNodeStreamingManager = getFullNodeStreamingManagerFromOptions(appFlags, logger)
app.FullNodeStreamingManager, app.WebsocketStreamingServer = getFullNodeStreamingManagerFromOptions(
appFlags,
appCodec,
logger,
)

timeProvider := &timelib.TimeProviderImpl{}

Expand Down Expand Up @@ -2021,20 +2031,37 @@ func getIndexerFromOptions(
// from the specified options. This function will default to returning a no-op instance.
func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
) (manager streamingtypes.FullNodeStreamingManager) {
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled", log.ModuleKey, "full-node-streaming")
logger.Info("Full node streaming is enabled")
if appFlags.FullNodeStreamingSnapshotInterval > 0 {
logger.Info("Interval snapshots enabled", log.ModuleKey, "full-node-streaming")
logger.Info("Interval snapshots enabled")
}
return streaming.NewFullNodeStreamingManager(
manager := streaming.NewFullNodeStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
)

// Start websocket server.
if appFlags.WebsocketStreamingEnabled {
port := appFlags.WebsocketStreamingPort
logger.Info("Websocket full node streaming is enabled")
wsServer = ws.NewWebsocketServer(
manager,
cdc,
logger,
port,
)
wsServer.Start()
}

return manager, wsServer
}
return streaming.NewNoopGrpcStreamingManager()
return streaming.NewNoopGrpcStreamingManager(), wsServer
}
1 change: 1 addition & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestAppIsFullyInitialized(t *testing.T) {
"BridgeClient",
"SlinkyClient",
"oraclePrometheusServer",
"WebsocketStreamingServer",

// Any default constructed type can be considered initialized if the default is what is
// expected. getUninitializedStructFields relies on fields being the non-default and
Expand Down
45 changes: 41 additions & 4 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ type Flags struct {
GrpcAddress string
GrpcEnable bool

// Grpc Streaming
// Full Node Streaming
GrpcStreamingEnabled bool
GrpcStreamingFlushIntervalMs uint32
GrpcStreamingMaxBatchSize uint32
GrpcStreamingMaxChannelBufferSize uint32
WebsocketStreamingEnabled bool
WebsocketStreamingPort uint16
FullNodeStreamingSnapshotInterval uint32

VEOracleEnabled bool // Slinky Vote Extensions
Expand All @@ -48,6 +50,8 @@ const (
GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms"
GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size"
GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size"
WebsocketStreamingEnabled = "websocket-streaming-enabled"
WebsocketStreamingPort = "websocket-streaming-port"
FullNodeStreamingSnapshotInterval = "fns-snapshot-interval"

// Slinky VEs enabled
Expand All @@ -68,6 +72,8 @@ const (
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000
DefaultWebsocketStreamingEnabled = false
DefaultWebsocketStreamingPort = 9091
DefaultFullNodeStreamingSnapshotInterval = 0

DefaultVEOracleEnabled = true
Expand Down Expand Up @@ -126,6 +132,16 @@ func AddFlagsToCmd(cmd *cobra.Command) {
"If set to positive number, number of blocks between each periodic snapshot will be sent out. "+
"Defaults to zero for regular behavior of one initial snapshot.",
)
cmd.Flags().Bool(
WebsocketStreamingEnabled,
DefaultWebsocketStreamingEnabled,
"Whether to enable websocket full node streaming for full nodes",
)
cmd.Flags().Uint16(
WebsocketStreamingPort,
DefaultWebsocketStreamingPort,
"Port for websocket full node streaming connections",
)
cmd.Flags().Bool(
VEOracleEnabled,
DefaultVEOracleEnabled,
Expand Down Expand Up @@ -155,15 +171,22 @@ func (f *Flags) Validate() error {
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
}
if f.GrpcStreamingMaxBatchSize == 0 {
return fmt.Errorf("grpc streaming batch size must be positive number")
return fmt.Errorf("full node streaming batch size must be positive number")
}
if f.GrpcStreamingFlushIntervalMs == 0 {
return fmt.Errorf("grpc streaming flush interval must be positive number")
return fmt.Errorf("full node streaming flush interval must be positive number")
}
if f.GrpcStreamingMaxChannelBufferSize == 0 {
return fmt.Errorf("grpc streaming channel size must be positive number")
return fmt.Errorf("full node streaming channel size must be positive number")
}
}

if f.WebsocketStreamingEnabled {
if !f.GrpcStreamingEnabled {
return fmt.Errorf("websocket full node streaming requires grpc streaming to be enabled")
}
}

if f.FullNodeStreamingSnapshotInterval > 0 && f.FullNodeStreamingSnapshotInterval < 50 {
return fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero")
}
Expand Down Expand Up @@ -191,6 +214,8 @@ func GetFlagValuesFromOptions(
GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs,
GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize,
GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize,
WebsocketStreamingEnabled: DefaultWebsocketStreamingEnabled,
WebsocketStreamingPort: DefaultWebsocketStreamingPort,
FullNodeStreamingSnapshotInterval: DefaultFullNodeStreamingSnapshotInterval,

VEOracleEnabled: true,
Expand Down Expand Up @@ -258,6 +283,18 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(WebsocketStreamingEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.WebsocketStreamingEnabled = v
}
}

if option := appOpts.Get(WebsocketStreamingPort); option != nil {
if v, err := cast.ToUint16E(option); err == nil {
result.WebsocketStreamingPort = v
}
}

if option := appOpts.Get(FullNodeStreamingSnapshotInterval); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.FullNodeStreamingSnapshotInterval = v
Expand Down
83 changes: 77 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/cosmos/cosmos-sdk/server/config"

"github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -44,6 +43,12 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): {
flagName: flags.GrpcStreamingMaxChannelBufferSize,
},
fmt.Sprintf("Has %s flag", flags.WebsocketStreamingEnabled): {
flagName: flags.WebsocketStreamingEnabled,
},
fmt.Sprintf("Has %s flag", flags.WebsocketStreamingPort): {
flagName: flags.WebsocketStreamingPort,
},
fmt.Sprintf("Has %s flag", flags.OptimisticExecutionEnabled): {
flagName: flags.OptimisticExecutionEnabled,
},
Expand Down Expand Up @@ -86,6 +91,19 @@ func TestValidate(t *testing.T) {
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
WebsocketStreamingEnabled: false,
},
},
"success - both grpc and websocket streaming enabled for validating nodes": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
WebsocketStreamingEnabled: true,
WebsocketStreamingPort: 8989,
},
},
"success - optimistic execution": {
Expand Down Expand Up @@ -118,6 +136,30 @@ func TestValidate(t *testing.T) {
},
expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"),
},
"failure - websocket streaming enabled with gRPC streaming disabled": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: false,
WebsocketStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
},
expectedErr: fmt.Errorf("websocket full node streaming requires grpc streaming to be enabled"),
},
"success - websocket streaming enabled with gRPC enabled for validating node": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
WebsocketStreamingEnabled: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingPort: 8989,
},
},
"failure - gRPC streaming enabled with zero batch size": {
flags: flags.Flags{
NonValidatingFullNode: true,
Expand All @@ -126,7 +168,7 @@ func TestValidate(t *testing.T) {
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 0,
},
expectedErr: fmt.Errorf("grpc streaming batch size must be positive number"),
expectedErr: fmt.Errorf("full node streaming batch size must be positive number"),
},
"failure - gRPC streaming enabled with zero flush interval ms": {
flags: flags.Flags{
Expand All @@ -136,7 +178,7 @@ func TestValidate(t *testing.T) {
GrpcStreamingFlushIntervalMs: 0,
GrpcStreamingMaxBatchSize: 2000,
},
expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"),
expectedErr: fmt.Errorf("full node streaming flush interval must be positive number"),
},
"failure - gRPC streaming enabled with zero channel size ms": {
flags: flags.Flags{
Expand All @@ -147,7 +189,18 @@ func TestValidate(t *testing.T) {
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 0,
},
expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"),
expectedErr: fmt.Errorf("full node streaming channel size must be positive number"),
},
"failure - websocket streaming enabled with zero batch size": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 0,
WebsocketStreamingEnabled: true,
},
expectedErr: fmt.Errorf("full node streaming batch size must be positive number"),
},
"failure - full node streaming enabled with <= 49 snapshot interval": {
flags: flags.Flags{
Expand Down Expand Up @@ -200,6 +253,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs uint32
expectedGrpcStreamingBatchSize uint32
expectedGrpcStreamingMaxChannelBufferSize uint32
expectedWebsocketEnabled bool
expectedWebsocketPort uint16
expectedFullNodeStreamingSnapshotInterval uint32
expectedOptimisticExecutionEnabled bool
}{
Expand All @@ -213,6 +268,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedWebsocketEnabled: false,
expectedWebsocketPort: 9091,
expectedFullNodeStreamingSnapshotInterval: 0,
expectedOptimisticExecutionEnabled: false,
},
Expand All @@ -222,23 +279,27 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags.DdAgentHost: "agentHostTest",
flags.DdTraceAgentPort: uint16(777),
flags.GrpcEnable: false,
flags.GrpcAddress: "localhost:9091",
flags.GrpcAddress: "localhost:1234",
flags.GrpcStreamingEnabled: "true",
flags.GrpcStreamingFlushIntervalMs: uint32(408),
flags.GrpcStreamingMaxBatchSize: uint32(650),
flags.GrpcStreamingMaxChannelBufferSize: uint32(972),
flags.WebsocketStreamingEnabled: "true",
flags.WebsocketStreamingPort: 8989,
flags.FullNodeStreamingSnapshotInterval: uint32(123),
flags.OptimisticExecutionEnabled: "true",
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
expectedDdTraceAgentPort: 777,
expectedGrpcEnable: false,
expectedGrpcAddress: "localhost:9091",
expectedGrpcAddress: "localhost:1234",
expectedGrpcStreamingEnable: true,
expectedGrpcStreamingFlushMs: 408,
expectedGrpcStreamingBatchSize: 650,
expectedGrpcStreamingMaxChannelBufferSize: 972,
expectedWebsocketEnabled: true,
expectedWebsocketPort: 8989,
expectedFullNodeStreamingSnapshotInterval: 123,
expectedOptimisticExecutionEnabled: true,
},
Expand Down Expand Up @@ -303,6 +364,16 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcStreamingMaxChannelBufferSize,
flags.GrpcStreamingMaxChannelBufferSize,
)
require.Equal(
t,
tc.expectedWebsocketEnabled,
flags.WebsocketStreamingEnabled,
)
require.Equal(
t,
tc.expectedWebsocketPort,
flags.WebsocketStreamingPort,
)
})
}
}
3 changes: 3 additions & 0 deletions protocol/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ services:
- "true"
- --max-daemon-unhealthy-seconds
- "4294967295" # Effectively disable the daemon monitor because bridge daemon is flaky in localnet.
- --grpc-streaming-enabled
- "true"
environment:
# See https://docs.datadoghq.com/profiler/enabling/go/ for DD_ specific environment variables
- DD_ENV=localnet_${USER}
Expand All @@ -31,6 +33,7 @@ services:
ports:
- "26657:26657"
- "9090:9090"
- "9091:9091"
- "1317:1317"

dydxprotocold1:
Expand Down
Loading

0 comments on commit 2bf2d49

Please sign in to comment.