Skip to content

Commit

Permalink
[FAB-5803] Initialize pvtdata store correctly
Browse files Browse the repository at this point in the history
This CR fixes a bug in the initialization of the pvtdata store in the scenario
when a blockchain already exists from version 1.0/1.0.1

Change-Id: Ifa466750340569892fcbe3245b5fc45c31402c77
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Aug 18, 2017
1 parent 13724f7 commit 561275a
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 4 deletions.
46 changes: 44 additions & 2 deletions core/ledger/ledgerstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (p *Provider) Open(ledgerid string) (*Store, error) {
var blockStore blkstorage.BlockStore
var pvtdataStore pvtdatastorage.Store
var err error

if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,14 +137,55 @@ func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter)
return pvtdata, nil
}

// init checks whether the block storage and pvt data store are in sync
// init first invokes function `initFromExistingBlockchain`
// in order to check whether the pvtdata store is present because of an upgrade
// of peer from 1.0 and need to be updated with the existing blockchain. If, this is
// not the case then this init will invoke function `syncPvtdataStoreWithBlockStore`
// to follow the normal course
func (s *Store) init() error {
var initialized bool
var err error
if initialized, err = s.initPvtdataStoreFromExistingBlockchain(); err != nil || initialized {
return err
}
return s.syncPvtdataStoreWithBlockStore()
}

// initPvtdataStoreFromExistingBlockchain updates the initial state of the pvtdata store
// if an existing block store has a blockchain and the pvtdata store is empty.
// This situation is expected to happen when a peer is upgrated from version 1.0
// and an existing blockchain is present that was generated with version 1.0.
// Under this scenario, the pvtdata store is brought upto the point as if it has
// processed exisitng blocks with no pvt data. This function returns true if the
// above mentioned condition is found to be true and pvtdata store is successfully updated
func (s *Store) initPvtdataStoreFromExistingBlockchain() (bool, error) {
var bcInfo *common.BlockchainInfo
var pvtdataStoreEmpty bool
var err error

if bcInfo, err = s.BlockStore.GetBlockchainInfo(); err != nil {
return false, err
}
if pvtdataStoreEmpty, err = s.pvtdataStore.IsEmpty(); err != nil {
return false, err
}
if pvtdataStoreEmpty && bcInfo.Height > 0 {
if err = s.pvtdataStore.InitLastCommittedBlock(bcInfo.Height - 1); err != nil {
return false, err
}
return true, nil
}
return false, nil
}

