Skip to content

Commit

Permalink
[CT-1178] Only emit finalized subaccount update (#2181)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding authored and jonfung-dydx committed Sep 17, 2024
1 parent 5ccf033 commit 49953b8
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 108 deletions.
26 changes: 13 additions & 13 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,19 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendSubaccountUpdatesLatency = "grpc_send_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
8 changes: 4 additions & 4 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 73 additions & 31 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streaming
import (
"fmt"
"sync"
"sync/atomic"
"time"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
Expand Down Expand Up @@ -55,8 +56,8 @@ type FullNodeStreamingManagerImpl struct {
type OrderbookSubscription struct {
subscriptionId uint32

// Initialize the subscription with orderbook snapshots.
initialize *sync.Once
// Whether the subscription is initialized with snapshot.
initialized *atomic.Bool

// Clob pair ids to subscribe to.
clobPairIds []uint32
Expand All @@ -75,6 +76,10 @@ type OrderbookSubscription struct {
nextSnapshotBlock uint32
}

func (sub *OrderbookSubscription) IsInitialized() bool {
return sub.initialized.Load()
}

func NewFullNodeStreamingManager(
logger log.Logger,
flushIntervalMs uint32,
Expand Down Expand Up @@ -159,7 +164,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
initialize: &sync.Once{},
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
messageSender: messageSender,
Expand Down Expand Up @@ -511,19 +516,23 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
)
}

// SendSubaccountUpdates groups subaccount updates by their subaccount ids and
// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdates(
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendSubaccountUpdatesLatency,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

// Group subaccount updates by subaccount id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
subaccountIds := make([]*satypes.SubaccountId, 0)
Expand Down Expand Up @@ -670,9 +679,33 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
sm.EmitMetrics()
}

func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
sm.Lock()
defer sm.Unlock()

ret := make(map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate)
for _, subscription := range sm.orderbookSubscriptions {
// If the subscription has been initialized, no need to grab the subaccount snapshot.
if alreadyInitialized := subscription.initialized.Load(); alreadyInitialized {
continue
}

for _, subaccountId := range subscription.subaccountIds {
if _, exists := ret[subaccountId]; exists {
continue
}

ret[subaccountId] = getSubaccountSnapshot(subaccountId)
}
}
return ret
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand All @@ -686,31 +719,40 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)

for subscriptionId, subscription := range sm.orderbookSubscriptions {
// If the snapshot block interval is enabled, reset the sync.Once in order to
// re-send snapshots out.
if sm.snapshotBlockInterval > 0 &&
blockHeight == subscription.nextSnapshotBlock {
subscription.initialize = &sync.Once{}
}

subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
for _, clobPairId := range subscription.clobPairIds {
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId))
}
allUpdates.Append(updatesByClobPairId[clobPairId])
}
saUpdates := []*satypes.StreamSubaccountUpdate{}
for _, subaccountId := range subscription.subaccountIds {
saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId))
if alreadyInitialized := subscription.initialized.Swap(true); !alreadyInitialized {
allUpdates := clobtypes.NewOffchainUpdates()
for _, clobPairId := range subscription.clobPairIds {
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId))
}
sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)
if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
allUpdates.Append(updatesByClobPairId[clobPairId])
}

saUpdates := []*satypes.StreamSubaccountUpdate{}
for _, subaccountId := range subscription.subaccountIds {
// The subaccount snapshot may not exist due to the following race condition
// 1. At beginning of PrepareCheckState we get snapshot for all subscribed subaccounts.
// 2. A new subaccount is subscribed to by a new subscription.
// 3. InitializeNewStreams is called.
// Then the new subaccount would not be included in the snapshot.
// We are okay with this behavior.
if saUpdate, ok := subaccountSnapshots[subaccountId]; ok {
saUpdates = append(saUpdates, saUpdate)
}
},
)
}

sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)

if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
}
}

// If the snapshot block interval is enabled and the next block is a snapshot block,
// reset the `atomic.Bool` so snapshots are sent for the next block.
if sm.snapshotBlockInterval > 0 &&
blockHeight+1 == subscription.nextSnapshotBlock {
subscription.initialized = &atomic.Bool{} // False by default.
}
}
}
10 changes: 8 additions & 2 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
) {
}

func (sm *NoopGrpcStreamingManager) SendSubaccountUpdates(
func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
Expand All @@ -62,9 +62,15 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId)
return false
}

func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
return nil
}

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand Down
7 changes: 5 additions & 2 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ type FullNodeStreamingManager interface {
// L3+ Orderbook updates.
InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
Expand All @@ -42,7 +45,7 @@ type FullNodeStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdates(
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
Expand Down
16 changes: 15 additions & 1 deletion protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/keeper"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// PreBlocker executes all ABCI PreBlock logic respective to the clob module.
Expand Down Expand Up @@ -123,6 +124,15 @@ func PrepareCheckState(
log.BlockHeight, ctx.BlockHeight()+1,
)

// We just committed block `h`, preparing `CheckState` of `h+1`
// Before we modify the `CheckState`, we first take the snapshot of
// the subscribed subaccounts at the end of block `h`. This we send finalized state of
// the subaccounts below in `InitializeNewStreams`.
var subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
if keeper.GetFullNodeStreamingManager().Enabled() {
subaccountSnapshots = keeper.GetSubaccountSnapshotsForInitStreams(ctx)
}

// Prune any rate limiting information that is no longer relevant.
keeper.PruneRateLimits(ctx)

Expand Down Expand Up @@ -239,7 +249,11 @@ func PrepareCheckState(
)

// Initialize new streams with orderbook snapshots, if any.
keeper.InitializeNewStreams(ctx)
keeper.InitializeNewStreams(
ctx,
// Use the subaccount snapshot at the top of function to initialize the streams.
subaccountSnapshots,
)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
Expand Down
36 changes: 26 additions & 10 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package keeper
import (
"errors"
"fmt"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"sync/atomic"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
Expand Down Expand Up @@ -251,9 +252,31 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
k.antehandler = anteHandler
}

func (k Keeper) GetSubaccountSnapshotsForInitStreams(
ctx sdk.Context,
) (
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
) {
lib.AssertCheckTxMode(ctx)

return k.GetFullNodeStreamingManager().GetSubaccountSnapshotsForInitStreams(
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate {
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(
ctx,
subaccountId,
true,
)
return &subaccountUpdate
},
)
}

// InitializeNewStreams initializes new streams for all uninitialized clob pairs
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
func (k Keeper) InitializeNewStreams(
ctx sdk.Context,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
) {
streamingManager := k.GetFullNodeStreamingManager()

streamingManager.InitializeNewStreams(
Expand All @@ -263,14 +286,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
clobPairId,
)
},
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate {
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(
ctx,
subaccountId,
true,
)
return &subaccountUpdate
},
subaccountSnapshots,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
Expand Down
Loading

0 comments on commit 49953b8

Please sign in to comment.