Skip to content

Commit

Permalink
Purge old instance messages from WAL
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Sztandera <oss@kubuxu.com>
  • Loading branch information
Kubuxu committed Sep 18, 2024
1 parent 8d801e4 commit c08c7c1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (m *F3) resumeInternal(ctx context.Context) error {

if runner, err := newRunner(
ctx, m.cs, m.ps, m.pubsub, m.verifier,
m.outboundMessages, m.manifest,
m.outboundMessages, m.manifest, m.wal,
); err != nil {
return err
} else {
Expand Down
8 changes: 8 additions & 0 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/internal/writeaheadlog"
"github.com/filecoin-project/go-f3/manifest"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -33,6 +34,7 @@ type gpbftRunner struct {
pubsub *pubsub.PubSub
clock clock.Clock
verifier gpbft.Verifier
wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry]
outMessages chan<- *gpbft.MessageBuilder

participant *gpbft.Participant
Expand All @@ -53,6 +55,7 @@ func newRunner(
verifier gpbft.Verifier,
out chan<- *gpbft.MessageBuilder,
m *manifest.Manifest,
wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry],
) (*gpbftRunner, error) {
runningCtx, ctxCancel := context.WithCancel(context.WithoutCancel(ctx))
errgrp, runningCtx := errgroup.WithContext(runningCtx)
Expand Down Expand Up @@ -197,6 +200,11 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
// will not impact the selection of next instance chain.
log.Error(fmt.Errorf("error while finalizing decision at EC: %w", err))
}
const keepInstancesInWAL = 5
if cert.GPBFTInstance > keepInstancesInWAL {
err := h.wal.Purge(cert.GPBFTInstance - keepInstancesInWAL)
log.Error("purging messages from WAL: %+v", err)
}
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/writeaheadlog/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (wal *WriteAheadLog[T, PT]) Purge(keepEpoch uint64) error {
return nil
}

const roatateAt = 16 << 20 // 16MiB
const roatateAt = 1 << 20 // 1MiB

func (wal *WriteAheadLog[T, PT]) maybeRotate() error {
if wal.active.file == nil {
Expand Down

0 comments on commit c08c7c1

Please sign in to comment.