Skip to content

Commit

Permalink
Stage FinalizeBlock events and emit in Precommit
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 13, 2024
1 parent 759498f commit 976ba07
Show file tree
Hide file tree
Showing 14 changed files with 816 additions and 150 deletions.
67 changes: 67 additions & 0 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,18 @@ export interface StreamUpdateSDKType {
taker_order?: StreamTakerOrderSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEvent {
orderFill?: StreamOrderbookFill;
subaccountUpdate?: StreamSubaccountUpdate;
}
/** StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`. */

export interface StagedFinalizeBlockEventSDKType {
order_fill?: StreamOrderbookFillSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
* the full node GRPC stream.
Expand Down Expand Up @@ -1396,6 +1408,61 @@ export const StreamUpdate = {

};

function createBaseStagedFinalizeBlockEvent(): StagedFinalizeBlockEvent {
return {
orderFill: undefined,
subaccountUpdate: undefined
};
}

export const StagedFinalizeBlockEvent = {
encode(message: StagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(10).fork()).ldelim();
}

if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(18).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StagedFinalizeBlockEvent {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStagedFinalizeBlockEvent();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 2:
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StagedFinalizeBlockEvent>): StagedFinalizeBlockEvent {
const message = createBaseStagedFinalizeBlockEvent();
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
return message;
}

};

function createBaseStreamOrderbookUpdate(): StreamOrderbookUpdate {
return {
snapshot: false,
Expand Down
9 changes: 9 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,15 @@ message StreamUpdate {
}
}

// StagedFinalizeBlockEvent is an event staged during `FinalizeBlock`.
message StagedFinalizeBlockEvent {
// Contains one of StreamOrderbookFill, StreamSubaccountUpdate.
oneof event {
StreamOrderbookFill order_fill = 1;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 2;
}
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
// the full node GRPC stream.
message StreamOrderbookUpdate {
Expand Down
4 changes: 4 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func New(
statsmoduletypes.TransientStoreKey,
rewardsmoduletypes.TransientStoreKey,
indexer_manager.TransientStoreKey,
streaming.StreamingManagerTransientStoreKey,
perpetualsmoduletypes.TransientStoreKey,
)
memKeys := storetypes.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, clobmoduletypes.MemStoreKey)
Expand Down Expand Up @@ -764,6 +765,7 @@ func New(
appFlags,
appCodec,
logger,
tkeys[streaming.StreamingManagerTransientStoreKey],
)

timeProvider := &timelib.TimeProviderImpl{}
Expand Down Expand Up @@ -2059,6 +2061,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
cdc codec.Codec,
logger log.Logger,
streamingManagerTransientStoreKey storetypes.StoreKey,
) (manager streamingtypes.FullNodeStreamingManager, wsServer *ws.WebsocketServer) {
logger = logger.With(log.ModuleKey, "full-node-streaming")
if appFlags.GrpcStreamingEnabled {
Expand All @@ -2072,6 +2075,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
)

// Start websocket server.
Expand Down
1 change: 1 addition & 0 deletions protocol/lib/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ const (
UpdateType = "update_type"
ValidateMatches = "validate_matches"
ValidateOrder = "validate_order"
StreamBatchUpdatesAfterFinalizeBlock = "stream_batch_updates_after_finalize_block"

// MemCLOB.
AddedToOrderBook = "added_to_orderbook"
Expand Down
3 changes: 3 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdates = "grpc_staged_all_finalize_block_updates"
GrpcStagedFillFinalizeBlockUpdates = "grpc_staged_finalize_block_fill_updates"
GrpcStagedSubaccountFinalizeBlockUpdates = "grpc_staged_finalize_block_subaccount_updates"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
12 changes: 12 additions & 0 deletions protocol/streaming/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package streaming

const (
// Transient store key for storing staged events.
StreamingManagerTransientStoreKey = "tmp_streaming"

// Key for storing the count of staged events.
StagedEventsCountKey = "EvtCnt"

// Key prefix for staged events.
StagedEventsKeyPrefix = "Evt:"
)
170 changes: 170 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
Expand Down Expand Up @@ -50,6 +56,9 @@ type FullNodeStreamingManagerImpl struct {
// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -86,6 +95,7 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
streamingManagerTransientStoreKey storetypes.StoreKey,
) *FullNodeStreamingManagerImpl {
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
Expand All @@ -102,6 +112,8 @@ func NewFullNodeStreamingManager(
maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -367,6 +379,84 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Stage a subaccount update event in transient store, during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

// Stage a fill event in transient store, during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
)
}

func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
clobtypes.Amino.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEvents(store)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
// Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
Expand Down Expand Up @@ -703,6 +793,86 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
ctx sdk.Context,
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdates()

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

// TODO(CT-1190): Stream below in a single batch.
// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
// It should be called after the consensus agrees on a block (e.g. Precommitter).
func (sm *FullNodeStreamingManagerImpl) getStagedEventsFromFinalizeBlock(
ctx sdk.Context,
) (
finalizedFills []clobtypes.StreamOrderbookFill,
finalizedSubaccountUpdates []satypes.StreamSubaccountUpdate,
) {
// Get onchain stream events stored in transient store.
stagedEvents := sm.GetStagedFinalizeBlockEvents(ctx)

telemetry.SetGauge(
float32(len(stagedEvents)),
types.ModuleName,
metrics.GrpcStagedAllFinalizeBlockUpdates,
metrics.Count,
)

for _, stagedEvent := range stagedEvents {
switch event := stagedEvent.Event.(type) {
case *clobtypes.StagedFinalizeBlockEvent_OrderFill:
finalizedFills = append(finalizedFills, *event.OrderFill)
case *clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate:
finalizedSubaccountUpdates = append(finalizedSubaccountUpdates, *event.SubaccountUpdate)
}
}

telemetry.SetGauge(
float32(len(finalizedSubaccountUpdates)),
types.ModuleName,
metrics.GrpcStagedSubaccountFinalizeBlockUpdates,
metrics.Count,
)
telemetry.SetGauge(
float32(len(finalizedFills)),
types.ModuleName,
metrics.GrpcStagedFillFinalizeBlockUpdates,
metrics.Count,
)

return finalizedFills, finalizedSubaccountUpdates
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
Expand Down
Loading

0 comments on commit 976ba07

Please sign in to comment.