Skip to content

Commit

Permalink
Merge pull request #5 from wkalt/task/wal-refresh
Browse files Browse the repository at this point in the history
Add write-ahead log
  • Loading branch information
wkalt committed Mar 24, 2024
2 parents 20a05cf + 0702ee2 commit 9a616f6
Show file tree
Hide file tree
Showing 39 changed files with 3,268 additions and 1,624 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ lint:

build:
go build -o dp3 ./client/dp3

clean:
rm -rf data/*
rm -rf waldir/*
rm dp3.db*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ In the service, nodes are cached on read in an LRU cache of configurable byte
capacity. In production deployments, this cache will be sized such that most
important inner nodes will fit within it at all times. Multigranular
summarization requires traversing the tree down to a sufficiently granular
depth, and then scanning the statistical summaries at that depth for the
height, and then scanning the statistical summaries at that height for the
requested range of time. If the cache is performing well this operation can be
done in RAM.

Expand Down
61 changes: 61 additions & 0 deletions client/dp3/cmd/walinspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cmd

import (
"errors"
"fmt"
"io"
"os"

"github.com/spf13/cobra"
"github.com/wkalt/dp3/wal"
)

// walinspectCmd represents the walinspect command
var walinspectCmd = &cobra.Command{
Use: "walinspect [file] | less",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
bailf("Usage: dp3 walinspect [file] | less")
}
f, err := os.Open(args[0])
if err != nil {
bailf("failed to open WAL file: %v", err)
}
defer f.Close()
reader, err := wal.NewReader(f)
if err != nil {
bailf("failed to create WAL reader: %v", err)
}
for {
offset := reader.Offset()
rectype, record, err := reader.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
bailf("failed to read record: %v", err)
}
switch rectype {
case wal.WALInsert:
req := wal.ParseInsertRecord(record)
fmt.Printf("%d: insert (%s) %d bytes\n", offset, req.Topic, len(record))
case wal.WALMergeRequest:
req := wal.ParseMergeRequestRecord(record)
addrs := ""
for _, addr := range req.Addrs {
addrs += fmt.Sprintf("%s ", addr)
}
fmt.Printf("%d: merge request %s (%s) addrs: %s\n", offset, req.BatchID, req.Topic, addrs)
case wal.WALMergeComplete:
req := wal.ParseMergeCompleteRecord(record)
fmt.Printf("%d: merge complete %s\n", offset, req.BatchID)
default:
bailf("unknown record type: %v", rectype)
}
}
},
}

func init() {
rootCmd.AddCommand(walinspectCmd)
}
65 changes: 0 additions & 65 deletions nodestore/memwal.go

This file was deleted.

17 changes: 17 additions & 0 deletions nodestore/nodeid.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func (n NodeID) Length() int {
return int(binary.LittleEndian.Uint32(n[12:]))
}

// NewNodeID creates a new node ID from an object ID, offset, and length.
func NewNodeID(oid uint64, offset, length int) NodeID {
var id NodeID
binary.LittleEndian.PutUint64(id[:8], oid)
binary.LittleEndian.PutUint32(id[8:12], uint32(offset))
binary.LittleEndian.PutUint32(id[12:], uint32(length))
return id
}

// String returns a string representation of the node ID.
func (n NodeID) String() string {
return fmt.Sprintf("%s:%d:%d", n.OID(), n.Offset(), n.Length())
Expand Down Expand Up @@ -78,3 +87,11 @@ func (n *NodeID) Scan(value interface{}) error {
func (n NodeID) Value() (driver.Value, error) {
return driver.Value(n.String()), nil
}

func generateNodeID(oid objectID, offset int, length int) NodeID {
var id NodeID
binary.LittleEndian.PutUint64(id[:], uint64(oid))
binary.LittleEndian.PutUint32(id[8:], uint32(offset))
binary.LittleEndian.PutUint32(id[12:], uint32(length))
return id
}
2 changes: 1 addition & 1 deletion nodestore/nodeid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestNodeIDScanner(t *testing.T) {
for i := 0; i < 1e3; i++ {
id := genNodeID(t)
id := nodestore.RandomNodeID()
value, err := id.Value()
require.NoError(t, err)
var id2 nodestore.NodeID
Expand Down
Loading

0 comments on commit 9a616f6

Please sign in to comment.