From cf2b1b3a91abc2552b1374368a4ec6b4fa61a607 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 3 Sep 2024 22:45:09 +0800 Subject: [PATCH 1/8] impl sized file dao --- blockchain/filedao/sized.go | 324 ++++++++++++++++++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 blockchain/filedao/sized.go diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go new file mode 100644 index 0000000000..9cc3d93a56 --- /dev/null +++ b/blockchain/filedao/sized.go @@ -0,0 +1,324 @@ +package filedao + +import ( + "context" + "os" + "sync" + + "github.com/holiman/billy" + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +type ( + blockstore []byte + sizedDao struct { + size uint64 + dataDir string + + store billy.Database + + tip uint64 + base uint64 + heightToHash map[uint64]hash.Hash256 + hashToHeight map[hash.Hash256]uint64 + heightToID map[uint64]uint64 + lock sync.RWMutex + + deser *block.Deserializer + } +) + +func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (FileDAO, error) { + return &sizedDao{ + size: size, + dataDir: dataDir, + heightToHash: make(map[uint64]hash.Hash256), + hashToHeight: make(map[hash.Hash256]uint64), + heightToID: make(map[uint64]uint64), + deser: deser, + }, nil +} + +func (sd *sizedDao) Start(ctx context.Context) error { + dir := sd.dataDir + if err := os.MkdirAll(dir, 0700); err != nil { + return errors.Wrap(err, "failed to create blob store directory") + } + + var fails []uint64 + index := func(id uint64, size uint32, blob []byte) { + blk, err := blockstore(blob).Block(sd.deser) + if err != nil { + fails = append(fails, id) + log.L().Warn("Failed to decode block", zap.Error(err)) + return + } + h := blk.HashBlock() + height := blk.Height() + sd.hashToHeight[h] = height + sd.heightToHash[height] = h + sd.heightToID[height] = id + if height > sd.tip || sd.tip == 0 { + sd.tip = height + } + if height < sd.base || sd.base == 0 { + sd.base = height + } + } + + store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index) + if err != nil { + return errors.Wrap(err, "failed to open blob store") + } + sd.store = store + if len(fails) > 0 { + return errors.Errorf("failed to decode blocks %v", fails) + } + return nil +} + +func (sd *sizedDao) Stop(ctx context.Context) error { + return sd.store.Close() +} + +func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { + if blk.Height() != sd.tip+1 { + return ErrInvalidTipHeight + } + data, err := serializeBlock(blk) + if err != nil { + return err + } + + sd.lock.Lock() + defer sd.lock.Unlock() + if blk.Height() != sd.tip+1 { + return ErrInvalidTipHeight + } + id, err := sd.store.Put(data) + if err != nil { + return err + } + sd.tip++ + hash := blk.HashBlock() + sd.heightToHash[sd.tip] = hash + sd.hashToHeight[hash] = sd.tip + sd.heightToID[sd.tip] = id + + if sd.tip-sd.base > sd.size { + sd.drop() + } + return nil +} + +func (sd *sizedDao) Height() (uint64, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + return sd.tip, nil +} + +func (sd *sizedDao) GetBlockHash(height uint64) (hash.Hash256, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + h, ok := sd.heightToHash[height] + if !ok { + return hash.ZeroHash256, db.ErrNotExist + } + return h, nil +} + +func (sd *sizedDao) GetBlockHeight(h hash.Hash256) (uint64, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return 0, db.ErrNotExist + } + return height, nil +} + +func (sd *sizedDao) GetBlock(h hash.Hash256) (*block.Block, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return nil, db.ErrNotExist + } + return sd.getBlock(height) +} + +func (sd *sizedDao) GetBlockByHeight(height uint64) (*block.Block, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + return sd.getBlock(height) +} + +func (sd *sizedDao) GetReceipts(height uint64) ([]*action.Receipt, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return blk.Receipts, nil +} + +func (sd *sizedDao) ContainsTransactionLog() bool { + return true +} + +func (sd *sizedDao) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + id, ok := sd.heightToID[height] + if !ok { + return nil, db.ErrNotExist + } + data, err := sd.store.Get(id) + if err != nil { + return nil, err + } + return blockstore(data).TransactionLogs() +} + +func (sd *sizedDao) DeleteTipBlock() error { + panic("not supported") +} + +func (sd *sizedDao) Header(h hash.Hash256) (*block.Header, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + height, ok := sd.hashToHeight[h] + if !ok { + return nil, db.ErrNotExist + } + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Header, nil +} + +func (sd *sizedDao) HeaderByHeight(height uint64) (*block.Header, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Header, nil +} + +func (sd *sizedDao) FooterByHeight(height uint64) (*block.Footer, error) { + sd.lock.RLock() + defer sd.lock.RUnlock() + blk, err := sd.getBlock(height) + if err != nil { + return nil, err + } + return &blk.Footer, nil +} + +func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { + id, ok := sd.heightToID[height] + if !ok { + return nil, db.ErrNotExist + } + data, err := sd.store.Get(id) + if err != nil { + return nil, err + } + return blockstore(data).Block(sd.deser) +} + +func (sd *sizedDao) drop() { + id := sd.heightToID[sd.base] + if err := sd.store.Delete(id); err != nil { + log.L().Error("Failed to delete block", zap.Error(err)) + return + } + hash := sd.heightToHash[sd.base] + delete(sd.heightToHash, sd.base) + delete(sd.heightToID, sd.base) + delete(sd.hashToHeight, hash) + sd.base++ +} + +func serializeBlock(blk *block.Block) (blockstore, error) { + data := make(blockstore, 0) + s := &block.Store{ + Block: blk, + Receipts: blk.Receipts, + } + tmp, err := s.Serialize() + if err != nil { + return nil, err + } + data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) + data = append(data, tmp...) + txLog := blk.TransactionLog() + if txLog != nil { + tmp = txLog.Serialize() + data = append(data, tmp...) + } + return data, nil +} + +func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { + size, err := s.blockSize() + if err != nil { + return nil, err + } + if uint64(len(s)) < size+8 { + return nil, errors.New("blockstore is too short") + } + bs, err := deser.DeserializeBlockStore(s[8 : size+8]) + if err != nil { + return nil, err + } + bs.Block.Receipts = bs.Receipts + return bs.Block, nil +} + +func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { + size, err := s.blockSize() + if err != nil { + return nil, err + } + if uint64(len(s)) < size+8 { + return nil, errors.New("blockstore is too short") + } else if uint64(len(s)) == size+8 { + return nil, nil + } + + return block.DeserializeSystemLogPb(s[size+8:]) +} + +func (s blockstore) blockSize() (uint64, error) { + if len(s) < 8 { + return 0, errors.New("blockstore is too short") + } + return byteutil.BytesToUint64BigEndian(s[:8]), nil +} + +func newSlotter() func() (uint32, bool) { + // TODO: set emptySize and delta according to the actual block size + emptySize := uint32(1024) + delta := uint32(2048) + slotsize := uint32(emptySize) // empty block + slotsize -= uint32(delta) // underflows, it's ok, will overflow back in the first return + + return func() (size uint32, done bool) { + slotsize += delta + return slotsize, false + } +} From 0700bcc5e131252dc4cc902d6ab5add23dc8bbce Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 12:23:57 +0800 Subject: [PATCH 2/8] blockstore --- blockchain/filedao/blockstore.go | 102 ++++++++++++++++++++++++++ blockchain/filedao/sized.go | 120 ++++++++++++------------------- 2 files changed, 149 insertions(+), 73 deletions(-) create mode 100644 blockchain/filedao/blockstore.go diff --git a/blockchain/filedao/blockstore.go b/blockchain/filedao/blockstore.go new file mode 100644 index 0000000000..b280c833cd --- /dev/null +++ b/blockchain/filedao/blockstore.go @@ -0,0 +1,102 @@ +package filedao + +import ( + "fmt" + + "github.com/iotexproject/iotex-proto/golang/iotextypes" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +const ( + blockstoreDefaultVersion = byte(0) + blockstoreHeaderSize = 8 +) + +type ( + // blockstore is a byte slice that contains a block, its receipts and transaction logs + // the first 8 bytes is the size of the block + // the next n bytes is the serialized block and receipts, n is the size of the block + // the rest bytes is the serialized transaction logs + blockstore []byte +) + +var ( + errInvalidBlockstore = fmt.Errorf("invalid blockstore") +) + +func convertToBlockStore(blk *block.Block) (blockstore, error) { + data := make(blockstore, 0) + s := &block.Store{ + Block: blk, + Receipts: blk.Receipts, + } + tmp, err := s.Serialize() + if err != nil { + return nil, err + } + data = append(data, blockstoreDefaultVersion) + data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) + data = append(data, tmp...) + txLog := blk.TransactionLog() + if txLog != nil { + tmp = txLog.Serialize() + data = append(data, tmp...) + } + return data, nil +} + +func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { + size := s.blockSize() + bs, err := deser.DeserializeBlockStore(s[blockstoreHeaderSize : size+blockstoreHeaderSize]) + if err != nil { + return nil, err + } + bs.Block.Receipts = bs.Receipts + return bs.Block, nil +} + +func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { + size := s.blockSize() + if uint64(len(s)) == size+blockstoreHeaderSize { + return nil, nil + } + return block.DeserializeSystemLogPb(s[size+blockstoreHeaderSize:]) +} + +func (s blockstore) Serialize() []byte { + return append([]byte{blockstoreDefaultVersion}, s...) +} + +func (s *blockstore) Deserialize(data []byte) error { + if len(data) == 0 { + return errInvalidBlockstore + } + switch data[0] { + case blockstoreDefaultVersion: + bs := blockstore(data[1:]) + if err := bs.Validate(); err != nil { + return err + } + *s = bs + return nil + default: + return errInvalidBlockstore + } +} + +func (s blockstore) blockSize() uint64 { + return byteutil.BytesToUint64BigEndian(s[:blockstoreHeaderSize]) +} + +func (s blockstore) Validate() error { + if len(s) < blockstoreHeaderSize { + return errInvalidBlockstore + } + blkSize := s.blockSize() + if uint64(len(s)) < blkSize+blockstoreHeaderSize { + return errInvalidBlockstore + } + return nil +} diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index 9cc3d93a56..9590cf9018 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -15,12 +15,10 @@ import ( "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/db" "github.com/iotexproject/iotex-core/pkg/log" - "github.com/iotexproject/iotex-core/pkg/util/byteutil" ) type ( - blockstore []byte - sizedDao struct { + sizedDao struct { size uint64 dataDir string @@ -49,6 +47,8 @@ func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (Fi } func (sd *sizedDao) Start(ctx context.Context) error { + sd.lock.Lock() + defer sd.lock.Unlock() dir := sd.dataDir if err := os.MkdirAll(dir, 0700); err != nil { return errors.Wrap(err, "failed to create blob store directory") @@ -56,7 +56,14 @@ func (sd *sizedDao) Start(ctx context.Context) error { var fails []uint64 index := func(id uint64, size uint32, blob []byte) { - blk, err := blockstore(blob).Block(sd.deser) + bs := new(blockstore) + err := bs.Deserialize(blob) + if err != nil { + fails = append(fails, id) + log.L().Warn("Failed to decode block store", zap.Error(err)) + return + } + blk, err := bs.Block(sd.deser) if err != nil { fails = append(fails, id) log.L().Warn("Failed to decode block", zap.Error(err)) @@ -87,6 +94,8 @@ func (sd *sizedDao) Start(ctx context.Context) error { } func (sd *sizedDao) Stop(ctx context.Context) error { + sd.lock.Lock() + defer sd.lock.Unlock() return sd.store.Close() } @@ -94,7 +103,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight } - data, err := serializeBlock(blk) + bs, err := convertToBlockStore(blk) if err != nil { return err } @@ -104,7 +113,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight } - id, err := sd.store.Put(data) + id, err := sd.store.Put(bs.Serialize()) if err != nil { return err } @@ -187,7 +196,12 @@ func (sd *sizedDao) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, if err != nil { return nil, err } - return blockstore(data).TransactionLogs() + bs := new(blockstore) + err = bs.Deserialize(data) + if err != nil { + return nil, err + } + return bs.TransactionLogs() } func (sd *sizedDao) DeleteTipBlock() error { @@ -237,7 +251,12 @@ func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { if err != nil { return nil, err } - return blockstore(data).Block(sd.deser) + bs := new(blockstore) + err = bs.Deserialize(data) + if err != nil { + return nil, err + } + return bs.Block(sd.deser) } func (sd *sizedDao) drop() { @@ -253,72 +272,27 @@ func (sd *sizedDao) drop() { sd.base++ } -func serializeBlock(blk *block.Block) (blockstore, error) { - data := make(blockstore, 0) - s := &block.Store{ - Block: blk, - Receipts: blk.Receipts, - } - tmp, err := s.Serialize() - if err != nil { - return nil, err - } - data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) - data = append(data, tmp...) - txLog := blk.TransactionLog() - if txLog != nil { - tmp = txLog.Serialize() - data = append(data, tmp...) - } - return data, nil -} - -func (s blockstore) Block(deser *block.Deserializer) (*block.Block, error) { - size, err := s.blockSize() - if err != nil { - return nil, err - } - if uint64(len(s)) < size+8 { - return nil, errors.New("blockstore is too short") - } - bs, err := deser.DeserializeBlockStore(s[8 : size+8]) - if err != nil { - return nil, err - } - bs.Block.Receipts = bs.Receipts - return bs.Block, nil -} - -func (s blockstore) TransactionLogs() (*iotextypes.TransactionLogs, error) { - size, err := s.blockSize() - if err != nil { - return nil, err - } - if uint64(len(s)) < size+8 { - return nil, errors.New("blockstore is too short") - } else if uint64(len(s)) == size+8 { - return nil, nil - } - - return block.DeserializeSystemLogPb(s[size+8:]) -} - -func (s blockstore) blockSize() (uint64, error) { - if len(s) < 8 { - return 0, errors.New("blockstore is too short") - } - return byteutil.BytesToUint64BigEndian(s[:8]), nil -} - func newSlotter() func() (uint32, bool) { - // TODO: set emptySize and delta according to the actual block size - emptySize := uint32(1024) - delta := uint32(2048) - slotsize := uint32(emptySize) // empty block - slotsize -= uint32(delta) // underflows, it's ok, will overflow back in the first return - + sizeList := []uint32{ + 1024 * 4, // empty block + 1024 * 8, // 2 execution + 1024 * 16, + 1024 * 128, // 250 transfer + 1024 * 512, + 1024 * 1024, + 1024 * 1024 * 4, // 5000 transfer + 1024 * 1024 * 8, + 1024 * 1024 * 16, + 1024 * 1024 * 128, + 1024 * 1024 * 512, + 1024 * 1024 * 1024, // max block size + } + i := -1 return func() (size uint32, done bool) { - slotsize += delta - return slotsize, false + i++ + if i >= len(sizeList)-1 { + return sizeList[i], true + } + return sizeList[i], true } } From 44874e29d9106a8c2a9a8c7000cfd344d423fbaf Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 21:52:19 +0800 Subject: [PATCH 3/8] fix --- blockchain/filedao/blockstore.go | 1 - blockchain/filedao/sized.go | 51 +++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/blockchain/filedao/blockstore.go b/blockchain/filedao/blockstore.go index b280c833cd..491fdace00 100644 --- a/blockchain/filedao/blockstore.go +++ b/blockchain/filedao/blockstore.go @@ -36,7 +36,6 @@ func convertToBlockStore(blk *block.Block) (blockstore, error) { if err != nil { return nil, err } - data = append(data, blockstoreDefaultVersion) data = append(data, byteutil.Uint64ToBytesBigEndian(uint64(len(tmp)))...) data = append(data, tmp...) txLog := blk.TransactionLog() diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index 9590cf9018..c3218c90ae 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -29,21 +29,25 @@ type ( heightToHash map[uint64]hash.Hash256 hashToHeight map[hash.Hash256]uint64 heightToID map[uint64]uint64 + dropCh chan uint64 lock sync.RWMutex + wg sync.WaitGroup deser *block.Deserializer } ) func NewSizedFileDao(size uint64, dataDir string, deser *block.Deserializer) (FileDAO, error) { - return &sizedDao{ + sd := &sizedDao{ size: size, dataDir: dataDir, heightToHash: make(map[uint64]hash.Hash256), hashToHeight: make(map[hash.Hash256]uint64), heightToID: make(map[uint64]uint64), deser: deser, - }, nil + dropCh: make(chan uint64, size), + } + return sd, nil } func (sd *sizedDao) Start(ctx context.Context) error { @@ -54,7 +58,9 @@ func (sd *sizedDao) Start(ctx context.Context) error { return errors.Wrap(err, "failed to create blob store directory") } - var fails []uint64 + var ( + fails []uint64 + ) index := func(id uint64, size uint32, blob []byte) { bs := new(blockstore) err := bs.Deserialize(blob) @@ -90,15 +96,47 @@ func (sd *sizedDao) Start(ctx context.Context) error { if len(fails) > 0 { return errors.Errorf("failed to decode blocks %v", fails) } + // block continous check + for i := sd.base; i <= sd.tip; i++ { + if i == 0 { + continue + } + if _, ok := sd.heightToID[i]; !ok { + return errors.Errorf("missing block %d", i) + } + } + // start drop routine + go func() { + sd.wg.Add(1) + defer sd.wg.Done() + for id := range sd.dropCh { + if err := sd.store.Delete(id); err != nil { + log.L().Error("Failed to delete block", zap.Error(err)) + } + } + }() return nil } func (sd *sizedDao) Stop(ctx context.Context) error { sd.lock.Lock() defer sd.lock.Unlock() + close(sd.dropCh) + sd.wg.Wait() return sd.store.Close() } +func (sd *sizedDao) SetStart(height uint64) error { + sd.lock.Lock() + defer sd.lock.Unlock() + if len(sd.hashToHeight) > 0 || len(sd.heightToHash) > 0 || len(sd.heightToID) > 0 { + return errors.New("cannot set start height after start") + } + sd.base = height - 1 + sd.tip = height - 1 + return nil +} + func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { if blk.Height() != sd.tip+1 { return ErrInvalidTipHeight @@ -123,7 +161,7 @@ func (sd *sizedDao) PutBlock(ctx context.Context, blk *block.Block) error { sd.hashToHeight[hash] = sd.tip sd.heightToID[sd.tip] = id - if sd.tip-sd.base > sd.size { + if sd.tip-sd.base >= sd.size { sd.drop() } return nil @@ -261,10 +299,7 @@ func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { func (sd *sizedDao) drop() { id := sd.heightToID[sd.base] - if err := sd.store.Delete(id); err != nil { - log.L().Error("Failed to delete block", zap.Error(err)) - return - } + sd.dropCh <- id hash := sd.heightToHash[sd.base] delete(sd.heightToHash, sd.base) delete(sd.heightToID, sd.base) From a9748dd304d7927f1b18779f58fef675a6d37346 Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 5 Sep 2024 21:52:48 +0800 Subject: [PATCH 4/8] test --- blockchain/filedao/blockstore_test.go | 40 ++++++++++++++++++ blockchain/filedao/sized_test.go | 59 +++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 blockchain/filedao/blockstore_test.go create mode 100644 blockchain/filedao/sized_test.go diff --git a/blockchain/filedao/blockstore_test.go b/blockchain/filedao/blockstore_test.go new file mode 100644 index 0000000000..f5ce3470ee --- /dev/null +++ b/blockchain/filedao/blockstore_test.go @@ -0,0 +1,40 @@ +package filedao + +import ( + "testing" + + "github.com/iotexproject/go-pkgs/hash" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/blockchain/block" +) + +func TestBlockStore(t *testing.T) { + r := require.New(t) + builder := block.NewTestingBuilder() + blk := createTestingBlock(builder, 1, hash.ZeroHash256) + bs, err := convertToBlockStore(blk) + r.NoError(err) + data := bs.Serialize() + dbs := new(blockstore) + r.NoError(dbs.Deserialize(data)) + r.Equal(bs[:], (*dbs)[:], "serialized block store should be equal to deserialized block store") + // check deserialized block + deser := block.NewDeserializer(0) + dBlk, err := dbs.Block(deser) + r.NoError(err) + dTxLogs, err := dbs.TransactionLogs() + r.NoError(err) + r.NoError(fillTransactionLog(dBlk.Receipts, dTxLogs.Logs)) + r.Equal(blk.Header, dBlk.Header) + r.Equal(blk.Body, dBlk.Body) + r.Equal(blk.Footer, dBlk.Footer) + r.Equal(len(blk.Receipts), len(dBlk.Receipts)) + for i := range blk.Receipts { + r.Equal(blk.Receipts[i].Hash(), dBlk.Receipts[i].Hash()) + r.Equal(len(blk.Receipts[i].TransactionLogs()), len(dBlk.Receipts[i].TransactionLogs())) + for j := range blk.Receipts[i].TransactionLogs() { + r.Equal(blk.Receipts[i].TransactionLogs()[j], dBlk.Receipts[i].TransactionLogs()[j]) + } + } +} diff --git a/blockchain/filedao/sized_test.go b/blockchain/filedao/sized_test.go new file mode 100644 index 0000000000..f40b3113ae --- /dev/null +++ b/blockchain/filedao/sized_test.go @@ -0,0 +1,59 @@ +package filedao + +import ( + "context" + "crypto/tls" + "testing" + + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-proto/golang/iotexapi" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/blockchain/block" +) + +func TestBlockSize(t *testing.T) { + r := require.New(t) + conn, err := grpc.NewClient("api.mainnet.iotex.one:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}))) + r.NoError(err) + defer conn.Close() + + cli := iotexapi.NewAPIServiceClient(conn) + for _, h := range []uint64{29276276} { + resp, err := cli.GetRawBlocks(context.Background(), &iotexapi.GetRawBlocksRequest{ + StartHeight: h, + Count: 1, + WithReceipts: true, + WithTransactionLogs: true, + }) + r.NoError(err) + r.Len(resp.Blocks, 1) + deserializer := block.NewDeserializer(4689) + blk, err := deserializer.FromBlockProto(resp.Blocks[0].Block) + r.NoError(err) + receipts := make([]*action.Receipt, 0, len(resp.Blocks[0].Receipts)) + for _, receiptpb := range resp.Blocks[0].Receipts { + receipt := &action.Receipt{} + receipt.ConvertFromReceiptPb(receiptpb) + receipts = append(receipts, receipt) + } + blk.Receipts = receipts + data, err := convertToBlockStore(blk) + r.NoError(err) + t.Logf("block %d size= %d", h, len(data)) + } +} + +func TestSizedDao(t *testing.T) { + r := require.New(t) + dao, err := NewSizedFileDao(10, t.TempDir(), block.NewDeserializer(4089)) + r.NoError(err) + + ctx := context.Background() + r.NoError(dao.Start(ctx)) + r.NoError(testCommitBlocks(t, dao, 1, 100, hash.ZeroHash256)) + r.NoError(dao.Stop(ctx)) +} From 022b1abd6578f2ea0092468703515d28bee4d262 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 6 Sep 2024 08:25:57 +0800 Subject: [PATCH 5/8] fix --- blockchain/filedao/sized.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index c3218c90ae..9c87ffd20b 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -1,8 +1,11 @@ package filedao import ( + "bytes" "context" + "math/big" "os" + "slices" "sync" "github.com/holiman/billy" @@ -331,3 +334,37 @@ func newSlotter() func() (uint32, bool) { return sizeList[i], true } } + +func fillTransactionLog(receipts []*action.Receipt, txLogs []*iotextypes.TransactionLog) error { + for _, l := range txLogs { + idx := slices.IndexFunc(receipts, func(r *action.Receipt) bool { + return bytes.Equal(r.ActionHash[:], l.ActionHash) + }) + if idx < 0 { + return errors.Errorf("missing receipt for log %x", l.ActionHash) + } + txLogs := make([]*action.TransactionLog, len(l.GetTransactions())) + for j, tx := range l.GetTransactions() { + txlog, err := convertToTxLog(tx) + if err != nil { + return err + } + txLogs[j] = txlog + } + receipts[idx].AddTransactionLogs(txLogs...) + } + return nil +} + +func convertToTxLog(tx *iotextypes.TransactionLog_Transaction) (*action.TransactionLog, error) { + amount, ok := big.NewInt(0).SetString(tx.Amount, 10) + if !ok { + return nil, errors.Errorf("failed to parse amount %s", tx.Amount) + } + return &action.TransactionLog{ + Type: tx.Type, + Amount: amount, + Sender: tx.Sender, + Recipient: tx.Recipient, + }, nil +} From 65bb0d868c50e08afdf55509791646fbf1bb768d Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 10 Sep 2024 17:12:57 +0800 Subject: [PATCH 6/8] fix & keep continous blocks --- blockchain/filedao/sized.go | 44 +++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/blockchain/filedao/sized.go b/blockchain/filedao/sized.go index 9c87ffd20b..1631aa3d77 100644 --- a/blockchain/filedao/sized.go +++ b/blockchain/filedao/sized.go @@ -61,20 +61,15 @@ func (sd *sizedDao) Start(ctx context.Context) error { return errors.Wrap(err, "failed to create blob store directory") } - var ( - fails []uint64 - ) index := func(id uint64, size uint32, blob []byte) { bs := new(blockstore) err := bs.Deserialize(blob) if err != nil { - fails = append(fails, id) log.L().Warn("Failed to decode block store", zap.Error(err)) return } blk, err := bs.Block(sd.deser) if err != nil { - fails = append(fails, id) log.L().Warn("Failed to decode block", zap.Error(err)) return } @@ -90,22 +85,27 @@ func (sd *sizedDao) Start(ctx context.Context) error { sd.base = height } } - + sd.base = 1 store, err := billy.Open(billy.Options{Path: dir}, newSlotter(), index) if err != nil { return errors.Wrap(err, "failed to open blob store") } sd.store = store - if len(fails) > 0 { - return errors.Errorf("failed to decode blocks %v", fails) - } // block continous check - for i := sd.base; i <= sd.tip; i++ { - if i == 0 { - continue - } + for i := sd.tip; i >= sd.base; i-- { if _, ok := sd.heightToID[i]; !ok { - return errors.Errorf("missing block %d", i) + // remove non-continous blocks[base to i] + for j := sd.base; j < i; j++ { + if id, ok := sd.heightToID[j]; ok { + sd.dropCh <- id + h := sd.heightToHash[j] + delete(sd.heightToHash, j) + delete(sd.hashToHeight, h) + delete(sd.heightToID, j) + } + } + sd.base = i + 1 + break } } // start drop routine @@ -301,12 +301,14 @@ func (sd *sizedDao) getBlock(height uint64) (*block.Block, error) { } func (sd *sizedDao) drop() { - id := sd.heightToID[sd.base] - sd.dropCh <- id - hash := sd.heightToHash[sd.base] - delete(sd.heightToHash, sd.base) - delete(sd.heightToID, sd.base) - delete(sd.hashToHeight, hash) + id, ok := sd.heightToID[sd.base] + if ok { + sd.dropCh <- id + hash := sd.heightToHash[sd.base] + delete(sd.heightToHash, sd.base) + delete(sd.heightToID, sd.base) + delete(sd.hashToHeight, hash) + } sd.base++ } @@ -331,7 +333,7 @@ func newSlotter() func() (uint32, bool) { if i >= len(sizeList)-1 { return sizeList[i], true } - return sizeList[i], true + return sizeList[i], false } } From 38ec12ad1eb24d3518288370a56e6bc2737ac3fc Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 10 Sep 2024 17:13:09 +0800 Subject: [PATCH 7/8] add tests --- blockchain/filedao/sized_test.go | 131 ++++++++++++++++++++++++++++++- blockchain/filedao/testing.go | 67 +++++++++++++--- 2 files changed, 182 insertions(+), 16 deletions(-) diff --git a/blockchain/filedao/sized_test.go b/blockchain/filedao/sized_test.go index f40b3113ae..37eb6cc1ae 100644 --- a/blockchain/filedao/sized_test.go +++ b/blockchain/filedao/sized_test.go @@ -13,9 +13,11 @@ import ( "github.com/iotexproject/iotex-core/action" "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db" ) func TestBlockSize(t *testing.T) { + // t.Skip("used for estimating block size") r := require.New(t) conn, err := grpc.NewClient("api.mainnet.iotex.one:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}))) r.NoError(err) @@ -41,19 +43,142 @@ func TestBlockSize(t *testing.T) { receipts = append(receipts, receipt) } blk.Receipts = receipts + r.NoError(fillTransactionLog(receipts, resp.Blocks[0].TransactionLogs.Logs)) data, err := convertToBlockStore(blk) r.NoError(err) t.Logf("block %d size= %d", h, len(data)) } } -func TestSizedDao(t *testing.T) { +func TestSizedDaoIntegrity(t *testing.T) { r := require.New(t) - dao, err := NewSizedFileDao(10, t.TempDir(), block.NewDeserializer(4089)) + desr := block.NewDeserializer(4689) + ctx := context.Background() + blocks := make([]*block.Block, 0, 20) + builder := block.NewTestingBuilder() + h := hash.ZeroHash256 + for i := 1; i <= cap(blocks); i++ { + blk := createTestingBlock(builder, uint64(i), h) + blocks = append(blocks, blk) + h = blk.HashBlock() + t.Logf("block height %d hash: %x", i, h) + } + // case: putblocks 1 - 10 + datadir := t.TempDir() + dao, err := NewSizedFileDao(10, datadir, desr) + r.NoError(err) + r.NoError(dao.Start(ctx)) + for i := 0; i < 10; i++ { + r.NoError(dao.PutBlock(ctx, blocks[i])) + } + // verify + testVerifyChainDB(t, dao, 1, 10) + // case: put block not expected + r.ErrorIs(dao.PutBlock(ctx, blocks[8]), ErrInvalidTipHeight) + r.ErrorIs(dao.PutBlock(ctx, blocks[9]), ErrInvalidTipHeight) + r.ErrorIs(dao.PutBlock(ctx, blocks[11]), ErrInvalidTipHeight) + // case: put blocks 11 - 20 + for i := 10; i < 20; i++ { + r.NoError(dao.PutBlock(ctx, blocks[i])) + blk, err := dao.GetBlockByHeight(11) + r.NoError(err) + r.Equal(blocks[10].HashBlock(), blk.HashBlock(), "height %d", i) + } + // verify new blocks added and old blocks removed + testVerifyChainDB(t, dao, 11, 20) + testVerifyChainDBNotExists(t, dao, blocks[:10]) + // case: reload + r.NoError(dao.Stop(ctx)) + dao, err = NewSizedFileDao(10, datadir, desr) + r.NoError(err) + r.NoError(dao.Start(ctx)) + // verify blocks reloaded + testVerifyChainDB(t, dao, 11, 20) + testVerifyChainDBNotExists(t, dao, blocks[:10]) + // case: damaged db + inner := dao.(*sizedDao) + _, err = inner.store.Put([]byte("damaged")) + r.NoError(err) + r.NoError(dao.Stop(ctx)) + dao, err = NewSizedFileDao(10, datadir, desr) + r.NoError(err) + // verify invalid data is ignored + r.NoError(dao.Start(ctx)) + testVerifyChainDB(t, dao, 11, 20) + testVerifyChainDBNotExists(t, dao, blocks[:10]) + // case: remove non-continous blocks + inner = dao.(*sizedDao) + inner.store.Delete(inner.heightToID[15]) + r.NoError(dao.Stop(ctx)) + dao, err = NewSizedFileDao(10, datadir, desr) r.NoError(err) + // verify blocks 11 - 15 are removed + r.NoError(dao.Start(ctx)) + testVerifyChainDB(t, dao, 16, 20) + testVerifyChainDBNotExists(t, dao, blocks[:15]) + r.NoError(dao.Stop(ctx)) +} +func TestSizedDaoStorage(t *testing.T) { + r := require.New(t) + desr := block.NewDeserializer(4689) ctx := context.Background() + + datadir := t.TempDir() + dao, err := NewSizedFileDao(10, datadir, desr) + r.NoError(err) r.NoError(dao.Start(ctx)) - r.NoError(testCommitBlocks(t, dao, 1, 100, hash.ZeroHash256)) + + // put empty block + height := uint64(1) + builder := block.NewTestingBuilder() + h := hash.ZeroHash256 + blk := createTestingBlock(builder, height, h) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) + // put block with 5 actions + blk = createTestingBlock(builder, height, h, withActionNum(5)) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) + // put block with 100 actions + blk = createTestingBlock(builder, height, h, withActionNum(100)) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) + // put block with 1000 actions + blk = createTestingBlock(builder, height, h, withActionNum(1000)) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) + // put block with 5000 actions + blk = createTestingBlock(builder, height, h, withActionNum(5000)) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) + // put block with 10000 actions + blk = createTestingBlock(builder, height, h, withActionNum(10000)) + height++ + h = blk.HashBlock() + r.NoError(dao.PutBlock(ctx, blk)) r.NoError(dao.Stop(ctx)) } + +func testVerifyChainDBNotExists(t *testing.T, fd FileDAO, blocks []*block.Block) { + r := require.New(t) + for _, blk := range blocks { + _, err := fd.GetBlockHash(blk.Height()) + r.ErrorIs(err, db.ErrNotExist) + _, err = fd.GetBlockHeight(blk.HashBlock()) + r.ErrorIs(err, db.ErrNotExist) + _, err = fd.GetBlockByHeight(blk.Height()) + r.ErrorIs(err, db.ErrNotExist) + _, err = fd.GetBlock(blk.HashBlock()) + r.ErrorIs(err, db.ErrNotExist) + _, err = fd.GetReceipts(blk.Height()) + r.ErrorIs(err, db.ErrNotExist) + _, err = fd.TransactionLogs(blk.Height()) + r.ErrorIs(err, db.ErrNotExist) + } +} diff --git a/blockchain/filedao/testing.go b/blockchain/filedao/testing.go index 3df58a6244..19bc5a28c7 100644 --- a/blockchain/filedao/testing.go +++ b/blockchain/filedao/testing.go @@ -9,6 +9,7 @@ import ( "context" "encoding/hex" "math/big" + "math/rand" "testing" "github.com/pkg/errors" @@ -223,24 +224,64 @@ func testVerifyChainDB(t *testing.T, fd FileDAO, start, end uint64) { } } -func createTestingBlock(builder *block.TestingBuilder, height uint64, h hash.Hash256) *block.Block { +type ( + testBlockCfg struct { + actionNum int + } + testBlockOption func(*testBlockCfg) +) + +func withActionNum(num int) testBlockOption { + return func(cfg *testBlockCfg) { + cfg.actionNum = num + } +} + +func createTestingBlock(builder *block.TestingBuilder, height uint64, h hash.Hash256, opts ...testBlockOption) *block.Block { + cfg := &testBlockCfg{} + for _, opt := range opts { + opt(cfg) + } block.LoadGenesisHash(&genesis.Default) - r := &action.Receipt{ - Status: 1, - BlockHeight: height, - ActionHash: h, + if cfg.actionNum > 0 { + acts := make([]*action.SealedEnvelope, 0) + receipts := make([]*action.Receipt, 0) + for i := 0; i < cfg.actionNum; i++ { + amount := big.NewInt(int64(rand.Intn(100))) + sender := rand.Intn(20) + receipient := rand.Intn(20) + act, _ := action.SignedTransfer(identityset.Address(receipient).String(), identityset.PrivateKey(sender), uint64(rand.Intn(100)), amount, nil, testutil.TestGasLimit, testutil.TestGasPrice) + acts = append(acts, act) + actHash, _ := act.Hash() + r := &action.Receipt{ + Status: 1, + BlockHeight: height, + ActionHash: actHash, + } + receipts = append(receipts, r.AddTransactionLogs(&action.TransactionLog{ + Type: iotextypes.TransactionLogType_NATIVE_TRANSFER, + Amount: amount, + Sender: hex.EncodeToString(identityset.Address(sender).Bytes()), + Recipient: hex.EncodeToString(identityset.Address(receipient).Bytes()), + })) + } + builder.AddActions(acts...).SetReceipts(receipts) + } else { + r := &action.Receipt{ + Status: 1, + BlockHeight: height, + ActionHash: h, + } + builder.SetReceipts([]*action.Receipt{r.AddTransactionLogs(&action.TransactionLog{ + Type: iotextypes.TransactionLogType_NATIVE_TRANSFER, + Amount: big.NewInt(100), + Sender: hex.EncodeToString(h[:]), + Recipient: hex.EncodeToString(h[:]), + })}) } blk, _ := builder. SetHeight(height). SetPrevBlockHash(h). - SetReceipts([]*action.Receipt{ - r.AddTransactionLogs(&action.TransactionLog{ - Type: iotextypes.TransactionLogType_NATIVE_TRANSFER, - Amount: big.NewInt(100), - Sender: hex.EncodeToString(h[:]), - Recipient: hex.EncodeToString(h[:]), - }), - }). SetTimeStamp(testutil.TimestampNow().UTC()). SignAndBuild(identityset.PrivateKey(27)) return &blk From cd6767a3ea0ab0d1426b0a644adf25bc14afb017 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 10 Sep 2024 17:31:55 +0800 Subject: [PATCH 8/8] skip test --- blockchain/filedao/sized_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockchain/filedao/sized_test.go b/blockchain/filedao/sized_test.go index 37eb6cc1ae..7a900c1f8e 100644 --- a/blockchain/filedao/sized_test.go +++ b/blockchain/filedao/sized_test.go @@ -17,7 +17,7 @@ import ( ) func TestBlockSize(t *testing.T) { - // t.Skip("used for estimating block size") + t.Skip("used for estimating block size") r := require.New(t) conn, err := grpc.NewClient("api.mainnet.iotex.one:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}))) r.NoError(err)