// syncPvtdataStoreWithBlockStore checks whether the block storage and pvt data store are in sync
// this is called when the store instance is constructed and handed over for the use.
// this check whether there is a pending batch (possibly from a previous system crash)
// of pvt data that was not committed. If a pending batch exists, the check is made
// whether the associated block was successfully committed in the block storage (before the crash)
// or not. If the block was committed, the private data batch is committed
// otherwise, the pvt data batch is rolledback
func (s *Store) init() error {
func (s *Store) syncPvtdataStoreWithBlockStore() error {
var pendingPvtbatch bool
var err error
if pendingPvtbatch, err = s.pvtdataStore.HasPendingBatch(); err != nil {
Expand Down
55 changes: 55 additions & 0 deletions core/ledger/ledgerstorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"testing"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,6 +99,58 @@ func TestStore(t *testing.T) {
assert.Nil(t, blockAndPvtdata.BlockPvtData[2])
}

func TestStoreWithExistingBlockchain(t *testing.T) {
testLedgerid := "test-ledger"
testEnv := newTestEnv(t)
defer testEnv.cleanup()

// Construct a block storage
attrsToIndex := []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
blkstorage.IndexableAttrBlockTxID,
blkstorage.IndexableAttrTxValidationCode,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStoreProvider := fsblkstorage.NewProvider(
fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),
indexConfig)

blkStore, err := blockStoreProvider.OpenBlockStore(testLedgerid)
assert.NoError(t, err)
testBlocks := testutil.ConstructTestBlocks(t, 10)

existingBlocks := testBlocks[0:9]
blockToAdd := testBlocks[9:][0]

// Add existingBlocks to the block storage directly without involving pvtdata store and close the block storage
for _, blk := range existingBlocks {
assert.NoError(t, blkStore.AddBlock(blk))
}
blockStoreProvider.Close()

// Simulating the upgrade from 1.0 situation:
// Open the ledger storage - pvtdata store is opened for the first time with an existing block storage
provider := NewProvider()
defer provider.Close()
store, err := provider.Open(testLedgerid)
defer store.Shutdown()

// test that pvtdata store is updated with info from existing block storage
pvtdataBlockHt, err := store.pvtdataStore.LastCommittedBlockHeight()
assert.NoError(t, err)
assert.Equal(t, uint64(9), pvtdataBlockHt)

// Add one more block with ovtdata associated with one of the trans and commit in the normal course
pvtdata := samplePvtData(t, []uint64{0})
assert.NoError(t, store.CommitWithPvtData(&ledger.BlockAndPvtData{Block: blockToAdd, BlockPvtData: pvtdata}))
pvtdataBlockHt, err = store.pvtdataStore.LastCommittedBlockHeight()
assert.NoError(t, err)
assert.Equal(t, uint64(10), pvtdataBlockHt)
}

func sampleData(t *testing.T) []*ledger.BlockAndPvtData {
var blockAndpvtdata []*ledger.BlockAndPvtData
blocks := testutil.ConstructTestBlocks(t, 10)
Expand Down
10 changes: 9 additions & 1 deletion core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type Provider interface {
// on whether the block was written successfully or not. The store implementation
// is expected to survive a server crash between the call to `Prepare` and `Commit`/`Rollback`
type Store interface {
// InitLastCommittedBlockHeight sets the last commited block height into the pvt data store
// This function is used in a special case where the peer is started up with the blockchain
// from an earlier version of a peer when the pvt data feature (and hence this store) was not
// available. This function is expected to be called only this situation and hence is
// expected to throw an error if the store is not empty. On a successful return from this
// fucntion the state of the store is expected to be same as of calling the prepare/commit
// function for block `0` through `blockNum` with no pvt data
InitLastCommittedBlock(blockNum uint64) error
// GetPvtDataByBlockNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
// A nil filter does not filter any results
Expand All @@ -52,7 +60,7 @@ type Store interface {
Shutdown()
}

// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback
// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback/InitLastCommittedBlock
type ErrIllegalCall struct {
msg string
}
Expand Down
16 changes: 16 additions & 0 deletions core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
return pvtData, nil
}

// InitLastCommittedBlock implements the function in the interface `Store`
func (s *store) InitLastCommittedBlock(blockNum uint64) error {
if !(s.isEmpty && !s.batchPending) {
return &ErrIllegalCall{"The pvtdata store is not empty. InitLastCommittedBlock() function call is not allowed"}
}
batch := leveldbhelper.NewUpdateBatch()
batch.Put(lastCommittedBlkkey, encodeBlockNum(blockNum))
if err := s.db.WriteBatch(batch, true); err != nil {
return err
}
s.isEmpty = false
s.lastCommittedBlock = blockNum
logger.Debugf("InitLastCommittedBlock set to = %d", blockNum)
return nil
}

// LastCommittedBlockHeight implements the function in the interface `Store`
func (s *store) LastCommittedBlockHeight() (uint64, error) {
if s.isEmpty {
Expand Down
28 changes: 28 additions & 0 deletions core/ledger/pvtdatastorage/store_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ func TestStoreState(t *testing.T) {
assert.True(ok)
}

func TestInitLastCommittedBlock(t *testing.T) {
env := NewTestStoreEnv(t)
defer env.Cleanup()
assert := assert.New(t)
store := env.TestStore
existingLastBlockNum := uint64(25)
assert.NoError(store.InitLastCommittedBlock(existingLastBlockNum))

testEmpty(false, assert, store)
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(existingLastBlockNum+1, assert, store)

env.CloseAndReopen()
testEmpty(false, assert, store)
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(existingLastBlockNum+1, assert, store)

err := store.InitLastCommittedBlock(30)
_, ok := err.(*ErrIllegalCall)
assert.True(ok)
}

// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback`

func testEmpty(expectedEmpty bool, assert *assert.Assertions, store Store) {
Expand All @@ -111,6 +133,12 @@ func testPendingBatch(expectedPending bool, assert *assert.Assertions, store Sto
assert.Equal(expectedPending, hasPendingBatch)
}

func testLastCommittedBlockHeight(expectedBlockHt uint64, assert *assert.Assertions, store Store) {
blkHt, err := store.LastCommittedBlockHeight()
assert.NoError(err)
assert.Equal(expectedBlockHt, blkHt)
}

func samplePvtData(t *testing.T, txNums []uint64) []*ledger.TxPvtData {
pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV}
pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{
Expand Down
13 changes: 12 additions & 1 deletion core/ledger/pvtdatastorage/test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/stretchr/testify/assert"
)

const testStoreid = "TestStore"

// StoreEnv provides the store env for testing
type StoreEnv struct {
t testing.TB
Expand All @@ -26,11 +28,20 @@ func NewTestStoreEnv(t *testing.T) *StoreEnv {
removeStorePath(t)
assert := assert.New(t)
testStoreProvider := NewProvider()
testStore, err := testStoreProvider.OpenStore("TestStore")
testStore, err := testStoreProvider.OpenStore(testStoreid)
assert.NoError(err)
return &StoreEnv{t, testStoreProvider, testStore}
}

// CloseAndReopen closes and opens the store provider
func (env *StoreEnv) CloseAndReopen() {
var err error
env.TestStoreProvider.Close()
env.TestStoreProvider = NewProvider()
env.TestStore, err = env.TestStoreProvider.OpenStore(testStoreid)
assert.NoError(env.t, err)
}

// Cleanup cleansup the store env after testing
func (env *StoreEnv) Cleanup() {
env.TestStoreProvider.Close()
Expand Down

0 comments on commit 561275a

Please sign in to comment.