Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process batch serialise concurrently #3877

Merged
merged 19 commits into from
Jun 21, 2023
Merged
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions db/batch/batch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,39 @@ func (b *baseKVStoreBatch) Entry(index int) (*WriteInfo, error) {
}

func (b *baseKVStoreBatch) SerializeQueue(serialize WriteInfoSerialize, filter WriteInfoFilter) []byte {
b.mutex.Lock()
defer b.mutex.Unlock()
// 1. This could be improved by being processed in parallel
// 2. Digest could be replaced by merkle root if we need proof
bytes := make([]byte, 0)
for _, wi := range b.writeQueue {
if filter != nil && filter(wi) {
continue
}
if serialize != nil {
bytes = append(bytes, serialize(wi)...)
} else {
bytes = append(bytes, wi.Serialize()...)
}
// 1. Digest could be replaced by merkle root if we need proof
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.mutex.Lock()
defer b.mutex.Unlock()

should the mtx be kept?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Liuhaai , @envestcc said it's not needed as the concurrent writes now happen on different indices within the slice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could write an unit test to verify the concurrency problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mtx is for b.writeQueue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the mtx back @Liuhaai

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the Unlock can be called earlier? It seems like that it can be placed just before wg.Wait().

but it seems that this is unrelated to the current issue. It may not improve the serialization process. Perhaps it can be discussed in a new issue.

var (
serialisedBytes = make([][]byte, len(b.writeQueue))
wg = sync.WaitGroup{}
)

wg.Add(len(b.writeQueue))
for i, wi := range b.writeQueue {
go func(i int, info *WriteInfo) {
defer wg.Done()
if filter != nil && filter(info) {
return
}
pocockn marked this conversation as resolved.
Show resolved Hide resolved

idx := i
var data []byte
if serialize != nil {
data = serialize(info)
} else {
data = info.Serialize()
}

serialisedBytes[idx] = data
}(i, wi)
}
return bytes
wg.Wait()

var returnedBytes []byte
for _, sb := range serialisedBytes {
returnedBytes = append(returnedBytes, sb...)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general this is fine, but probably won't work for our case. Because the total bytes will be hashed to serve as a digest of the batch's content, which is used later in validation. So the bytes has to be in fixed order, and working parallel breaks this order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, does this mean the PR is no longer needed? Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should ensure that the order of writing bytes matches the order in the writeQueue. As for the order of concurrent serialization in between does not matter .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@envestcc modified it so the writing of the bytes matches the order 👍


return returnedBytes
}

// Clear clear write queue
Expand Down