diff --git a/Makefile b/Makefile index adbf75dc6c..87b378b839 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,12 @@ it-polkadotjs: build test-state-race: @echo " > \033[32mRunning race tests...\033[0m " go test ./dot/state/... -race -timeout=5m + go test -run ^Test_hashToBlockMap_threadSafety$ -race -timeout=30s ./dot/state + go test -run ^Test_lockQueue_threadSafety$ -race -timeout=30s ./dot/sync + go test -run ^Test_Database_threadSafety$ -race -timeout=30s ./internal/database/badger + go test -run ^Test_Database_threadSafety$ -race -timeout=30s ./internal/database/memory + go test -run ^Test_hashToRuntime_threadSafety$ -race -timeout=30s ./lib/blocktree + go test -run ^Test_commitsTracker_threadSafety$ -race -timeout=30s ./lib/grandpa ## deps: Install missing dependencies. Runs `go mod download` internally. deps: diff --git a/go.mod b/go.mod index 7691b7857a..45d0f59c60 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/decred/base58 v1.0.3 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/dgraph-io/badger/v3 v3.2103.4 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -78,8 +79,10 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect github.com/gtank/ristretto255 v0.1.2 // indirect diff --git a/go.sum b/go.sum index 32661ca3e2..3bd230bfa8 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,8 @@ github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32y github.com/dgraph-io/badger/v2 v2.2007.3/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= +github.com/dgraph-io/badger/v3 v3.2103.4 h1:WE1B07YNTTJTtG9xjBcSW2wn0RJLyiV99h959RKZqM4= +github.com/dgraph-io/badger/v3 v3.2103.4/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= @@ -190,8 +192,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -218,6 +222,8 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -225,6 +231,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -630,6 +637,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -735,6 +743,7 @@ golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -811,12 +820,14 @@ google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20181029155118-b69ba1387ce2/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= google.golang.org/genproto v0.0.0-20190306203927-b5d61aea6440/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= diff --git a/internal/database/badger/database.go b/internal/database/badger/database.go new file mode 100644 index 0000000000..2395e6deac --- /dev/null +++ b/internal/database/badger/database.go @@ -0,0 +1,154 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +// Package badger provides a database implementation using badger v3. +package badger + +import ( + "context" + "errors" + "fmt" + + "github.com/ChainSafe/gossamer/internal/database" + badger "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/ristretto/z" +) + +// Database is database implementation using a badger/v3 database. +type Database struct { + badgerDatabase *badger.DB +} + +// New returns a new database based on a badger v3 database. +func New(settings Settings) (database *Database, err error) { + settings.SetDefaults() + err = settings.Validate() + if err != nil { + return nil, fmt.Errorf("validating settings: %w", err) + } + + badgerOptions := badger.DefaultOptions(*settings.Path) + badgerOptions = badgerOptions.WithLogger(nil) + badgerOptions = badgerOptions.WithInMemory(*settings.InMemory) + // TODO enable once we share the same instance + // See https://github.com/ChainSafe/gossamer/issues/2981 + // badgerOptions = badgerOptions.WithBypassLockGuard(true) + badgerDatabase, err := badger.Open(badgerOptions) + if err != nil { + return nil, fmt.Errorf("opening badger database: %w", err) + } + + return &Database{ + badgerDatabase: badgerDatabase, + }, nil +} + +// Get retrieves a value from the database using the given key. +// It returns the wrapped error `database.ErrKeyNotFound` if the +// key is not found. +func (db *Database) Get(key []byte) (value []byte, err error) { + err = db.badgerDatabase.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("getting item from transaction: %w", err) + } + + value, err = item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("copying value: %w", err) + } + + return nil + }) + + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, fmt.Errorf("%w: 0x%x", database.ErrKeyNotFound, key) + } + + return value, transformError(err) +} + +// Set sets a value at the given key in the database. +func (db *Database) Set(key, value []byte) (err error) { + err = db.badgerDatabase.Update(func(txn *badger.Txn) error { + return txn.Set(key, value) + }) + return transformError(err) +} + +// Delete deletes the given key from the database. +// If the key is not found, no error is returned. +func (db *Database) Delete(key []byte) (err error) { + err = db.badgerDatabase.Update(func(txn *badger.Txn) error { + return txn.Delete(key) + }) + return transformError(err) +} + +// NewWriteBatch returns a new write batch for the database. +func (db *Database) NewWriteBatch() (writeBatch database.WriteBatch) { + prefix := []byte(nil) + badgerWriteBatch := db.badgerDatabase.NewWriteBatch() + return newWriteBatch(prefix, badgerWriteBatch) +} + +// NewTable returns a new table using the database. +// All keys on the table will be prefixed with the given prefix. +func (db *Database) NewTable(prefix string) (dbTable database.Table) { + return &table{ + prefix: []byte(prefix), + database: db, + } +} + +// Stream streams data from the database to the `handle` +// function given. The `prefix` is used to filter the keys +// as well as the `chooseKey` function. Note the `prefix` +// argument is more performant than checking the prefix within +// the `chooseKey` function. +func (db *Database) Stream(ctx context.Context, + prefix []byte, + chooseKey func(key []byte) bool, + handle func(key, value []byte) error, +) error { + stream := db.badgerDatabase.NewStream() + + if prefix != nil { + stream.Prefix = make([]byte, len(prefix)) + copy(stream.Prefix, prefix) + } + + stream.ChooseKey = func(item *badger.Item) bool { + key := item.Key() + return chooseKey(key) + } + + stream.Send = func(buf *z.Buffer) (err error) { + kvList, err := badger.BufferToKVList(buf) + if err != nil { + return fmt.Errorf("decoding badger proto key value: %w", err) + } + + for _, keyValue := range kvList.Kv { + err = handle(keyValue.Key, keyValue.Value) + if err != nil { + return fmt.Errorf("handling key value: %w", err) + } + } + return nil + } + + return stream.Orchestrate(ctx) +} + +// Close closes the database. +func (db *Database) Close() (err error) { + err = db.badgerDatabase.Close() + return transformError(err) +} + +// DropAll drops all data from the database. +func (db *Database) DropAll() (err error) { + err = db.badgerDatabase.DropAll() + return transformError(err) +} diff --git a/internal/database/badger/database_test.go b/internal/database/badger/database_test.go new file mode 100644 index 0000000000..0aa23fab5b --- /dev/null +++ b/internal/database/badger/database_test.go @@ -0,0 +1,277 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "testing" + + "github.com/ChainSafe/gossamer/internal/database" + badger "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_New(t *testing.T) { + t.Parallel() + + settings := Settings{}.WithPath(t.TempDir()) + database, err := New(settings) + require.NoError(t, err) + + err = database.Close() + require.NoError(t, err) +} + +func Test_Database_Get(t *testing.T) { + t.Parallel() + + t.Run("get error", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + _, err = db.Get([]byte{}) + assert.ErrorIs(t, err, badger.ErrEmptyKey) + assert.EqualError(t, err, "getting item from transaction: Key cannot be empty") + }) + + t.Run("key not found", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + _, err = db.Get([]byte{1}) + assert.ErrorIs(t, err, database.ErrKeyNotFound) + assert.EqualError(t, err, "key not found: 0x01") + }) + + t.Run("key found", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + key := []byte{1} + value := []byte{2} + + err = db.badgerDatabase.Update(func(txn *badger.Txn) error { + return txn.Set(key, value) + }) + require.NoError(t, err) + + valueRetrieved, err := db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, value, valueRetrieved) + + // Check for mutation safety + value[0]++ + assert.NotEqual(t, value, valueRetrieved) + + valueRetrieved[0]++ + valueRetrievedAgain, err := db.Get([]byte{1}) + require.NoError(t, err) + assert.NotEqual(t, valueRetrieved, valueRetrievedAgain) + }) +} + +func Test_Database_Set(t *testing.T) { + t.Parallel() + + t.Run("set error", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.Set([]byte{}, []byte{2}) + assert.ErrorIs(t, err, badger.ErrEmptyKey) + assert.EqualError(t, err, "Key cannot be empty") + }) + + t.Run("set new key", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + + value, err := db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{2}, value) + }) + + t.Run("override at existing key", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.badgerDatabase.Update(func(txn *badger.Txn) error { + return txn.Set([]byte{1}, []byte{2}) + }) + require.NoError(t, err) + + value := []byte{3} + err = db.Set([]byte{1}, value) + require.NoError(t, err) + + valueRetrieved, err := db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{3}, valueRetrieved) + + // Check for mutation safety + value[0]++ + assert.NotEqual(t, value, valueRetrieved) + valueRetrieved, err = db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{3}, valueRetrieved) + }) +} + +func Test_Database_Delete(t *testing.T) { + t.Parallel() + + t.Run("delete error", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.Delete([]byte{}) + assert.ErrorIs(t, err, badger.ErrEmptyKey) + assert.EqualError(t, err, "Key cannot be empty") + }) + + t.Run("key not found", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.Delete([]byte{1}) + require.NoError(t, err) + }) + + t.Run("delete existing key", func(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + err = db.badgerDatabase.Update(func(txn *badger.Txn) error { + return txn.Set([]byte{1}, []byte{2}) + }) + require.NoError(t, err) + + err = db.Delete([]byte{1}) + require.NoError(t, err) + + _, err = db.Get([]byte{1}) + require.ErrorIs(t, err, database.ErrKeyNotFound) + }) +} + +func Test_Database_NewWriteBatch(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + writeBatch := db.NewWriteBatch() + + err = writeBatch.Set([]byte{1}, []byte{1}) + require.NoError(t, err) + writeBatch.Cancel() + + writeBatch = db.NewWriteBatch() + err = writeBatch.Set([]byte{2}, []byte{2}) + require.NoError(t, err) + err = writeBatch.Set([]byte{3}, []byte{3}) + require.NoError(t, err) + err = writeBatch.Delete([]byte{2}) + require.NoError(t, err) + err = writeBatch.Flush() + require.NoError(t, err) + + _, err = db.Get([]byte{1}) + require.ErrorIs(t, err, database.ErrKeyNotFound) + _, err = db.Get([]byte{2}) + require.ErrorIs(t, err, database.ErrKeyNotFound) + value, err := db.Get([]byte{3}) + require.NoError(t, err) + assert.Equal(t, []byte{3}, value) +} + +func Test_Database_NewTable(t *testing.T) { + t.Parallel() + + db, err := New(Settings{}.WithPath(t.TempDir())) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + const prefix = "prefix" + prefixBytes := []byte(prefix) + table := db.NewTable(prefix) + + writeBatch := table.NewWriteBatch() + err = writeBatch.Set([]byte{1}, []byte{1}) + require.NoError(t, err) + err = writeBatch.Set([]byte{2}, []byte{2}) + require.NoError(t, err) + err = writeBatch.Set([]byte{3}, []byte{3}) + require.NoError(t, err) + err = writeBatch.Delete([]byte{2}) + require.NoError(t, err) + err = writeBatch.Flush() + require.NoError(t, err) + + assertDBValue(t, db, append(prefixBytes, []byte{3}...), []byte{3}) +} diff --git a/internal/database/badger/helpers.go b/internal/database/badger/helpers.go new file mode 100644 index 0000000000..9689a51c48 --- /dev/null +++ b/internal/database/badger/helpers.go @@ -0,0 +1,35 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "errors" + "fmt" + + "github.com/ChainSafe/gossamer/internal/database" + badger "github.com/dgraph-io/badger/v3" +) + +func ptrTo[T any](value T) *T { return &value } + +func newPrefixedKey(prefix, key []byte) (prefixedKey []byte) { + // WARNING: Do not use: + // return append(prefix, key...) + // since the prefix might have a capacity larger than its length, + // and that would produce data corruption on prefixed keys pointing + // to the prefix underlying memory array. + prefixedKey = make([]byte, 0, len(prefix)+len(key)) + prefixedKey = append(prefixedKey, prefix...) + prefixedKey = append(prefixedKey, key...) + return prefixedKey +} + +// transformError transforms a badger error into a database error +// eventually, for errors defined in the parent database package. +func transformError(badgerErr error) (err error) { + if errors.Is(badgerErr, badger.ErrDBClosed) { + return fmt.Errorf("%w", database.ErrClosed) + } + return badgerErr +} diff --git a/internal/database/badger/helpers_test.go b/internal/database/badger/helpers_test.go new file mode 100644 index 0000000000..68473b5d35 --- /dev/null +++ b/internal/database/badger/helpers_test.go @@ -0,0 +1,61 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "fmt" + "testing" + + "github.com/ChainSafe/gossamer/internal/database" + badger "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func assertDBValue(t *testing.T, db *Database, key, expectedValue []byte) { + t.Helper() + + value, err := db.Get(key) + require.NoError(t, err) + + assert.Equal(t, expectedValue, value) +} + +func assertDBKeyNotFound(t *testing.T, db *Database, key []byte) { + t.Helper() + + _, err := db.Get(key) + assert.ErrorIs(t, err, database.ErrKeyNotFound) +} + +func logAllKeyValues(t *testing.T, badgerDB *badger.DB) { //nolint:unused,deadcode + t.Helper() + + keyValues := make(map[string][]byte) + err := badgerDB.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + iterator := txn.NewIterator(opts) + defer iterator.Close() + for iterator.Rewind(); iterator.Valid(); iterator.Next() { + item := iterator.Item() + key := item.Key() + err := item.Value(func(v []byte) error { + keyValues[string(key)] = v + return nil + }) + if err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + message := "Database contains the following key value pairs:\n" + for key, value := range keyValues { + keyBytes := []byte(key) + message += fmt.Sprintf(" 0x%x <-> 0x%x\n", keyBytes, value) + } + t.Log(message) +} diff --git a/internal/database/badger/race_test.go b/internal/database/badger/race_test.go new file mode 100644 index 0000000000..882ebf2370 --- /dev/null +++ b/internal/database/badger/race_test.go @@ -0,0 +1,78 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_Database_threadSafety(t *testing.T) { + // This test consists in checking for concurrent access + // using the -race detector. + t.Parallel() + + var startWg, endWg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + + const operations = 4 + const parallelism = 3 + const goroutines = parallelism * operations + startWg.Add(goroutines) + endWg.Add(goroutines) + + const testDuration = 50 * time.Millisecond + go func() { + timer := time.NewTimer(time.Hour) + startWg.Wait() + _ = timer.Reset(testDuration) + <-timer.C + cancel() + }() + + runInLoop := func(f func()) { + defer endWg.Done() + startWg.Done() + startWg.Wait() + for ctx.Err() == nil { + f() + } + } + + settings := Settings{ + InMemory: ptrTo(true), + } + database, err := New(settings) + require.NoError(t, err) + key := []byte{1} + value := []byte{2} + + for i := 0; i < parallelism; i++ { + go runInLoop(func() { + _, _ = database.Get(key) + }) + + go runInLoop(func() { + _ = database.Set(key, value) + }) + + go runInLoop(func() { + _ = database.Delete(key) + }) + + go runInLoop(func() { + batch := database.NewWriteBatch() + _ = batch.Set(key, value) + _ = batch.Delete(key) + _ = batch.Set(key, value) + _ = batch.Flush() + }) + } + + endWg.Wait() +} diff --git a/internal/database/badger/settings.go b/internal/database/badger/settings.go new file mode 100644 index 0000000000..781a80dc1f --- /dev/null +++ b/internal/database/badger/settings.go @@ -0,0 +1,66 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "errors" + "fmt" + "path/filepath" +) + +// Settings is the database settings. +type Settings struct { + // Path is the database directory path to use. + // Note it should be the empty string if InMemory is true. + // It defaults to the empty string if left unset. + Path *string + // InMemory is whether to use an in-memory database. + InMemory *bool +} + +// WithPath returns the settings with the path set. +// Note it does not modify the receiver settings. +func (s Settings) WithPath(path string) Settings { + s.Path = ptrTo(path) + return s +} + +// WithInMemory returns the settings with the in-memory flag set. +// Note it does not modify the receiver settings. +func (s Settings) WithInMemory(inMemory bool) Settings { + s.InMemory = ptrTo(inMemory) + return s +} + +// SetDefaults sets the default values on the settings. +func (s *Settings) SetDefaults() { + if s.Path == nil { + s.Path = ptrTo("") + } + + if s.InMemory == nil { + s.InMemory = ptrTo(false) + } +} + +var ( + ErrPathSetInMemory = errors.New("path set with database in-memory") +) + +// Validate validates the settings. +func (s Settings) Validate() (err error) { //skipcq: GO-W1029 + if *s.InMemory { + if *s.Path != "" { + // Note badger v3 enforces the path is not set in this case. + return fmt.Errorf("%w: %q", ErrPathSetInMemory, *s.Path) + } + } else { + _, err = filepath.Abs(*s.Path) + if err != nil { + return fmt.Errorf("changing path to absolute path: %w", err) + } + } + + return nil +} diff --git a/internal/database/badger/settings_test.go b/internal/database/badger/settings_test.go new file mode 100644 index 0000000000..a12d631ad3 --- /dev/null +++ b/internal/database/badger/settings_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Settings_SetDefaults(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + originalSettings Settings + expectedSettings Settings + }{ + "empty_settings": { + expectedSettings: Settings{ + Path: ptrTo(""), + InMemory: ptrTo(false), + }, + }, + "non-empty_settings": { + originalSettings: Settings{ + Path: ptrTo("x"), + InMemory: ptrTo(true), + }, + expectedSettings: Settings{ + Path: ptrTo("x"), + InMemory: ptrTo(true), + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + + t.Run(name, func(t *testing.T) { + t.Parallel() + + testCase.originalSettings.SetDefaults() + assert.Equal(t, testCase.expectedSettings, testCase.originalSettings) + }) + } +} + +func Test_Settings_Validate(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + settings Settings + errWrapped error + errMessage string + }{ + "path_set_in-memory": { + settings: Settings{ + Path: ptrTo("."), + InMemory: ptrTo(true), + }, + errWrapped: ErrPathSetInMemory, + errMessage: "path set with database in-memory: \".\"", + }, + // Note we cannot test for a bad path since we would + // need os.Getcwd() to fail. + "valid_settings": { + settings: Settings{ + Path: ptrTo("."), + InMemory: ptrTo(false), + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + + t.Run(name, func(t *testing.T) { + t.Parallel() + + err := testCase.settings.Validate() + + if testCase.errMessage == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, testCase.errMessage) + } + }) + } +} diff --git a/internal/database/badger/table.go b/internal/database/badger/table.go new file mode 100644 index 0000000000..c93fd50e3d --- /dev/null +++ b/internal/database/badger/table.go @@ -0,0 +1,43 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "github.com/ChainSafe/gossamer/internal/database" +) + +type table struct { + prefix []byte + database *Database +} + +// Get retrieves a value from the database using the given key +// prefixed with the table prefix. +// It returns the wrapped error `database.ErrKeyNotFound` if the +// prefixed key is not found. +func (t *table) Get(key []byte) (value []byte, err error) { + key = newPrefixedKey(t.prefix, key) + return t.database.Get(key) +} + +// Set sets a value at the given key prefixed with the table prefix +// in the database. +func (t *table) Set(key, value []byte) (err error) { + key = newPrefixedKey(t.prefix, key) + return t.database.Set(key, value) +} + +// Delete deletes the given key prefixed with the table prefix +// from the database. If the key is not found, no error is returned. +func (t *table) Delete(key []byte) (err error) { + key = newPrefixedKey(t.prefix, key) + return t.database.Delete(key) +} + +// NewWriteBatch returns a new write batch for the database, +// using the table prefix to prefix all keys. +func (t *table) NewWriteBatch() (writeBatch database.WriteBatch) { + badgerWriteBatch := t.database.badgerDatabase.NewWriteBatch() + return newWriteBatch(t.prefix, badgerWriteBatch) +} diff --git a/internal/database/badger/table_test.go b/internal/database/badger/table_test.go new file mode 100644 index 0000000000..00b24bafce --- /dev/null +++ b/internal/database/badger/table_test.go @@ -0,0 +1,74 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_table(t *testing.T) { + settings := Settings{}.WithPath(t.TempDir()) + db, err := New(settings) + require.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + require.NoError(t, err) + }) + + // Important: + // We use a variable instead of a constant for 'prefix' + // to have a byte slice of length 6 and capacity 8. + // See the `makePrefixedKey` function in `helpers.go`. + prefixString := "prefix" + prefix := []byte(prefixString) + dbTable := &table{ + prefix: prefix, + database: db, + } + + err = dbTable.Set([]byte{1}, []byte{1}) + require.NoError(t, err) + assertDBValue(t, db, newPrefixedKey(prefix, []byte{1}), []byte{1}) + + err = dbTable.Delete([]byte{1}) + require.NoError(t, err) + assertDBKeyNotFound(t, db, append(prefix, []byte{1}...)) + + err = dbTable.Set([]byte{2}, []byte{2}) + require.NoError(t, err) + assertDBValue(t, db, append(prefix, []byte{2}...), []byte{2}) + + value, err := dbTable.Get([]byte{2}) + require.NoError(t, err) + assert.Equal(t, []byte{2}, value) + + writeBatch := dbTable.NewWriteBatch() + + err = writeBatch.Set([]byte{3}, []byte{3}) + require.NoError(t, err) + assertDBKeyNotFound(t, db, append(prefix, []byte{3}...)) + + writeBatch.Cancel() + + writeBatch = dbTable.NewWriteBatch() + + err = writeBatch.Set([]byte{1}, []byte{1}) + require.NoError(t, err) + + err = writeBatch.Set([]byte{2}, []byte{2}) + require.NoError(t, err) + + err = writeBatch.Delete([]byte{1}) + require.NoError(t, err) + + err = writeBatch.Flush() + require.NoError(t, err) + + assertDBKeyNotFound(t, db, newPrefixedKey(prefix, []byte{1})) + assertDBValue(t, db, newPrefixedKey(prefix, []byte{2}), []byte{2}) + +} diff --git a/internal/database/badger/writebatch.go b/internal/database/badger/writebatch.go new file mode 100644 index 0000000000..e7a7674ec9 --- /dev/null +++ b/internal/database/badger/writebatch.go @@ -0,0 +1,48 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package badger + +import ( + badger "github.com/dgraph-io/badger/v3" +) + +// writeBatch uses the badger write batch and prefixes +// all keys with a certain given prefix. +type writeBatch struct { + prefix []byte + badgerWriteBatch *badger.WriteBatch +} + +func newWriteBatch(prefix []byte, badgerWriteBatch *badger.WriteBatch) *writeBatch { + return &writeBatch{ + prefix: prefix, + badgerWriteBatch: badgerWriteBatch, + } +} + +// Set sets a value at the given key prefixed with the given prefix. +func (wb *writeBatch) Set(key, value []byte) (err error) { + key = newPrefixedKey(wb.prefix, key) + err = wb.badgerWriteBatch.Set(key, value) + return transformError(err) +} + +// Delete deletes the given key prefixed with the table prefix +// from the database. +func (wb *writeBatch) Delete(key []byte) (err error) { + key = newPrefixedKey(wb.prefix, key) + err = wb.badgerWriteBatch.Delete(key) + return transformError(err) +} + +// Flush flushes the write batch to the database. +func (wb *writeBatch) Flush() (err error) { + err = wb.badgerWriteBatch.Flush() + return transformError(err) +} + +// Cancel cancels the write batch. +func (wb *writeBatch) Cancel() { + wb.badgerWriteBatch.Cancel() +} diff --git a/internal/database/database_test/databases_test.go b/internal/database/database_test/databases_test.go new file mode 100644 index 0000000000..a95b4fce06 --- /dev/null +++ b/internal/database/database_test/databases_test.go @@ -0,0 +1,147 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package test + +import ( + "bytes" + "context" + "testing" + + "github.com/ChainSafe/gossamer/internal/database" + "github.com/ChainSafe/gossamer/internal/database/badger" + "github.com/ChainSafe/gossamer/internal/database/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type Database interface { + Get(key []byte) (value []byte, err error) + Set(key, value []byte) error + Delete(key []byte) error + NewWriteBatch() database.WriteBatch + NewTable(prefix string) database.Table + Stream(ctx context.Context, prefix []byte, chooseKey func(key []byte) bool, + handle func(key []byte, value []byte) error) error + DropAll() error + Close() error +} + +func Test_Databases(t *testing.T) { + t.Parallel() + + databaseBuilders := []func() Database{ + func() Database { return memory.New() }, + func() Database { + settings := badger.Settings{}.WithInMemory(true) + database, err := badger.New(settings) + require.NoError(t, err) + return database + }, + } + + for _, databaseBuilder := range databaseBuilders { + db := databaseBuilder() + + err := db.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + + value, err := db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{2}, value) + + err = db.Delete([]byte{2}) + require.NoError(t, err) + + err = db.Delete([]byte{1}) + require.NoError(t, err) + + _, err = db.Get([]byte{1}) + require.ErrorIs(t, err, database.ErrKeyNotFound) + + err = db.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + + value, err = db.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{2}, value) + + batch := db.NewWriteBatch() + err = batch.Set([]byte{3}, []byte{4}) + require.NoError(t, err) + err = batch.Set([]byte{4}, []byte{5}) + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + value, err = db.Get([]byte{3}) + require.NoError(t, err) + assert.Equal(t, []byte{4}, value) + value, err = db.Get([]byte{4}) + require.NoError(t, err) + assert.Equal(t, []byte{5}, value) + + table := db.NewTable("x") + err = table.Set([]byte{1}, []byte{3}) + require.NoError(t, err) + value, err = table.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{3}, value) + value, err = db.Get([]byte("x\x01")) + require.NoError(t, err) + assert.Equal(t, []byte{3}, value) + + err = db.DropAll() + require.NoError(t, err) + + _, err = db.Get([]byte{1}) + require.ErrorIs(t, err, database.ErrKeyNotFound) + + streamTest(t, db) + + err = db.Close() + require.NoError(t, err) + + err = db.Set([]byte{1}, []byte{2}) + assert.ErrorIs(t, err, database.ErrClosed) + } +} + +func streamTest(t *testing.T, db Database) { + err := db.DropAll() + require.NoError(t, err) + + keyValues := map[string][]byte{ + "prefix_1": {1}, + "prefix_12": {1, 2}, + "prefix_3": {3}, + "4": {4}, + } + for keyString, value := range keyValues { + err := db.Set([]byte(keyString), value) + require.NoError(t, err) + } + + ctx := context.Background() + prefix := []byte("prefix") + chooseKey := func(key []byte) bool { + keyWithoutPrefix := bytes.TrimPrefix(key, prefix) + return keyWithoutPrefix[1] == '1' + } + expected := map[string][]byte{ + "prefix_1": {1}, + "prefix_12": {1, 2}, + } + handle := func(key []byte, value []byte) error { + keyString := string(key) + expectedValue, ok := expected[keyString] + require.True(t, ok) + assert.Equal(t, expectedValue, value) + delete(expected, keyString) + return nil + } + + err = db.Stream(ctx, prefix, chooseKey, handle) + require.NoError(t, err) + + assert.Empty(t, expected) +} diff --git a/internal/database/errors.go b/internal/database/errors.go new file mode 100644 index 0000000000..f5b52c6efe --- /dev/null +++ b/internal/database/errors.go @@ -0,0 +1,11 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package database + +import "errors" + +var ( + ErrKeyNotFound = errors.New("key not found") + ErrClosed = errors.New("database closed") +) diff --git a/internal/database/interfaces.go b/internal/database/interfaces.go new file mode 100644 index 0000000000..5b0b80b493 --- /dev/null +++ b/internal/database/interfaces.go @@ -0,0 +1,23 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +// Package database contains common interfaces and errors for all database implementations. +package database + +// WriteBatch is a batch of write operations that can be +// flushed to the database/database table, or canceled. +type WriteBatch interface { + Set(key, value []byte) error + Delete(key []byte) error + Flush() error + Cancel() +} + +// Table is a table derived from the database for a particular +// key prefix. +type Table interface { + Get(key []byte) ([]byte, error) + Set(key, value []byte) error + Delete(key []byte) error + NewWriteBatch() (writeBatch WriteBatch) +} diff --git a/internal/database/memory/database.go b/internal/database/memory/database.go new file mode 100644 index 0000000000..8e70e163da --- /dev/null +++ b/internal/database/memory/database.go @@ -0,0 +1,144 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +// Package memory provides an in-memory database implementation. +package memory + +import ( + "bytes" + "context" + "fmt" + "sync" + "sync/atomic" + + "github.com/ChainSafe/gossamer/internal/database" +) + +// Database is an in-memory database implementation. +type Database struct { + closed *atomic.Bool + keyValues map[string][]byte + mutex sync.RWMutex +} + +// New returns a new in-memory database. +func New() *Database { + return &Database{ + closed: new(atomic.Bool), + keyValues: make(map[string][]byte), + } +} + +// Get retrieves a value from the database using the given key. +// It returns `ErrKeyNotFound` if the key is not found. +func (db *Database) Get(key []byte) (value []byte, err error) { + if db.closed.Load() { + return nil, fmt.Errorf("%w", database.ErrClosed) + } + + db.mutex.RLock() + defer db.mutex.RUnlock() + value, ok := db.keyValues[string(key)] + if !ok { + return nil, fmt.Errorf("%w: 0x%x", database.ErrKeyNotFound, key) + } + + return value, nil +} + +// Set sets a value at the given key in the database. +// The value byte slice is deep copied to avoid any mutation surprises. +// The error returned is always nil. +func (db *Database) Set(key, value []byte) (err error) { + if db.closed.Load() { + return fmt.Errorf("%w", database.ErrClosed) + } + + db.mutex.Lock() + defer db.mutex.Unlock() + + db.keyValues[string(key)] = copyBytes(value) + + return nil +} + +// Delete deletes a the given key in the database. +// If the key is not found, no error is returned. +func (db *Database) Delete(key []byte) (err error) { + if db.closed.Load() { + return fmt.Errorf("%w", database.ErrClosed) + } + + db.mutex.Lock() + defer db.mutex.Unlock() + + delete(db.keyValues, string(key)) + + return nil +} + +// NewWriteBatch returns a new write batch for the database. +// It is not thread-safe to write to the batch, but flushing it is +// thread-safe for the database. +func (db *Database) NewWriteBatch() (writeBatch database.WriteBatch) { + const prefix = "" + return newWriteBatch(prefix, db) +} + +// NewTable returns a new table using the database. +// All keys on the table will be prefixed with the given prefix. +func (db *Database) NewTable(prefix string) (writeBatch database.Table) { + return &table{ + prefix: prefix, + database: db, + } +} + +// Stream streams data from the database to the `handle` +// function given. The `prefix` is used to filter the keys +// as well as the `chooseKey` function. Note the whole stream +// operation locks the database for reading. +func (db *Database) Stream(_ context.Context, prefix []byte, + chooseKey func(key []byte) bool, + handle func(key, value []byte) error) (err error) { + if db.closed.Load() { + return fmt.Errorf("%w", database.ErrClosed) + } + + db.mutex.RLock() + defer db.mutex.RUnlock() + + for keyString, value := range db.keyValues { + key := []byte(keyString) + if !bytes.HasPrefix(key, prefix) || !chooseKey(key) { + continue + } + + if err := handle(key, value); err != nil { + return err + } + } + + return nil +} + +// Close closes the database. +func (db *Database) Close() (err error) { + closed := db.closed.Swap(true) + if closed { + return fmt.Errorf("%w", database.ErrClosed) + } + + db.keyValues = nil + return nil +} + +// DropAll drops all data from the database. +func (db *Database) DropAll() (err error) { + if db.closed.Load() { + return fmt.Errorf("%w", database.ErrClosed) + } + + db.keyValues = make(map[string][]byte) + return nil +} diff --git a/internal/database/memory/database_test.go b/internal/database/memory/database_test.go new file mode 100644 index 0000000000..0b2103f6d7 --- /dev/null +++ b/internal/database/memory/database_test.go @@ -0,0 +1,304 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "sync/atomic" + "testing" + + "github.com/ChainSafe/gossamer/internal/database" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func atomicTrue() *atomic.Bool { + b := new(atomic.Bool) + b.Store(true) + return b +} + +func Test_Database_Get(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + db *Database + key []byte + value []byte + errWrapped error + errMessage string + }{ + "database_closed": { + db: &Database{ + closed: atomicTrue(), + }, + errWrapped: database.ErrClosed, + errMessage: "database closed", + }, + "key_not_found": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + }, + key: []byte{1}, + errWrapped: database.ErrKeyNotFound, + errMessage: "key not found: 0x01", + }, + "key_found": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {2}, + }, + }, + key: []byte{1}, + value: []byte{2}, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + value, err := testCase.db.Get(testCase.key) + + assert.ErrorIs(t, err, testCase.errWrapped) + if testCase.errWrapped != nil { + assert.EqualError(t, err, testCase.errMessage) + } + assert.Equal(t, testCase.value, value) + }) + } +} + +func Test_Database_Set(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + db *Database + key []byte + value []byte + errWrapped error + errMessage string + expectedDB *Database + }{ + "database_is_closed": { + db: &Database{ + closed: atomicTrue(), + }, + errWrapped: database.ErrClosed, + errMessage: "database closed", + expectedDB: &Database{ + closed: atomicTrue(), + }, + }, + "set_at_new_key": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + }, + key: []byte{1}, + value: []byte{2}, + expectedDB: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {2}, + }, + }, + }, + "override_value_at_key": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {1}}, + }, + key: []byte{1}, + value: []byte{2}, + expectedDB: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {2}, + }, + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + err := testCase.db.Set(testCase.key, testCase.value) + + require.ErrorIs(t, err, testCase.errWrapped) + if testCase.errWrapped != nil { + assert.EqualError(t, err, testCase.errMessage) + } + assert.Equal(t, testCase.expectedDB, testCase.db) + }) + } + + t.Run("value mutation safety", func(t *testing.T) { + t.Parallel() + + database := &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + } + + key := []byte{1} + value := []byte{2} + err := database.Set(key, value) + require.NoError(t, err) + + value[0]++ + value, err = database.Get(key) + require.NoError(t, err) + assert.Equal(t, []byte{2}, value) + }) +} + +func Test_Database_Delete(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + db *Database + key []byte + errWrapped error + expectedDB *Database + }{ + "database_closed": { + db: &Database{ + closed: atomicTrue(), + }, + errWrapped: database.ErrClosed, + expectedDB: &Database{ + closed: atomicTrue(), + }, + }, + "key_not_found": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + }, + key: []byte{1}, + expectedDB: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + }, + }, + "key_deleted": { + db: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {1}}, + }, + key: []byte{1}, + expectedDB: &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + err := testCase.db.Delete(testCase.key) + + require.ErrorIs(t, err, testCase.errWrapped) + assert.Equal(t, testCase.expectedDB, testCase.db) + }) + } +} + +func Test_Database_NewWriteBatch(t *testing.T) { + t.Parallel() + + database := &Database{ + keyValues: map[string][]byte{}, + } + writeBatch := database.NewWriteBatch() + + err := writeBatch.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + + err = writeBatch.Flush() + require.NoError(t, err) + + expectedDB := &Database{ + keyValues: map[string][]byte{ + "\x01": {2}, + }, + } + assert.Equal(t, expectedDB, database) +} + +func Test_Database_Close(t *testing.T) { + t.Parallel() + + t.Run("already closed", func(t *testing.T) { + t.Parallel() + + db := &Database{ + closed: atomicTrue(), + } + err := db.Close() + assert.ErrorIs(t, err, database.ErrClosed) + }) + + t.Run("closing", func(t *testing.T) { + db := &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + } + + err := db.Close() + require.NoError(t, err) + + expectedDB := &Database{ + closed: atomicTrue(), + } + assert.Equal(t, expectedDB, db) + + _, err = db.Get([]byte{1}) + assert.ErrorIs(t, err, database.ErrClosed) + }) +} + +func Test_Database_DropAll(t *testing.T) { + t.Parallel() + + t.Run("database is closed", func(t *testing.T) { + t.Parallel() + + db := &Database{ + closed: atomicTrue(), + } + err := db.DropAll() + assert.ErrorIs(t, err, database.ErrClosed) + }) + + t.Run("success", func(t *testing.T) { + t.Parallel() + + database := &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{ + "\x01": {1}, + }, + } + + err := database.DropAll() + require.NoError(t, err) + + expectedDB := &Database{ + closed: new(atomic.Bool), + keyValues: map[string][]byte{}, + } + assert.Equal(t, expectedDB, database) + }) +} diff --git a/internal/database/memory/helpers.go b/internal/database/memory/helpers.go new file mode 100644 index 0000000000..70fae44492 --- /dev/null +++ b/internal/database/memory/helpers.go @@ -0,0 +1,10 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +func copyBytes(b []byte) (bCopy []byte) { + bCopy = make([]byte, len(b)) + copy(bCopy, b) + return bCopy +} diff --git a/internal/database/memory/race_test.go b/internal/database/memory/race_test.go new file mode 100644 index 0000000000..ad0fcc7360 --- /dev/null +++ b/internal/database/memory/race_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "context" + "sync" + "testing" + "time" +) + +func Test_Database_threadSafety(t *testing.T) { + // This test consists in checking for concurrent access + // using the -race detector. + t.Parallel() + + var startWg, endWg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + + const operations = 5 + const parallelism = 3 + const goroutines = parallelism * operations + startWg.Add(goroutines) + endWg.Add(goroutines) + + const testDuration = 50 * time.Millisecond + go func() { + timer := time.NewTimer(time.Hour) + startWg.Wait() + _ = timer.Reset(testDuration) + <-timer.C + cancel() + }() + + runInLoop := func(f func()) { + defer endWg.Done() + startWg.Done() + startWg.Wait() + for ctx.Err() == nil { + f() + } + } + + database := New() + key := []byte{1} + value := []byte{2} + + for i := 0; i < parallelism; i++ { + go runInLoop(func() { + _, _ = database.Get(key) + }) + + go runInLoop(func() { + _ = database.Set(key, value) + }) + + go runInLoop(func() { + _ = database.Delete(key) + }) + + go runInLoop(func() { + batch := database.NewWriteBatch() + _ = batch.Set(key, value) + _ = batch.Delete(key) + _ = batch.Set(key, value) + _ = batch.Flush() + }) + go runInLoop(func() { + _ = database.DropAll() + }) + } + + endWg.Wait() +} diff --git a/internal/database/memory/table.go b/internal/database/memory/table.go new file mode 100644 index 0000000000..4609bff1c9 --- /dev/null +++ b/internal/database/memory/table.go @@ -0,0 +1,42 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "github.com/ChainSafe/gossamer/internal/database" +) + +type table struct { + prefix string + database *Database +} + +// Get retrieves a value from the database using the given key +// prefixed with the table prefix. +// It returns the wrapped error `database.ErrKeyNotFound` if the +// prefixed key is not found. +func (t *table) Get(key []byte) (value []byte, err error) { + key = []byte(t.prefix + string(key)) + return t.database.Get(key) +} + +// Set sets a value at the given key prefixed with the table prefix +// in the database. +func (t *table) Set(key, value []byte) (err error) { + key = []byte(t.prefix + string(key)) + return t.database.Set(key, value) +} + +// Delete deletes the given key prefixed with the table prefix +// from the database. If the key is not found, no error is returned. +func (t *table) Delete(key []byte) (err error) { + key = []byte(t.prefix + string(key)) + return t.database.Delete(key) +} + +// NewWriteBatch returns a new write batch for the database, +// using the table prefix to prefix all keys. +func (t *table) NewWriteBatch() (writeBatch database.WriteBatch) { + return newWriteBatch(t.prefix, t.database) +} diff --git a/internal/database/memory/table_test.go b/internal/database/memory/table_test.go new file mode 100644 index 0000000000..c00cd006f3 --- /dev/null +++ b/internal/database/memory/table_test.go @@ -0,0 +1,64 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_table(t *testing.T) { + t.Parallel() + + db := New() + dbTable := db.NewTable("x") + + db.Set([]byte{1}, []byte{2}) + + dbTable.Set([]byte{1}, []byte{3}) + expectedKeyValues := map[string][]byte{ + "\x01": {2}, + "x\x01": {3}, + } + assert.Equal(t, expectedKeyValues, db.keyValues) + + value, err := dbTable.Get([]byte{1}) + require.NoError(t, err) + assert.Equal(t, []byte{3}, value) + + err = dbTable.Delete([]byte{1}) + require.NoError(t, err) + expectedKeyValues = map[string][]byte{ + "\x01": {2}, + } + assert.Equal(t, expectedKeyValues, db.keyValues) + + batch := dbTable.NewWriteBatch() + err = batch.Set([]byte{1}, []byte{4}) + require.NoError(t, err) + err = batch.Set([]byte{2}, []byte{5}) + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + expectedKeyValues = map[string][]byte{ + "\x01": {2}, + "x\x01": {4}, + "x\x02": {5}, + } + assert.Equal(t, expectedKeyValues, db.keyValues) + + typedDBTable := dbTable.(*table) + for key := range typedDBTable.database.keyValues { + if strings.HasPrefix(key, typedDBTable.prefix) { + delete(typedDBTable.database.keyValues, key) + } + } + expectedKeyValues = map[string][]byte{ + "\x01": {2}, + } + assert.Equal(t, expectedKeyValues, db.keyValues) +} diff --git a/internal/database/memory/writebatch.go b/internal/database/memory/writebatch.go new file mode 100644 index 0000000000..f941c2975c --- /dev/null +++ b/internal/database/memory/writebatch.go @@ -0,0 +1,102 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "errors" + "fmt" +) + +var ( + ErrWriteBatchDiscarded = errors.New("write batch has been discarded") +) + +type operationKind uint8 + +const ( + operationSet operationKind = iota + operationDelete +) + +type operation struct { + kind operationKind + key string + value []byte +} + +// writeBatch implements an in-memory write batch and prefixes +// all keys with a certain given prefix. +// It is NOT thread safe, but its Flush operation is thread safe for +// the database injected. +type writeBatch struct { + discarded bool + prefix string + database *Database + operations []operation +} + +func newWriteBatch(prefix string, database *Database) *writeBatch { + return &writeBatch{ + prefix: prefix, + database: database, + } +} + +// Set sets a value at the given key prefixed with the given prefix. +func (wb *writeBatch) Set(key, value []byte) (err error) { + if wb.discarded { + return fmt.Errorf("%w", ErrWriteBatchDiscarded) + } + + op := operation{ + kind: operationSet, + key: wb.prefix + string(key), + value: copyBytes(value), + } + wb.operations = append(wb.operations, op) + return nil +} + +// Delete deletes the given key prefixed with the table prefix +// from the database. +func (wb *writeBatch) Delete(key []byte) (err error) { + if wb.discarded { + return fmt.Errorf("%w", ErrWriteBatchDiscarded) + } + + op := operation{ + kind: operationDelete, + key: wb.prefix + string(key), + } + wb.operations = append(wb.operations, op) + return nil +} + +// Flush flushes the write batch to the database. +func (wb *writeBatch) Flush() (err error) { + if wb.discarded { + return fmt.Errorf("%w", ErrWriteBatchDiscarded) + } + + wb.database.mutex.Lock() + defer wb.database.mutex.Unlock() + defer wb.Cancel() + + for _, op := range wb.operations { + switch op.kind { + case operationSet: + wb.database.keyValues[op.key] = op.value + case operationDelete: + delete(wb.database.keyValues, op.key) + } + } + + return nil +} + +// Cancel cancels the write batch. +func (wb *writeBatch) Cancel() { + wb.discarded = true + wb.operations = nil +} diff --git a/internal/database/memory/writebatch_test.go b/internal/database/memory/writebatch_test.go new file mode 100644 index 0000000000..60dd1a0565 --- /dev/null +++ b/internal/database/memory/writebatch_test.go @@ -0,0 +1,151 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package memory + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_writeBatch(t *testing.T) { + t.Parallel() + + db := &Database{ + keyValues: map[string][]byte{ + "\x03": {2}, + }, + } + writeBatch := db.NewWriteBatch() + + err := writeBatch.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + err = writeBatch.Delete([]byte{3}) + require.NoError(t, err) + + writeBatch.Cancel() + + expectedDB := &Database{ + keyValues: map[string][]byte{ + "\x03": {2}, + }, + } + assert.Equal(t, expectedDB, db) + + writeBatch = db.NewWriteBatch() + err = writeBatch.Set([]byte{1}, []byte{2}) + require.NoError(t, err) + err = writeBatch.Delete([]byte{3}) + require.NoError(t, err) + + err = writeBatch.Flush() + require.NoError(t, err) + + expectedDB = &Database{ + keyValues: map[string][]byte{ + "\x01": {2}, + }, + } + assert.Equal(t, expectedDB, db) +} + +func Test_writeBatch_Set(t *testing.T) { + t.Parallel() + + wb := &writeBatch{} + + key := []byte{1} + value := []byte{2} + + err := wb.Set(key, value) + require.NoError(t, err) + + expectedWb := &writeBatch{ + operations: []operation{{ + kind: operationSet, + key: "\x01", + value: []byte{2}, + }}, + } + assert.Equal(t, expectedWb, wb) + + // Check it is resistant to value mutation. + value[0]++ + assert.Equal(t, expectedWb, wb) +} + +func Test_writeBatch_Delete(t *testing.T) { + t.Parallel() + + wb := &writeBatch{} + + key := []byte{1} + + err := wb.Delete(key) + require.NoError(t, err) + + expectedWb := &writeBatch{ + operations: []operation{{ + kind: operationDelete, + key: "\x01", + }}, + } + assert.Equal(t, expectedWb, wb) +} + +func Test_writeBatch_Flush(t *testing.T) { + t.Parallel() + + wb := &writeBatch{ + database: &Database{ + keyValues: map[string][]byte{}, + }, + operations: []operation{{ + kind: operationSet, + key: "\x02", + value: []byte{3}, + }, { + kind: operationSet, + key: "\x01", + value: []byte{2}, + }, { + kind: operationDelete, + key: "\x01", + value: []byte{2}, + }}, + } + + err := wb.Flush() + require.NoError(t, err) + + expectedWb := &writeBatch{ + discarded: true, + database: &Database{ + keyValues: map[string][]byte{ + "\x02": {3}, + }, + }, + } + assert.Equal(t, expectedWb, wb) +} + +func Test_writeBatch_Cancel(t *testing.T) { + t.Parallel() + + wb := &writeBatch{ + operations: []operation{{ + kind: operationSet, + key: "\x02", + value: []byte{3}, + }}, + } + + wb.Cancel() + + expectedWb := &writeBatch{ + discarded: true, + } + assert.Equal(t, expectedWb, wb) +}