diff --git a/core/ledger/ledgerstorage/store.go b/core/ledger/ledgerstorage/store.go index f1948ecf246..e56d2d38439 100644 --- a/core/ledger/ledgerstorage/store.go +++ b/core/ledger/ledgerstorage/store.go @@ -116,7 +116,7 @@ func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsColl if block, err = s.RetrieveBlockByNumber(blockNum); err != nil { return nil, err } - if pvtdata, err = s.GetPvtDataByNum(blockNum, filter); err != nil { + if pvtdata, err = s.getPvtDataByNumWithoutLock(blockNum, filter); err != nil { return nil, err } return &ledger.BlockAndPvtData{Block: block, BlockPvtData: constructPvtdataMap(pvtdata)}, nil @@ -128,7 +128,13 @@ func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsColl func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { s.rwlock.RLock() defer s.rwlock.RUnlock() + return s.getPvtDataByNumWithoutLock(blockNum, filter) +} +// getPvtDataByNumWithoutLock returns only the pvt data corresponding to the given block number. +// This function does not acquire a readlock and it is expected that in most of the circumstances, the caller +// posesses a read lock on `s.rwlock` +func (s *Store) getPvtDataByNumWithoutLock(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { var pvtdata []*ledger.TxPvtData var err error if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil { diff --git a/core/ledger/ledgerstorage/store_test.go b/core/ledger/ledgerstorage/store_test.go index 0daf03823d5..81e7e994755 100644 --- a/core/ledger/ledgerstorage/store_test.go +++ b/core/ledger/ledgerstorage/store_test.go @@ -17,8 +17,11 @@ limitations under the License. package ledgerstorage import ( + "fmt" "os" + "runtime" "testing" + "time" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/blkstorage" @@ -26,6 +29,7 @@ import ( "github.com/hyperledger/fabric/common/ledger/testutil" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -38,6 +42,50 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestStoreConcurrentReadWrite(t *testing.T) { + testEnv := newTestEnv(t) + defer testEnv.cleanup() + provider := NewProvider() + defer provider.Close() + store, err := provider.Open("testLedger") + assert.NoError(t, err) + defer store.Shutdown() + + // Modify store to have a BlockStore that has a custom slowdown + store.BlockStore = newSlowBlockStore(store.BlockStore, time.Second) + + sampleData := sampleData(t) + // Commit first block + store.CommitWithPvtData(sampleData[0]) + go func() { + time.Sleep(time.Millisecond * 500) + // Commit all but first block + for _, sampleDatum := range sampleData[1:] { + store.CommitWithPvtData(sampleDatum) + } + + }() + + c := make(chan struct{}) + go func() { + // Read first block + _, err := store.GetPvtDataAndBlockByNum(0, nil) + assert.NoError(t, err) + c <- struct{}{} + }() + + select { + case <-c: + t.Log("Obtained private data and block by number") + case <-time.After(time.Second * 10): + assert.Fail(t, "Didn't finish within a timely manner, perhaps the system is deadlocked?") + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + fmt.Printf("%s", buf) + } + +} + func TestStore(t *testing.T) { testEnv := newTestEnv(t) defer testEnv.cleanup() @@ -188,3 +236,25 @@ func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData { } return constructPvtdataMap(pvtData) } + +type slowBlockStore struct { + delay time.Duration + blkstorage.BlockStore +} + +func newSlowBlockStore(store blkstorage.BlockStore, delay time.Duration) blkstorage.BlockStore { + return &slowBlockStore{ + delay: delay, + BlockStore: store, + } +} + +func (bs *slowBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) { + time.Sleep(bs.delay) + return bs.BlockStore.RetrieveBlockByNumber(blockNum) +} + +func (bs *slowBlockStore) AddBlock(block *common.Block) error { + time.Sleep(bs.delay) + return bs.BlockStore.AddBlock(block) +}