Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: audit store/streaming/file/service.go (backport #14234) #14241

Merged
merged 2 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,10 @@ func DefaultConfig() *Config {
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
WriteDir: "data/file_streamer",
WriteDir: "",
OutputMetadata: true,
StopNodeOnError: true,
// NOTICE: the default config don't protect the streamer data integrity
// NOTICE: The default config doesn't protect the streamer data integrity
// in face of system crash.
Fsync: false,
},
Expand Down
3 changes: 3 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"

# output-metadata specifies if output the metadata file which includes the abci request/responses
# during processing the block.
output-metadata = "{{ .Streamers.File.OutputMetadata }}"

# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine.
stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}"

# fsync specifies if call fsync after writing the files.
fsync = "{{ .Streamers.File.Fsync }}"
`
Expand Down
112 changes: 71 additions & 41 deletions store/streaming/file/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

var _ baseapp.StreamingService = &StreamingService{}

// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
// StreamingService is a concrete implementation of StreamingService that writes
// state changes out to files.
type StreamingService struct {
storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore
filePrefix string // optional prefix for each of the generated files
Expand All @@ -30,101 +31,122 @@ type StreamingService struct {

currentBlockNumber int64
blockMetadata types.BlockMetadata
// if write the metadata file, otherwise only data file is outputted.

// outputMetadata, if true, writes additional metadata to file per block
outputMetadata bool
// if true, when commit failed it will panic and stop the consensus state machine to ensure the
// eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data.

// stopNodeOnErr, if true, will panic and stop the node during ABCI Commit
// to ensure eventual consistency of the output, otherwise, any errors are
// logged and ignored which could yield data loss in streamed output.
stopNodeOnErr bool
// if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash.

// fsync, if true, will execute file Sync to make sure the data is persisted
// onto disk, otherwise there is a risk of data loss during any crash.
fsync bool
}

// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) {
func NewStreamingService(
writeDir, filePrefix string,
storeKeys []types.StoreKey,
cdc codec.BinaryCodec,
outputMetadata, stopNodeOnErr, fsync bool,
) (*StreamingService, error) {
// sort storeKeys for deterministic output
sort.SliceStable(storeKeys, func(i, j int) bool {
return storeKeys[i].Name() < storeKeys[j].Name()
})

// NOTE: We use the same listener for each store.
listeners := make([]*types.MemoryListener, len(storeKeys))
// in this case, we are using the same listener for each Store
for i, key := range storeKeys {
listeners[i] = types.NewMemoryListener(key)
}
// check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not
// we don't open a dstFile until we receive our first ABCI message

// Check that the writeDir exists and is writable so that we can catch the
// error here at initialization. If it is not we don't open a dstFile until we
// receive our first ABCI message.
if err := isDirWriteable(writeDir); err != nil {
return nil, err
}

return &StreamingService{
storeListeners: listeners,
filePrefix: filePrefix,
writeDir: writeDir,
codec: c,
codec: cdc,
outputMetadata: outputMetadata,
stopNodeOnErr: stopNodeOnErr,
fsync: fsync,
}, nil
}

// Listeners satisfies the baseapp.StreamingService interface
// It returns the StreamingService's underlying WriteListeners
// Use for registering the underlying WriteListeners with the BaseApp
// Listeners satisfies the StreamingService interface. It returns the
// StreamingService's underlying WriteListeners. Use for registering the
// underlying WriteListeners with the BaseApp.
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners))
for _, listener := range fss.storeListeners {
listeners[listener.StoreKey()] = []types.WriteListener{listener}
}

return listeners
}

// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It writes the received BeginBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) {
// ListenBeginBlock satisfies the ABCIListener interface. It sets the received
// BeginBlock request, response and the current block number. Note, these are
// not written to file until ListenCommit is executed and outputMetadata is set,
// after which it will be reset again on the next block.
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
fss.blockMetadata.RequestBeginBlock = &req
fss.blockMetadata.ResponseBeginBlock = &res
fss.currentBlockNumber = req.Header.Height
return nil
}

// ListenDeliverTx satisfies the baseapp.ABCIListener interface
// It writes the received DeliverTx request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) {
// ListenDeliverTx satisfies the ABCIListener interface. It appends the received
// DeliverTx request and response to a list of DeliverTxs objects. Note, these
// are not written to file until ListenCommit is executed and outputMetadata is
// set, after which it will be reset again on the next block.
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{
Request: &req,
Response: &res,
})

return nil
}

// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It writes the received EndBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) {
// ListenEndBlock satisfies the ABCIListener interface. It sets the received
// EndBlock request, response and the current block number. Note, these are
// not written to file until ListenCommit is executed and outputMetadata is set,
// after which it will be reset again on the next block.
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
fss.blockMetadata.RequestEndBlock = &req
fss.blockMetadata.ResponseEndBlock = &res
return nil
}

// ListenEndBlock satisfies the baseapp.ABCIListener interface
// ListenCommit satisfies the ABCIListener interface. It is executed during the
// ABCI Commit request and is responsible for writing all staged data to files.
// It will only return a non-nil error when stopNodeOnErr is set.
func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error {
err := fss.doListenCommit(ctx, res)
if err != nil {
if err := fss.doListenCommit(ctx, res); err != nil {
if fss.stopNodeOnErr {
return err
}
}

return nil
}

func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) {
fss.blockMetadata.ResponseCommit = &res

// write to target files, the file size is written at the beginning, which can be used to detect completeness.
// Write to target files, the file size is written at the beginning, which can
// be used to detect completeness.
metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber)
dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber)

if fss.filePrefix != "" {
metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName)
dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName)
Expand All @@ -135,6 +157,7 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
if err != nil {
return err
}

if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil {
return err
}
Expand All @@ -144,42 +167,44 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon
if err := fss.writeBlockData(&buf); err != nil {
return err
}

return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync)
}

func (fss *StreamingService) writeBlockData(writer io.Writer) error {
for _, listener := range fss.storeListeners {
cache := listener.PopStateCache()

for i := range cache {
bz, err := fss.codec.MarshalLengthPrefixed(&cache[i])
if err != nil {
return err
}
if _, err = writer.Write(bz); err != nil {

if _, err := writer.Write(bz); err != nil {
return err
}
}
}
return nil
}

// Stream satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
return nil
}

// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
return nil
}
// Stream satisfies the StreamingService interface. It performs a no-op.
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil }

// Close satisfies the StreamingService interface. It performs a no-op.
func (fss *StreamingService) Close() error { return nil }

// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
// to dir. It returns nil if dir is writable. We have to do this as there is no
// platform-independent way of determining if a directory is writeable.
func isDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
if err := os.WriteFile(f, []byte(""), 0o600); err != nil {
return err
}

return os.Remove(f)
}

Expand All @@ -189,25 +214,30 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) {
if err != nil {
return sdkerrors.Wrapf(err, "open file failed: %s", path)
}

defer func() {
// avoid overriding the real error with file close error
if err1 := f.Close(); err1 != nil && err == nil {
err = sdkerrors.Wrapf(err, "close file failed: %s", path)
}
}()

_, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data))))
if err != nil {
return sdkerrors.Wrapf(err, "write length prefix failed: %s", path)
}

_, err = f.Write(data)
if err != nil {
return sdkerrors.Wrapf(err, "write block data failed: %s", path)
}

if fsync {
err = f.Sync()
if err != nil {
return sdkerrors.Wrapf(err, "fsync failed: %s", path)
}
}
return

return err
}
Loading