Skip to content

Commit

Permalink
Segregate trees in storage by topic/producer
Browse files Browse the repository at this point in the history
This partitions the tree storage by topic/producer prefixes. The choice
of topic/producer over producer/topic is under the assumption that
potential direct readers will be more likely to be reading everything on
one topic than all topics on one producer.

A substitution string is used for slashes in the prefixes - $()$.
Slashes are very common in ROS topics so the resulting files are a
little strange - though easy to interpret.
  • Loading branch information
wkalt committed Apr 11, 2024
1 parent 5c72879 commit 20c80fe
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 82 deletions.
3 changes: 2 additions & 1 deletion nodestore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type NodeNotFoundError struct {
Prefix string
NodeID NodeID
}

func (e NodeNotFoundError) Error() string {
return fmt.Sprintf("node %s not found", e.NodeID)
return fmt.Sprintf("node %s/%s not found", e.Prefix, e.NodeID)
}

func (e NodeNotFoundError) Is(target error) bool {
Expand Down
16 changes: 8 additions & 8 deletions nodestore/nodestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func NewNodestore(
}

// Put an object to storage.
func (n *Nodestore) Put(ctx context.Context, oid uint64, data []byte) error {
objectname := strconv.FormatUint(oid, 10)
func (n *Nodestore) Put(ctx context.Context, prefix string, oid uint64, data []byte) error {
objectname := prefix + "/" + strconv.FormatUint(oid, 10)
if err := n.store.Put(ctx, objectname, data); err != nil {
return fmt.Errorf("failed to put object %d: %w", oid, err)
}
Expand All @@ -75,14 +75,14 @@ func (n *Nodestore) Put(ctx context.Context, oid uint64, data []byte) error {

// Get retrieves a node from the nodestore. It will check the cache prior to
// storage.
func (n *Nodestore) Get(ctx context.Context, id NodeID) (Node, error) {
func (n *Nodestore) Get(ctx context.Context, prefix string, id NodeID) (Node, error) {
if value, ok := n.cache.Get(id); ok {
return value, nil
}
reader, err := n.store.GetRange(ctx, id.OID(), int(id.Offset()), int(id.Length()))
reader, err := n.store.GetRange(ctx, prefix+"/"+id.OID(), int(id.Offset()), int(id.Length()))
if err != nil {
if errors.Is(err, storage.ErrObjectNotFound) {
return nil, NodeNotFoundError{id}
return nil, NodeNotFoundError{prefix, id}
}
return nil, fmt.Errorf("failed to get node %s: %w", id, err)
}
Expand All @@ -102,13 +102,13 @@ func (n *Nodestore) Get(ctx context.Context, id NodeID) (Node, error) {
// GetLeaf retrieves a leaf node from the nodestore. It returns a ReadSeekCloser
// over the leaf data, the closing of which is the caller's responsibility. It
// does not cache data.
func (n *Nodestore) GetLeafData(ctx context.Context, id NodeID) (
func (n *Nodestore) GetLeafData(ctx context.Context, prefix string, id NodeID) (
ancestor NodeID, reader io.ReadSeekCloser, err error,
) {
reader, err = n.store.GetRange(ctx, id.OID(), int(id.Offset()), int(id.Length()))
reader, err = n.store.GetRange(ctx, prefix+"/"+id.OID(), int(id.Offset()), int(id.Length()))
if err != nil {
if errors.Is(err, storage.ErrObjectNotFound) {
return ancestor, nil, NodeNotFoundError{id}
return ancestor, nil, NodeNotFoundError{prefix, id}
}
return ancestor, nil, fmt.Errorf("failed to get node %s: %w", id, err)
}
Expand Down
9 changes: 5 additions & 4 deletions nodestore/nodestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
)

func TestNodestore(t *testing.T) {
var prefix = "prefix"
t.Run("Put", func(t *testing.T) {
ctx := context.Background()
store := storage.NewMemStore()
cache := util.NewLRU[nodestore.NodeID, nodestore.Node](1024)
ns := nodestore.NewNodestore(store, cache)
node := nodestore.NewLeafNode([]byte("test"), nil, nil)
bytes := node.ToBytes()
require.NoError(t, ns.Put(ctx, 1, bytes))
require.NoError(t, ns.Put(ctx, prefix, 1, bytes))
})

t.Run("Get", func(t *testing.T) {
Expand All @@ -40,9 +41,9 @@ func TestNodestore(t *testing.T) {
require.NoError(t, err)
addr := nodestore.NewNodeID(1, uint64(n), uint64(len(bytes)))

require.NoError(t, ns.Put(ctx, 1, buf.Bytes()))
require.NoError(t, ns.Put(ctx, prefix, 1, buf.Bytes()))

retrieved, err := ns.Get(ctx, addr)
retrieved, err := ns.Get(ctx, prefix, addr)
require.NoError(t, err)

leaf, ok := retrieved.(*nodestore.LeafNode)
Expand All @@ -53,7 +54,7 @@ func TestNodestore(t *testing.T) {
require.Equal(t, data, found)

// hits the cache
_, err = ns.Get(ctx, addr)
_, err = ns.Get(ctx, prefix, addr)
require.NoError(t, err)
})
}
19 changes: 11 additions & 8 deletions rootmap/memrootmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type root struct {
producerID string
topic string
version uint64
prefix string
nodeID nodestore.NodeID
}

Expand Down Expand Up @@ -54,40 +55,42 @@ func (rm *memrootmap) GetLatestByTopic(
maxVersion = item.version
}
}
listings = append(listings, RootListing{topic, max.nodeID, max.version, topics[topic]})
listings = append(listings, RootListing{max.prefix, topic, max.nodeID, max.version, topics[topic]})
}
return listings, nil
}

func (rm *memrootmap) GetLatest(
ctx context.Context, producerID string, topic string) (nodestore.NodeID, uint64, error) {
ctx context.Context, producerID string, topic string) (string, nodestore.NodeID, uint64, error) {
for i := len(rm.roots) - 1; i >= 0; i-- { // nb: assumes roots added in ascending order
root := rm.roots[i]
if root.producerID == producerID && root.topic == topic {
return root.nodeID, root.version, nil
return root.prefix, root.nodeID, root.version, nil
}
}
return nodestore.NodeID{}, 0, StreamNotFoundError{producerID, topic}
return "", nodestore.NodeID{}, 0, StreamNotFoundError{producerID, topic}
}

func (rm *memrootmap) Get(
ctx context.Context, producerID string, topic string, version uint64) (nodestore.NodeID, error) {
ctx context.Context, producerID string, topic string, version uint64) (string, nodestore.NodeID, error) {
for i := len(rm.roots) - 1; i >= 0; i-- {
root := rm.roots[i]
if root.producerID == producerID && root.topic == topic && root.version == version {
return root.nodeID, nil
return root.prefix, root.nodeID, nil
}
}
return nodestore.NodeID{}, StreamNotFoundError{producerID, topic}
return "", nodestore.NodeID{}, StreamNotFoundError{producerID, topic}
}

func (rm *memrootmap) Put(
ctx context.Context, producerID string, topic string,
version uint64, nodeID nodestore.NodeID) error {
version uint64, prefix string, nodeID nodestore.NodeID,
) error {
rm.roots = append(rm.roots, root{
producerID: producerID,
topic: topic,
version: version,
prefix: prefix,
nodeID: nodeID,
})
return nil
Expand Down
7 changes: 4 additions & 3 deletions rootmap/rootmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ recover.
var ErrRootAlreadyExists = errors.New("root already exists")

type RootListing struct {
Prefix string
Topic string
NodeID nodestore.NodeID
NewMinVersion uint64
RequestedMinVersion uint64
}

type Rootmap interface {
GetLatest(ctx context.Context, producerID string, topic string) (nodestore.NodeID, uint64, error)
GetLatest(ctx context.Context, producerID string, topic string) (string, nodestore.NodeID, uint64, error)
GetLatestByTopic(ctx context.Context, producerID string, topics map[string]uint64) ([]RootListing, error)
Get(ctx context.Context, producerID string, topic string, version uint64) (nodestore.NodeID, error)
Put(ctx context.Context, producerID string, topic string, version uint64, nodeID nodestore.NodeID) error
Get(ctx context.Context, producerID string, topic string, version uint64) (string, nodestore.NodeID, error)
Put(ctx context.Context, producerID string, topic string, version uint64, prefix string, nodeID nodestore.NodeID) error
}
26 changes: 15 additions & 11 deletions rootmap/rootmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func TestRootmaps(t *testing.T) {
require.NoError(t, err)
defer db.Close()

var testPrefix = "test"

cases := []struct {
assertion string
f func(*testing.T) rootmap.Rootmap
Expand All @@ -43,50 +45,52 @@ func TestRootmaps(t *testing.T) {
rm := c.f(t)
t.Run("put", func(t *testing.T) {
expected := randNodeID()
err := rm.Put(ctx, "my-device", "my-topic", 10, expected)
err := rm.Put(ctx, "my-device", "my-topic", 10, testPrefix, expected)
require.NoError(t, err)

nodeID, err := rm.Get(ctx, "my-device", "my-topic", 10)
prefix, nodeID, err := rm.Get(ctx, "my-device", "my-topic", 10)
require.NoError(t, err)
require.Equal(t, expected, nodeID)
require.Equal(t, testPrefix, prefix)
})

t.Run("get latest", func(t *testing.T) {
node1 := randNodeID()
err := rm.Put(ctx, "my-device", "my-topic", 20, node1)
err := rm.Put(ctx, "my-device", "my-topic", 20, testPrefix, node1)
require.NoError(t, err)

node2 := randNodeID()
err = rm.Put(ctx, "my-device", "my-topic", 30, node2)
err = rm.Put(ctx, "my-device", "my-topic", 30, testPrefix, node2)
require.NoError(t, err)

nodeID, _, err := rm.GetLatest(ctx, "my-device", "my-topic")
prefix, nodeID, _, err := rm.GetLatest(ctx, "my-device", "my-topic")
require.NoError(t, err)
require.Equal(t, node2, nodeID)
require.Equal(t, testPrefix, prefix)
})
t.Run("get latest by topic", func(t *testing.T) {
node1 := randNodeID()
err := rm.Put(ctx, "my-device", "topic1", 40, node1)
err := rm.Put(ctx, "my-device", "topic1", 40, testPrefix, node1)
require.NoError(t, err)

node2 := randNodeID()
err = rm.Put(ctx, "my-device", "topic2", 50, node2)
err = rm.Put(ctx, "my-device", "topic2", 50, testPrefix, node2)
require.NoError(t, err)

listings, err := rm.GetLatestByTopic(ctx, "my-device", map[string]uint64{"topic1": 0, "topic2": 0})
require.NoError(t, err)

require.ElementsMatch(t, []rootmap.RootListing{
{"topic1", node1, 40, 0},
{"topic2", node2, 50, 0},
{testPrefix, "topic1", node1, 40, 0},
{testPrefix, "topic2", node2, 50, 0},
}, listings)
})
t.Run("get version that does not exist", func(t *testing.T) {
_, err := rm.Get(ctx, "fake-device", "my-topic", 1e9)
_, _, err := rm.Get(ctx, "fake-device", "my-topic", 1e9)
require.ErrorIs(t, err, rootmap.StreamNotFoundError{})
})
t.Run("get latest version that does not exist", func(t *testing.T) {
_, _, err := rm.GetLatest(ctx, "fake-device", "my-topic")
_, _, _, err := rm.GetLatest(ctx, "fake-device", "my-topic")
require.ErrorIs(t, err, rootmap.StreamNotFoundError{})
})
})
Expand Down
49 changes: 30 additions & 19 deletions rootmap/sqlrootmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (rm *sqlRootmap) initialize() error {
producer_id text not null,
topic text not null,
version bigint not null,
storage_prefix text not null,
node_id text not null,
timestamp text not null default current_timestamp,
primary key (producer_id, topic, version)
Expand All @@ -57,13 +58,14 @@ func (rm *sqlRootmap) Put(
producerID string,
topic string,
version uint64,
prefix string,
nodeID nodestore.NodeID,
) error {
rm.mtx.Lock()
defer rm.mtx.Unlock()
_, err := rm.db.ExecContext(ctx, `
insert into rootmap (producer_id, topic, version, node_id) values ($1, $2, $3, $4)`,
producerID, topic, version, hex.EncodeToString(nodeID[:]),
insert into rootmap (producer_id, topic, version, storage_prefix, node_id) values ($1, $2, $3, $4, $5)`,
producerID, topic, version, prefix, hex.EncodeToString(nodeID[:]),
)
if err != nil {
var err sqlite3.Error
Expand All @@ -83,7 +85,7 @@ func (rm *sqlRootmap) GetLatestByTopic(
sb := strings.Builder{}
params := []any{producerID}
sb.WriteString(`
select r1.topic, r1.node_id, r1.version
select r1.topic, r1.storage_prefix, r1.node_id, r1.version
from rootmap r1 left join rootmap r2
on (r1.topic = r2.topic and r1.producer_id = r2.producer_id and r1.version < r2.version)
where r2.rowid is null
Expand Down Expand Up @@ -112,7 +114,8 @@ func (rm *sqlRootmap) GetLatestByTopic(
var nodeID string
var version uint64
var topic string
if err := rows.Scan(&topic, &nodeID, &version); err != nil {
var prefix string
if err := rows.Scan(&topic, &prefix, &nodeID, &version); err != nil {
return nil, fmt.Errorf("failed to read from rootmap: %w", err)
}
if version > maxVersion {
Expand All @@ -124,7 +127,7 @@ func (rm *sqlRootmap) GetLatestByTopic(
}
if version > topics[topic] {
listings = append(listings, RootListing{
topic, nodestore.NodeID(decoded), version, topics[topic],
prefix, topic, nodestore.NodeID(decoded), version, topics[topic],
})
}
}
Expand All @@ -135,44 +138,52 @@ func (rm *sqlRootmap) GetLatestByTopic(
}

func (rm *sqlRootmap) GetLatest(
ctx context.Context, producerID string, topic string) (nodestore.NodeID, uint64, error) {
ctx context.Context,
producerID string,
topic string,
) (string, nodestore.NodeID, uint64, error) {
var prefix string
var nodeID string
var version uint64
err := rm.db.QueryRowContext(ctx, `
select node_id, version from rootmap where producer_id = $1 and topic = $2 order by version desc limit 1`,
select storage_prefix, node_id, version
from rootmap
where producer_id = $1 and topic = $2
order by version desc limit 1`,
producerID, topic,
).Scan(&nodeID, &version)
).Scan(&prefix, &nodeID, &version)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nodestore.NodeID{}, 0, StreamNotFoundError{producerID, topic}
return prefix, nodestore.NodeID{}, 0, StreamNotFoundError{producerID, topic}
}
return nodestore.NodeID{}, 0, fmt.Errorf("failed to read from rootmap: %w", err)
return prefix, nodestore.NodeID{}, 0, fmt.Errorf("failed to read from rootmap: %w", err)
}
decoded, err := hex.DecodeString(nodeID)
if err != nil {
return nodestore.NodeID{}, 0, fmt.Errorf("failed to decode node ID: %w", err)
return prefix, nodestore.NodeID{}, 0, fmt.Errorf("failed to decode node ID: %w", err)
}
return nodestore.NodeID(decoded), version, nil
return prefix, nodestore.NodeID(decoded), version, nil
}

func (rm *sqlRootmap) Get(
ctx context.Context, producerID string, topic string, version uint64) (nodestore.NodeID, error) {
ctx context.Context, producerID string, topic string, version uint64) (string, nodestore.NodeID, error) {
var nodeID string
var prefix string
err := rm.db.QueryRowContext(ctx, `
select node_id from rootmap where producer_id = $1 and topic = $2 and version = $3`,
select storage_prefix, node_id from rootmap where producer_id = $1 and topic = $2 and version = $3`,
producerID, topic, version,
).Scan(&nodeID)
).Scan(&prefix, &nodeID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nodestore.NodeID{}, StreamNotFoundError{producerID, topic}
return prefix, nodestore.NodeID{}, StreamNotFoundError{producerID, topic}
}
return nodestore.NodeID{}, fmt.Errorf("failed to read from rootmap: %w", err)
return prefix, nodestore.NodeID{}, fmt.Errorf("failed to read from rootmap: %w", err)
}
decoded, err := hex.DecodeString(nodeID)
if err != nil {
return nodestore.NodeID{}, fmt.Errorf("failed to decode node ID: %w", err)
return prefix, nodestore.NodeID{}, fmt.Errorf("failed to decode node ID: %w", err)
}
return nodestore.NodeID(decoded), nil
return prefix, nodestore.NodeID(decoded), nil
}

func NewSQLRootmap(db *sql.DB) (Rootmap, error) {
Expand Down
4 changes: 4 additions & 0 deletions storage/directorystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewDirectoryStore(root string) *DirectoryStore {

// Put stores an object in the directory.
func (d *DirectoryStore) Put(_ context.Context, id string, data []byte) error {
dir, _ := filepath.Split(d.root + "/" + id)
if err := util.EnsureDirectoryExists(dir); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
err := os.WriteFile(d.root+"/"+id, data, 0600)
if err != nil {
return fmt.Errorf("write failure: %w", err)
Expand Down
Loading

0 comments on commit 20c80fe

Please sign in to comment.