From c08c7c17ecb29a02c8726aa81f8763af10bc897a Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 18 Sep 2024 15:23:40 +0200 Subject: [PATCH] Purge old instance messages from WAL Signed-off-by: Jakub Sztandera --- f3.go | 2 +- host.go | 8 ++++++++ internal/writeaheadlog/wal.go | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/f3.go b/f3.go index 15e95fea..86aa3009 100644 --- a/f3.go +++ b/f3.go @@ -412,7 +412,7 @@ func (m *F3) resumeInternal(ctx context.Context) error { if runner, err := newRunner( ctx, m.cs, m.ps, m.pubsub, m.verifier, - m.outboundMessages, m.manifest, + m.outboundMessages, m.manifest, m.wal, ); err != nil { return err } else { diff --git a/host.go b/host.go index 1e31610c..e722436a 100644 --- a/host.go +++ b/host.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/internal/clock" "github.com/filecoin-project/go-f3/internal/psutil" + "github.com/filecoin-project/go-f3/internal/writeaheadlog" "github.com/filecoin-project/go-f3/manifest" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -33,6 +34,7 @@ type gpbftRunner struct { pubsub *pubsub.PubSub clock clock.Clock verifier gpbft.Verifier + wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry] outMessages chan<- *gpbft.MessageBuilder participant *gpbft.Participant @@ -53,6 +55,7 @@ func newRunner( verifier gpbft.Verifier, out chan<- *gpbft.MessageBuilder, m *manifest.Manifest, + wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry], ) (*gpbftRunner, error) { runningCtx, ctxCancel := context.WithCancel(context.WithoutCancel(ctx)) errgrp, runningCtx := errgroup.WithContext(runningCtx) @@ -197,6 +200,11 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) { // will not impact the selection of next instance chain. log.Error(fmt.Errorf("error while finalizing decision at EC: %w", err)) } + const keepInstancesInWAL = 5 + if cert.GPBFTInstance > keepInstancesInWAL { + err := h.wal.Purge(cert.GPBFTInstance - keepInstancesInWAL) + log.Error("purging messages from WAL: %+v", err) + } } } return nil diff --git a/internal/writeaheadlog/wal.go b/internal/writeaheadlog/wal.go index 30d5a421..e17c7562 100644 --- a/internal/writeaheadlog/wal.go +++ b/internal/writeaheadlog/wal.go @@ -137,7 +137,7 @@ func (wal *WriteAheadLog[T, PT]) Purge(keepEpoch uint64) error { return nil } -const roatateAt = 16 << 20 // 16MiB +const roatateAt = 1 << 20 // 1MiB func (wal *WriteAheadLog[T, PT]) maybeRotate() error { if wal.active.file == nil {