Skip to content

Commit

Permalink
fix(dot/state): change map of tries implementation to have working ga…
Browse files Browse the repository at this point in the history
…rbage collection (#2206)

* Remove `syncing` boolean field from `Storage`
* Replace tries `sync.Map` with mutex locked map
  • Loading branch information
qdm12 committed Jan 28, 2022
1 parent 6fb057e commit fada46b
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 69 deletions.
4 changes: 2 additions & 2 deletions dot/state/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func TestService_PruneStorage(t *testing.T) {
time.Sleep(1 * time.Second)

for _, v := range prunedArr {
_, has := serv.Storage.tries.Load(v.hash)
require.Equal(t, false, has)
tr := serv.Storage.tries.get(v.hash)
require.Nil(t, tr)
}
}

Expand Down
50 changes: 16 additions & 34 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func errTrieDoesNotExist(hash common.Hash) error {
// StorageState is the struct that holds the trie, db and lock
type StorageState struct {
blockState *BlockState
tries *sync.Map // map[common.Hash]*trie.Trie // map of root -> trie
tries *tries

db chaindb.Database
sync.RWMutex
Expand All @@ -39,7 +39,6 @@ type StorageState struct {
changedLock sync.RWMutex
observerList []Observer
pruner pruner.Pruner
syncing bool
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
Expand All @@ -53,8 +52,7 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,
return nil, fmt.Errorf("cannot have nil trie")
}

tries := new(sync.Map)
tries.Store(t.MustHash(), t)
tries := newTries(t)

storageTable := chaindb.NewTable(db, storagePrefix)

Expand All @@ -78,30 +76,16 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,
}, nil
}

// SetSyncing sets whether the node is currently syncing or not
func (s *StorageState) SetSyncing(syncing bool) {
s.syncing = syncing
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.Delete(keyHeader.StateRoot)
s.tries.delete(keyHeader.StateRoot)
}

// StoreTrie stores the given trie in the StorageState and writes it to the database
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) error {
root := ts.MustRoot()

if s.syncing {
// keep only the trie at the head of the chain when syncing
// TODO: probably remove this when memory usage improves (#1494)
s.tries.Range(func(k, _ interface{}) bool {
s.tries.Delete(k)
return true
})
}

_, _ = s.tries.LoadOrStore(root, ts.Trie())
s.tries.softSet(root, ts.Trie())

if _, ok := s.pruner.(*pruner.FullNode); header == nil && ok {
return fmt.Errorf("block cannot be empty for Full node pruner")
Expand Down Expand Up @@ -142,20 +126,16 @@ func (s *StorageState) TrieState(root *common.Hash) (*rtstorage.TrieState, error
root = &sr
}

st, has := s.tries.Load(*root)
if !has {
t := s.tries.get(*root)
if t == nil {
var err error
st, err = s.LoadFromDB(*root)
t, err = s.LoadFromDB(*root)
if err != nil {
return nil, err
}

_, _ = s.tries.LoadOrStore(*root, st)
}

t := st.(*trie.Trie)

if has && t.MustHash() != *root {
s.tries.softSet(*root, t)
} else if t.MustHash() != *root {
panic("trie does not have expected root")
}

Expand All @@ -177,7 +157,7 @@ func (s *StorageState) LoadFromDB(root common.Hash) (*trie.Trie, error) {
return nil, err
}

_, _ = s.tries.LoadOrStore(t.MustHash(), t)
s.tries.softSet(t.MustHash(), t)
return t, nil
}

Expand All @@ -190,8 +170,9 @@ func (s *StorageState) loadTrie(root *common.Hash) (*trie.Trie, error) {
root = &sr
}

if t, has := s.tries.Load(*root); has && t != nil {
return t.(*trie.Trie), nil
t := s.tries.get(*root)
if t != nil {
return t, nil
}

tr, err := s.LoadFromDB(*root)
Expand Down Expand Up @@ -220,8 +201,9 @@ func (s *StorageState) GetStorage(root *common.Hash, key []byte) ([]byte, error)
root = &sr
}

if t, has := s.tries.Load(*root); has {
val := t.(*trie.Trie).Get(key)
t := s.tries.get(*root)
if t != nil {
val := t.Get(key)
return val, nil
}

Expand Down
38 changes: 6 additions & 32 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package state

import (
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -99,7 +98,7 @@ func TestStorage_TrieState(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// get trie from db
storage.tries.Delete(root)
storage.tries.delete(root)
ts3, err := storage.TrieState(&root)
require.NoError(t, err)
require.Equal(t, ts.Trie().MustHash(), ts3.Trie().MustHash())
Expand Down Expand Up @@ -131,49 +130,25 @@ func TestStorage_LoadFromDB(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.Delete(root)
storage.tries.delete(root)

data, err := storage.GetStorage(&root, trieKV[0].key)
require.NoError(t, err)
require.Equal(t, trieKV[0].value, data)

storage.tries.Delete(root)
storage.tries.delete(root)

prefixKeys, err := storage.GetKeysWithPrefix(&root, []byte("ke"))
require.NoError(t, err)
require.Equal(t, 2, len(prefixKeys))

storage.tries.Delete(root)
storage.tries.delete(root)

entries, err := storage.Entries(&root)
require.NoError(t, err)
require.Equal(t, 3, len(entries))
}

func syncMapLen(m *sync.Map) int {
l := 0
m.Range(func(_, _ interface{}) bool {
l++
return true
})
return l
}

func TestStorage_StoreTrie_Syncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(true)
err = storage.StoreTrie(ts, nil)
require.NoError(t, err)
require.Equal(t, 1, syncMapLen(storage.tries))
}

func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
Expand All @@ -183,10 +158,9 @@ func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(false)
err = storage.StoreTrie(ts, nil)
require.NoError(t, err)
require.Equal(t, 2, syncMapLen(storage.tries))
require.Equal(t, 2, storage.tries.len())
}

func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
Expand Down Expand Up @@ -233,7 +207,7 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.Delete(rootHash)
storage.tries.delete(rootHash)

_, err = storage.GetStorageChild(&rootHash, []byte("keyToChild"))
require.NoError(t, err)
Expand Down
60 changes: 60 additions & 0 deletions dot/state/tries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package state

import (
"sync"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/trie"
)

type tries struct {
rootToTrie map[common.Hash]*trie.Trie
mapMutex sync.RWMutex
}

func newTries(t *trie.Trie) *tries {
return &tries{
rootToTrie: map[common.Hash]*trie.Trie{
t.MustHash(): t,
},
}
}

// softSet sets the given trie at the given root hash
// in the memory map only if it is not already set.
func (t *tries) softSet(root common.Hash, trie *trie.Trie) {
t.mapMutex.Lock()
defer t.mapMutex.Unlock()

_, has := t.rootToTrie[root]
if has {
return
}

t.rootToTrie[root] = trie
}

func (t *tries) delete(root common.Hash) {
t.mapMutex.Lock()
defer t.mapMutex.Unlock()
delete(t.rootToTrie, root)
}

// get retrieves the trie corresponding to the root hash given
// from the in-memory thread safe map.
func (t *tries) get(root common.Hash) (tr *trie.Trie) {
t.mapMutex.RLock()
defer t.mapMutex.RUnlock()
return t.rootToTrie[root]
}

// len returns the current numbers of tries
// stored in the in-memory map.
func (t *tries) len() int {
t.mapMutex.RLock()
defer t.mapMutex.RUnlock()
return len(t.rootToTrie)
}
Loading

0 comments on commit fada46b

Please sign in to comment.