Skip to content

Commit

Permalink
Adapt code to internal/database packages
Browse files Browse the repository at this point in the history
  • Loading branch information
qdm12 committed Dec 14, 2022
1 parent 127c377 commit e524268
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 472 deletions.
68 changes: 34 additions & 34 deletions internal/pruner/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"fmt"
"sync"

"github.com/ChainSafe/chaindb"
"github.com/ChainSafe/gossamer/internal/database"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
)
Expand All @@ -30,7 +30,7 @@ type FullNode struct {

// Dependency injected
logger Logger
storageDatabase ChainDBNewBatcher
storageDatabase NewWriteBatcher
journalDatabase JournalDatabase
blockState BlockState

Expand Down Expand Up @@ -62,7 +62,7 @@ type journalRecord struct {
}

// NewFullNode creates a full node pruner.
func NewFullNode(journalDB JournalDatabase, storageDB ChainDBNewBatcher, retainBlocks uint32,
func NewFullNode(journalDB JournalDatabase, storageDB NewWriteBatcher, retainBlocks uint32,
blockState BlockState, logger Logger) (pruner *FullNode, err error) {
highestBlockNumber, err := getBlockNumberFromKey(journalDB, []byte(highestBlockNumberKey))
if err != nil {
Expand All @@ -89,10 +89,10 @@ func NewFullNode(journalDB JournalDatabase, storageDB ChainDBNewBatcher, retainB

// Prune all block numbers necessary, if for example the
// user lowers the retainBlocks parameter.
journalDBBatch := journalDB.NewBatch()
journalDBBatch := journalDB.NewWriteBatch()
err = pruner.pruneAll(journalDBBatch)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return nil, fmt.Errorf("pruning: %w", err)
}
err = journalDBBatch.Flush()
Expand Down Expand Up @@ -120,11 +120,11 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
// Delist re-inserted keys from being pruned.
// WARNING: this must be before the pruning to avoid
// pruning still needed database keys.
journalDBBatch := p.journalDatabase.NewBatch()
journalDBBatch := p.journalDatabase.NewWriteBatch()
err = p.handleInsertedKeys(insertedNodeHashes, blockNumber,
blockHash, journalDBBatch)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return fmt.Errorf("handling inserted keys: %w", err)
}

Expand All @@ -133,7 +133,7 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
return fmt.Errorf("flushing re-inserted keys updates to journal database: %w", err)
}

journalDBBatch = p.journalDatabase.NewBatch()
journalDBBatch = p.journalDatabase.NewWriteBatch()

// Update highest block number only in memory so `pruneAll` can use it,
// prune and flush the deletions in the journal and storage databases.
Expand All @@ -144,14 +144,14 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
// Prune before inserting a new journal record
err = p.pruneAll(journalDBBatch)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return fmt.Errorf("pruning database: %w", err)
}

if blockNumber > p.highestBlockNumber {
err = storeBlockNumberAtKey(journalDBBatch, []byte(highestBlockNumberKey), blockNumber)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return fmt.Errorf("storing highest block number in journal database: %w", err)
}
}
Expand All @@ -162,7 +162,7 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
// block number encountered.
err = appendBlockHash(blockNumber, blockHash, p.journalDatabase, journalDBBatch)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return fmt.Errorf("recording block hash in journal database: %w", err)
}

Expand All @@ -172,7 +172,7 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
}
err = storeJournalRecord(journalDBBatch, blockNumber, blockHash, record)
if err != nil {
journalDBBatch.Reset()
journalDBBatch.Cancel()
return fmt.Errorf("storing journal record for block number %d: %w", blockNumber, err)
}

Expand All @@ -186,7 +186,7 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
}

