Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the DataAvailabilityHeader to BlockID #304

Closed
wants to merge 13 commits into from
Closed
220 changes: 176 additions & 44 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
block, err := bcR.store.LoadBlock(nil, msg.Height)
if err != nil {
return false
}
if block != nil {
bl, err := block.ToProto()
if err != nil {
Expand Down
44 changes: 37 additions & 7 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v0

import (
"context"
"crypto/sha256"
"fmt"
"os"
Expand All @@ -11,6 +12,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-ipfs/core/coreapi"
coremock "github.com/ipfs/go-ipfs/core/mock"
iface "github.com/ipfs/interface-go-ipfs-core"
abci "github.com/lazyledger/lazyledger-core/abci/types"
cfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/db/memdb"
Expand Down Expand Up @@ -71,7 +75,8 @@ func newBlockchainReactor(
blockDB := memdb.NewDB()
stateDB := memdb.NewDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB)
ipfsAPI := mockedIpfsAPI()
blockStore := store.NewBlockStore(blockDB, ipfsAPI)

state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand Down Expand Up @@ -99,8 +104,10 @@ func newBlockchainReactor(
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)

lastBlock, err := blockStore.LoadBlock(nil, blockHeight-1)
if err != nil {
panic(err)
}
vote, err := types.MakeVote(
lastBlock.Header.Height,
lastBlockMeta.BlockID,
Expand All @@ -118,8 +125,13 @@ func newBlockchainReactor(

thisBlock := makeBlock(blockHeight, state, lastCommit)

err = thisBlock.PutBlock(context.Background(), ipfsAPI.Dag())
if err != nil {
panic(err)
}

thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header(), DataAvailabilityHeader: &thisBlock.DataAvailabilityHeader}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking: will this PR remove the PartSetHeader from the BlockID or will this rather happen in a separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will likely be in #287


state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
Expand Down Expand Up @@ -182,12 +194,15 @@ func TestNoBlockResponse(t *testing.T) {

assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
defer cancel()

for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
block, _ := reactorPairs[1].reactor.store.LoadBlock(ctx, tt.height)
if tt.existent {
assert.True(t, block != nil)
assert.True(t, block != nil, fmt.Sprintf("height %d", tt.height))
} else {
assert.True(t, block == nil)
assert.True(t, block == nil, tt.height, fmt.Sprintf("height %d", tt.height))
}
}
}
Expand Down Expand Up @@ -331,3 +346,18 @@ func (app *testApp) Commit() abci.ResponseCommit {
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
}

func mockedIpfsAPI() iface.CoreAPI {
ipfsNode, err := coremock.NewMockNode()
if err != nil {
panic(err)
}

// issue a new API object
ipfsAPI, err := coreapi.NewCoreAPI(ipfsNode)
if err != nil {
panic(err)
}

return ipfsAPI
}
8 changes: 6 additions & 2 deletions blockchain/v2/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -21,7 +22,7 @@ const (
)

type blockStore interface {
LoadBlock(height int64) *types.Block
LoadBlock(ctx context.Context, height int64) (*types.Block, error)
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Base() int64
Height() int64
Expand Down Expand Up @@ -488,7 +489,10 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}

case *bcproto.BlockRequest:
block := r.store.LoadBlock(msg.Height)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
block, err := r.store.LoadBlock(ctx, msg.Height)

if block != nil {
if err = r.io.sendBlockToPeer(block, src); err != nil {
logger.Error("Could not send block message to src peer", "err", err)
Expand Down
15 changes: 13 additions & 2 deletions blockchain/v2/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2

import (
"context"
"crypto/sha256"
"fmt"
"net"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/lazyledger/lazyledger-core/proxy"
sm "github.com/lazyledger/lazyledger-core/state"
"github.com/lazyledger/lazyledger-core/store"
"github.com/lazyledger/lazyledger-core/test/mockipfs"
"github.com/lazyledger/lazyledger-core/types"
tmtime "github.com/lazyledger/lazyledger-core/types/time"
)
Expand Down Expand Up @@ -515,7 +517,8 @@ func newReactorStore(
}

stateDB := memdb.NewDB()
blockStore := store.NewBlockStore(memdb.NewDB())
ipfsAPI := mockipfs.MockedIpfsAPI()
blockStore := store.NewBlockStore(memdb.NewDB(), ipfsAPI)
stateStore := sm.NewStore(stateDB)
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand All @@ -539,7 +542,10 @@ func newReactorStore(
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
lastBlock, err := blockStore.LoadBlock(nil, blockHeight-1)
if err != nil {
panic(err)
}
vote, err := types.MakeVote(
lastBlock.Header.Height,
lastBlockMeta.BlockID,
Expand All @@ -557,6 +563,11 @@ func newReactorStore(

thisBlock := makeBlock(blockHeight, state, lastCommit)

err = thisBlock.PutBlock(context.Background(), ipfsAPI.Dag())
if err != nil {
panic(err)
}

thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}

Expand Down
172 changes: 83 additions & 89 deletions cmd/tendermint/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@ package commands

import (
"fmt"
"os"
"path/filepath"
"sync"

ipfscfg "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/ipfs/interface-go-ipfs-core/options"

"github.com/spf13/cobra"

cfg "github.com/lazyledger/lazyledger-core/config"
tmos "github.com/lazyledger/lazyledger-core/libs/os"
tmrand "github.com/lazyledger/lazyledger-core/libs/rand"
"github.com/lazyledger/lazyledger-core/node"
"github.com/lazyledger/lazyledger-core/p2p"
"github.com/lazyledger/lazyledger-core/privval"
tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types"
Expand Down Expand Up @@ -106,90 +100,90 @@ func initFilesWithConfig(config *cfg.Config) error {
logger.Info("Generated genesis file", "path", genFile)
}

if err := InitIpfs(config); err != nil {
if err := node.InitIpfs(config, logger); err != nil {
return err
}

return nil
}

// InitIpfs takes a few config flags from the tendermint config.IPFS
// and applies them to the freshly created IPFS repo.
// The IPFS config will stored under config.IPFS.ConfigRootPath.
// TODO(ismail) move into separate file, and consider making IPFS initialization
// independent from the `tendermint init` subcommand.
// TODO(ismail): add counter part in ResetAllCmd
func InitIpfs(config *cfg.Config) error {
repoRoot := config.IPFSRepoRoot()
if fsrepo.IsInitialized(repoRoot) {
logger.Info("IPFS was already initialized", "ipfs-path", repoRoot)
return nil
}
var conf *ipfscfg.Config

identity, err := ipfscfg.CreateIdentity(os.Stdout, []options.KeyGenerateOption{
options.Key.Type(options.Ed25519Key),
})
if err != nil {
return err
}

logger.Info("initializing IPFS node", "ipfs-path", repoRoot)

if err := tmos.EnsureDir(repoRoot, 0700); err != nil {
return err
}

conf, err = ipfscfg.InitWithIdentity(identity)
if err != nil {
return err
}

applyFromTmConfig(conf, config.IPFS)
if err := setupPlugins(repoRoot); err != nil {
return err
}

if err := fsrepo.Init(repoRoot, conf); err != nil {
return err
}
return nil
}

// Inject replies on several global vars internally.
// For instance fsrepo.AddDatastoreConfigHandler will error
// if called multiple times with the same datastore.
// But for CI and integration tests, we want to setup the plugins
// for each repo but only inject once s.t. we can init multiple
// repos from the same runtime.
// TODO(ismail): find a more elegant way to achieve the same.
var injectPluginsOnce sync.Once

func setupPlugins(path string) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

injectPluginsOnce.Do(func() {
err = plugins.Inject()
})
if err != nil {
return fmt.Errorf("error injecting plugins once: %w", err)
}

return nil
}

func applyFromTmConfig(ipfsConf *ipfscfg.Config, tmConf *cfg.IPFSConfig) {
ipfsConf.Addresses.API = ipfscfg.Strings{tmConf.API}
ipfsConf.Addresses.Gateway = ipfscfg.Strings{tmConf.Gateway}
ipfsConf.Addresses.Swarm = tmConf.Swarm
ipfsConf.Addresses.Announce = tmConf.Announce
ipfsConf.Addresses.NoAnnounce = tmConf.NoAnnounce
}
// // InitIpfs takes a few config flags from the tendermint config.IPFS
// // and applies them to the freshly created IPFS repo.
// // The IPFS config will stored under config.IPFS.ConfigRootPath.
// // TODO(ismail) move into separate file, and consider making IPFS initialization
// // independent from the `tendermint init` subcommand.
// // TODO(ismail): add counter part in ResetAllCmd
// func InitIpfs(config *cfg.Config) error {
// repoRoot := config.IPFSRepoRoot()
// if fsrepo.IsInitialized(repoRoot) {
// logger.Info("IPFS was already initialized", "ipfs-path", repoRoot)
// return nil
// }
// var conf *ipfscfg.Config

// identity, err := ipfscfg.CreateIdentity(os.Stdout, []options.KeyGenerateOption{
// options.Key.Type(options.Ed25519Key),
// })
// if err != nil {
// return err
// }

// logger.Info("initializing IPFS node", "ipfs-path", repoRoot)

// if err := tmos.EnsureDir(repoRoot, 0700); err != nil {
// return err
// }

// conf, err = ipfscfg.InitWithIdentity(identity)
// if err != nil {
// return err
// }

// applyFromTmConfig(conf, config.IPFS)
// if err := setupPlugins(repoRoot); err != nil {
// return err
// }

// if err := fsrepo.Init(repoRoot, conf); err != nil {
// return err
// }
// return nil
// }

// // Inject replies on several global vars internally.
// // For instance fsrepo.AddDatastoreConfigHandler will error
// // if called multiple times with the same datastore.
// // But for CI and integration tests, we want to setup the plugins
// // for each repo but only inject once s.t. we can init multiple
// // repos from the same runtime.
// // TODO(ismail): find a more elegant way to achieve the same.
// var injectPluginsOnce sync.Once

// func setupPlugins(path string) error {
// // Load plugins. This will skip the repo if not available.
// plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
// if err != nil {
// return fmt.Errorf("error loading plugins: %s", err)
// }

// if err := plugins.Initialize(); err != nil {
// return fmt.Errorf("error initializing plugins: %s", err)
// }

// injectPluginsOnce.Do(func() {
// err = plugins.Inject()
// })
// if err != nil {
// return fmt.Errorf("error injecting plugins once: %w", err)
// }

// return nil
// }

// func applyFromTmConfig(ipfsConf *ipfscfg.Config, tmConf *cfg.IPFSConfig) {
// ipfsConf.Addresses.API = ipfscfg.Strings{tmConf.API}
// ipfsConf.Addresses.Gateway = ipfscfg.Strings{tmConf.Gateway}
// ipfsConf.Addresses.Swarm = tmConf.Swarm
// ipfsConf.Addresses.Announce = tmConf.Announce
// ipfsConf.Addresses.NoAnnounce = tmConf.NoAnnounce
// }
Loading