Skip to content

Commit

Permalink
[FAB-6260] ledgerstore uses RLock^2 and deadlocks
Browse files Browse the repository at this point in the history
Re-entrent RLocking while a Lock happens in between is a deadlock
in golang.
This makes the ledgerstore deadlock if it writes blocks while some peer
fetches blocks from it via state transfer.

Full details in the JIRA.

Change-Id: I17f159120c370cacdbeec92c04e9b662c9c6b213
Signed-off-by: yacovm <yacovm@il.ibm.com>
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
yacovm committed Sep 22, 2017
1 parent ff57401 commit 881f38e
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
8 changes: 7 additions & 1 deletion core/ledger/ledgerstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsColl
if block, err = s.RetrieveBlockByNumber(blockNum); err != nil {
return nil, err
}
if pvtdata, err = s.GetPvtDataByNum(blockNum, filter); err != nil {
if pvtdata, err = s.getPvtDataByNumWithoutLock(blockNum, filter); err != nil {
return nil, err
}
return &ledger.BlockAndPvtData{Block: block, BlockPvtData: constructPvtdataMap(pvtdata)}, nil
Expand All @@ -128,7 +128,13 @@ func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsColl
func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
return s.getPvtDataByNumWithoutLock(blockNum, filter)
}

// getPvtDataByNumWithoutLock returns only the pvt data corresponding to the given block number.
// This function does not acquire a readlock and it is expected that in most of the circumstances, the caller
// posesses a read lock on `s.rwlock`
func (s *Store) getPvtDataByNumWithoutLock(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
var pvtdata []*ledger.TxPvtData
var err error
if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil {
Expand Down
70 changes: 70 additions & 0 deletions core/ledger/ledgerstorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package ledgerstorage

import (
"fmt"
"os"
"runtime"
"testing"
"time"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand All @@ -38,6 +42,50 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestStoreConcurrentReadWrite(t *testing.T) {
testEnv := newTestEnv(t)
defer testEnv.cleanup()
provider := NewProvider()
defer provider.Close()
store, err := provider.Open("testLedger")
assert.NoError(t, err)
defer store.Shutdown()

// Modify store to have a BlockStore that has a custom slowdown
store.BlockStore = newSlowBlockStore(store.BlockStore, time.Second)

sampleData := sampleData(t)
// Commit first block
store.CommitWithPvtData(sampleData[0])
go func() {
time.Sleep(time.Millisecond * 500)
// Commit all but first block
for _, sampleDatum := range sampleData[1:] {
store.CommitWithPvtData(sampleDatum)
}

}()

c := make(chan struct{})
go func() {
// Read first block
_, err := store.GetPvtDataAndBlockByNum(0, nil)
assert.NoError(t, err)
c <- struct{}{}
}()

select {
case <-c:
t.Log("Obtained private data and block by number")
case <-time.After(time.Second * 10):
assert.Fail(t, "Didn't finish within a timely manner, perhaps the system is deadlocked?")
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
fmt.Printf("%s", buf)
}

}

func TestStore(t *testing.T) {
testEnv := newTestEnv(t)
defer testEnv.cleanup()
Expand Down Expand Up @@ -188,3 +236,25 @@ func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData {
}
return constructPvtdataMap(pvtData)
}

type slowBlockStore struct {
delay time.Duration
blkstorage.BlockStore
}

func newSlowBlockStore(store blkstorage.BlockStore, delay time.Duration) blkstorage.BlockStore {
return &slowBlockStore{
delay: delay,
BlockStore: store,
}
}

func (bs *slowBlockStore) RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
time.Sleep(bs.delay)
return bs.BlockStore.RetrieveBlockByNumber(blockNum)
}

func (bs *slowBlockStore) AddBlock(block *common.Block) error {
time.Sleep(bs.delay)
return bs.BlockStore.AddBlock(block)
}

0 comments on commit 881f38e

Please sign in to comment.