Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process batch serialise concurrently #3877

Merged
merged 19 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion action/action_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import "github.com/iotexproject/iotex-proto/golang/iotextypes"
// Currently the parameter is EVM network ID for tx in web3 format, it is called like
//
// act, err := (&Deserializer{}).SetEvmNetworkID(id).ActionToSealedEnvelope(pbAction)
//
type Deserializer struct {
evmNetworkID uint32
}
Expand Down
2 changes: 1 addition & 1 deletion action/protocol/execution/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func getChainConfig(g genesis.Blockchain, height uint64, id uint32) *params.Chai
return &chainConfig
}

//Error in executeInEVM is a consensus issue
// Error in executeInEVM is a consensus issue
func executeInEVM(ctx context.Context, evmParams *Params, stateDB *StateDBAdapter, g genesis.Blockchain, gasLimit uint64, blockHeight uint64) ([]byte, uint64, uint64, string, iotextypes.ReceiptStatus, error) {
remainingGas := evmParams.gas
if err := securityDeposit(evmParams, stateDB, gasLimit); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion action/protocol/vote/probationlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/iotexproject/iotex-proto/golang/iotextypes"
)

//ProbationList defines a map where key is candidate's name and value is the counter which counts the unproductivity during probation epoch.
// ProbationList defines a map where key is candidate's name and value is the counter which counts the unproductivity during probation epoch.
type ProbationList struct {
ProbationInfo map[string]uint32
IntensityRate uint32
Expand Down
4 changes: 2 additions & 2 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,9 @@ func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) err
return nil
}

//======================================
// ======================================
// private functions
//======================================
// ======================================
func (ap *actPool) enqueueAction(ctx context.Context, addr address.Address, act action.SealedEnvelope, actHash hash.Hash256, actNonce uint64) error {
span := tracer.SpanFromContext(ctx)
defer span.End()
Expand Down
1 change: 0 additions & 1 deletion blockchain/block/block_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
//
// blk, err := (&Deserializer{}).SetEvmNetworkID(id).FromBlockProto(pbBlock)
// blk, err := (&Deserializer{}).SetEvmNetworkID(id).DeserializeBlock(buf)
//
type Deserializer struct {
evmNetworkID uint32
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (bc *blockchain) MintNewBlock(timestamp time.Time) (*block.Block, error) {
return &blk, nil
}

// CommitBlock validates and appends a block to the chain
// CommitBlock validates and appends a block to the chain
func (bc *blockchain) CommitBlock(blk *block.Block) error {
bc.mu.Lock()
defer bc.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion consensus/scheme/rolldpos/endorsementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
_statusKey = []byte("status")
)

//EndorsedByMajorityFunc defines a function to give an information of consensus status
// EndorsedByMajorityFunc defines a function to give an information of consensus status
type EndorsedByMajorityFunc func(blockHash []byte, topics []ConsensusVoteTopic) bool

type endorserEndorsementCollection struct {
Expand Down
33 changes: 22 additions & 11 deletions db/batch/batch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,30 @@ func (b *baseKVStoreBatch) Entry(index int) (*WriteInfo, error) {
func (b *baseKVStoreBatch) SerializeQueue(serialize WriteInfoSerialize, filter WriteInfoFilter) []byte {
b.mutex.Lock()
defer b.mutex.Unlock()
// 1. This could be improved by being processed in parallel
// 2. Digest could be replaced by merkle root if we need proof
bytes := make([]byte, 0)
// 1. Digest could be replaced by merkle root if we need proof
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.mutex.Lock()
defer b.mutex.Unlock()

should the mtx be kept?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Liuhaai , @envestcc said it's not needed as the concurrent writes now happen on different indices within the slice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could write an unit test to verify the concurrency problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mtx is for b.writeQueue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the mtx back @Liuhaai

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the Unlock can be called earlier? It seems like that it can be placed just before wg.Wait().

but it seems that this is unrelated to the current issue. It may not improve the serialization process. Perhaps it can be discussed in a new issue.

bytesChan := make(chan []byte, len(b.writeQueue))
wg := sync.WaitGroup{}
wg.Add(len(b.writeQueue))
for _, wi := range b.writeQueue {
if filter != nil && filter(wi) {
continue
}
if serialize != nil {
bytes = append(bytes, serialize(wi)...)
} else {
bytes = append(bytes, wi.Serialize()...)
}
go func(info *WriteInfo) {
if filter != nil && filter(info) {
return
}
pocockn marked this conversation as resolved.
Show resolved Hide resolved
if serialize != nil {
bytesChan <- serialize(info)
} else {
bytesChan <- info.Serialize()
}
wg.Done()
}(wi)
}
wg.Wait()

bytes := make([]byte, 0)
for itemBytes := range bytesChan {
bytes = append(bytes, itemBytes...)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general this is fine, but probably won't work for our case. Because the total bytes will be hashed to serve as a digest of the batch's content, which is used later in validation. So the bytes has to be in fixed order, and working parallel breaks this order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, does this mean the PR is no longer needed? Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should ensure that the order of writing bytes matches the order in the writeQueue. As for the order of concurrent serialization in between does not matter .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@envestcc modified it so the writing of the bytes matches the order 👍


return bytes
}

Expand Down
2 changes: 1 addition & 1 deletion db/batch/batch_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBaseKVStoreBatch(t *testing.T) {
require.Equal(0.5, p)

// test serialize/translate
require.True(bytes.Equal([]byte{0, 110, 115, 1, 110, 115}, b.SerializeQueue(nil, nil)))
require.True(bytes.Equal([]byte{1, 110, 115, 0, 110, 115}, b.SerializeQueue(nil, nil)))
pocockn marked this conversation as resolved.
Show resolved Hide resolved
require.True(bytes.Equal([]byte{}, b.SerializeQueue(nil, func(wi *WriteInfo) bool {
return wi.Namespace() == "ns"
})))
Expand Down
2 changes: 1 addition & 1 deletion ioctl/cmd/action/stake2.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

var _stake2AutoStake bool

//Stake2Cmd represent stake2 command
// Stake2Cmd represent stake2 command
var Stake2Cmd = &cobra.Command{
Use: "stake2",
Short: config.TranslateInLang(_stake2CmdShorts, config.UILanguage),
Expand Down
2 changes: 1 addition & 1 deletion ioctl/cmd/action/xrc20.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
}
)

//Xrc20Cmd represent xrc20 standard command-line
// Xrc20Cmd represent xrc20 standard command-line
var Xrc20Cmd = &cobra.Command{
Use: "xrc20",
Short: config.TranslateInLang(_xrc20CmdShorts, config.UILanguage),
Expand Down
1 change: 0 additions & 1 deletion tools/executiontester/blockchain/erc721_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (f *erc721Token) CreateToken(tokenid, creditor string) (string, error) {
return h, nil
}

//
func (f *erc721Token) Transfer(token, sender, prvkey, receiver string, tokenid string) (string, error) {
TokenID, ok := new(big.Int).SetString(tokenid, 10)
if !ok {
Expand Down