From 976ba07eb72327a072e3c023f8657d32cafabd7c Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Fri, 13 Sep 2024 11:56:17 -0400 Subject: [PATCH] Stage FinalizeBlock events and emit in Precommit --- .../src/codegen/dydxprotocol/clob/query.ts | 67 +++ proto/dydxprotocol/clob/query.proto | 9 + protocol/app/app.go | 4 + protocol/lib/metrics/constants.go | 1 + protocol/lib/metrics/metric_keys.go | 3 + protocol/streaming/constants.go | 12 + .../streaming/full_node_streaming_manager.go | 170 ++++++ protocol/streaming/noop_streaming_manager.go | 25 + protocol/streaming/types/interface.go | 16 + protocol/x/clob/abci.go | 11 + .../clob/keeper/grpc_stream_finalize_block.go | 49 ++ protocol/x/clob/keeper/process_operations.go | 34 +- protocol/x/clob/types/query.pb.go | 559 ++++++++++++++---- protocol/x/subaccounts/keeper/subaccount.go | 6 +- 14 files changed, 816 insertions(+), 150 deletions(-) create mode 100644 protocol/streaming/constants.go create mode 100644 protocol/x/clob/keeper/grpc_stream_finalize_block.go diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts index 41ca0c872a..31cae2a8a3 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts @@ -319,6 +319,18 @@ export interface StreamUpdateSDKType { taker_order?: StreamTakerOrderSDKType; subaccount_update?: StreamSubaccountUpdateSDKType; } +/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ + +export interface StagedFinalizeBlockEvent { + orderFill?: StreamOrderbookFill; + subaccountUpdate?: StreamSubaccountUpdate; +} +/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */ + +export interface StagedFinalizeBlockEventSDKType { + order_fill?: StreamOrderbookFillSDKType; + subaccount_update?: StreamSubaccountUpdateSDKType; +} /** * StreamOrderbookUpdate provides information on an orderbook update. Used in * the full node GRPC stream. @@ -1396,6 +1408,61 @@ export const StreamUpdate = { }; +function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent { + return { + orderFill: undefined, + subaccountUpdate: undefined + }; +} + +export const StagedFinalizeBlockEvent = { + encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.orderFill !== undefined) { + StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim(); + } + + if (message.subaccountUpdate !== undefined) { + StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim(); + } + + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseStagedFinalizeBlockEvent(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32()); + break; + + case 2: + message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32()); + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial): StagedFinalizeBlockEvent { + const message = createBaseStagedFinalizeBlockEvent(); + message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined; + message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined; + return message; + } + +}; + function createBaseStreamOrderbookUpdate(): StreamOrderbookUpdate { return { snapshot: false, diff --git a/proto/dydxprotocol/clob/query.proto b/proto/dydxprotocol/clob/query.proto index 9d47ea4641..fb4757e875 100644 --- a/proto/dydxprotocol/clob/query.proto +++ b/proto/dydxprotocol/clob/query.proto @@ -198,6 +198,15 @@ message StreamUpdate { } } +// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. +message StagedFinalizeBlockEvent { + // Contains one of StreamOrderbookFill, StreamSubaccountUpdate. + oneof event { + StreamOrderbookFill order_fill = 1; + dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2; + } +} + // StreamOrderbookUpdate provides information on an orderbook update. Used in // the full node GRPC stream. message StreamOrderbookUpdate { diff --git a/protocol/app/app.go b/protocol/app/app.go index fbb8db6bd0..a8f2cec91e 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -469,6 +469,7 @@ func New( statsmoduletypes.TransientStoreKey, rewardsmoduletypes.TransientStoreKey, indexer_manager.TransientStoreKey, + streaming.StreamingManagerTransientStoreKey, perpetualsmoduletypes.TransientStoreKey, ) memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey) @@ -764,6 +765,7 @@ func New( appFlags, appCodec, logger, + tkeys[streaming.StreamingManagerTransientStoreKey], ) timeProvider := &timelib.TimeProviderImpl{} @@ -2059,6 +2061,7 @@ func getFullNodeStreamingManagerFromOptions( appFlags flags.Flags, cdc codec.Codec, logger log.Logger, + streamingManagerTransientStoreKey storetypes.StoreKey, ) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) { logger = logger.With(log.ModuleKey, "full-node-streaming") if appFlags.GrpcStreamingEnabled { @@ -2072,6 +2075,7 @@ func getFullNodeStreamingManagerFromOptions( appFlags.GrpcStreamingMaxBatchSize, appFlags.GrpcStreamingMaxChannelBufferSize, appFlags.FullNodeStreamingSnapshotInterval, + streamingManagerTransientStoreKey, ) // Start websocket server. diff --git a/protocol/lib/metrics/constants.go b/protocol/lib/metrics/constants.go index 0ac766bf5b..a0f851eb58 100644 --- a/protocol/lib/metrics/constants.go +++ b/protocol/lib/metrics/constants.go @@ -202,6 +202,7 @@ const ( UpdateType = "update_type" ValidateMatches = "validate_matches" ValidateOrder = "validate_order" + StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block" // MemCLOB. AddedToOrderBook = "added_to_orderbook" diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index f980275d7f..2a44834332 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -83,6 +83,9 @@ const ( GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" + GrpcStagedAllFinalizeBlockUpdates = "grpc_staged_all_finalize_block_updates" + GrpcStagedFillFinalizeBlockUpdates = "grpc_staged_finalize_block_fill_updates" + GrpcStagedSubaccountFinalizeBlockUpdates = "grpc_staged_finalize_block_subaccount_updates" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/constants.go b/protocol/streaming/constants.go new file mode 100644 index 0000000000..c3c3c255a7 --- /dev/null +++ b/protocol/streaming/constants.go @@ -0,0 +1,12 @@ +package streaming + +const ( + // Transient store key for storing staged events. + StreamingManagerTransientStoreKey = "tmp_streaming" + + // Key for storing the count of staged events. + StagedEventsCountKey = "EvtCnt" + + // Key prefix for staged events. + StagedEventsKeyPrefix = "Evt:" +) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index a6fe37c512..f357908f28 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -1,15 +1,21 @@ package streaming import ( + "encoding/binary" "fmt" "sync" "sync/atomic" "time" + "github.com/dydxprotocol/v4-chain/protocol/lib" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "cosmossdk.io/log" + "cosmossdk.io/store/prefix" + storetypes "cosmossdk.io/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" sdk "github.com/cosmos/cosmos-sdk/types" + ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/streaming/types" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" @@ -50,6 +56,9 @@ type FullNodeStreamingManagerImpl struct { // Block interval in which snapshot info should be sent out in. // Defaults to 0, which means only one snapshot will be sent out. snapshotBlockInterval uint32 + + // stores the staged FinalizeBlock events for full node streaming. + streamingManagerTransientStoreKey storetypes.StoreKey } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -86,6 +95,7 @@ func NewFullNodeStreamingManager( maxUpdatesInCache uint32, maxSubscriptionChannelSize uint32, snapshotBlockInterval uint32, + streamingManagerTransientStoreKey storetypes.StoreKey, ) *FullNodeStreamingManagerImpl { fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ logger: logger, @@ -102,6 +112,8 @@ func NewFullNodeStreamingManager( maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, snapshotBlockInterval: snapshotBlockInterval, + + streamingManagerTransientStoreKey: streamingManagerTransientStoreKey, } // Start the goroutine for pushing order updates through. @@ -367,6 +379,84 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( } } +func getStagedEventsCount(store storetypes.KVStore) uint32 { + countsBytes := store.Get([]byte(StagedEventsCountKey)) + if countsBytes == nil { + return 0 + } + return binary.BigEndian.Uint32(countsBytes) +} + +// Stage a subaccount update event in transient store, during `FinalizeBlock`. +func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, +) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + clobtypes.Amino.MustMarshal(stagedEvent), + ) +} + +// Stage a fill event in transient store, during `FinalizeBlock`. +func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, +) { + stagedEvent := clobtypes.StagedFinalizeBlockEvent{ + Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{ + OrderFill: &fill, + }, + } + sm.stageFinalizeBlockEvent( + ctx, + clobtypes.Amino.MustMarshal(stagedEvent), + ) +} + +func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent { + count := getStagedEventsCount(store) + events := make([]clobtypes.StagedFinalizeBlockEvent, count) + store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) + for i := uint32(0); i < count; i++ { + var event clobtypes.StagedFinalizeBlockEvent + bytes := store.Get(lib.Uint32ToKey(i)) + clobtypes.Amino.MustUnmarshal(bytes, &event) + events[i] = event + } + return events +} + +// Retrieve all events staged during `FinalizeBlock`. +func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( + ctx sdk.Context, +) []clobtypes.StagedFinalizeBlockEvent { + noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) + return getStagedFinalizeBlockEvents(store) +} + +func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent( + ctx sdk.Context, + eventBytes []byte, +) { + noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter()) + store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey) + + // Increment events count. + count := getStagedEventsCount(store) + store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1)) + + // Store events keyed by index. + store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix)) + store.Set(lib.Uint32ToKey(count), eventBytes) +} + // SendCombinedSnapshot sends messages to a particular subscriber without buffering. // Note this method requires the lock and assumes that the lock has already been // acquired by the caller. @@ -703,6 +793,86 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } +// Grpc Streaming logic after consensus agrees on a block. +// - Stream all events staged during `FinalizeBlock`. +// - Stream orderbook updates to sync fills in local ops queue. +func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) { + // Flush all pending updates, since we want the onchain updates to arrive in a batch. + sm.FlushStreamUpdates() + + finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) + + // TODO(CT-1190): Stream below in a single batch. + // Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock. + sm.SendOrderbookUpdates( + orderBookUpdatesToSyncLocalOpsQueue, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) + + // Send finalized fills from FinalizeBlock. + sm.SendOrderbookFillUpdates( + finalizedFills, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + perpetualIdToClobPairId, + ) + + // Send finalized subaccount updates from FinalizeBlock. + sm.SendFinalizedSubaccountUpdates( + finalizedSubaccountUpdates, + uint32(ctx.BlockHeight()), + ctx.ExecMode(), + ) +} + +// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`. +// It should be called after the consensus agrees on a block (e.g. Precommitter). +func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock( + ctx sdk.Context, +) ( + finalizedFills []clobtypes.StreamOrderbookFill, + finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate, +) { + // Get onchain stream events stored in transient store. + stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx) + + telemetry.SetGauge( + float32(len(stagedEvents)), + types.ModuleName, + metrics.GrpcStagedAllFinalizeBlockUpdates, + metrics.Count, + ) + + for _, stagedEvent := range stagedEvents { + switch event := stagedEvent.Event.(type) { + case *clobtypes.StagedFinalizeBlockEvent_OrderFill: + finalizedFills = append(finalizedFills, *event.OrderFill) + case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate: + finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate) + } + } + + telemetry.SetGauge( + float32(len(finalizedSubaccountUpdates)), + types.ModuleName, + metrics.GrpcStagedSubaccountFinalizeBlockUpdates, + metrics.Count, + ) + telemetry.SetGauge( + float32(len(finalizedFills)), + types.ModuleName, + metrics.GrpcStagedFillFinalizeBlockUpdates, + metrics.Count, + ) + + return finalizedFills, finalizedSubaccountUpdates +} + func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 6e9c00895d..4df60bc427 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -78,3 +78,28 @@ func (sm *NoopGrpcStreamingManager) InitializeNewStreams( func (sm *NoopGrpcStreamingManager) Stop() { } + +func (sm *NoopGrpcStreamingManager) StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, +) { +} + +func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( + ctx sdk.Context, +) []clobtypes.StagedFinalizeBlockEvent { + return nil +} + +func (sm *NoopGrpcStreamingManager) StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, +) { +} + +func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) { +} diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index e3dff9d94b..0f097d3e75 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -50,7 +50,23 @@ type FullNodeStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) + StageFinalizeBlockFill( + ctx sdk.Context, + fill clobtypes.StreamOrderbookFill, + ) + StageFinalizeBlockSubaccountUpdate( + ctx sdk.Context, + subaccountUpdate satypes.StreamSubaccountUpdate, + ) + GetStagedFinalizeBlockEvents( + ctx sdk.Context, + ) []clobtypes.StagedFinalizeBlockEvent TracksSubaccountId(id satypes.SubaccountId) bool + StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, + orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, + ) } type OutgoingMessageSender interface { diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 96bacb2cc5..9fd5f4a0b5 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -46,6 +46,17 @@ func BeginBlocker( keeper.ResetAllDeliveredOrderIds(ctx) } +// Precommit executes all ABCI Precommit logic respective to the clob module. +func Precommit( + ctx sdk.Context, + keeper keeper.Keeper, +) { + if streamingManager := keeper.GetFullNodeStreamingManager(); !streamingManager.Enabled() { + return + } + keeper.StreamBatchUpdatesAfterFinalizeBlock(ctx) +} + // EndBlocker executes all ABCI EndBlock logic respective to the clob module. func EndBlocker( ctx sdk.Context, diff --git a/protocol/x/clob/keeper/grpc_stream_finalize_block.go b/protocol/x/clob/keeper/grpc_stream_finalize_block.go new file mode 100644 index 0000000000..6216eba461 --- /dev/null +++ b/protocol/x/clob/keeper/grpc_stream_finalize_block.go @@ -0,0 +1,49 @@ +package keeper + +import ( + "time" + + "github.com/cosmos/cosmos-sdk/telemetry" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +// Returns the order updates needed to update the fill amount for the orders +// from local ops queue, according to the latest onchain state (after FinalizeBlock). +func (k Keeper) getUpdatesToSyncLocalOpsQueue( + ctx sdk.Context, +) *types.OffchainUpdates { + localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) + fetchOrdersInvolvedInOpQueue(localValidatorOperationsQueue) + orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( + localValidatorOperationsQueue, + ) + allUpdates := types.NewOffchainUpdates() + for orderId := range orderIdsFromLocal { + orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + return allUpdates +} + +// Grpc Streaming logic after consensus agrees on a block. +// - Stream all events staged during `FinalizeBlock`. +// - Stream orderbook updates to sync fills in local ops queue. +func (k Keeper) StreamBatchUpdatesAfterFinalizeBlock( + ctx sdk.Context, +) { + defer telemetry.MeasureSince( + time.Now(), + types.ModuleName, + metrics.StreamBatchUpdatesAfterFinalizeBlock, + metrics.Latency, + ) + orderBookUpdatesToSyncLocalOpsQueue := k.getUpdatesToSyncLocalOpsQueue(ctx) + + k.GetFullNodeStreamingManager().StreamBatchUpdatesAfterFinalizeBlock( + ctx, + orderBookUpdatesToSyncLocalOpsQueue, + k.PerpetualIdToClobPairId, + ) +} diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index d202c2a270..cddb5fb68b 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -57,26 +57,6 @@ func (k Keeper) ProcessProposerOperations( return errorsmod.Wrapf(types.ErrInvalidMsgProposedOperations, "Error: %+v", err) } - // If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream. - // This effetively reverts the optimitic orderbook updates during CheckTx. - if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { - localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) - orderIdsFromProposed := fetchOrdersInvolvedInOpQueue( - operations, - ) - orderIdsFromLocal := fetchOrdersInvolvedInOpQueue( - localValidatorOperationsQueue, - ) - orderIdSetToUpdate := lib.MergeMaps(orderIdsFromLocal, orderIdsFromProposed) - - allUpdates := types.NewOffchainUpdates() - for orderId := range orderIdSetToUpdate { - orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) - } - k.SendOrderbookUpdates(ctx, allUpdates) - } - log.DebugLog(ctx, "Processing operations queue", log.OperationsQueue, types.GetInternalOperationsQueueTextString(operations)) @@ -550,6 +530,7 @@ func (k Keeper) PersistMatchOrdersToState( // if GRPC streaming is on, emit a generated clob match to stream. if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { + // Note: GenerateStreamOrderbookFill doesn't rely on MemClob state. streamOrderbookFill := k.MemClob.GenerateStreamOrderbookFill( ctx, types.ClobMatch{ @@ -560,11 +541,10 @@ func (k Keeper) PersistMatchOrdersToState( &takerOrder, makerOrders, ) - k.SendOrderbookFillUpdates( + + k.GetFullNodeStreamingManager().StageFinalizeBlockFill( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } @@ -669,11 +649,9 @@ func (k Keeper) PersistMatchLiquidationToState( takerOrder, makerOrders, ) - k.SendOrderbookFillUpdates( + k.GetFullNodeStreamingManager().StageFinalizeBlockFill( ctx, - []types.StreamOrderbookFill{ - streamOrderbookFill, - }, + streamOrderbookFill, ) } return nil diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index 4f4d01af14..279280a0ae 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -1004,6 +1004,95 @@ func (*StreamUpdate) XXX_OneofWrappers() []interface{} { } } +// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. +type StagedFinalizeBlockEvent struct { + // Contains one of StreamOrderbookFill, StreamSubaccountUpdate. + // + // Types that are valid to be assigned to Event: + // + // *StagedFinalizeBlockEvent_OrderFill + // *StagedFinalizeBlockEvent_SubaccountUpdate + Event isStagedFinalizeBlockEvent_Event `protobuf_oneof:"event"` +} + +func (m *StagedFinalizeBlockEvent) Reset() { *m = StagedFinalizeBlockEvent{} } +func (m *StagedFinalizeBlockEvent) String() string { return proto.CompactTextString(m) } +func (*StagedFinalizeBlockEvent) ProtoMessage() {} +func (*StagedFinalizeBlockEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_3365c195b25c5bc0, []int{17} +} +func (m *StagedFinalizeBlockEvent) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StagedFinalizeBlockEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StagedFinalizeBlockEvent.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StagedFinalizeBlockEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_StagedFinalizeBlockEvent.Merge(m, src) +} +func (m *StagedFinalizeBlockEvent) XXX_Size() int { + return m.Size() +} +func (m *StagedFinalizeBlockEvent) XXX_DiscardUnknown() { + xxx_messageInfo_StagedFinalizeBlockEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_StagedFinalizeBlockEvent proto.InternalMessageInfo + +type isStagedFinalizeBlockEvent_Event interface { + isStagedFinalizeBlockEvent_Event() + MarshalTo([]byte) (int, error) + Size() int +} + +type StagedFinalizeBlockEvent_OrderFill struct { + OrderFill *StreamOrderbookFill `protobuf:"bytes,1,opt,name=order_fill,json=orderFill,proto3,oneof" json:"order_fill,omitempty"` +} +type StagedFinalizeBlockEvent_SubaccountUpdate struct { + SubaccountUpdate *types.StreamSubaccountUpdate `protobuf:"bytes,2,opt,name=subaccount_update,json=subaccountUpdate,proto3,oneof" json:"subaccount_update,omitempty"` +} + +func (*StagedFinalizeBlockEvent_OrderFill) isStagedFinalizeBlockEvent_Event() {} +func (*StagedFinalizeBlockEvent_SubaccountUpdate) isStagedFinalizeBlockEvent_Event() {} + +func (m *StagedFinalizeBlockEvent) GetEvent() isStagedFinalizeBlockEvent_Event { + if m != nil { + return m.Event + } + return nil +} + +func (m *StagedFinalizeBlockEvent) GetOrderFill() *StreamOrderbookFill { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_OrderFill); ok { + return x.OrderFill + } + return nil +} + +func (m *StagedFinalizeBlockEvent) GetSubaccountUpdate() *types.StreamSubaccountUpdate { + if x, ok := m.GetEvent().(*StagedFinalizeBlockEvent_SubaccountUpdate); ok { + return x.SubaccountUpdate + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*StagedFinalizeBlockEvent) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*StagedFinalizeBlockEvent_OrderFill)(nil), + (*StagedFinalizeBlockEvent_SubaccountUpdate)(nil), + } +} + // StreamOrderbookUpdate provides information on an orderbook update. Used in // the full node GRPC stream. type StreamOrderbookUpdate struct { @@ -1021,7 +1110,7 @@ func (m *StreamOrderbookUpdate) Reset() { *m = StreamOrderbookUpdate{} } func (m *StreamOrderbookUpdate) String() string { return proto.CompactTextString(m) } func (*StreamOrderbookUpdate) ProtoMessage() {} func (*StreamOrderbookUpdate) Descriptor() ([]byte, []int) { - return fileDescriptor_3365c195b25c5bc0, []int{17} + return fileDescriptor_3365c195b25c5bc0, []int{18} } func (m *StreamOrderbookUpdate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1081,7 +1170,7 @@ func (m *StreamOrderbookFill) Reset() { *m = StreamOrderbookFill{} } func (m *StreamOrderbookFill) String() string { return proto.CompactTextString(m) } func (*StreamOrderbookFill) ProtoMessage() {} func (*StreamOrderbookFill) Descriptor() ([]byte, []int) { - return fileDescriptor_3365c195b25c5bc0, []int{18} + return fileDescriptor_3365c195b25c5bc0, []int{19} } func (m *StreamOrderbookFill) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1152,7 +1241,7 @@ func (m *StreamTakerOrder) Reset() { *m = StreamTakerOrder{} } func (m *StreamTakerOrder) String() string { return proto.CompactTextString(m) } func (*StreamTakerOrder) ProtoMessage() {} func (*StreamTakerOrder) Descriptor() ([]byte, []int) { - return fileDescriptor_3365c195b25c5bc0, []int{19} + return fileDescriptor_3365c195b25c5bc0, []int{20} } func (m *StreamTakerOrder) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1255,7 +1344,7 @@ func (m *StreamTakerOrderStatus) Reset() { *m = StreamTakerOrderStatus{} func (m *StreamTakerOrderStatus) String() string { return proto.CompactTextString(m) } func (*StreamTakerOrderStatus) ProtoMessage() {} func (*StreamTakerOrderStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_3365c195b25c5bc0, []int{20} + return fileDescriptor_3365c195b25c5bc0, []int{21} } func (m *StreamTakerOrderStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1324,6 +1413,7 @@ func init() { proto.RegisterType((*StreamOrderbookUpdatesRequest)(nil), "dydxprotocol.clob.StreamOrderbookUpdatesRequest") proto.RegisterType((*StreamOrderbookUpdatesResponse)(nil), "dydxprotocol.clob.StreamOrderbookUpdatesResponse") proto.RegisterType((*StreamUpdate)(nil), "dydxprotocol.clob.StreamUpdate") + proto.RegisterType((*StagedFinalizeBlockEvent)(nil), "dydxprotocol.clob.StagedFinalizeBlockEvent") proto.RegisterType((*StreamOrderbookUpdate)(nil), "dydxprotocol.clob.StreamOrderbookUpdate") proto.RegisterType((*StreamOrderbookFill)(nil), "dydxprotocol.clob.StreamOrderbookFill") proto.RegisterType((*StreamTakerOrder)(nil), "dydxprotocol.clob.StreamTakerOrder") @@ -1333,111 +1423,114 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/query.proto", fileDescriptor_3365c195b25c5bc0) } var fileDescriptor_3365c195b25c5bc0 = []byte{ - // 1661 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x58, 0xc1, 0x4f, 0xdc, 0x46, - 0x17, 0x5f, 0xb3, 0x84, 0xc0, 0xdb, 0x40, 0x60, 0x08, 0xc9, 0x66, 0x21, 0x0b, 0x71, 0xbe, 0x90, - 0x85, 0x7c, 0x59, 0x03, 0x89, 0xa2, 0x7c, 0xe1, 0x53, 0x3e, 0x01, 0xdf, 0x47, 0x88, 0x14, 0xbe, - 0x10, 0x43, 0x12, 0xd4, 0x46, 0xb2, 0xbc, 0xf6, 0xb0, 0x58, 0xd8, 0x9e, 0xc5, 0x1e, 0xaf, 0x40, - 0x55, 0x55, 0xa9, 0x87, 0x5c, 0xda, 0x4a, 0x91, 0x7a, 0xe8, 0xa1, 0x52, 0x2f, 0x3d, 0xf5, 0x50, - 0xa9, 0x97, 0x1e, 0xab, 0xb6, 0xb7, 0x1c, 0x23, 0xf5, 0xd2, 0x43, 0x55, 0x55, 0x49, 0xcf, 0xfd, - 0x1b, 0x2a, 0xcf, 0x8c, 0x77, 0xbd, 0xbb, 0xf6, 0x42, 0xb8, 0x80, 0xfd, 0xe6, 0xbd, 0x37, 0xbf, - 0xf7, 0xde, 0x6f, 0xde, 0x3c, 0x2f, 0x5c, 0x32, 0x0f, 0xcd, 0x83, 0x9a, 0x47, 0x28, 0x31, 0x88, - 0xad, 0x18, 0x36, 0xa9, 0x28, 0xfb, 0x01, 0xf6, 0x0e, 0xcb, 0x4c, 0x86, 0x46, 0xe2, 0xcb, 0xe5, - 0x70, 0xb9, 0x70, 0xae, 0x4a, 0xaa, 0x84, 0x89, 0x94, 0xf0, 0x89, 0x2b, 0x16, 0x26, 0xaa, 0x84, - 0x54, 0x6d, 0xac, 0xe8, 0x35, 0x4b, 0xd1, 0x5d, 0x97, 0x50, 0x9d, 0x5a, 0xc4, 0xf5, 0xc5, 0xea, - 0xac, 0x41, 0x7c, 0x87, 0xf8, 0x4a, 0x45, 0xf7, 0x31, 0xf7, 0xaf, 0xd4, 0xe7, 0x2b, 0x98, 0xea, - 0xf3, 0x4a, 0x4d, 0xaf, 0x5a, 0x2e, 0x53, 0x16, 0xba, 0x4a, 0x27, 0xa2, 0x8a, 0x4d, 0x8c, 0x3d, - 0xcd, 0xd3, 0x29, 0xd6, 0x6c, 0xcb, 0xb1, 0xa8, 0x66, 0x10, 0x77, 0xc7, 0xaa, 0x0a, 0x83, 0xcb, - 0x9d, 0x06, 0xe1, 0x1f, 0xad, 0xa6, 0x5b, 0x9e, 0x50, 0x99, 0xeb, 0x54, 0xc1, 0xfb, 0x81, 0x45, - 0x0f, 0x35, 0x6a, 0x61, 0x2f, 0xc9, 0x69, 0x42, 0x5e, 0x88, 0x67, 0xe2, 0xc8, 0xe1, 0x64, 0xe7, - 0xb2, 0xa3, 0x53, 0x63, 0x17, 0x47, 0x11, 0x5f, 0xef, 0x54, 0xb0, 0xad, 0xfd, 0xc0, 0x32, 0x79, - 0x5e, 0x5a, 0x37, 0x1b, 0x4f, 0xf0, 0x86, 0xeb, 0x62, 0xf1, 0x5e, 0xcb, 0xa2, 0xe5, 0x9a, 0xf8, - 0x00, 0x7b, 0x0a, 0xd9, 0xd9, 0xd1, 0x8c, 0x5d, 0xdd, 0x72, 0xb5, 0xa0, 0x66, 0xea, 0x14, 0xfb, - 0x9d, 0x12, 0x61, 0x5f, 0x6a, 0xb1, 0xf7, 0x83, 0x8a, 0x6e, 0x18, 0x24, 0x70, 0xa9, 0xaf, 0xf8, - 0xd4, 0xc3, 0xba, 0x63, 0xb9, 0x11, 0x8c, 0x99, 0x74, 0xcd, 0xc6, 0x33, 0x57, 0x95, 0x67, 0xe0, - 0xc2, 0xe3, 0xb0, 0x8c, 0xf7, 0x31, 0x5d, 0xb1, 0x49, 0x65, 0x43, 0xb7, 0x3c, 0x15, 0xef, 0x07, - 0xd8, 0xa7, 0x68, 0x08, 0x7a, 0x2c, 0x33, 0x2f, 0x4d, 0x49, 0xa5, 0x41, 0xb5, 0xc7, 0x32, 0xe5, - 0x67, 0x30, 0xc6, 0x54, 0x9b, 0x7a, 0x7e, 0x8d, 0xb8, 0x3e, 0x46, 0xf7, 0x60, 0xa0, 0x51, 0x27, - 0xa6, 0x9f, 0x5b, 0x18, 0x2f, 0x77, 0xf0, 0xad, 0x1c, 0xd9, 0x2d, 0xf7, 0xbe, 0xfa, 0x7d, 0x32, - 0xa3, 0xf6, 0x1b, 0xe2, 0x5d, 0xd6, 0x05, 0x86, 0x25, 0xdb, 0x6e, 0xc7, 0xb0, 0x0a, 0xd0, 0xe4, - 0x95, 0xf0, 0x3d, 0x5d, 0xe6, 0x24, 0x2c, 0x87, 0x24, 0x2c, 0x73, 0x92, 0x0b, 0x12, 0x96, 0x37, - 0xf4, 0x2a, 0x16, 0xb6, 0x6a, 0xcc, 0x52, 0xfe, 0x5a, 0x82, 0x7c, 0x0b, 0xf8, 0x25, 0xdb, 0x4e, - 0xc3, 0x9f, 0x7d, 0x47, 0xfc, 0xe8, 0x7e, 0x0b, 0xc8, 0x1e, 0x06, 0xf2, 0xda, 0x91, 0x20, 0xf9, - 0xe6, 0x2d, 0x28, 0x7f, 0x93, 0x60, 0x72, 0x1d, 0xd7, 0xff, 0x4f, 0x4c, 0xbc, 0x45, 0xc2, 0xbf, - 0x2b, 0xba, 0x6d, 0x04, 0x36, 0x5b, 0x8c, 0x32, 0xf2, 0x1c, 0xce, 0xf3, 0x53, 0x54, 0xf3, 0x48, - 0x8d, 0xf8, 0xd8, 0xd3, 0x04, 0x5f, 0x1b, 0xd9, 0xe9, 0x44, 0xfe, 0x54, 0xb7, 0x43, 0xbe, 0x12, - 0x6f, 0x1d, 0xd7, 0xd7, 0xb9, 0xb6, 0x7a, 0x8e, 0x79, 0xd9, 0x10, 0x4e, 0x84, 0x14, 0xbd, 0x0f, - 0x63, 0xf5, 0x48, 0x59, 0x73, 0x70, 0x5d, 0x73, 0x30, 0xf5, 0x2c, 0xc3, 0x6f, 0x44, 0xd5, 0xe9, - 0xbc, 0x05, 0xf0, 0x3a, 0x57, 0x57, 0x47, 0xeb, 0xf1, 0x2d, 0xb9, 0x50, 0xfe, 0x4b, 0x82, 0xa9, - 0xf4, 0xf0, 0x44, 0x31, 0xaa, 0x70, 0xda, 0xc3, 0x7e, 0x60, 0x53, 0x5f, 0x94, 0xe2, 0xfe, 0x51, - 0x7b, 0x26, 0x78, 0x09, 0x15, 0x96, 0x5c, 0xf3, 0x29, 0xb1, 0x03, 0x07, 0x6f, 0x60, 0x2f, 0x2c, - 0x9d, 0x28, 0x5b, 0xe4, 0xbd, 0xa0, 0xc3, 0x68, 0x82, 0x16, 0x9a, 0x82, 0x33, 0x0d, 0x32, 0x68, - 0x0d, 0xfe, 0x43, 0x54, 0xec, 0x07, 0x26, 0x1a, 0x86, 0xac, 0x83, 0xeb, 0x2c, 0x23, 0x3d, 0x6a, - 0xf8, 0x88, 0xce, 0x43, 0x5f, 0x9d, 0x39, 0xc9, 0x67, 0xa7, 0xa4, 0x52, 0xaf, 0x2a, 0xde, 0xe4, - 0x59, 0x28, 0x31, 0xd2, 0xfd, 0x8f, 0xb5, 0xa8, 0x2d, 0x0b, 0x7b, 0x0f, 0xc3, 0x06, 0xb5, 0xc2, - 0x5a, 0x46, 0xe0, 0xc5, 0xeb, 0x2a, 0x7f, 0x29, 0xc1, 0xcc, 0x31, 0x94, 0x45, 0x96, 0x5c, 0xc8, - 0xa7, 0xf5, 0x3d, 0xc1, 0x03, 0x25, 0x21, 0x6d, 0xdd, 0x5c, 0x8b, 0xf4, 0x8c, 0xe1, 0x24, 0x1d, - 0x79, 0x06, 0xae, 0x31, 0x70, 0xcb, 0x21, 0x69, 0x54, 0x9d, 0xe2, 0xf4, 0x40, 0xbe, 0x90, 0x44, - 0xd4, 0x5d, 0x75, 0x45, 0x1c, 0x7b, 0x70, 0x21, 0xe5, 0x4e, 0x10, 0x61, 0x94, 0x13, 0xc2, 0xe8, - 0xe2, 0x58, 0x44, 0xc1, 0xc9, 0xdd, 0xa6, 0x22, 0x6f, 0xc3, 0x45, 0x06, 0x6c, 0x93, 0xea, 0x14, - 0xef, 0x04, 0xf6, 0xa3, 0xf0, 0x1e, 0x88, 0xce, 0xd5, 0x22, 0xf4, 0xb3, 0x7b, 0x21, 0xaa, 0x79, - 0x6e, 0xa1, 0x90, 0xb0, 0x35, 0x33, 0x79, 0x60, 0x46, 0x5c, 0x22, 0xfc, 0x55, 0xfe, 0x5e, 0x82, - 0x42, 0x92, 0x6b, 0x11, 0xe5, 0x36, 0x9c, 0xe5, 0xbe, 0x6b, 0xb6, 0x6e, 0x60, 0x07, 0xbb, 0x54, - 0x6c, 0x31, 0x93, 0xb0, 0xc5, 0x43, 0xe2, 0x56, 0xb7, 0xb0, 0xe7, 0x30, 0x17, 0x1b, 0x91, 0x81, - 0xd8, 0x71, 0x88, 0xb4, 0x48, 0xd1, 0x24, 0xe4, 0x76, 0x2c, 0xdb, 0xd6, 0x74, 0x27, 0xec, 0xe9, - 0x8c, 0x93, 0xbd, 0x2a, 0x84, 0xa2, 0x25, 0x26, 0x41, 0x13, 0x30, 0x40, 0x3d, 0xab, 0x5a, 0xc5, - 0x1e, 0x36, 0x19, 0x3b, 0xfb, 0xd5, 0xa6, 0x40, 0xbe, 0x06, 0x57, 0x19, 0xec, 0x87, 0xb1, 0x1b, - 0x2d, 0xb1, 0xa8, 0x2f, 0x24, 0x98, 0x3e, 0x4a, 0x53, 0x04, 0xfb, 0x1c, 0x46, 0x13, 0x2e, 0x48, - 0x11, 0xf0, 0xd5, 0xa4, 0x80, 0x3b, 0x5c, 0x8a, 0x60, 0x91, 0xdd, 0xb1, 0x22, 0xbf, 0x94, 0xe0, - 0xd2, 0x26, 0xbb, 0xee, 0x58, 0x7e, 0x2a, 0x84, 0xec, 0x3d, 0xe1, 0xb7, 0x64, 0x54, 0xc8, 0xce, - 0x03, 0x9c, 0x6d, 0x3b, 0xc0, 0xeb, 0x30, 0xd4, 0xbc, 0x07, 0x35, 0xcb, 0x0c, 0xbb, 0x5b, 0xb6, - 0xb3, 0x75, 0xc6, 0xee, 0xcd, 0xf2, 0x66, 0xe3, 0xf9, 0x81, 0xa9, 0x0e, 0xfa, 0xb1, 0x37, 0x5f, - 0xd6, 0xa1, 0x98, 0x86, 0x48, 0xa4, 0xe4, 0x3f, 0x70, 0x5a, 0x5c, 0xe5, 0xa2, 0xa7, 0x4d, 0x26, - 0xa4, 0x81, 0xfb, 0xe0, 0xa6, 0x11, 0xbf, 0x84, 0x95, 0xfc, 0x4d, 0x16, 0xce, 0xc4, 0xd7, 0xd1, - 0x65, 0x38, 0xc3, 0xcf, 0xcd, 0x2e, 0xb6, 0xaa, 0xbb, 0x54, 0x74, 0xa9, 0x1c, 0x93, 0xad, 0x31, - 0x11, 0x1a, 0x87, 0x01, 0x7c, 0x80, 0x0d, 0xcd, 0x21, 0x26, 0x66, 0xc4, 0x18, 0x54, 0xfb, 0x43, - 0xc1, 0x3a, 0x31, 0x31, 0x7a, 0x02, 0xc3, 0x24, 0x42, 0x2b, 0xc6, 0x0c, 0xc6, 0x8e, 0xdc, 0x42, - 0x29, 0x15, 0x5a, 0x5b, 0x78, 0x6b, 0x19, 0xf5, 0x2c, 0x69, 0x15, 0x85, 0x37, 0x21, 0x27, 0x7a, - 0xc8, 0xc0, 0x7c, 0x6f, 0xea, 0x85, 0xd4, 0xe6, 0x70, 0xd5, 0xb2, 0xed, 0xb5, 0x8c, 0x3a, 0xc0, - 0x6c, 0xc3, 0x17, 0xb4, 0x0a, 0x39, 0xaa, 0xef, 0x61, 0x4f, 0x63, 0xa2, 0xfc, 0x29, 0xe6, 0xe9, - 0x4a, 0xaa, 0xa7, 0xad, 0x50, 0x97, 0xb9, 0x5b, 0xcb, 0xa8, 0x40, 0x1b, 0x6f, 0x48, 0x83, 0x91, - 0x58, 0xa9, 0x45, 0xa0, 0x7d, 0xcc, 0xdb, 0x5c, 0x97, 0x6a, 0x33, 0xa7, 0xcd, 0x9a, 0x37, 0x02, - 0x1e, 0xf6, 0xdb, 0x64, 0xcb, 0xc3, 0x30, 0xc4, 0xbd, 0x6a, 0x0e, 0xf6, 0x7d, 0xbd, 0x8a, 0xe5, - 0xcf, 0x24, 0x18, 0x4b, 0x4c, 0x18, 0x2a, 0x40, 0xbf, 0xef, 0xea, 0x35, 0x7f, 0x97, 0xf0, 0x82, - 0xf5, 0xab, 0x8d, 0x77, 0xb4, 0xdd, 0xa4, 0x08, 0x27, 0xe3, 0x9d, 0x56, 0x78, 0x62, 0x5c, 0x2c, - 0x77, 0x0e, 0x87, 0x8f, 0x76, 0x76, 0x56, 0x42, 0x01, 0xdf, 0xe4, 0xe9, 0x7c, 0x3b, 0x77, 0xbe, - 0x95, 0x60, 0x34, 0x21, 0xdf, 0x68, 0x11, 0xd8, 0x99, 0xe0, 0xe3, 0x83, 0x38, 0x9e, 0x13, 0x29, - 0x63, 0x0f, 0x1b, 0x0f, 0x54, 0x36, 0x25, 0xb1, 0x47, 0x74, 0x1b, 0xfa, 0x58, 0x65, 0x22, 0xb4, - 0xf9, 0xb4, 0x5e, 0x29, 0xd0, 0x08, 0xed, 0x90, 0xb7, 0xb1, 0x7e, 0xe5, 0xe7, 0xb3, 0x53, 0xd9, - 0x52, 0xaf, 0x9a, 0x6b, 0x36, 0x2c, 0x5f, 0x7e, 0xd1, 0x03, 0xc3, 0xed, 0x55, 0x45, 0x73, 0x70, - 0x8a, 0x33, 0x81, 0xe3, 0x4c, 0xdd, 0x6e, 0x2d, 0xa3, 0x72, 0x45, 0xb4, 0x0d, 0x23, 0xb1, 0xf6, - 0x21, 0x78, 0xd4, 0x93, 0xda, 0x75, 0xf9, 0x8e, 0xb1, 0x56, 0x14, 0xb9, 0x1b, 0xb6, 0xdb, 0x64, - 0xe8, 0x19, 0xa0, 0x18, 0x37, 0x35, 0x9f, 0xea, 0x34, 0xf0, 0xc5, 0xe9, 0x99, 0x39, 0x06, 0x45, - 0x37, 0x99, 0x81, 0x3a, 0x4c, 0xdb, 0x24, 0xcb, 0x83, 0x2d, 0xa4, 0x97, 0xbf, 0x93, 0xe0, 0x7c, - 0xb2, 0x6d, 0x98, 0xc6, 0x96, 0xcd, 0xc5, 0xf1, 0x27, 0x31, 0x95, 0x1b, 0x80, 0x3c, 0xec, 0xe8, - 0x96, 0x6b, 0xb9, 0x55, 0x6d, 0x3f, 0xd0, 0x5d, 0x1a, 0x38, 0xbe, 0xb8, 0x20, 0x46, 0x1a, 0x2b, - 0x8f, 0xc5, 0x02, 0xfa, 0x2f, 0x14, 0x49, 0x8d, 0x5a, 0x8e, 0xe5, 0x53, 0xcb, 0xd0, 0x6d, 0xfb, - 0x90, 0x1d, 0x61, 0x6c, 0x36, 0x4d, 0xf9, 0x68, 0x33, 0xd1, 0xaa, 0xb5, 0xca, 0x94, 0x22, 0x2f, - 0x0b, 0x5f, 0x01, 0x9c, 0x62, 0xd7, 0x04, 0xfa, 0x44, 0x82, 0xfe, 0x68, 0x60, 0x46, 0xb3, 0x09, - 0x59, 0x49, 0xf9, 0xea, 0x28, 0x94, 0xd2, 0x74, 0xdb, 0x3f, 0x3b, 0xe4, 0x99, 0x8f, 0x7f, 0xf9, - 0xf3, 0xf3, 0x9e, 0x2b, 0xe8, 0xb2, 0xd2, 0xe5, 0xbb, 0x51, 0xf9, 0xc0, 0x32, 0x3f, 0x44, 0x9f, - 0x4a, 0x90, 0x8b, 0x4d, 0xfe, 0xe9, 0x80, 0x3a, 0x3f, 0x41, 0x0a, 0xd7, 0x8f, 0x02, 0x14, 0xfb, - 0x94, 0x90, 0xff, 0xc1, 0x30, 0x15, 0xd1, 0x44, 0x37, 0x4c, 0xe8, 0x47, 0x09, 0xf2, 0x69, 0x23, - 0x2c, 0x5a, 0x78, 0xa7, 0x79, 0x97, 0x63, 0xbc, 0x79, 0x82, 0x19, 0x59, 0xbe, 0xcb, 0xb0, 0xde, - 0xba, 0x2b, 0xcd, 0xca, 0x8a, 0x92, 0xf8, 0xe1, 0xaa, 0xb9, 0xc4, 0xc4, 0x1a, 0x25, 0xfc, 0xbf, - 0x11, 0x03, 0xf9, 0xb3, 0x04, 0x13, 0xdd, 0xa6, 0x49, 0xb4, 0x98, 0x96, 0xb5, 0x63, 0xcc, 0xc2, - 0x85, 0x7f, 0x9f, 0xcc, 0x58, 0xc4, 0x35, 0xcd, 0xe2, 0x9a, 0x42, 0x45, 0xa5, 0xeb, 0x8f, 0x05, - 0xe8, 0x07, 0x09, 0xc6, 0xbb, 0x8c, 0x92, 0xe8, 0x6e, 0x1a, 0x8a, 0xa3, 0x87, 0xe0, 0xc2, 0xe2, - 0x89, 0x6c, 0x45, 0x00, 0x57, 0x59, 0x00, 0x93, 0xe8, 0x52, 0xd7, 0x5f, 0x50, 0xd0, 0x4f, 0x12, - 0x5c, 0x4c, 0x1d, 0xc7, 0xd0, 0x9d, 0x34, 0x04, 0x47, 0xcd, 0x7a, 0x85, 0x7f, 0x9d, 0xc0, 0x52, - 0x20, 0x2f, 0x33, 0xe4, 0x25, 0x34, 0xad, 0x1c, 0xeb, 0x57, 0x13, 0xe4, 0xc2, 0x60, 0xcb, 0xc4, - 0x8c, 0xfe, 0x99, 0xb6, 0x77, 0xd2, 0xcc, 0x5e, 0xb8, 0x71, 0x4c, 0x6d, 0x81, 0x2e, 0x83, 0x3e, - 0x8a, 0x3a, 0x6a, 0xfb, 0xa8, 0x86, 0xe6, 0x8e, 0x3b, 0xf6, 0x44, 0x73, 0x66, 0x61, 0xfe, 0x1d, - 0x2c, 0x38, 0x80, 0x39, 0x69, 0x79, 0xe3, 0xd5, 0x9b, 0xa2, 0xf4, 0xfa, 0x4d, 0x51, 0xfa, 0xe3, - 0x4d, 0x51, 0x7a, 0xf9, 0xb6, 0x98, 0x79, 0xfd, 0xb6, 0x98, 0xf9, 0xf5, 0x6d, 0x31, 0xf3, 0xde, - 0xed, 0xaa, 0x45, 0x77, 0x83, 0x4a, 0xd9, 0x20, 0x4e, 0x6b, 0xf2, 0xea, 0xb7, 0x6e, 0xb0, 0x0b, - 0x5f, 0x69, 0x48, 0x0e, 0x78, 0x42, 0xe9, 0x61, 0x0d, 0xfb, 0x95, 0x3e, 0x26, 0xbe, 0xf9, 0x77, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xa1, 0x46, 0x6b, 0x53, 0x00, 0x14, 0x00, 0x00, + // 1705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x58, 0x4f, 0x6f, 0xdc, 0xc6, + 0x15, 0x5f, 0xee, 0xca, 0xb6, 0xfc, 0xd6, 0x72, 0xe4, 0x71, 0xec, 0x6c, 0xd6, 0xf2, 0x4a, 0x66, + 0x6a, 0x7b, 0xe5, 0xd4, 0x4b, 0x59, 0x09, 0x82, 0xd4, 0x2a, 0x52, 0x48, 0x6a, 0x64, 0x19, 0xb0, + 0x1a, 0x85, 0x52, 0x1c, 0xa1, 0x0d, 0x40, 0xcc, 0x92, 0x23, 0x6a, 0x20, 0x92, 0xb3, 0x22, 0x87, + 0x0b, 0xa9, 0x45, 0x51, 0xa0, 0x87, 0x5c, 0xda, 0x02, 0x01, 0x7a, 0xe8, 0xa1, 0x40, 0x2f, 0x3d, + 0xf5, 0x50, 0xa0, 0x97, 0x1e, 0x8b, 0xb6, 0xb7, 0x5c, 0x0a, 0x18, 0xe8, 0xa5, 0x87, 0xa2, 0x28, + 0xec, 0x9e, 0xfb, 0x19, 0x0a, 0xce, 0x0c, 0x77, 0xc9, 0x5d, 0x72, 0x25, 0x0b, 0xbd, 0x48, 0x9c, + 0x37, 0xef, 0xbd, 0xf9, 0xbd, 0x37, 0xef, 0xdf, 0x2c, 0xdc, 0x76, 0x4e, 0x9c, 0xe3, 0x5e, 0xc8, + 0x38, 0xb3, 0x99, 0x67, 0xd8, 0x1e, 0xeb, 0x1a, 0x47, 0x31, 0x09, 0x4f, 0x3a, 0x82, 0x86, 0xae, + 0x65, 0xb7, 0x3b, 0xc9, 0x76, 0xf3, 0x4d, 0x97, 0xb9, 0x4c, 0x90, 0x8c, 0xe4, 0x4b, 0x32, 0x36, + 0xe7, 0x5c, 0xc6, 0x5c, 0x8f, 0x18, 0xb8, 0x47, 0x0d, 0x1c, 0x04, 0x8c, 0x63, 0x4e, 0x59, 0x10, + 0xa9, 0xdd, 0x07, 0x36, 0x8b, 0x7c, 0x16, 0x19, 0x5d, 0x1c, 0x11, 0xa9, 0xdf, 0xe8, 0x3f, 0xea, + 0x12, 0x8e, 0x1f, 0x19, 0x3d, 0xec, 0xd2, 0x40, 0x30, 0x2b, 0x5e, 0x63, 0x1c, 0x51, 0xd7, 0x63, + 0xf6, 0xa1, 0x15, 0x62, 0x4e, 0x2c, 0x8f, 0xfa, 0x94, 0x5b, 0x36, 0x0b, 0xf6, 0xa9, 0xab, 0x04, + 0xee, 0x8c, 0x0b, 0x24, 0x7f, 0xac, 0x1e, 0xa6, 0xa1, 0x62, 0x59, 0x1a, 0x67, 0x21, 0x47, 0x31, + 0xe5, 0x27, 0x16, 0xa7, 0x24, 0x2c, 0x52, 0x5a, 0xe0, 0x17, 0x16, 0x3a, 0x24, 0x55, 0x38, 0x3f, + 0xbe, 0xed, 0x63, 0x6e, 0x1f, 0x90, 0xd4, 0xe2, 0x77, 0xc7, 0x19, 0x3c, 0x7a, 0x14, 0x53, 0x47, + 0xfa, 0x25, 0x7f, 0xd8, 0xad, 0x02, 0x6d, 0xa4, 0xaf, 0x36, 0x3f, 0xca, 0x6d, 0xd2, 0xc0, 0x21, + 0xc7, 0x24, 0x34, 0xd8, 0xfe, 0xbe, 0x65, 0x1f, 0x60, 0x1a, 0x58, 0x71, 0xcf, 0xc1, 0x9c, 0x44, + 0xe3, 0x14, 0x25, 0xdf, 0xce, 0xc9, 0x47, 0x71, 0x17, 0xdb, 0x36, 0x8b, 0x03, 0x1e, 0x19, 0x11, + 0x0f, 0x09, 0xf6, 0x69, 0x90, 0xc2, 0x58, 0x2c, 0xe7, 0x1c, 0x7c, 0x4b, 0x56, 0x7d, 0x11, 0xde, + 0xfa, 0x34, 0xb9, 0xc6, 0x27, 0x84, 0xaf, 0x7b, 0xac, 0xbb, 0x8d, 0x69, 0x68, 0x92, 0xa3, 0x98, + 0x44, 0x1c, 0x5d, 0x85, 0x2a, 0x75, 0x1a, 0xda, 0x82, 0xd6, 0x9e, 0x31, 0xab, 0xd4, 0xd1, 0x3f, + 0x87, 0x1b, 0x82, 0x75, 0xc8, 0x17, 0xf5, 0x58, 0x10, 0x11, 0xf4, 0x11, 0x5c, 0x1e, 0xdc, 0x93, + 0xe0, 0xaf, 0x2f, 0xdf, 0xea, 0x8c, 0xc5, 0x5b, 0x27, 0x95, 0x5b, 0x9b, 0xfa, 0xfa, 0x5f, 0xf3, + 0x15, 0x73, 0xda, 0x56, 0x6b, 0x1d, 0x2b, 0x0c, 0xab, 0x9e, 0x37, 0x8a, 0x61, 0x03, 0x60, 0x18, + 0x57, 0x4a, 0xf7, 0xbd, 0x8e, 0x0c, 0xc2, 0x4e, 0x12, 0x84, 0x1d, 0x19, 0xe4, 0x2a, 0x08, 0x3b, + 0xdb, 0xd8, 0x25, 0x4a, 0xd6, 0xcc, 0x48, 0xea, 0xbf, 0xd5, 0xa0, 0x91, 0x03, 0xbf, 0xea, 0x79, + 0x65, 0xf8, 0x6b, 0xaf, 0x89, 0x1f, 0x3d, 0xc9, 0x81, 0xac, 0x0a, 0x90, 0xf7, 0x4f, 0x05, 0x29, + 0x0f, 0xcf, 0xa1, 0xfc, 0xa7, 0x06, 0xf3, 0x5b, 0xa4, 0xff, 0x3d, 0xe6, 0x90, 0x5d, 0x96, 0xfc, + 0x5d, 0xc7, 0x9e, 0x1d, 0x7b, 0x62, 0x33, 0xf5, 0xc8, 0x17, 0x70, 0x53, 0x66, 0x51, 0x2f, 0x64, + 0x3d, 0x16, 0x91, 0xd0, 0x52, 0xf1, 0x3a, 0xf0, 0xce, 0x38, 0xf2, 0xe7, 0xd8, 0x4b, 0xe2, 0x95, + 0x85, 0x5b, 0xa4, 0xbf, 0x25, 0xb9, 0xcd, 0x37, 0x85, 0x96, 0x6d, 0xa5, 0x44, 0x51, 0xd1, 0x0f, + 0xe0, 0x46, 0x3f, 0x65, 0xb6, 0x7c, 0xd2, 0xb7, 0x7c, 0xc2, 0x43, 0x6a, 0x47, 0x03, 0xab, 0xc6, + 0x95, 0xe7, 0x00, 0x6f, 0x49, 0x76, 0xf3, 0x7a, 0x3f, 0x7b, 0xa4, 0x24, 0xea, 0xff, 0xd5, 0x60, + 0xa1, 0xdc, 0x3c, 0x75, 0x19, 0x2e, 0x5c, 0x0a, 0x49, 0x14, 0x7b, 0x3c, 0x52, 0x57, 0xf1, 0xe4, + 0xb4, 0x33, 0x0b, 0xb4, 0x24, 0x0c, 0xab, 0x81, 0xf3, 0x9c, 0x79, 0xb1, 0x4f, 0xb6, 0x49, 0x98, + 0x5c, 0x9d, 0xba, 0xb6, 0x54, 0x7b, 0x13, 0xc3, 0xf5, 0x02, 0x2e, 0xb4, 0x00, 0x57, 0x06, 0xc1, + 0x60, 0x0d, 0xe2, 0x1f, 0xd2, 0xcb, 0x7e, 0xea, 0xa0, 0x59, 0xa8, 0xf9, 0xa4, 0x2f, 0x3c, 0x52, + 0x35, 0x93, 0x4f, 0x74, 0x13, 0x2e, 0xf6, 0x85, 0x92, 0x46, 0x6d, 0x41, 0x6b, 0x4f, 0x99, 0x6a, + 0xa5, 0x3f, 0x80, 0xb6, 0x08, 0xba, 0x8f, 0x45, 0x89, 0xda, 0xa5, 0x24, 0x7c, 0x96, 0x14, 0xa8, + 0x75, 0x51, 0x32, 0xe2, 0x30, 0x7b, 0xaf, 0xfa, 0xaf, 0x35, 0x58, 0x3c, 0x03, 0xb3, 0xf2, 0x52, + 0x00, 0x8d, 0xb2, 0xba, 0xa7, 0xe2, 0xc0, 0x28, 0x70, 0xdb, 0x24, 0xd5, 0xca, 0x3d, 0x37, 0x48, + 0x11, 0x8f, 0xbe, 0x08, 0xf7, 0x05, 0xb8, 0xb5, 0x24, 0x68, 0x4c, 0xcc, 0x49, 0xb9, 0x21, 0xbf, + 0xd2, 0x94, 0xd5, 0x13, 0x79, 0x95, 0x1d, 0x87, 0xf0, 0x56, 0x49, 0x4f, 0x50, 0x66, 0x74, 0x0a, + 0xcc, 0x98, 0xa0, 0x58, 0x59, 0x21, 0x83, 0x7b, 0x84, 0x45, 0xdf, 0x83, 0xb7, 0x05, 0xb0, 0x1d, + 0x8e, 0x39, 0xd9, 0x8f, 0xbd, 0x4f, 0x92, 0x3e, 0x90, 0xe6, 0xd5, 0x0a, 0x4c, 0x8b, 0xbe, 0x90, + 0xde, 0x79, 0x7d, 0xb9, 0x59, 0x70, 0xb4, 0x10, 0x79, 0xea, 0xa4, 0xb1, 0xc4, 0xe4, 0x52, 0xff, + 0xa3, 0x06, 0xcd, 0x22, 0xd5, 0xca, 0xca, 0x3d, 0x78, 0x43, 0xea, 0xee, 0x79, 0xd8, 0x26, 0x3e, + 0x09, 0xb8, 0x3a, 0x62, 0xb1, 0xe0, 0x88, 0x67, 0x2c, 0x70, 0x77, 0x49, 0xe8, 0x0b, 0x15, 0xdb, + 0xa9, 0x80, 0x3a, 0xf1, 0x2a, 0xcb, 0x51, 0xd1, 0x3c, 0xd4, 0xf7, 0xa9, 0xe7, 0x59, 0xd8, 0x4f, + 0x6a, 0xba, 0x88, 0xc9, 0x29, 0x13, 0x12, 0xd2, 0xaa, 0xa0, 0xa0, 0x39, 0xb8, 0xcc, 0x43, 0xea, + 0xba, 0x24, 0x24, 0x8e, 0x88, 0xce, 0x69, 0x73, 0x48, 0xd0, 0xef, 0xc3, 0x5d, 0x01, 0xfb, 0x59, + 0xa6, 0xa3, 0x15, 0x5e, 0xea, 0x97, 0x1a, 0xdc, 0x3b, 0x8d, 0x53, 0x19, 0xfb, 0x05, 0x5c, 0x2f, + 0x68, 0x90, 0xca, 0xe0, 0xbb, 0x45, 0x06, 0x8f, 0xa9, 0x54, 0xc6, 0x22, 0x6f, 0x6c, 0x47, 0xff, + 0x4a, 0x83, 0xdb, 0x3b, 0xa2, 0xdd, 0x09, 0xff, 0x74, 0x19, 0x3b, 0xfc, 0x4c, 0x76, 0xc9, 0xf4, + 0x22, 0xc7, 0x13, 0xb8, 0x36, 0x92, 0xc0, 0x5b, 0x70, 0x75, 0xd8, 0x07, 0x2d, 0xea, 0x24, 0xd5, + 0xad, 0x36, 0x5e, 0x3a, 0x33, 0x7d, 0xb3, 0xb3, 0x33, 0xf8, 0x7e, 0xea, 0x98, 0x33, 0x51, 0x66, + 0x15, 0xe9, 0x18, 0x5a, 0x65, 0x88, 0x94, 0x4b, 0xbe, 0x03, 0x97, 0x54, 0x2b, 0x57, 0x35, 0x6d, + 0xbe, 0xc0, 0x0d, 0x52, 0x87, 0x14, 0x4d, 0xe3, 0x4b, 0x49, 0xe9, 0xbf, 0xab, 0xc1, 0x95, 0xec, + 0x3e, 0xba, 0x03, 0x57, 0x64, 0xde, 0x1c, 0x10, 0xea, 0x1e, 0x70, 0x55, 0xa5, 0xea, 0x82, 0xb6, + 0x29, 0x48, 0xe8, 0x16, 0x5c, 0x26, 0xc7, 0xc4, 0xb6, 0x7c, 0xe6, 0x10, 0x11, 0x18, 0x33, 0xe6, + 0x74, 0x42, 0xd8, 0x62, 0x0e, 0x41, 0x9f, 0xc1, 0x2c, 0x4b, 0xd1, 0xaa, 0x31, 0x43, 0x44, 0x47, + 0x7d, 0xb9, 0x5d, 0x0a, 0x6d, 0xc4, 0xbc, 0xcd, 0x8a, 0xf9, 0x06, 0xcb, 0x93, 0x92, 0x4e, 0x28, + 0x03, 0x3d, 0x89, 0xc0, 0xc6, 0x54, 0x69, 0x43, 0x1a, 0x51, 0xb8, 0x41, 0x3d, 0x6f, 0xb3, 0x62, + 0x5e, 0x16, 0xb2, 0xc9, 0x02, 0x6d, 0x40, 0x9d, 0xe3, 0x43, 0x12, 0x5a, 0x82, 0xd4, 0xb8, 0x20, + 0x34, 0xbd, 0x53, 0xaa, 0x69, 0x37, 0xe1, 0x15, 0xea, 0x36, 0x2b, 0x26, 0xf0, 0xc1, 0x0a, 0x59, + 0x70, 0x2d, 0x73, 0xd5, 0xca, 0xd0, 0x8b, 0x42, 0xdb, 0xd2, 0x84, 0xdb, 0x16, 0x4a, 0x87, 0x77, + 0x3e, 0x30, 0x78, 0x36, 0x1a, 0xa1, 0xad, 0xcd, 0xc2, 0x55, 0xa9, 0xd5, 0xf2, 0x49, 0x14, 0x61, + 0x97, 0xe8, 0x7f, 0xd3, 0xa0, 0xb1, 0xc3, 0xb1, 0x4b, 0x9c, 0x0d, 0x1a, 0x60, 0x8f, 0xfe, 0x90, + 0x88, 0x7a, 0xf5, 0x71, 0x3f, 0xc9, 0xd7, 0xbc, 0x83, 0xb4, 0xf3, 0x3b, 0xa8, 0xd0, 0xb0, 0xea, + 0xff, 0xd1, 0xb0, 0x4b, 0x70, 0x81, 0x24, 0x90, 0xf5, 0x5f, 0x68, 0x70, 0xa3, 0x30, 0x00, 0x50, + 0x13, 0xa6, 0xa3, 0x00, 0xf7, 0xa2, 0x03, 0x26, 0x03, 0x70, 0xda, 0x1c, 0xac, 0xd1, 0xde, 0x30, + 0xe4, 0x65, 0x72, 0x7d, 0x98, 0x47, 0xa5, 0xc6, 0xdf, 0xce, 0xf8, 0xb0, 0xfb, 0xc9, 0xfe, 0xfe, + 0x7a, 0x42, 0x90, 0x87, 0x3c, 0x7f, 0x34, 0x9a, 0x0b, 0xbf, 0xd7, 0xe0, 0x7a, 0x81, 0x7b, 0xd0, + 0x0a, 0x88, 0x1c, 0x97, 0xe3, 0x90, 0x72, 0xed, 0x5c, 0xc9, 0x18, 0x27, 0xc6, 0x1d, 0x53, 0x4c, + 0x7d, 0xe2, 0x13, 0x7d, 0x00, 0x17, 0x85, 0x6f, 0x53, 0xb4, 0x8d, 0xb2, 0xda, 0xaf, 0xd0, 0x28, + 0xee, 0x24, 0x0f, 0x33, 0xf5, 0x37, 0x6a, 0xd4, 0x16, 0x6a, 0xed, 0x29, 0xb3, 0x3e, 0x2c, 0xc0, + 0x91, 0xfe, 0x65, 0x15, 0x66, 0x47, 0xa3, 0x14, 0x2d, 0xc1, 0x05, 0x19, 0xd9, 0x12, 0x67, 0xe9, + 0x71, 0x9b, 0x15, 0x53, 0x32, 0xa2, 0x3d, 0xb8, 0x96, 0x29, 0x87, 0x2a, 0x2f, 0xaa, 0xa5, 0x5d, + 0x44, 0x9e, 0x98, 0x29, 0xad, 0xa9, 0xba, 0x59, 0x6f, 0x84, 0x86, 0x3e, 0x07, 0x94, 0xc9, 0x35, + 0x2b, 0xe2, 0x98, 0xc7, 0x91, 0xaa, 0x06, 0x8b, 0x67, 0x48, 0xb9, 0x1d, 0x21, 0x60, 0xce, 0xf2, + 0x11, 0xca, 0xda, 0x4c, 0x2e, 0x89, 0xf5, 0x3f, 0x68, 0x70, 0xb3, 0x58, 0x36, 0x71, 0x63, 0xee, + 0x70, 0x55, 0xce, 0x58, 0x86, 0xe5, 0x21, 0xa0, 0x90, 0xf8, 0x98, 0x06, 0x34, 0x70, 0xad, 0xa3, + 0x18, 0x07, 0x3c, 0xf6, 0x23, 0xd5, 0xf0, 0xae, 0x0d, 0x76, 0x3e, 0x55, 0x1b, 0xe8, 0xbb, 0xd0, + 0x62, 0x3d, 0x4e, 0x7d, 0x1a, 0x71, 0x6a, 0x63, 0xcf, 0x3b, 0x11, 0x19, 0x47, 0x9c, 0xa1, 0xa8, + 0x1c, 0xd5, 0xe6, 0xf2, 0x5c, 0x1b, 0x82, 0x29, 0xd5, 0xb2, 0xfc, 0x1b, 0x80, 0x0b, 0xa2, 0xed, + 0xa1, 0x9f, 0x69, 0x30, 0x9d, 0x3e, 0x00, 0xd0, 0x83, 0x02, 0xaf, 0x94, 0xbc, 0xa2, 0x9a, 0xed, + 0x32, 0xde, 0xd1, 0x67, 0x94, 0xbe, 0xf8, 0xd3, 0xbf, 0xff, 0xe7, 0x97, 0xd5, 0x77, 0xd0, 0x1d, + 0x63, 0xc2, 0x3b, 0xd8, 0xf8, 0x11, 0x75, 0x7e, 0x8c, 0x7e, 0xae, 0x41, 0x3d, 0xf3, 0x92, 0x29, + 0x07, 0x34, 0xfe, 0xa4, 0x6a, 0xbe, 0x7b, 0x1a, 0xa0, 0xcc, 0xd3, 0x48, 0xff, 0x86, 0xc0, 0xd4, + 0x42, 0x73, 0x93, 0x30, 0xa1, 0x3f, 0x6b, 0xd0, 0x28, 0x1b, 0xc9, 0xd1, 0xf2, 0x6b, 0xcd, 0xef, + 0x12, 0xe3, 0x7b, 0xe7, 0x98, 0xf9, 0xf5, 0xc7, 0x02, 0xeb, 0xfb, 0x8f, 0xb5, 0x07, 0xba, 0x61, + 0x14, 0x3e, 0xc4, 0xad, 0x80, 0x39, 0xc4, 0xe2, 0x4c, 0xfe, 0xb7, 0x33, 0x20, 0xff, 0xaa, 0xc1, + 0xdc, 0xa4, 0xe9, 0x18, 0xad, 0x94, 0x79, 0xed, 0x0c, 0xb3, 0x7d, 0xf3, 0xdb, 0xe7, 0x13, 0x56, + 0x76, 0xdd, 0x13, 0x76, 0x2d, 0xa0, 0x96, 0x31, 0xf1, 0xc7, 0x0f, 0xf4, 0x27, 0x0d, 0x6e, 0x4d, + 0x18, 0x8d, 0xd1, 0xe3, 0x32, 0x14, 0xa7, 0x0f, 0xf5, 0xcd, 0x95, 0x73, 0xc9, 0x2a, 0x03, 0xee, + 0x0a, 0x03, 0xe6, 0xd1, 0xed, 0x89, 0xbf, 0x08, 0xa1, 0xbf, 0x68, 0xf0, 0x76, 0xe9, 0x78, 0x89, + 0x3e, 0x2c, 0x43, 0x70, 0xda, 0xec, 0xda, 0xfc, 0xd6, 0x39, 0x24, 0x15, 0xf2, 0x8e, 0x40, 0xde, + 0x46, 0xf7, 0x8c, 0x33, 0xfd, 0x0a, 0x84, 0x02, 0x98, 0xc9, 0xbd, 0x00, 0xd0, 0x37, 0xcb, 0xce, + 0x2e, 0x7a, 0x83, 0x34, 0x1f, 0x9e, 0x91, 0x5b, 0xa1, 0xab, 0xa0, 0x9f, 0xa4, 0x15, 0x75, 0x74, + 0xf4, 0x44, 0x4b, 0x67, 0x1d, 0xe3, 0xd2, 0xb9, 0xb9, 0xf9, 0xe8, 0x35, 0x24, 0x24, 0x80, 0x25, + 0x6d, 0x6d, 0xfb, 0xeb, 0x97, 0x2d, 0xed, 0xc5, 0xcb, 0x96, 0xf6, 0xef, 0x97, 0x2d, 0xed, 0xab, + 0x57, 0xad, 0xca, 0x8b, 0x57, 0xad, 0xca, 0x3f, 0x5e, 0xb5, 0x2a, 0xdf, 0xff, 0xc0, 0xa5, 0xfc, + 0x20, 0xee, 0x76, 0x6c, 0xe6, 0xe7, 0x9d, 0xd7, 0x7f, 0xff, 0xa1, 0x68, 0xf8, 0xc6, 0x80, 0x72, + 0x2c, 0x1d, 0xca, 0x4f, 0x7a, 0x24, 0xea, 0x5e, 0x14, 0xe4, 0xf7, 0xfe, 0x17, 0x00, 0x00, 0xff, + 0xff, 0x8b, 0xcc, 0x03, 0x3a, 0xd0, 0x14, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -2553,6 +2646,80 @@ func (m *StreamUpdate_SubaccountUpdate) MarshalToSizedBuffer(dAtA []byte) (int, } return len(dAtA) - i, nil } +func (m *StagedFinalizeBlockEvent) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StagedFinalizeBlockEvent) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Event != nil { + { + size := m.Event.Size() + i -= size + if _, err := m.Event.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *StagedFinalizeBlockEvent_OrderFill) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_OrderFill) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.OrderFill != nil { + { + size, err := m.OrderFill.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.SubaccountUpdate != nil { + { + size, err := m.SubaccountUpdate.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func (m *StreamOrderbookUpdate) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2621,20 +2788,20 @@ func (m *StreamOrderbookFill) MarshalToSizedBuffer(dAtA []byte) (int, error) { var l int _ = l if len(m.FillAmounts) > 0 { - dAtA18 := make([]byte, len(m.FillAmounts)*10) - var j17 int + dAtA20 := make([]byte, len(m.FillAmounts)*10) + var j19 int for _, num := range m.FillAmounts { for num >= 1<<7 { - dAtA18[j17] = uint8(uint64(num)&0x7f | 0x80) + dAtA20[j19] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j17++ + j19++ } - dAtA18[j17] = uint8(num) - j17++ + dAtA20[j19] = uint8(num) + j19++ } - i -= j17 - copy(dAtA[i:], dAtA18[:j17]) - i = encodeVarintQuery(dAtA, i, uint64(j17)) + i -= j19 + copy(dAtA[i:], dAtA20[:j19]) + i = encodeVarintQuery(dAtA, i, uint64(j19)) i-- dAtA[i] = 0x1a } @@ -3098,6 +3265,42 @@ func (m *StreamUpdate_SubaccountUpdate) Size() (n int) { } return n } +func (m *StagedFinalizeBlockEvent) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Event != nil { + n += m.Event.Size() + } + return n +} + +func (m *StagedFinalizeBlockEvent_OrderFill) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.OrderFill != nil { + l = m.OrderFill.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} +func (m *StagedFinalizeBlockEvent_SubaccountUpdate) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SubaccountUpdate != nil { + l = m.SubaccountUpdate.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} func (m *StreamOrderbookUpdate) Size() (n int) { if m == nil { return 0 @@ -4945,6 +5148,126 @@ func (m *StreamUpdate) Unmarshal(dAtA []byte) error { } return nil } +func (m *StagedFinalizeBlockEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StagedFinalizeBlockEvent: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StagedFinalizeBlockEvent: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field OrderFill", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &StreamOrderbookFill{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_OrderFill{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SubaccountUpdate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &types.StreamSubaccountUpdate{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Event = &StagedFinalizeBlockEvent_SubaccountUpdate{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *StreamOrderbookUpdate) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index 8b40d382b1..8ee77a9d13 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -445,11 +445,9 @@ func (k Keeper) UpdateSubaccounts( if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.SendFinalizedSubaccountUpdates( + k.GetFullNodeStreamingManager().StageFinalizeBlockSubaccountUpdate( ctx, - []types.StreamSubaccountUpdate{ - subaccountUpdate, - }, + subaccountUpdate, ) } }