Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AggregateFunction(quantileDD(...), ...) #1

Merged
merged 3 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/ddsketch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"context"
"fmt"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
)

func main() {
ctx := context.Background()
c, err := ch.Dial(ctx, ch.Options{Address: "localhost:9000"})
if err != nil {
panic(err)
}

data := proto.NewAggregateFunctionDD([]interface{}{0.01, 0.9}, proto.ColumnTypeUInt64)

if err := c.Do(ctx, ch.Query{
Body: "SELECT sketch FROM default.02919_ddsketch_quantile",
Result: proto.Results{
{Name: "sketch", Data: data},
},
// OnResult will be called on next received data block.
OnResult: func(ctx context.Context, b proto.Block) error {
fmt.Println("OnResult", data.Rows())
fmt.Println(data.Debug())
return nil
},
}); err != nil {
panic(err)
}
}
6 changes: 6 additions & 0 deletions proto/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func (b *Buffer) PutUVarInt(x uint64) {
b.Buf = append(b.Buf, buf[:n]...)
}

func (b *Buffer) PutVarInt(x int64) {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, x)
b.Buf = append(b.Buf, buf[:n]...)
}

// PutInt encodes integer as uvarint.
func (b *Buffer) PutInt(x int) {
b.PutUVarInt(uint64(x))
Expand Down
81 changes: 81 additions & 0 deletions proto/col_agg_func_dd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package proto

import (
"fmt"
"strings"
)

// AggregateFunctionDD implements Column interface.
type AggregateFunctionDD struct {
data []DD
args []interface{}
typ ColumnType
}

// NewAggregateFunctionDD returns new AggregateFunctionDD.
func NewAggregateFunctionDD(args []interface{}, typ ColumnType) *AggregateFunctionDD {
return &AggregateFunctionDD{args: args, typ: typ}
}

// Append appends DD to column.
func (a *AggregateFunctionDD) Append(v DD) {
a.data = append(a.data, v)
}

// AppendArr appends DD array to column.
func (a *AggregateFunctionDD) AppendArr(v []DD) {
a.data = append(a.data, v...)
}

// Row returns DD at index i.
func (a AggregateFunctionDD) Row(i int) DD {
return a.data[i]
}

// Type returns ColumnTypeAggregateFunction.
func (a AggregateFunctionDD) Type() ColumnType {
argsStr := make([]string, len(a.args))
for i, arg := range a.args {
argsStr[i] = fmt.Sprintf("%v", arg)
}
return ColumnType(fmt.Sprintf("%s(%s, %s)", string(ColumnTypeAggregateFunction), strings.Join(argsStr, ", "), a.typ))
}

// Rows returns number of rows in column.
func (a AggregateFunctionDD) Rows() int { return len(a.data) }

// DecodeColumn decodes column from reader.
func (a *AggregateFunctionDD) DecodeColumn(r *Reader, rows int) error {
for i := 0; i < rows; i++ {
var v DD
if err := v.Decode(r); err != nil {
return err
}
a.Append(v)
}
return nil
}

// Reset resets column data.
func (a *AggregateFunctionDD) Reset() {
a.data = nil
}

// EncodeColumn encodes column to buffer.
func (a AggregateFunctionDD) EncodeColumn(b *Buffer) {
if b == nil {
return
}
for _, v := range a.data {
v.Encode(b)
}
}

// Debug returns string representation of column.
func (a AggregateFunctionDD) Debug() string {
var sketches = make([]string, len(a.data))
for _, sketch := range a.data {
sketches = append(sketches, sketch.Debug())
}
return strings.Join(sketches, "\n")
}
126 changes: 87 additions & 39 deletions proto/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func (c ColumnType) Conflicts(b ColumnType) bool {
if c.Base() != b.Base() {
return true
}
if c.Base() == b.Base() && c.Base() == ColumnTypeAggregateFunction {
// Compare function names.
aName, aType := aggregateFuncNameAndType(string(c))
bName, bType := aggregateFuncNameAndType(string(b))
if aName == bName {
return false
}
if aType == bType {
return false
}
return true
}
if c.normalizeCommas() == b.normalizeCommas() {
return false
}
Expand Down Expand Up @@ -174,45 +186,46 @@ func (c ColumnType) Array() ColumnType {
//
// For example: Array(Int8) or even Array(Array(String)).
const (
ColumnTypeNone ColumnType = ""
ColumnTypeInt8 ColumnType = "Int8"
ColumnTypeInt16 ColumnType = "Int16"
ColumnTypeInt32 ColumnType = "Int32"
ColumnTypeInt64 ColumnType = "Int64"
ColumnTypeInt128 ColumnType = "Int128"
ColumnTypeInt256 ColumnType = "Int256"
ColumnTypeUInt8 ColumnType = "UInt8"
ColumnTypeUInt16 ColumnType = "UInt16"
ColumnTypeUInt32 ColumnType = "UInt32"
ColumnTypeUInt64 ColumnType = "UInt64"
ColumnTypeUInt128 ColumnType = "UInt128"
ColumnTypeUInt256 ColumnType = "UInt256"
ColumnTypeFloat32 ColumnType = "Float32"
ColumnTypeFloat64 ColumnType = "Float64"
ColumnTypeString ColumnType = "String"
ColumnTypeFixedString ColumnType = "FixedString"
ColumnTypeArray ColumnType = "Array"
ColumnTypeIPv4 ColumnType = "IPv4"
ColumnTypeIPv6 ColumnType = "IPv6"
ColumnTypeDateTime ColumnType = "DateTime"
ColumnTypeDateTime64 ColumnType = "DateTime64"
ColumnTypeDate ColumnType = "Date"
ColumnTypeDate32 ColumnType = "Date32"
ColumnTypeUUID ColumnType = "UUID"
ColumnTypeEnum8 ColumnType = "Enum8"
ColumnTypeEnum16 ColumnType = "Enum16"
ColumnTypeLowCardinality ColumnType = "LowCardinality"
ColumnTypeMap ColumnType = "Map"
ColumnTypeBool ColumnType = "Bool"
ColumnTypeTuple ColumnType = "Tuple"
ColumnTypeNullable ColumnType = "Nullable"
ColumnTypeDecimal32 ColumnType = "Decimal32"
ColumnTypeDecimal64 ColumnType = "Decimal64"
ColumnTypeDecimal128 ColumnType = "Decimal128"
ColumnTypeDecimal256 ColumnType = "Decimal256"
ColumnTypePoint ColumnType = "Point"
ColumnTypeInterval ColumnType = "Interval"
ColumnTypeNothing ColumnType = "Nothing"
ColumnTypeNone ColumnType = ""
ColumnTypeInt8 ColumnType = "Int8"
ColumnTypeInt16 ColumnType = "Int16"
ColumnTypeInt32 ColumnType = "Int32"
ColumnTypeInt64 ColumnType = "Int64"
ColumnTypeInt128 ColumnType = "Int128"
ColumnTypeInt256 ColumnType = "Int256"
ColumnTypeUInt8 ColumnType = "UInt8"
ColumnTypeUInt16 ColumnType = "UInt16"
ColumnTypeUInt32 ColumnType = "UInt32"
ColumnTypeUInt64 ColumnType = "UInt64"
ColumnTypeUInt128 ColumnType = "UInt128"
ColumnTypeUInt256 ColumnType = "UInt256"
ColumnTypeFloat32 ColumnType = "Float32"
ColumnTypeFloat64 ColumnType = "Float64"
ColumnTypeString ColumnType = "String"
ColumnTypeFixedString ColumnType = "FixedString"
ColumnTypeArray ColumnType = "Array"
ColumnTypeIPv4 ColumnType = "IPv4"
ColumnTypeIPv6 ColumnType = "IPv6"
ColumnTypeDateTime ColumnType = "DateTime"
ColumnTypeDateTime64 ColumnType = "DateTime64"
ColumnTypeDate ColumnType = "Date"
ColumnTypeDate32 ColumnType = "Date32"
ColumnTypeUUID ColumnType = "UUID"
ColumnTypeEnum8 ColumnType = "Enum8"
ColumnTypeEnum16 ColumnType = "Enum16"
ColumnTypeLowCardinality ColumnType = "LowCardinality"
ColumnTypeMap ColumnType = "Map"
ColumnTypeBool ColumnType = "Bool"
ColumnTypeTuple ColumnType = "Tuple"
ColumnTypeNullable ColumnType = "Nullable"
ColumnTypeDecimal32 ColumnType = "Decimal32"
ColumnTypeDecimal64 ColumnType = "Decimal64"
ColumnTypeDecimal128 ColumnType = "Decimal128"
ColumnTypeDecimal256 ColumnType = "Decimal256"
ColumnTypePoint ColumnType = "Point"
ColumnTypeInterval ColumnType = "Interval"
ColumnTypeNothing ColumnType = "Nothing"
ColumnTypeAggregateFunction ColumnType = "AggregateFunction"
)

// colWrap wraps Column with type t.
Expand Down Expand Up @@ -288,3 +301,38 @@ func (s *ColInfoInput) DecodeResult(r *Reader, version int, b Block) error {
}
return nil
}

const (
colType = string(ColumnTypeAggregateFunction)
sep = ", "
)

func aggregateFuncNameAndType(columnType string) (string, string) {

subType := columnSubType(columnType)

if subType == "" {
return "", ""
}

idx := strings.LastIndex(subType, sep)
if idx == -1 {
return "", ""
}

funcName := subType[:idx]
funcType := subType[idx+len(sep):]

if idx := strings.IndexByte(funcName, '('); idx >= 0 {
funcName = funcName[:idx]
}

return funcName, funcType
}

func columnSubType(s string) string {
if strings.HasPrefix(s, colType) && strings.HasSuffix(s, ")") {
return s[len(colType)+1 : len(s)-1]
}
return ""
}
Loading