Skip to content

Commit

Permalink
Add write-ahead log
Browse files Browse the repository at this point in the history
This replaces the sqlite-based write-ahead log with a more featureful
implementation. The new implementation uses append-only log files and a
"wal manager" that is in charge of scheduling batched merges from WAL to
storage and rotating logs.

This is substantially faster to write to than sqlite, and also fixes a
bug in the service around handling records that exceed the max blob size
in sqlite.

In the course of adding support for the new merging flow, I have made
changes to the nodestore/treemgr/tree relationships. There is no more
concept of "staging". Instead, the tree functions work on "tree writers"
and "tree readers", and there is one implementation (memtree) that is
used for staging writes to the WAL as well as writes to storage from
WAL.

Another implementation of the tree reader is backed by the nodestore,
allowing data from the WAL to be merged with data from storage in one
operation.

With this change, the "tree-aware" machinery in the nodestore such as
tree merging has been moved to either the tree or treemgr module. The
tree module contains methods for traversing generic tree
readers/writers while the treemgr module contains methods related to
interfacing trees with the WAL.
  • Loading branch information
wkalt committed Mar 24, 2024
1 parent 20a05cf commit 0702ee2
Show file tree
Hide file tree
Showing 39 changed files with 3,268 additions and 1,624 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ lint:

build:
go build -o dp3 ./client/dp3

clean:
rm -rf data/*
rm -rf waldir/*
rm dp3.db*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ In the service, nodes are cached on read in an LRU cache of configurable byte
capacity. In production deployments, this cache will be sized such that most
important inner nodes will fit within it at all times. Multigranular
summarization requires traversing the tree down to a sufficiently granular
depth, and then scanning the statistical summaries at that depth for the
height, and then scanning the statistical summaries at that height for the
requested range of time. If the cache is performing well this operation can be
done in RAM.

Expand Down
61 changes: 61 additions & 0 deletions client/dp3/cmd/walinspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cmd

import (
"errors"
"fmt"
"io"
"os"

"github.com/spf13/cobra"
"github.com/wkalt/dp3/wal"
)

// walinspectCmd represents the walinspect command
var walinspectCmd = &cobra.Command{
Use: "walinspect [file] | less",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
bailf("Usage: dp3 walinspect [file] | less")
}
f, err := os.Open(args[0])
if err != nil {
bailf("failed to open WAL file: %v", err)
}
defer f.Close()
reader, err := wal.NewReader(f)
if err != nil {
bailf("failed to create WAL reader: %v", err)
}
for {
offset := reader.Offset()
rectype, record, err := reader.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
bailf("failed to read record: %v", err)
}
switch rectype {
case wal.WALInsert:
req := wal.ParseInsertRecord(record)
fmt.Printf("%d: insert (%s) %d bytes\n", offset, req.Topic, len(record))
case wal.WALMergeRequest:
req := wal.ParseMergeRequestRecord(record)
addrs := ""
for _, addr := range req.Addrs {
addrs += fmt.Sprintf("%s ", addr)
}
fmt.Printf("%d: merge request %s (%s) addrs: %s\n", offset, req.BatchID, req.Topic, addrs)
case wal.WALMergeComplete:
req := wal.ParseMergeCompleteRecord(record)
fmt.Printf("%d: merge complete %s\n", offset, req.BatchID)
default:
bailf("unknown record type: %v", rectype)
}
}
},
}

func init() {
rootCmd.AddCommand(walinspectCmd)
}
65 changes: 0 additions & 65 deletions nodestore/memwal.go

This file was deleted.

17 changes: 17 additions & 0 deletions nodestore/nodeid.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func (n NodeID) Length() int {
return int(binary.LittleEndian.Uint32(n[12:]))
}

// NewNodeID creates a new node ID from an object ID, offset, and length.
func NewNodeID(oid uint64, offset, length int) NodeID {
var id NodeID
binary.LittleEndian.PutUint64(id[:8], oid)
binary.LittleEndian.PutUint32(id[8:12], uint32(offset))
binary.LittleEndian.PutUint32(id[12:], uint32(length))
return id
}

// String returns a string representation of the node ID.
func (n NodeID) String() string {
return fmt.Sprintf("%s:%d:%d", n.OID(), n.Offset(), n.Length())
Expand Down Expand Up @@ -78,3 +87,11 @@ func (n *NodeID) Scan(value interface{}) error {
func (n NodeID) Value() (driver.Value, error) {
return driver.Value(n.String()), nil
}

func generateNodeID(oid objectID, offset int, length int) NodeID {
var id NodeID
binary.LittleEndian.PutUint64(id[:], uint64(oid))
binary.LittleEndian.PutUint32(id[8:], uint32(offset))
binary.LittleEndian.PutUint32(id[12:], uint32(length))
return id
}
2 changes: 1 addition & 1 deletion nodestore/nodeid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestNodeIDScanner(t *testing.T) {
for i := 0; i < 1e3; i++ {
id := genNodeID(t)
id := nodestore.RandomNodeID()
value, err := id.Value()
require.NoError(t, err)
var id2 nodestore.NodeID
Expand Down
Loading

0 comments on commit 0702ee2

Please sign in to comment.