From 49953b84093f8367dae7cdc3245484d3851a5d7b Mon Sep 17 00:00:00 2001 From: Teddy Ding Date: Mon, 9 Sep 2024 11:33:24 -0400 Subject: [PATCH] [CT-1178] Only emit finalized subaccount update (#2181) --- protocol/lib/metrics/metric_keys.go | 26 ++--- protocol/mocks/ClobKeeper.go | 8 +- .../streaming/full_node_streaming_manager.go | 104 ++++++++++++------ protocol/streaming/noop_streaming_manager.go | 10 +- protocol/streaming/types/interface.go | 7 +- protocol/x/clob/abci.go | 16 ++- protocol/x/clob/keeper/keeper.go | 36 ++++-- protocol/x/clob/keeper/process_operations.go | 36 +----- protocol/x/clob/types/clob_keeper.go | 5 +- protocol/x/clob/types/expected_keepers.go | 2 +- protocol/x/subaccounts/keeper/subaccount.go | 16 +-- protocol/x/subaccounts/types/types.go | 2 +- 12 files changed, 160 insertions(+), 108 deletions(-) diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index 088e742386..e38890d8ac 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -66,19 +66,19 @@ const ( GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency" // Full node grpc - FullNodeGrpc = "full_node_grpc" - GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" - GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" - GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" - GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcSendSubaccountUpdatesLatency = "grpc_send_subaccount_updates_latency" - GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" - GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" - GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" - GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" - GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" - GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" - GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" + FullNodeGrpc = "full_node_grpc" + GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" + GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" + GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" + GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" + GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" + GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" + GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" + GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" + GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" + GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index 5eb830f15d..62ab311a6b 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. +// Code generated by mockery v2.44.1. DO NOT EDIT. package mocks @@ -706,9 +706,9 @@ func (_m *ClobKeeper) InitializeEquityTierLimit(ctx types.Context, config clobty return r0 } -// InitializeNewStreams provides a mock function with given fields: ctx -func (_m *ClobKeeper) InitializeNewStreams(ctx types.Context) { - _m.Called(ctx) +// InitializeNewStreams provides a mock function with given fields: ctx, subaccountSnapshots +func (_m *ClobKeeper) InitializeNewStreams(ctx types.Context, subaccountSnapshots map[subaccountstypes.SubaccountId]*subaccountstypes.StreamSubaccountUpdate) { + _m.Called(ctx, subaccountSnapshots) } // IsInitialized provides a mock function with given fields: diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 31a82855ac..a6fe37c512 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -3,6 +3,7 @@ package streaming import ( "fmt" "sync" + "sync/atomic" "time" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" @@ -55,8 +56,8 @@ type FullNodeStreamingManagerImpl struct { type OrderbookSubscription struct { subscriptionId uint32 - // Initialize the subscription with orderbook snapshots. - initialize *sync.Once + // Whether the subscription is initialized with snapshot. + initialized *atomic.Bool // Clob pair ids to subscribe to. clobPairIds []uint32 @@ -75,6 +76,10 @@ type OrderbookSubscription struct { nextSnapshotBlock uint32 } +func (sub *OrderbookSubscription) IsInitialized() bool { + return sub.initialized.Load() +} + func NewFullNodeStreamingManager( logger log.Logger, flushIntervalMs uint32, @@ -159,7 +164,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } subscription := &OrderbookSubscription{ subscriptionId: sm.nextSubscriptionId, - initialize: &sync.Once{}, + initialized: &atomic.Bool{}, // False by default. clobPairIds: clobPairIds, subaccountIds: sIds, messageSender: messageSender, @@ -511,19 +516,23 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( ) } -// SendSubaccountUpdates groups subaccount updates by their subaccount ids and +// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdates( +func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( subaccountUpdates []satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, - metrics.GrpcSendSubaccountUpdatesLatency, + metrics.GrpcSendFinalizedSubaccountUpdatesLatency, time.Now(), ) + if execMode != sdk.ExecModeFinalize { + panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") + } + // Group subaccount updates by subaccount id. streamUpdates := make([]clobtypes.StreamUpdate, 0) subaccountIds := make([]*satypes.SubaccountId, 0) @@ -670,9 +679,33 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { sm.EmitMetrics() } +func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, +) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate { + sm.Lock() + defer sm.Unlock() + + ret := make(map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate) + for _, subscription := range sm.orderbookSubscriptions { + // If the subscription has been initialized, no need to grab the subaccount snapshot. + if alreadyInitialized := subscription.initialized.Load(); alreadyInitialized { + continue + } + + for _, subaccountId := range subscription.subaccountIds { + if _, exists := ret[subaccountId]; exists { + continue + } + + ret[subaccountId] = getSubaccountSnapshot(subaccountId) + } + } + return ret +} + func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -686,31 +719,40 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) for subscriptionId, subscription := range sm.orderbookSubscriptions { - // If the snapshot block interval is enabled, reset the sync.Once in order to - // re-send snapshots out. - if sm.snapshotBlockInterval > 0 && - blockHeight == subscription.nextSnapshotBlock { - subscription.initialize = &sync.Once{} - } - - subscription.initialize.Do( - func() { - allUpdates := clobtypes.NewOffchainUpdates() - for _, clobPairId := range subscription.clobPairIds { - if _, ok := updatesByClobPairId[clobPairId]; !ok { - updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId)) - } - allUpdates.Append(updatesByClobPairId[clobPairId]) - } - saUpdates := []*satypes.StreamSubaccountUpdate{} - for _, subaccountId := range subscription.subaccountIds { - saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId)) + if alreadyInitialized := subscription.initialized.Swap(true); !alreadyInitialized { + allUpdates := clobtypes.NewOffchainUpdates() + for _, clobPairId := range subscription.clobPairIds { + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId)) } - sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) - if sm.snapshotBlockInterval != 0 { - subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + allUpdates.Append(updatesByClobPairId[clobPairId]) + } + + saUpdates := []*satypes.StreamSubaccountUpdate{} + for _, subaccountId := range subscription.subaccountIds { + // The subaccount snapshot may not exist due to the following race condition + // 1. At beginning of PrepareCheckState we get snapshot for all subscribed subaccounts. + // 2. A new subaccount is subscribed to by a new subscription. + // 3. InitializeNewStreams is called. + // Then the new subaccount would not be included in the snapshot. + // We are okay with this behavior. + if saUpdate, ok := subaccountSnapshots[subaccountId]; ok { + saUpdates = append(saUpdates, saUpdate) } - }, - ) + } + + sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) + + if sm.snapshotBlockInterval != 0 { + subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + } + } + + // If the snapshot block interval is enabled and the next block is a snapshot block, + // reset the `atomic.Bool` so snapshots are sent for the next block. + if sm.snapshotBlockInterval > 0 && + blockHeight+1 == subscription.nextSnapshotBlock { + subscription.initialized = &atomic.Bool{} // False by default. + } } } diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 24810fefe2..6e9c00895d 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -51,7 +51,7 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( ) { } -func (sm *NoopGrpcStreamingManager) SendSubaccountUpdates( +func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates( subaccountUpdates []satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, @@ -62,9 +62,15 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) return false } +func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, +) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate { + return nil +} + func (sm *NoopGrpcStreamingManager) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 7930853be6..e3dff9d94b 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -22,10 +22,13 @@ type FullNodeStreamingManager interface { // L3+ Orderbook updates. InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) + GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, @@ -42,7 +45,7 @@ type FullNodeStreamingManager interface { blockHeight uint32, execMode sdk.ExecMode, ) - SendSubaccountUpdates( + SendFinalizedSubaccountUpdates( subaccountUpdates []satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 641e49e897..96bacb2cc5 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/x/clob/keeper" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) // PreBlocker executes all ABCI PreBlock logic respective to the clob module. @@ -123,6 +124,15 @@ func PrepareCheckState( log.BlockHeight, ctx.BlockHeight()+1, ) + // We just committed block `h`, preparing `CheckState` of `h+1` + // Before we modify the `CheckState`, we first take the snapshot of + // the subscribed subaccounts at the end of block `h`. This we send finalized state of + // the subaccounts below in `InitializeNewStreams`. + var subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate + if keeper.GetFullNodeStreamingManager().Enabled() { + subaccountSnapshots = keeper.GetSubaccountSnapshotsForInitStreams(ctx) + } + // Prune any rate limiting information that is no longer relevant. keeper.PruneRateLimits(ctx) @@ -239,7 +249,11 @@ func PrepareCheckState( ) // Initialize new streams with orderbook snapshots, if any. - keeper.InitializeNewStreams(ctx) + keeper.InitializeNewStreams( + ctx, + // Use the subaccount snapshot at the top of function to initialize the streams. + subaccountSnapshots, + ) // Set per-orderbook gauges. keeper.MemClob.SetMemclobGauges(ctx) diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 6817b43469..8e607d6833 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -3,9 +3,10 @@ package keeper import ( "errors" "fmt" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "sync/atomic" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "cosmossdk.io/log" "cosmossdk.io/store/prefix" storetypes "cosmossdk.io/store/types" @@ -251,9 +252,31 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) { k.antehandler = anteHandler } +func (k Keeper) GetSubaccountSnapshotsForInitStreams( + ctx sdk.Context, +) ( + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, +) { + lib.AssertCheckTxMode(ctx) + + return k.GetFullNodeStreamingManager().GetSubaccountSnapshotsForInitStreams( + func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate { + subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate( + ctx, + subaccountId, + true, + ) + return &subaccountUpdate + }, + ) +} + // InitializeNewStreams initializes new streams for all uninitialized clob pairs // by sending the corresponding orderbook snapshots. -func (k Keeper) InitializeNewStreams(ctx sdk.Context) { +func (k Keeper) InitializeNewStreams( + ctx sdk.Context, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, +) { streamingManager := k.GetFullNodeStreamingManager() streamingManager.InitializeNewStreams( @@ -263,14 +286,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) { clobPairId, ) }, - func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate { - subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate( - ctx, - subaccountId, - true, - ) - return &subaccountUpdate - }, + subaccountSnapshots, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) diff --git a/protocol/x/clob/keeper/process_operations.go b/protocol/x/clob/keeper/process_operations.go index 486dd6ab0e..d202c2a270 100644 --- a/protocol/x/clob/keeper/process_operations.go +++ b/protocol/x/clob/keeper/process_operations.go @@ -38,21 +38,6 @@ func fetchOrdersInvolvedInOpQueue( return orderIdSet } -// fetchSubaccountIdsInvolvedInOpQueue fetches all SubaccountIds involved in an operations -// queue's matches and returns them as a set. -func fetchSubaccountIdsInvolvedInOpQueue( - operations []types.InternalOperation, -) (subaccountIdSet map[satypes.SubaccountId]struct{}) { - subaccountIdSet = make(map[satypes.SubaccountId]struct{}) - for _, operation := range operations { - if clobMatch := operation.GetMatch(); clobMatch != nil { - subaccountIdSetForClobMatch := clobMatch.GetAllSubaccountIds() - subaccountIdSet = lib.MergeMaps(subaccountIdSet, subaccountIdSetForClobMatch) - } - } - return subaccountIdSet -} - // ProcessProposerOperations updates on-chain state given an []OperationRaw operations queue // representing matches that occurred in the previous block. It performs validation on an operations // queue. If all validation passes, the operations queue is written to state. @@ -73,11 +58,7 @@ func (k Keeper) ProcessProposerOperations( } // If grpc streams are on, send absolute fill amounts from local + proposed opqueue to the grpc stream. - // Also send subaccount snapshots for impacted subaccounts. - // An impacted subaccount is defined as: - // - A subaccount that was involved in any match in the local opqueue. - // Only matches generate subaccount updates. - // This must be sent out to account for checkState being discarded and deliverState being used. + // This effetively reverts the optimitic orderbook updates during CheckTx. if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { localValidatorOperationsQueue, _ := k.MemClob.GetOperationsToReplay(ctx) orderIdsFromProposed := fetchOrdersInvolvedInOpQueue( @@ -94,21 +75,6 @@ func (k Keeper) ProcessProposerOperations( allUpdates.Append(orderbookUpdate) } k.SendOrderbookUpdates(ctx, allUpdates) - - subaccountIdsFromProposed := fetchSubaccountIdsInvolvedInOpQueue( - operations, - ) - - subaccountIdsFromLocal := fetchSubaccountIdsInvolvedInOpQueue( - localValidatorOperationsQueue, - ) - subaccountIdsToUpdate := lib.MergeMaps(subaccountIdsFromLocal, subaccountIdsFromProposed) - allSubaccountUpdates := make([]satypes.StreamSubaccountUpdate, 0) - for subaccountId := range subaccountIdsToUpdate { - subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(ctx, subaccountId, false) - allSubaccountUpdates = append(allSubaccountUpdates, subaccountUpdate) - } - k.subaccountsKeeper.SendSubaccountUpdates(ctx, allSubaccountUpdates) } log.DebugLog(ctx, "Processing operations queue", diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index a961046f23..9705439dc4 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -137,7 +137,10 @@ type ClobKeeper interface { ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error // full node streaming - InitializeNewStreams(ctx sdk.Context) + InitializeNewStreams( + ctx sdk.Context, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, + ) SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *OffchainUpdates, diff --git a/protocol/x/clob/types/expected_keepers.go b/protocol/x/clob/types/expected_keepers.go index 3ed63c0592..b14c4aeacd 100644 --- a/protocol/x/clob/types/expected_keepers.go +++ b/protocol/x/clob/types/expected_keepers.go @@ -85,7 +85,7 @@ type SubaccountsKeeper interface { quantums *big.Int, perpetualId uint32, ) error - SendSubaccountUpdates( + SendFinalizedSubaccountUpdates( ctx sdk.Context, subaccountUpdates []satypes.StreamSubaccountUpdate, ) diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index 12a5af7ef9..8b40d382b1 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -3,11 +3,12 @@ package keeper import ( "errors" "fmt" - streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types" "math/big" "math/rand" "time" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/cosmos/gogoproto/proto" @@ -440,11 +441,11 @@ func (k Keeper) UpdateSubaccounts( ), ) - // if GRPC streaming is on, emit a generated subaccount update to stream. - if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled() { + // If DeliverTx and GRPC streaming is on, emit a generated subaccount update to stream. + if lib.IsDeliverTxMode(ctx) && k.GetFullNodeStreamingManager().Enabled() { if k.GetFullNodeStreamingManager().TracksSubaccountId(*u.SettledSubaccount.Id) { subaccountUpdate := GenerateStreamSubaccountUpdate(u, fundingPayments) - k.SendSubaccountUpdates( + k.SendFinalizedSubaccountUpdates( ctx, []types.StreamSubaccountUpdate{ subaccountUpdate, @@ -824,15 +825,16 @@ func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingMa return k.streamingManager } -// SendSubaccountUpdates sends the subaccount updates to the gRPC streaming manager. -func (k Keeper) SendSubaccountUpdates( +// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager. +func (k Keeper) SendFinalizedSubaccountUpdates( ctx sdk.Context, subaccountUpdates []types.StreamSubaccountUpdate, ) { + lib.AssertDeliverTxMode(ctx) if len(subaccountUpdates) == 0 { return } - k.GetFullNodeStreamingManager().SendSubaccountUpdates( + k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates( subaccountUpdates, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), diff --git a/protocol/x/subaccounts/types/types.go b/protocol/x/subaccounts/types/types.go index 3effca3878..cbccc9d2b9 100644 --- a/protocol/x/subaccounts/types/types.go +++ b/protocol/x/subaccounts/types/types.go @@ -77,7 +77,7 @@ type SubaccountsKeeper interface { perpetualId uint32, blockHeight uint32, ) error - SendSubaccountUpdates( + SendFinalizedSubaccountUpdates( ctx sdk.Context, subaccountUpdates []StreamSubaccountUpdate, )