Skip to content

Commit

Permalink
Order result-set columns by query projection ordering
Browse files Browse the repository at this point in the history
Closes #14
  • Loading branch information
flobernd committed Jun 19, 2023
1 parent a929b19 commit a267d03
Showing 1 changed file with 37 additions and 19 deletions.
56 changes: 37 additions & 19 deletions pkg/plugin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/SnellerInc/sneller/ion"
"github.com/grafana/grafana-plugin-sdk-go/data"
"golang.org/x/exp/slices"
)

// frameFromSnellerResult builds a Grafana data frame from a raw Sneller query result.
Expand Down Expand Up @@ -362,10 +363,11 @@ type snellerColumn struct {
}

type snellerFinalStatus struct {
Hits int64 `ion:"hits"`
Misses int64 `ion:"misses"`
Scanned int64 `ion:"scanned"`
Error string `ion:"error"`
Hits int64 `ion:"hits"`
Misses int64 `ion:"misses"`
Scanned int64 `ion:"scanned"`
Error string `ion:"error"`
ResultSet ion.Datum `ion:"result_set"`
}

type snellerQueryError struct {
Expand All @@ -374,20 +376,21 @@ type snellerQueryError struct {

// snellerSchema represents the derived schema of a Sneller query result-set.
type snellerSchema struct {
RowCount int // The total number of rows returned by the query
Columns map[string]*snellerColumn // The individual columns indexed by their field names
FinalStatus *snellerFinalStatus // The final query status
RowCount int // The total number of rows returned by the query
Columns []*snellerColumn // The individual columns
FinalStatus *snellerFinalStatus // The final query status
}

func deriveSchema(buf []byte) (*snellerSchema, error) {
schema := snellerSchema{
RowCount: 0,
Columns: map[string]*snellerColumn{},
Columns: []*snellerColumn{},
}
lookup := map[string]*snellerColumn{}

status, err := iterateRows(buf, func(reader *IonReader, index int) error {
schema.RowCount += 1
return analyzeRow(reader, &schema)
return analyzeRow(reader, &schema, lookup)
})
if err != nil {
return nil, err
Expand All @@ -402,11 +405,30 @@ func deriveSchema(buf []byte) (*snellerSchema, error) {
}
}

// Restore column order
index := 0
err = status.ResultSet.UnpackStruct(func(field ion.Field) error {
for _, col := range schema.Columns {
if col.Name == field.Label {
col.Index = index
break
}
}
index++
return nil
})
if err != nil {
return nil, err
}

slices.SortFunc(schema.Columns, func(a, b *snellerColumn) bool {
return a.Index < b.Index
})

return &schema, nil
}

func analyzeRow(reader *IonReader, schema *snellerSchema) error {
index := 0
func analyzeRow(reader *IonReader, schema *snellerSchema, lookup map[string]*snellerColumn) error {
for reader.Next() {
name, err := reader.FieldName()
if err != nil {
Expand All @@ -416,23 +438,20 @@ func analyzeRow(reader *IonReader, schema *snellerSchema) error {
ionType := reader.Type()
snellerType := snellerType(ionType)

col, ok := schema.Columns[name]
col, ok := lookup[name]
if !ok {
col = &snellerColumn{
Index: index,
Name: name,
Typ: snellerType,
Nullable: snellerType == snellerTypeNull,
Signed: ionType == ion.IntType || ionType == ion.FloatType,
Optional: schema.RowCount != 1,
Count: 0,
}
schema.Columns[name] = col
lookup[name] = col
schema.Columns = append(schema.Columns, col)
}

if index != col.Index {
col.Index = -1
}
col.Count++

// Adjust column type if required
Expand Down Expand Up @@ -461,9 +480,8 @@ func analyzeRow(reader *IonReader, schema *snellerSchema) error {
}
// TODO: Required bits
}

index++
}

return reader.Error()
}

Expand Down

0 comments on commit a267d03

Please sign in to comment.