Skip to content

Commit

Permalink
Merge pull request #136 from SiaFoundation/nate/sector-root-optimizat…
Browse files Browse the repository at this point in the history
…ions

Sector root caching
  • Loading branch information
n8maninger committed Aug 4, 2023
2 parents d3538d1 + 80d8849 commit c59c11a
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 83 deletions.
9 changes: 7 additions & 2 deletions host/contracts/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -150,8 +151,9 @@ type (
store ContractStore
log *zap.Logger

once sync.Once
done func() // done is called when the updater is closed.
rootsCache *lru.TwoQueueCache[types.FileContractID, []types.Hash256] // reference to the cache in the contract manager
once sync.Once
done func() // done is called when the updater is closed.

sectors uint64
contractID types.FileContractID
Expand Down Expand Up @@ -333,11 +335,14 @@ func (cu *ContractUpdater) Commit(revision SignedRevision, usage Usage) error {
}

start := time.Now()
// revise the contract
err := cu.store.ReviseContract(revision, usage, cu.sectors, cu.sectorActions)
if err == nil {
// clear the committed sector actions
cu.sectorActions = cu.sectorActions[:0]
}
// update the roots cache
cu.rootsCache.Add(revision.Revision.ParentID, cu.sectorRoots[:])
cu.log.Debug("contract update committed", zap.String("contractID", revision.Revision.ParentID.String()), zap.Uint64("revision", revision.Revision.RevisionNumber), zap.Duration("elapsed", time.Since(start)))
return err
}
170 changes: 170 additions & 0 deletions host/contracts/contracts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package contracts_test

import (
"path/filepath"
"testing"

rhp2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/hostd/host/alerts"
"go.sia.tech/hostd/host/contracts"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/internal/test"
"go.sia.tech/hostd/persist/sqlite"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)

func TestContractUpdater(t *testing.T) {
const sectors = 256
hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))
renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))
dir := t.TempDir()

log := zaptest.NewLogger(t)
db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite"))
if err != nil {
t.Fatal(err)
}
defer db.Close()

node, err := test.NewWallet(hostKey, t.TempDir(), log.Named("wallet"))
if err != nil {
t.Fatal(err)
}
defer node.Close()

am := alerts.NewManager()
s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize)
if err != nil {
t.Fatal(err)
}
defer s.Close()

// create a fake volume so disk space is not used
id, err := db.AddVolume("test", false)
if err != nil {
t.Fatal(err)
} else if err := db.GrowVolume(id, sectors); err != nil {
t.Fatal(err)
} else if err := db.SetAvailable(id, true); err != nil {
t.Fatal(err)
}

c, err := contracts.NewManager(db, am, s, node.ChainManager(), node.TPool(), node, log.Named("contracts"))
if err != nil {
t.Fatal(err)
}
defer c.Close()

contractUnlockConditions := types.UnlockConditions{
PublicKeys: []types.UnlockKey{
renterKey.PublicKey().UnlockKey(),
hostKey.PublicKey().UnlockKey(),
},
SignaturesRequired: 2,
}
rev := contracts.SignedRevision{
Revision: types.FileContractRevision{
FileContract: types.FileContract{
UnlockHash: types.Hash256(contractUnlockConditions.UnlockHash()),
WindowStart: 100,
WindowEnd: 200,
},
ParentID: frand.Entropy256(),
UnlockConditions: contractUnlockConditions,
},
}

if err := c.AddContract(rev, []types.Transaction{}, types.ZeroCurrency, contracts.Usage{}); err != nil {
t.Fatal(err)
}

var roots []types.Hash256

tests := []struct {
name string
append int
swap [][2]uint64
trim uint64
}{
{
name: "single root",
append: 1,
},
{
name: "multiple roots",
append: 100,
},
{
name: "swap roots",
swap: [][2]uint64{{0, 1}, {2, 3}, {4, 5}, {0, 100}},
},
{
name: "trim roots",
trim: 3,
},
{
name: "append, swap, trim",
append: 1,
swap: [][2]uint64{{50, 68}},
trim: 10,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updater, err := c.ReviseContract(rev.Revision.ParentID)
if err != nil {
t.Fatal(err)
}
defer updater.Close()

for i := 0; i < test.append; i++ {
root := frand.Entropy256()
release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil })
if err != nil {
t.Fatal(err)
}
defer release()
updater.AppendSector(root)
roots = append(roots, root)
}

for _, swap := range test.swap {
if err := updater.SwapSectors(swap[0], swap[1]); err != nil {
t.Fatal(err)
}
roots[swap[0]], roots[swap[1]] = roots[swap[1]], roots[swap[0]]
}

if err := updater.TrimSectors(test.trim); err != nil {
t.Fatal(err)
}
roots = roots[:len(roots)-int(test.trim)]

if updater.MerkleRoot() != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root")
} else if err := updater.Commit(rev, contracts.Usage{}); err != nil {
t.Fatal(err)
} else if err := updater.Close(); err != nil {
t.Fatal(err)
}

