Skip to content

Commit

Permalink
wip recurring snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Aug 12, 2024
1 parent 6e5673e commit 6a39c22
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/protocol-build-and-push-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: Protocol Build & Push Image to AWS ECR
on: # yamllint disable-line rule:truthy
push:
branches:
- jonfung/**
- main
- 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/protocol-build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: Protocol Build & Push Image to AWS ECR
on: # yamllint disable-line rule:truthy
push:
branches:
- jonfung/**
- main
- 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x
Expand Down
2 changes: 2 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,8 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
// TODO appflag this.
50,
)
}
return streaming.NewNoopGrpcStreamingManager()
Expand Down
30 changes: 27 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package streaming

import (
"fmt"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"sync"
"time"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand Down Expand Up @@ -44,14 +45,18 @@ type FullNodeStreamingManagerImpl struct {

maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32

// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
subscriptionId uint32

// Initialize the subscription with orderbook snapshots.
initialize sync.Once
initialize *sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
Expand All @@ -64,13 +69,18 @@ type OrderbookSubscription struct {

// Channel to buffer writes before the stream
updatesChannel chan []clobtypes.StreamUpdate

// Block height initial snapshot was emitted at modulo snapshotBlockInterval.
// Used to keep position of subsequent snapshots.
snapshotDelayBlock uint32
}

func NewFullNodeStreamingManager(
logger log.Logger,
flushIntervalMs uint32,
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
) *FullNodeStreamingManagerImpl {
logger = logger.With(log.ModuleKey, "full-node-streaming")
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
Expand All @@ -87,6 +97,7 @@ func NewFullNodeStreamingManager(

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -149,6 +160,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
initialize: &sync.Once{},
clobPairIds: clobPairIds,
subaccountIds: sIds,
messageSender: messageSender,
Expand Down Expand Up @@ -674,7 +686,16 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
sm.FlushStreamUpdatesWithLock()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)

for subscriptionId, subscription := range sm.orderbookSubscriptions {
// If the snapshot block interval is enabled, reset the sync.Once in order to
// re-send snapshots out.
if sm.snapshotBlockInterval > 0 &&
(blockHeight+subscription.snapshotDelayBlock)%uint32(sm.snapshotBlockInterval) == 0 {
subscription.initialize = &sync.Once{}
sm.logger.Info(fmt.Sprintf("resetting sync once for sub id %+v", subscriptionId))
}

subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
Expand All @@ -688,8 +709,11 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
for _, subaccountId := range subscription.subaccountIds {
saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId))
}

sm.logger.Info(fmt.Sprintf("Sending out snapshot for %+v", subscriptionId))
sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)
if sm.snapshotBlockInterval != 0 {
subscription.snapshotDelayBlock = blockHeight % uint32(sm.snapshotBlockInterval)
}
},
)
}
Expand Down

0 comments on commit 6a39c22

Please sign in to comment.