Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] etcdserver: process the scenaro of the last WAL record being partially synced to disk #15069

Merged
merged 1 commit into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package etcdserver

import (
"errors"
"io"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -100,7 +101,7 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(lg, waldir) {
Expand Down
4 changes: 2 additions & 2 deletions server/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
// The length of current WAL entry must be less than the remaining file size.
maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
if recBytes > maxEntryLimit {
return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
}

data := make([]byte, recBytes+padBytes)
Expand Down
9 changes: 5 additions & 4 deletions server/wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package wal

import (
"errors"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -44,8 +45,8 @@ func Repair(lg *zap.Logger, dirpath string) bool {
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
switch err {
case nil:
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
Expand All @@ -59,11 +60,11 @@ func Repair(lg *zap.Logger, dirpath string) bool {
}
continue

case io.EOF:
case errors.Is(err, io.EOF):
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
return true

case io.ErrUnexpectedEOF:
case errors.Is(err, io.ErrUnexpectedEOF):
bf, bferr := os.Create(f.Name() + ".broken")
if bferr != nil {
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
Expand Down
10 changes: 5 additions & 5 deletions server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// We do not have to read out all entries in read mode.
// The last record maybe a partial written one, so
// ErrunexpectedEOF might be returned.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
state.Reset()
return nil, state, nil, err
}
default:
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
// We must read all the entries if WAL is opened in write mode.
if !errors.Is(err, io.EOF) {
state.Reset()
return nil, state, nil, err
}
Expand Down Expand Up @@ -598,7 +598,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down Expand Up @@ -688,7 +688,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta

// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down
77 changes: 77 additions & 0 deletions server/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package wal
import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -1155,3 +1156,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
t.Fatal(err)
}
}

func TestLastRecordLengthExceedFileEnd(t *testing.T) {
/* The data below was generated by code something like below. The length
* of the last record was intentionally changed to 1000 in order to make
* sure it exceeds the end of the file.
*
* for i := 0; i < 3; i++ {
* es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}}
* if err = w.Save(raftpb.HardState{}, es); err != nil {
* t.Fatal(err)
* }
* }
* ......
* var sb strings.Builder
* for _, ch := range buf {
* sb.WriteString(fmt.Sprintf("\\x%02x", ch))
* }
*/
// Generate WAL file
t.Log("Generate a WAL file with the last record's length modified.")
data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" +
"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" +
"\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" +
"\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" +
"\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" +
"\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" +
"\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" +
"\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" +
"\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" +
"\x00\x00\x00")

buf := bytes.NewBuffer(data)
f, err := createFileWithData(t, buf)
fileName := f.Name()
require.NoError(t, err)
t.Logf("fileName: %v", fileName)

// Verify low-level decoder directly
t.Log("Verify all records can be parsed correctly.")
rec := &walpb.Record{}
decoder := newDecoder(fileutil.NewFileReader(f))
for {
if err = decoder.decode(rec); err != nil {
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
break
}
if rec.Type == entryType {
e := mustUnmarshalEntry(rec.Data)
t.Logf("Validating normal entry: %v", e)
recData := fmt.Sprintf("waldata%d", e.Index)
require.Equal(t, raftpb.EntryNormal, e.Type)
require.Equal(t, recData, string(e.Data))
}
rec = &walpb.Record{}
}
require.NoError(t, f.Close())

// Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain.
t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain")
newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal")
require.NoError(t, os.Rename(fileName, newFileName))

w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{
Index: 0,
Term: 0,
})
require.NoError(t, err)
defer w.Close()

_, _, _, err = w.ReadAll()
// Note: The wal file will be repaired automatically in production
// environment, but only once.
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}