Skip to content

Commit

Permalink
etcdserver/*: changes to snapshots and wal logic to fix etcd-io#10219
Browse files Browse the repository at this point in the history
  • Loading branch information
Viacheslav Biriukov committed Jan 23, 2019
1 parent b3443fc commit e67b5ae
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 85 deletions.
2 changes: 1 addition & 1 deletion build
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}"
toggle_failpoints() {
mode="$1"
if command -v gofail >/dev/null 2>&1; then
gofail "$mode" etcdserver/ mvcc/backend/ wal/
gofail "$mode" etcdserver/ mvcc/backend/
elif [[ "$mode" != "disable" ]]; then
echo "FAILPOINTS set but gofail not found"
exit 1
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {

if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
Expand All @@ -250,8 +250,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftAfterSaveSnap struct{}
}

// gofail: var raftBeforeSaveAll struct{}
if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil {
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
} else {
Expand All @@ -261,7 +261,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSaveAll struct{}
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
// etcdserver now claim the snapshot has been persisted onto the disk
Expand Down
9 changes: 9 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,6 +2188,15 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
plog.Fatalf("save snapshot error: %v", err)
}
}

if err = s.r.storage.Release(snap); err != nil {
if lg != nil {
lg.Panic("failed to release wal", zap.Error(err))
} else {
plog.Fatalf("failed to release wal error: %v", err)
}
}

if lg != nil {
lg.Info(
"saved snapshot",
Expand Down
14 changes: 0 additions & 14 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ type Storage interface {
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error

// SaveSnapshot function saves only snapshot to the underlying stable storage.
SaveSnapshot(snap raftpb.Snapshot) error
// SaveAll function saves ents, snapshot and state to the underlying stable storage.
// SaveAll MUST block until st and ents are on stable storage.
SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error
// Release release release the locked wal files since they will not be used.
Release(snap raftpb.Snapshot) error
}
Expand All @@ -66,15 +60,7 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}

// SaveSnapshot saves the snapshot to disk.
func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error {
return st.Snapshotter.SaveSnap(snap)
}

Expand Down
66 changes: 0 additions & 66 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,72 +746,6 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
return w.sync()
}

func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error {
w.mu.Lock()
defer w.mu.Unlock()

// short cut, do not call sync
if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) {
return nil
}

mustSync := raft.MustSync(st, w.state, len(ents))

if !raft.IsEmptySnap(snap) {
mustSync = true
}

// 1. Save entries
// TODO(xiangli): no more reference operator
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
return err
}
// gofail: var raftAfterSaveWALFirstEntry struct{}
}
// gofail: var raftAfterSaveWALEntries struct{}

// 2. Save snapshot
if !raft.IsEmptySnap(snap) {
e := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}

b := pbutil.MustMarshal(&e)

rec := &walpb.Record{Type: snapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil {
return err
}

// update enti only when snapshot is ahead of last index
if w.enti < e.Index {
w.enti = e.Index
}
// gofail: var raftAfterSaveWALSnap struct{}
}

// 3. Save HardState
if err := w.saveState(&st); err != nil {
return err
}
// gofail: var raftAfterSaveWALState struct{}

curOff, err := w.tail().Seek(0, io.SeekCurrent)
if err != nil {
return err
}
if curOff < SegmentSizeBytes {
if mustSync {
return w.sync()
}
return nil
}

return w.cut()
}

func (w *WAL) saveCrc(prevCrc uint32) error {
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
}
Expand Down

0 comments on commit e67b5ae

Please sign in to comment.