From ac64e2645ac2581320f28a9f66ed0380d4b8c520 Mon Sep 17 00:00:00 2001 From: Erick Daniszewski Date: Wed, 27 Mar 2019 16:46:05 -0400 Subject: [PATCH] add transform fns to allow generalized plugins to manipulate reading values (#399) --- sdk/config/device.go | 7 ++ sdk/device.go | 16 +++++ sdk/device_test.go | 39 +++++++++++ sdk/funcs/builtins.go | 40 +++++++++++ sdk/funcs/builtins_test.go | 79 ++++++++++++++++++++++ sdk/funcs/functions.go | 67 +++++++++++++++++++ sdk/funcs/functions_test.go | 120 +++++++++++++++++++++++++++++++++ sdk/output/output.go | 2 +- sdk/scheduler.go | 54 +++++++++++++-- sdk/scheduler_test.go | 129 ++++++++++++++++++++++++++++++++++++ 10 files changed, 548 insertions(+), 5 deletions(-) create mode 100644 sdk/funcs/builtins.go create mode 100644 sdk/funcs/builtins_test.go create mode 100644 sdk/funcs/functions.go create mode 100644 sdk/funcs/functions_test.go diff --git a/sdk/config/device.go b/sdk/config/device.go index 7ca09954..36d03f88 100644 --- a/sdk/config/device.go +++ b/sdk/config/device.go @@ -136,6 +136,13 @@ type DeviceInstance struct { // e.g. "1e-2". ScalingFactor string `yaml:"scalingFactor,omitempty"` + // Apply defines a list of functions which are to be applied to the device + // reading values, in the order in which they are defined. + // + // There are some built-in functions that the SDK provides. A plugin can also + // register their own functions. + Apply []string `yaml:"apply,omitempty"` + // WriteTimeout defines a custom write timeout for the device instance. This // is the time within which the write transaction will remain valid. If left // unspecified, it will fall back to the default value of 30s. diff --git a/sdk/device.go b/sdk/device.go index 84cccdf6..4199fcd5 100644 --- a/sdk/device.go +++ b/sdk/device.go @@ -27,6 +27,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vapor-ware/synse-sdk/sdk/config" "github.com/vapor-ware/synse-sdk/sdk/errors" + "github.com/vapor-ware/synse-sdk/sdk/funcs" "github.com/vapor-ware/synse-sdk/sdk/output" "github.com/vapor-ware/synse-sdk/sdk/utils" synse "github.com/vapor-ware/synse-server-grpc/go" @@ -99,6 +100,10 @@ type Device struct { // populated via the SDK on device loading and parsing and uses the Handler // field to match the name of the handler to the actual instance. handler *DeviceHandler + + // fns defines a list of functions which should be applied to the reading value(s) + // for the device. This is called internally, if any fns are defined. + fns []*funcs.Func } // NewDeviceFromConfig creates a new instance of a Device from its device prototype @@ -179,6 +184,16 @@ func NewDeviceFromConfig(proto *config.DeviceProto, instance *config.DeviceInsta } } + var fns []*funcs.Func + for _, fn := range instance.Apply { + f := funcs.Get(fn) + if f == nil { + // fixme: err message + return nil, fmt.Errorf("device specified unknown transform function") + } + fns = append(fns, f) + } + // Override write timeout, if set. if instance.WriteTimeout != 0 { writeTimeout = instance.WriteTimeout @@ -200,6 +215,7 @@ func NewDeviceFromConfig(proto *config.DeviceProto, instance *config.DeviceInsta ScalingFactor: instance.ScalingFactor, WriteTimeout: writeTimeout, Output: instance.Output, + fns: fns, } if err := d.setAlias(instance.Alias); err != nil { diff --git a/sdk/device_test.go b/sdk/device_test.go index 49b10fa3..f41c0673 100644 --- a/sdk/device_test.go +++ b/sdk/device_test.go @@ -91,6 +91,7 @@ func TestNewDeviceFromConfig(t *testing.T) { assert.Equal(t, "2", device.ScalingFactor) assert.Equal(t, 5*time.Second, device.WriteTimeout) assert.Equal(t, "temperature", device.Output) + assert.Equal(t, 0, len(device.fns)) } func TestNewDeviceFromConfig2(t *testing.T) { @@ -115,6 +116,7 @@ func TestNewDeviceFromConfig2(t *testing.T) { "address": "localhost", }, Output: "temperature", + Apply: []string{"FtoC"}, SortIndex: 1, Alias: &config.DeviceAlias{ Name: "foo", @@ -136,6 +138,7 @@ func TestNewDeviceFromConfig2(t *testing.T) { assert.Equal(t, "2", device.ScalingFactor) assert.Equal(t, 3*time.Second, device.WriteTimeout) assert.Equal(t, "temperature", device.Output) + assert.Equal(t, 1, len(device.fns)) } func TestNewDeviceFromConfig3(t *testing.T) { @@ -211,6 +214,7 @@ func TestNewDeviceFromConfig4(t *testing.T) { assert.Equal(t, "2", device.ScalingFactor) assert.Equal(t, 30*time.Second, device.WriteTimeout) // takes the default value assert.Equal(t, "", device.Output) + assert.Equal(t, 0, len(device.fns)) } func TestNewDeviceFromConfig5(t *testing.T) { @@ -255,6 +259,7 @@ func TestNewDeviceFromConfig5(t *testing.T) { assert.Equal(t, "2", device.ScalingFactor) assert.Equal(t, 30*time.Second, device.WriteTimeout) // takes the default value assert.Equal(t, "", device.Output) + assert.Equal(t, 0, len(device.fns)) } func TestNewDeviceFromConfig6(t *testing.T) { @@ -401,6 +406,40 @@ func TestNewDeviceFromConfig9(t *testing.T) { assert.Nil(t, device) } +func TestNewDeviceFromConfig10(t *testing.T) { + // Unknown transformation function specified + proto := &config.DeviceProto{ + Type: "type1", + Metadata: map[string]string{ + "a": "b", + }, + Data: map[string]interface{}{ + "port": 5000, + }, + Tags: []string{"default/foo"}, + Handler: "testhandler", + WriteTimeout: 3 * time.Second, + } + instance := &config.DeviceInstance{ + Type: "type2", + Info: "testdata", + Tags: []string{"vapor/io"}, + Data: map[string]interface{}{ + "address": "localhost", + }, + SortIndex: 1, + Handler: "testhandler2", + Apply: []string{"unknown-fn"}, + ScalingFactor: "2", + WriteTimeout: 5 * time.Second, + DisableInheritance: false, + } + + device, err := NewDeviceFromConfig(proto, instance) + assert.Error(t, err) + assert.Nil(t, device) +} + func TestDevice_setAlias_noConf(t *testing.T) { device := Device{} diff --git a/sdk/funcs/builtins.go b/sdk/funcs/builtins.go new file mode 100644 index 00000000..4d691fe2 --- /dev/null +++ b/sdk/funcs/builtins.go @@ -0,0 +1,40 @@ +// Synse SDK +// Copyright (c) 2019 Vapor IO +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package funcs + +import "github.com/vapor-ware/synse-sdk/sdk/utils" + +// GetBuiltins returns all of the built-in Funcs supplied by the SDK. +func GetBuiltins() []*Func { + return []*Func{ + &FtoC, + } +} + +// FtoC is a Func which converts a value from degrees Fahrenheit to +// degrees Celsius. +var FtoC = Func{ + Name: "FtoC", + Fn: func(value interface{}) (interface{}, error) { + f, err := utils.ConvertToFloat64(value) + if err != nil { + return nil, err + } + c := float64((f - 32.0) * 5.0 / 9.0) + return c, nil + }, +} diff --git a/sdk/funcs/builtins_test.go b/sdk/funcs/builtins_test.go new file mode 100644 index 00000000..a654edcc --- /dev/null +++ b/sdk/funcs/builtins_test.go @@ -0,0 +1,79 @@ +// Synse SDK +// Copyright (c) 2019 Vapor IO +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package funcs + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetBuiltins(t *testing.T) { + fns := GetBuiltins() + assert.NotEmpty(t, fns) +} + +func TestFtoC_Fn(t *testing.T) { + cases := []struct { + f float64 + c float64 + }{ + {f: -459.67, c: -273.15}, + {f: -50, c: -45.56}, + {f: -40, c: -40.00}, + {f: -30, c: -34.44}, + {f: -20, c: -28.89}, + {f: -10, c: -23.33}, + {f: 0, c: -17.78}, + {f: 10, c: -12.22}, + {f: 20, c: -6.67}, + {f: 30, c: -1.11}, + {f: 32, c: 0}, + {f: 40, c: 4.44}, + {f: 50, c: 10.00}, + {f: 60, c: 15.56}, + {f: 70, c: 21.11}, + {f: 80, c: 26.67}, + {f: 90, c: 32.22}, + {f: 100, c: 37.78}, + {f: 110, c: 43.33}, + {f: 120, c: 48.89}, + {f: 130, c: 54.44}, + {f: 140, c: 60.00}, + {f: 150, c: 65.56}, + {f: 160, c: 71.11}, + {f: 170, c: 76.67}, + {f: 180, c: 82.22}, + {f: 190, c: 87.78}, + {f: 200, c: 93.33}, + {f: 212, c: 100}, + {f: 300, c: 148.89}, + {f: 400, c: 204.44}, + {f: 500, c: 260.00}, + {f: 600, c: 315.56}, + {f: 700, c: 371.11}, + {f: 800, c: 426.67}, + {f: 900, c: 482.22}, + {f: 1000, c: 537.78}, + } + + for _, c := range cases { + val, err := FtoC.Fn(c.f) + assert.NoError(t, err) + assert.InDelta(t, c.c, val, 0.01) + } +} diff --git a/sdk/funcs/functions.go b/sdk/funcs/functions.go new file mode 100644 index 00000000..3304bdd1 --- /dev/null +++ b/sdk/funcs/functions.go @@ -0,0 +1,67 @@ +// Synse SDK +// Copyright (c) 2019 Vapor IO +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package funcs + +import ( + "fmt" + + "github.com/vapor-ware/synse-sdk/sdk/errors" +) + +var registeredFuncs map[string]*Func + +func init() { + registeredFuncs = make(map[string]*Func) + for _, f := range GetBuiltins() { + registeredFuncs[f.Name] = f + } +} + +// Get gets a Func by its name. If a func with the specified name +// is not found, nil is returned. +func Get(name string) *Func { + return registeredFuncs[name] +} + +// Register registers new funcs to the tracked funcs. +func Register(funcs ...*Func) error { + multiErr := errors.NewMultiError("func registration") + + for _, f := range funcs { + if _, exists := registeredFuncs[f.Name]; exists { + multiErr.Add(fmt.Errorf("conflict: Func with name '%s' already exists", f.Name)) + continue + } + registeredFuncs[f.Name] = f + } + return multiErr.Err() +} + +// Func is a function that can be applied to a device reading. +type Func struct { + // Name is the name of the function. This is how it is identified + // and referenced. + Name string + + // Fn is the function which will be called on the reading value. + Fn func(value interface{}) (interface{}, error) +} + +// Call calls the function defined for the Func. +func (fn *Func) Call(value interface{}) (interface{}, error) { + return fn.Fn(value) +} diff --git a/sdk/funcs/functions_test.go b/sdk/funcs/functions_test.go new file mode 100644 index 00000000..d4a27a75 --- /dev/null +++ b/sdk/funcs/functions_test.go @@ -0,0 +1,120 @@ +// Synse SDK +// Copyright (c) 2019 Vapor IO +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package funcs + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGet_notExists(t *testing.T) { + f := Get("nonexistent") + assert.Nil(t, f) +} + +func TestGet_exists(t *testing.T) { + // The func here is built-in so it should always exist. + f := Get("FtoC") + assert.NotNil(t, f) + assert.Equal(t, "FtoC", f.Name) +} + +func TestRegister_noOutputs(t *testing.T) { + // Copy the map and reset it once we're done so we don't + // pollute it for other tests. + var registeredCopy = map[string]*Func{} + for k, v := range registeredFuncs { + registeredCopy[k] = v + } + defer func() { + registeredFuncs = registeredCopy + }() + + initLen := len(registeredFuncs) + + err := Register() + assert.NoError(t, err) + assert.Len(t, registeredFuncs, initLen) +} + +func TestRegister_oneOutput(t *testing.T) { + // Copy the map and reset it once we're done so we don't + // pollute it for other tests. + var registeredCopy = map[string]*Func{} + for k, v := range registeredFuncs { + registeredCopy[k] = v + } + defer func() { + registeredFuncs = registeredCopy + }() + + initLen := len(registeredFuncs) + + err := Register(&Func{ + Name: "test-func-1", + }) + assert.NoError(t, err) + assert.Len(t, registeredFuncs, initLen+1) +} + +func TestRegister_conflict(t *testing.T) { + // Copy the map and reset it once we're done so we don't + // pollute it for other tests. + var registeredCopy = map[string]*Func{} + for k, v := range registeredFuncs { + registeredCopy[k] = v + } + defer func() { + registeredFuncs = registeredCopy + }() + + initLen := len(registeredFuncs) + + err := Register(&Func{ + Name: "FtoC", // same name as a built-in, should conflict + }) + assert.Error(t, err) + assert.Len(t, registeredFuncs, initLen) +} + +func TestFunc_Call_ok(t *testing.T) { + fn := Func{ + Name: "test", + Fn: func(value interface{}) (i interface{}, e error) { + return value, nil + }, + } + + val, err := fn.Call(1) + assert.NoError(t, err) + assert.Equal(t, 1, val.(int)) +} + +func TestFunc_Call_err(t *testing.T) { + fn := Func{ + Name: "test", + Fn: func(value interface{}) (i interface{}, e error) { + return nil, fmt.Errorf("test error") + }, + } + + val, err := fn.Call(1) + assert.Error(t, err) + assert.Nil(t, val) +} diff --git a/sdk/output/output.go b/sdk/output/output.go index 45e75a1c..7daca533 100644 --- a/sdk/output/output.go +++ b/sdk/output/output.go @@ -39,7 +39,7 @@ func Get(name string) *Output { return registeredOutputs[name] } -// Register registers outputs to the tracked slice of outputs. +// Register registers new outputs to the tracked outputs. func Register(output ...*Output) error { multiErr := errors.NewMultiError("output registration") diff --git a/sdk/scheduler.go b/sdk/scheduler.go index 03c85ad3..b1519542 100644 --- a/sdk/scheduler.go +++ b/sdk/scheduler.go @@ -446,6 +446,43 @@ func (scheduler *scheduler) scheduleListen() { } } +// applyTransformations is a helper function to apply any transformation functions +// which a device specifies to its readings. +func applyTransformations(device *Device, rctx *ReadContext) error { + if len(device.fns) > 0 { + log.WithFields(log.Fields{ + "device": device.id, + }).Info("[scheduler] applying reading transform fns") + + for _, reading := range rctx.Reading { + for _, fn := range device.fns { + log.WithFields(log.Fields{ + "device": device.id, + "fn": fn.Name, + "value": reading.Value, + }).Debug("[scheduler] reading value pre-transform") + newVal, err := fn.Call(reading.Value) + if err != nil { + log.WithFields(log.Fields{ + "device": device.id, + "fn": fn.Name, + "value": reading.Value, + "error": err, + }).Error("[scheduler] failed to apply transform function") + return err + } + log.WithFields(log.Fields{ + "device": device.id, + "fn": fn.Name, + "value": newVal, + }).Debug("[scheduler] reading value post-transform") + reading.Value = newVal + } + } + } + return nil +} + // read reads from a single device using a handler's Read function. func (scheduler *scheduler) read(device *Device) { delay := scheduler.config.Read.Delay @@ -488,7 +525,12 @@ func (scheduler *scheduler) read(device *Device) { rlog.Error("[scheduler] failed device read") } } else { - scheduler.stateManager.readChan <- response + err := applyTransformations(device, response) + if err != nil { + log.Error("[scheduler] discarding readings") + } else { + scheduler.stateManager.readChan <- response + } } // If a delay is configured, wait for the delay before continuing @@ -540,16 +582,20 @@ func (scheduler *scheduler) bulkRead(handler *DeviceHandler) { rlog.WithField("error", err).Error("[scheduler] handler failed bulk read") } else { for _, readCtx := range response { - scheduler.stateManager.readChan <- readCtx + device := scheduler.deviceManager.GetDevice(readCtx.Device) + err := applyTransformations(device, readCtx) + if err != nil { + log.Error("[scheduler] discarding readings") + } else { + scheduler.stateManager.readChan <- readCtx + } } } // If a delay is configured, wait for the delay before continuing // (and relinquishing the lock, if in serial mode). if delay != 0 { - //rlog.Debug("[scheduler] sleeping for bulk read delay") time.Sleep(delay) - //rlog.Debug("[scheduler] waking up for bulk read delay") } } } diff --git a/sdk/scheduler_test.go b/sdk/scheduler_test.go index 40047848..09ec2712 100644 --- a/sdk/scheduler_test.go +++ b/sdk/scheduler_test.go @@ -17,12 +17,14 @@ package sdk import ( + "fmt" "testing" "time" "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/vapor-ware/synse-sdk/sdk/config" + "github.com/vapor-ware/synse-sdk/sdk/funcs" "github.com/vapor-ware/synse-sdk/sdk/health" "github.com/vapor-ware/synse-sdk/sdk/output" synse "github.com/vapor-ware/synse-server-grpc/go" @@ -495,3 +497,130 @@ func TestScheduler_scheduleListen(t *testing.T) { assert.True(t, isOpen) assert.Equal(t, "123", reading.Device) } + +func TestScheduler_applyTransformations_noFns(t *testing.T) { + device := &Device{ + fns: []*funcs.Func{}, + } + rctx := &ReadContext{ + Reading: []*output.Reading{ + {Value: 2}, + }, + } + + err := applyTransformations(device, rctx) + assert.NoError(t, err) + + // Verify that the reading value did not change. + assert.Equal(t, 2, rctx.Reading[0].Value.(int)) +} + +func TestScheduler_applyTransformations_oneFnOk(t *testing.T) { + device := &Device{ + fns: []*funcs.Func{ + { + Name: "test-fn-1", + Fn: func(value interface{}) (interface{}, error) { + return (value.(int)) * 2, nil + }, + }, + }, + } + rctx := &ReadContext{ + Reading: []*output.Reading{ + {Value: 2}, + }, + } + + err := applyTransformations(device, rctx) + assert.NoError(t, err) + + // Verify that the reading value changed. + assert.Equal(t, 4, rctx.Reading[0].Value.(int)) +} + +func TestScheduler_applyTransformations_multipleFnsOk(t *testing.T) { + device := &Device{ + fns: []*funcs.Func{ + { + Name: "test-fn-1", + Fn: func(value interface{}) (interface{}, error) { + return (value.(int)) * 2, nil + }, + }, + { + Name: "test-fn-2", + Fn: func(value interface{}) (interface{}, error) { + return (value.(int)) + 3, nil + }, + }, + }, + } + rctx := &ReadContext{ + Reading: []*output.Reading{ + {Value: 2}, + }, + } + + err := applyTransformations(device, rctx) + assert.NoError(t, err) + + // Verify that the reading value changed. + assert.Equal(t, 7, rctx.Reading[0].Value.(int)) +} + +func TestScheduler_applyTransformations_oneFnErr(t *testing.T) { + device := &Device{ + fns: []*funcs.Func{ + { + Name: "test-fn-1", + Fn: func(value interface{}) (interface{}, error) { + return nil, fmt.Errorf("test error") + }, + }, + }, + } + rctx := &ReadContext{ + Reading: []*output.Reading{ + {Value: 2}, + }, + } + + err := applyTransformations(device, rctx) + assert.Error(t, err) + + // Verify that the reading value did not change. + assert.Equal(t, 2, rctx.Reading[0].Value.(int)) +} + +func TestScheduler_applyTransformations_multipleFnsErr(t *testing.T) { + device := &Device{ + fns: []*funcs.Func{ + { + Name: "test-fn-1", + Fn: func(value interface{}) (interface{}, error) { + return (value.(int)) * 2, nil + }, + }, + { + Name: "test-fn-2", + Fn: func(value interface{}) (interface{}, error) { + return nil, fmt.Errorf("test err") + }, + }, + }, + } + rctx := &ReadContext{ + Reading: []*output.Reading{ + {Value: 2}, + }, + } + + err := applyTransformations(device, rctx) + assert.Error(t, err) + + // Verify that the reading value changed. It should change because the first + // fn was applied successfully. It is up to the upstream caller to check the + // error and make sure all transforms succeed before using the value. + assert.Equal(t, 4, rctx.Reading[0].Value.(int)) +}