func (p *FullNode) handleInsertedKeys(insertedNodeHashes map[common.Hash]struct{},
blockNumber uint32, blockHash common.Hash, journalDBBatch Putter) (err error) {
blockNumber uint32, blockHash common.Hash, journalDBBatch Setter) (err error) {
for insertedNodeHash := range insertedNodeHashes {
err = p.handleInsertedKey(insertedNodeHash, blockNumber, blockHash, journalDBBatch)
if err != nil {
Expand All @@ -199,12 +199,12 @@ func (p *FullNode) handleInsertedKeys(insertedNodeHashes map[common.Hash]struct{
}

func (p *FullNode) handleInsertedKey(insertedNodeHash common.Hash, blockNumber uint32,
blockHash common.Hash, journalDBBatch Putter) (err error) {
blockHash common.Hash, journalDBBatch Setter) (err error) {
// Try to find if the node hash was deleted in another block before
// since we no longer want to prune it, as it was re-inserted.
deletedNodeHashKey := makeDeletedKey(insertedNodeHash)
journalKeyDeletedAt, err := p.journalDatabase.Get(deletedNodeHashKey)
nodeHashDeletedInAnotherBlock := errors.Is(err, chaindb.ErrKeyNotFound)
nodeHashDeletedInAnotherBlock := errors.Is(err, database.ErrKeyNotFound)
if !nodeHashDeletedInAnotherBlock {
return nil
} else if err != nil {
Expand Down Expand Up @@ -251,25 +251,25 @@ func (p *FullNode) handleInsertedKey(insertedNodeHash common.Hash, blockNumber u
return fmt.Errorf("encoding updated journal record: %w", err)
}

err = journalDBBatch.Put(journalKeyDeletedAt, encodedJournalRecord)
err = journalDBBatch.Set(journalKeyDeletedAt, encodedJournalRecord)
if err != nil {
return fmt.Errorf("putting updated journal record in journal database batch: %w", err)
}

return nil
}

func (p *FullNode) pruneAll(journalDBBatch PutDeleter) (err error) {
func (p *FullNode) pruneAll(journalDBBatch SetDeleter) (err error) {
if p.highestBlockNumber-p.nextBlockNumberToPrune <= p.retainBlocks {
return nil
}

storageBatch := p.storageDatabase.NewBatch()
storageBatch := p.storageDatabase.NewWriteBatch()
blockNumberToPrune := p.nextBlockNumberToPrune
for p.highestBlockNumber-blockNumberToPrune > p.retainBlocks {
err := prune(blockNumberToPrune, p.journalDatabase, journalDBBatch, storageBatch)
if err != nil {
storageBatch.Reset()
storageBatch.Cancel()
return fmt.Errorf("pruning block number %d: %w", blockNumberToPrune, err)
}
blockNumberToPrune++
Expand All @@ -279,7 +279,7 @@ func (p *FullNode) pruneAll(journalDBBatch PutDeleter) (err error) {

err = storeBlockNumberAtKey(journalDBBatch, []byte(lastPrunedKey), lastBlockNumberPruned)
if err != nil {
storageBatch.Reset()
storageBatch.Cancel()
return fmt.Errorf("writing last pruned block number to journal database batch: %w", err)
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func pruneStorage(blockNumber uint32, blockHashes []common.Hash,
}

for deletedNodeHash := range record.DeletedNodeHashes {
err = batch.Del(deletedNodeHash.ToBytes())
err = batch.Delete(deletedNodeHash.ToBytes())
if err != nil {
return fmt.Errorf("deleting key from batch: %w", err)
}
Expand All @@ -350,15 +350,15 @@ func pruneJournal(blockNumber uint32, blockHashes []common.Hash,
return fmt.Errorf("scale encoding journal key: %w", err)
}

err = batch.Del(encodedKey)
err = batch.Delete(encodedKey)
if err != nil {
return fmt.Errorf("deleting journal key from batch: %w", err)
}
}
return nil
}

func storeJournalRecord(batch Putter, blockNumber uint32, blockHash common.Hash,
func storeJournalRecord(batch Setter, blockNumber uint32, blockHash common.Hash,
record journalRecord) (err error) {
key := journalKey{
BlockNumber: blockNumber,
Expand All @@ -374,7 +374,7 @@ func storeJournalRecord(batch Putter, blockNumber uint32, blockHash common.Hash,
// so a node hash can quickly be checked for from the journal database
// when running `handleInsertedKey`.
databaseKey := makeDeletedKey(deletedNodeHash)
err = batch.Put(databaseKey, encodedKey)
err = batch.Set(databaseKey, encodedKey)
if err != nil {
return fmt.Errorf("putting journal key in database batch: %w", err)
}
Expand All @@ -387,7 +387,7 @@ func storeJournalRecord(batch Putter, blockNumber uint32, blockHash common.Hash,

// We store the journal record (block hash + deleted node hashes + inserted node hashes)
// in the journal database at the key (block hash + block number)
err = batch.Put(encodedKey, encodedRecord)
err = batch.Set(encodedKey, encodedRecord)
if err != nil {
return fmt.Errorf("putting journal record in database batch: %w", err)
}
Expand Down Expand Up @@ -419,13 +419,13 @@ func getJournalRecord(database Getter, blockNumber uint32,
return record, nil
}

func storeBlockNumberAtKey(batch Putter, key []byte, blockNumber uint32) error {
func storeBlockNumberAtKey(batch Setter, key []byte, blockNumber uint32) error {
encodedBlockNumber, err := scale.Marshal(blockNumber)
if err != nil {
return fmt.Errorf("encoding block number: %w", err)
}

err = batch.Put(key, encodedBlockNumber)
err = batch.Set(key, encodedBlockNumber)
if err != nil {
return fmt.Errorf("putting block number %d: %w", blockNumber, err)
}
Expand All @@ -435,10 +435,10 @@ func storeBlockNumberAtKey(batch Putter, key []byte, blockNumber uint32) error {

// getBlockNumberFromKey obtains the block number from the database at the given key.
// If the key is not found, the block number `0` is returned without error.
func getBlockNumberFromKey(database Getter, key []byte) (blockNumber uint32, err error) {
encodedBlockNumber, err := database.Get(key)
func getBlockNumberFromKey(getter Getter, key []byte) (blockNumber uint32, err error) {
encodedBlockNumber, err := getter.Get(key)
if err != nil {
if errors.Is(err, chaindb.ErrKeyNotFound) {
if errors.Is(err, database.ErrKeyNotFound) {
return 0, nil
}
return 0, fmt.Errorf("getting block number from database: %w", err)
Expand Down Expand Up @@ -474,17 +474,17 @@ func loadBlockHashes(blockNumber uint32, journalDB Getter) (blockHashes []common
}

func appendBlockHash(blockNumber uint32, blockHash common.Hash, journalDB Getter,
batch Putter) (err error) {
batch Setter) (err error) {
keyString := blockNumberToHashPrefix + fmt.Sprint(blockNumber)
key := []byte(keyString)
encodedBlockHashes, err := journalDB.Get(key)
if err != nil && !errors.Is(err, chaindb.ErrKeyNotFound) {
if err != nil && !errors.Is(err, database.ErrKeyNotFound) {
return fmt.Errorf("getting block hashes for block number %d: %w", blockNumber, err)
}

encodedBlockHashes = append(encodedBlockHashes, blockHash.ToBytes()...)

err = batch.Put(key, encodedBlockHashes)
err = batch.Set(key, encodedBlockHashes)
if err != nil {
return fmt.Errorf("putting block hashes for block number %d: %w", blockNumber, err)
}
Expand All @@ -495,7 +495,7 @@ func appendBlockHash(blockNumber uint32, blockHash common.Hash, journalDB Getter
func pruneBlockHashes(blockNumber uint32, batch Deleter) (err error) {
keyString := blockNumberToHashPrefix + fmt.Sprint(blockNumber)
key := []byte(keyString)
err = batch.Del(key)
err = batch.Delete(key)
if err != nil {
return fmt.Errorf("deleting block hashes for block number %d from database: %w",
blockNumber, err)
Expand Down
Loading

0 comments on commit e524268

Please sign in to comment.