Skip to content

Commit

Permalink
Add support for multiple schemas within a single tree
Browse files Browse the repository at this point in the history
Track statistics for multiple schemas in a single tree, if there are
multiple. Distinguish between the two schemas in statrange output so the
user can figure out how to handle it. Store statistics objects for each
schema present in the inner nodes of the tree.

This will allow the system to support users changing schemas of topics
over time. I considered instead requiring that each tree have a distinct
schema, but this ended up making "schema hashes" more of a user-facing
concept than I think we will want it to be -- though we don't completely
escape it and will need a schema API down the line.
  • Loading branch information
wkalt committed Apr 1, 2024
1 parent 866d1e7 commit 70905e5
Show file tree
Hide file tree
Showing 15 changed files with 472 additions and 305 deletions.
146 changes: 72 additions & 74 deletions README.md

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions client/dp3/cmd/statrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ var statrangeCmd = &cobra.Command{
bailf("error decoding response: %s", err)
}

headers := []string{"Start", "End", "Type", "Field", "Name", "Value"}
headers := []string{"Start", "End", "Schema", "Type", "Field", "Name", "Value"}
data := [][]string{}
for _, record := range response {
start := time.Unix(0, int64(record.Start)).Format(time.RFC3339)
end := time.Unix(0, int64(record.End)).Format(time.RFC3339)
schema := record.SchemaHash[:7]
data = append(data, []string{
start, end, string(record.Type), record.Field, record.Name, fmt.Sprintf("%v", record.Value)},
start, end, schema, string(record.Type), record.Field, record.Name, fmt.Sprintf("%v", record.Value)},
)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/alecthomas/participle/v2 v2.1.1
github.com/foxglove/go-rosbag v0.0.5
github.com/minio/minio-go/v7 v7.0.68
github.com/prometheus/client_golang v1.18.0
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -189,6 +188,7 @@ require (
github.com/posener/complete v1.2.3 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/pquerna/cachecontrol v0.2.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
40 changes: 40 additions & 0 deletions mcap/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,46 @@ func ReadFile(t *testing.T, r io.Reader) []uint64 {
return timestamps
}

func WriteFileExtended(t *testing.T, w io.Writer, fieldCount int, timestampsets ...[]int64) {
t.Helper()
writer, err := NewWriter(w)
require.NoError(t, err)
require.NoError(t, writer.WriteHeader(&mcap.Header{}))

schema := ""
for i := range fieldCount {
schema += fmt.Sprintf("string data%d\n", i)
}
require.NoError(t, writer.WriteSchema(&mcap.Schema{
ID: 1,
Name: "test",
Encoding: "ros1msg",
Data: []byte(schema),
}))
for i := range timestampsets {
require.NoError(t, writer.WriteChannel(&mcap.Channel{
ID: uint16(i),
SchemaID: 1,
Topic: fmt.Sprintf("topic-%d", i),
}))
}
for chanID, timestamps := range timestampsets {
for _, ts := range timestamps {
data := []byte{}
for i := 0; i < fieldCount; i++ {
data = append(data, testutils.U32b(5)...)
data = append(data, []byte("hello")...)
}
require.NoError(t, writer.WriteMessage(&mcap.Message{
ChannelID: uint16(chanID),
LogTime: uint64(ts),
Data: data,
}))
}
}
require.NoError(t, writer.Close())
}

func WriteFile(t *testing.T, w io.Writer, timestampsets ...[]int64) {
t.Helper()
writer, err := NewWriter(w)
Expand Down
8 changes: 4 additions & 4 deletions nodestore/inner_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type InnerNode struct {

// Child represents a child of an inner node.
type Child struct {
ID NodeID `json:"id"`
Version uint64 `json:"version"`
Statistics *Statistics `json:"statistics"`
ID NodeID `json:"id"`
Version uint64 `json:"version"`
Statistics map[string]*Statistics `json:"statistics"`
}

// Size returns the size of the node in bytes.
Expand Down Expand Up @@ -71,7 +71,7 @@ func (n *InnerNode) FromBytes(data []byte) error {
}

// PlaceChild sets the child at the given index to the given ID and version.
func (n *InnerNode) PlaceChild(index uint64, id NodeID, version uint64, statistics *Statistics) {
func (n *InnerNode) PlaceChild(index uint64, id NodeID, version uint64, statistics map[string]*Statistics) {
n.Children[index] = &Child{
ID: id,
Version: version,
Expand Down
193 changes: 115 additions & 78 deletions nodestore/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,43 @@ type NumericalSummary struct {
Sum float64 `json:"sum"`
}

func (n *NumericalSummary) ranges(field string, start, end uint64) []StatRange {
func (n *NumericalSummary) ranges(field string, start, end uint64, schemaHash string) []StatRange {
return []StatRange{
{
Start: start,
End: end,
Type: Float,
Name: "mean",
Field: field,
Value: n.Mean,
Start: start,
End: end,
Type: Float,
Name: "mean",
SchemaHash: schemaHash,
Field: field,
Value: n.Mean,
},
{
Start: start,
End: end,
Type: Float,
Name: "min",
Field: field,
Value: n.Min,
Start: start,
End: end,
Type: Float,
Name: "min",
SchemaHash: schemaHash,
Field: field,
Value: n.Min,
},
{
Start: start,
End: end,
Type: Float,
Name: "max",
Field: field,
Value: n.Max,
Start: start,
End: end,
Type: Float,
Name: "max",
SchemaHash: schemaHash,
Field: field,
Value: n.Max,
},
{
Start: start,
End: end,
Type: Float,
Name: "sum",
Field: field,
Value: n.Sum,
Start: start,
End: end,
Type: Float,
Name: "sum",
SchemaHash: schemaHash,
Field: field,
Value: n.Sum,
},
}
}
Expand All @@ -84,50 +88,55 @@ type TextSummary struct {
// todo: bloom filters, trigrams, etc.
}

func (s *TextSummary) ranges(field string, start, end uint64) []StatRange {
func (s *TextSummary) ranges(field string, start, end uint64, schemaHash string) []StatRange {
return []StatRange{
{
Start: start,
End: end,
Type: Text,
Name: "min",
Field: field,
Value: s.Min,
Start: start,
End: end,
Type: Text,
SchemaHash: schemaHash,
Name: "min",
Field: field,
Value: s.Min,
},
{
Start: start,
End: end,
Type: Text,
Name: "max",
Field: field,
Value: s.Max,
Start: start,
End: end,
Type: Text,
SchemaHash: schemaHash,
Name: "max",
Field: field,
Value: s.Max,
},
}
}

// StatRange is a range of statistics.
type StatRange struct {
Start uint64 `json:"start"`
End uint64 `json:"end"`
Type StatType `json:"type"`
Field string `json:"field"`
Name string `json:"name"`
Value any `json:"value"`
Start uint64 `json:"start"`
End uint64 `json:"end"`
SchemaHash string `json:"schemaHash"`
Type StatType `json:"type"`
Field string `json:"field"`
Name string `json:"name"`
Value any `json:"value"`
}

func NewStatRange(
schemaHash string,
start, end uint64,
typ StatType,
field, name string,
value any,
) StatRange {
return StatRange{
Start: start,
End: end,
Type: typ,
Field: field,
Name: name,
Value: value,
Start: start,
End: end,
Type: typ,
Field: field,
SchemaHash: schemaHash,
Name: name,
Value: value,
}
}

Expand All @@ -144,49 +153,53 @@ type Statistics struct {

// Ranges converts a statistics object into an array of StatRange objects,
// suitable for returning to a user.
func (s *Statistics) Ranges(start, end uint64) []StatRange {
func (s *Statistics) Ranges(start, end uint64, schemaHash string) []StatRange {
ranges := make([]StatRange, 0, len(s.NumStats)+len(s.TextStats))
for i, field := range s.Fields {
if numstat, ok := s.NumStats[i]; ok {
ranges = append(ranges, numstat.ranges(field.Name, start, end)...)
ranges = append(ranges, numstat.ranges(field.Name, start, end, schemaHash)...)
continue
}
if textstat, ok := s.TextStats[i]; ok {
ranges = append(ranges, textstat.ranges(field.Name, start, end)...)
ranges = append(ranges, textstat.ranges(field.Name, start, end, schemaHash)...)
}
}
ranges = append(ranges, []StatRange{
{
Start: start,
End: end,
Type: Int,
Field: "",
Name: "messageCount",
Value: s.MessageCount,
Start: start,
End: end,
Type: Int,
Field: "",
SchemaHash: schemaHash,
Name: "messageCount",
Value: s.MessageCount,
},
{
Start: start,
End: end,
Type: Int,
Field: "",
Name: "byteCount",
Value: s.ByteCount,
Start: start,
End: end,
Type: Int,
Field: "",
SchemaHash: schemaHash,
Name: "byteCount",
Value: s.ByteCount,
},
{
Start: start,
End: end,
Type: Int,
Field: "",
Name: "minObservedTime",
Value: s.MinObservedTime,
Start: start,
End: end,
Type: Int,
Field: "",
SchemaHash: schemaHash,
Name: "minObservedTime",
Value: s.MinObservedTime,
},
{
Start: start,
End: end,
Type: Int,
Field: "",
Name: "maxObservedTime",
Value: s.MaxObservedTime,
Start: start,
End: end,
Type: Int,
Field: "",
SchemaHash: schemaHash,
Name: "maxObservedTime",
Value: s.MaxObservedTime,
},
}...)
return ranges
Expand Down Expand Up @@ -295,6 +308,30 @@ func NewStatistics(fields []util.Named[schema.PrimitiveType]) *Statistics {
}
}

func (s *Statistics) Clone() *Statistics {
fields := make([]util.Named[schema.PrimitiveType], len(s.Fields))
copy(fields, s.Fields)
numStats := make(map[int]*NumericalSummary, len(s.NumStats))
for i, numStat := range s.NumStats {
stat := *numStat
numStats[i] = &stat
}
textStats := make(map[int]*TextSummary, len(s.TextStats))
for i, textStat := range s.TextStats {
stat := *textStat
textStats[i] = &stat
}
return &Statistics{
Fields: fields,
NumStats: numStats,
TextStats: textStats,
MessageCount: s.MessageCount,
ByteCount: s.ByteCount,
MinObservedTime: s.MinObservedTime,
MaxObservedTime: s.MaxObservedTime,
}
}

// Add adds the statistics from another Statistics object to this one.
func (s *Statistics) Add(other *Statistics) error {
if s.MessageCount == 0 {
Expand Down Expand Up @@ -332,5 +369,5 @@ func (s *Statistics) Add(other *Statistics) error {

// String returns a string representation of the statistics.
func (s *Statistics) String() string {
return fmt.Sprintf("(count=%d)", s.MessageCount)
return fmt.Sprintf("count=%d", s.MessageCount)
}
Loading

0 comments on commit 70905e5

Please sign in to comment.