// check that the sector roots are correct in the database
dbRoots, err := db.SectorRoots(rev.Revision.ParentID)
if err != nil {
t.Fatal(err)
} else if rhp2.MetaRoot(dbRoots) != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root in database")
}
// check that the cache sector roots are correct
cachedRoots, err := c.SectorRoots(rev.Revision.ParentID, 0, 0)
if err != nil {
t.Fatal(err)
} else if rhp2.MetaRoot(cachedRoots) != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root in cache")
}
})
}
}
2 changes: 1 addition & 1 deletion host/contracts/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (cm *ContractManager) CheckIntegrity(ctx context.Context, contractID types.

expectedRoots := contract.Revision.Filesize / rhpv2.SectorSize

roots, err := cm.store.SectorRoots(contractID, 0, 0)
roots, err := cm.getSectorRoots(contractID, 0, 0)
if err != nil {
return nil, 0, fmt.Errorf("failed to get sector roots: %w", err)
} else if uint64(len(roots)) != expectedRoots {
Expand Down
54 changes: 51 additions & 3 deletions host/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"gitlab.com/NebulousLabs/encoding"
"go.sia.tech/core/consensus"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand All @@ -21,6 +22,12 @@ import (
"go.uber.org/zap"
)

// sectorRootCacheSize is the number of contracts' sector roots to cache.
// Caching prevents frequently updated contracts from continuously hitting the
// DB. This is left as a hard-coded small value to limit memory usage since
// contracts can contain any number of sector roots
const sectorRootCacheSize = 30

type (
contractChange struct {
id types.FileContractID
Expand Down Expand Up @@ -82,11 +89,45 @@ type (

processQueue chan uint64 // signals that the contract manager should process actions for a given block height

// caches the sector roots of contracts to avoid hitting the DB
// for frequently accessed contracts. The cache is limited to a
// small number of contracts to limit memory usage.
rootsCache *lru.TwoQueueCache[types.FileContractID, []types.Hash256]

mu sync.Mutex // guards the following fields
locks map[types.FileContractID]*locker // contracts must be locked while they are being modified
}
)

func (cm *ContractManager) getSectorRoots(id types.FileContractID, limit, offset int) ([]types.Hash256, error) {
// check the cache first
if roots, ok := cm.rootsCache.Get(id); ok {
if limit == 0 {
limit = len(roots)
}

if offset+limit > len(roots) {
return nil, errors.New("offset + limit exceeds length of roots")
}

// copy the roots into a new slice to avoid returning a slice of the
// cache's internal array
r := make([]types.Hash256, limit)
copy(r, roots[offset:offset+limit])
return r, nil
}

// if the cache doesn't have the roots, read them from the store
roots, err := cm.store.SectorRoots(id)
if err != nil {
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}
// add the roots to the cache
cm.rootsCache.Add(id, roots)
// return the requested roots
return roots[offset : offset+limit], nil
}

// Lock locks a contract for modification.
func (cm *ContractManager) Lock(ctx context.Context, id types.FileContractID) (SignedRevision, error) {
ctx, cancel, err := cm.tg.AddContext(ctx)
Expand Down Expand Up @@ -201,14 +242,14 @@ func (cm *ContractManager) RenewContract(renewal SignedRevision, existing Signed
}

// SectorRoots returns the roots of all sectors stored by the contract.
func (cm *ContractManager) SectorRoots(id types.FileContractID, limit, offset uint64) ([]types.Hash256, error) {
func (cm *ContractManager) SectorRoots(id types.FileContractID, limit, offset int) ([]types.Hash256, error) {
done, err := cm.tg.Add()
if err != nil {
return nil, err
}
defer done()

return cm.store.SectorRoots(id, limit, offset)
return cm.getSectorRoots(id, limit, offset)
}

// ProcessConsensusChange applies a block update to the contract manager.
Expand Down Expand Up @@ -413,14 +454,15 @@ func (cm *ContractManager) ReviseContract(contractID types.FileContractID) (*Con
return nil, err
}

roots, err := cm.store.SectorRoots(contractID, 0, 0)
roots, err := cm.getSectorRoots(contractID, 0, 0)
if err != nil {
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}
return &ContractUpdater{
store: cm.store,
log: cm.log.Named("contractUpdater"),

rootsCache: cm.rootsCache,
contractID: contractID,
sectors: uint64(len(roots)),
sectorRoots: roots,
Expand Down Expand Up @@ -460,6 +502,10 @@ func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) {

// NewManager creates a new contract manager.
func NewManager(store ContractStore, alerts Alerts, storage StorageManager, c ChainManager, tpool TransactionPool, wallet Wallet, log *zap.Logger) (*ContractManager, error) {
cache, err := lru.New2Q[types.FileContractID, []types.Hash256](sectorRootCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create cache: %w", err)
}
cm := &ContractManager{
store: store,
tg: threadgroup.New(),
Expand All @@ -470,6 +516,8 @@ func NewManager(store ContractStore, alerts Alerts, storage StorageManager, c Ch
tpool: tpool,
wallet: wallet,

rootsCache: cache,

processQueue: make(chan uint64, 100),
locks: make(map[types.FileContractID]*locker),
}
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type (
RenewContract(renewal SignedRevision, existing SignedRevision, formationSet []types.Transaction, lockedCollateral types.Currency, clearingUsage, initialUsage Usage, negotationHeight uint64) error
// SectorRoots returns the sector roots for a contract. If limit is 0, all roots
// are returned.
SectorRoots(id types.FileContractID, limit, offset uint64) ([]types.Hash256, error)
SectorRoots(id types.FileContractID) ([]types.Hash256, error)
// ContractAction calls contractFn on every contract in the store that
// needs a lifecycle action performed.
ContractAction(height uint64, contractFn func(types.FileContractID, uint64, string)) error
Expand Down
4 changes: 4 additions & 0 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,10 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhpv2.SectorSize]byte)
// AddTemporarySectors adds sectors to the temporary store. The sectors are not
// referenced by a contract and will be removed at the expiration height.
func (vm *VolumeManager) AddTemporarySectors(sectors []TempSector) error {
if len(sectors) == 0 {
return nil
}

done, err := vm.tg.Add()
if err != nil {
return err
Expand Down
Loading

0 comments on commit c59c11a

Please sign in to comment.