-
Notifications
You must be signed in to change notification settings - Fork 0
/
plugin.cpp
247 lines (229 loc) · 6.95 KB
/
plugin.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/*
* Fledge "delta" filter plugin.
*
* Copyright (c) 2018 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Massimiliano Pinto, Mark Riddoch
*/
#include <plugin_api.h>
#include <config_category.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <string>
#include <iostream>
#include <filter_plugin.h>
#include <filter.h>
#include <reading_set.h>
#include <map>
#include <rapidjson/writer.h>
#include <delta_filter.h>
#include <version.h>
#define FILTER_NAME "delta"
static const char *default_config = QUOTE({
"plugin" : {
"description" : "Delta filter plugin",
"type" : "string",
"default" : FILTER_NAME,
"readonly" : "true"
},
"enable": {
"description": "A switch that can be used to enable or disable execution of the delta filter.",
"type": "boolean",
"displayName": "Enabled",
"default": "false",
"order" : "7"
},
"toleranceMeasure": {
"description": "Whether tolerance is specified as a percentage or in absolute terms",
"type": "enumeration",
"options" : [ "Percentage", "Absolute Value" ],
"default": "Percentage",
"order" : "1",
"displayName" : "Tolerance Measure"
},
"tolerance": {
"description": "A percentage/absolute difference that will be tolerated when determining if values are equal.",
"type": "float",
"minimum": "0.0",
"default": "1.0",
"mandatory": "true",
"order" : "2",
"displayName" : "Tolerance Value"
},
"processingMode": {
"description": "Reading processing mode",
"type": "enumeration",
"options" : [ "Include full reading if any Datapoint exceeds tolerance", "Include full reading if all Datapoints exceed tolerance",
"Include only the Datapoints that exceed tolerance" ],
"default": "Include full reading if any Datapoint exceeds tolerance",
"order" : "3",
"displayName" : "Reading Processing Mode"
},
"minRate": {
"description": "The minimum rate at which data must be sent",
"type": "integer",
"minimum": "0",
"default": "0",
"mandatory": "true",
"order" : "4",
"displayName" : "Minimum Rate"
},
"rateUnit": {
"description": "The unit used to evaluate the minimum rate",
"type": "enumeration",
"options" : [ "per second", "per minute", "per hour", "per day" ],
"default": "per second",
"order" : "5",
"displayName" : "Minimum Rate Units"
},
"overrides" : {
"description": "Individual asset tolerances, if different from the global tolerance",
"type": "JSON",
"default": "{ }",
"order" : "6",
"displayName" : "Individual Tolerances"
}
});
using namespace std;
using namespace rapidjson;
/**
* The Filter plugin interface
*/
extern "C" {
/**
* The plugin information structure
*/
static PLUGIN_INFORMATION info = {
FILTER_NAME, // Name
VERSION, // Version
0, // Flags
PLUGIN_TYPE_FILTER, // Type
"1.0.0", // Interface version
default_config // Default plugin configuration
};
typedef struct
{
DeltaFilter *handle;
std::string configCatName;
} FILTER_INFO;
/**
* Return the information about this plugin
*/
PLUGIN_INFORMATION *plugin_info()
{
return &info;
}
/**
* Initialise the plugin, called to get the plugin handle and setup the
* output handle that will be passed to the output stream. The output stream
* is merely a function pointer that is called with the output handle and
* the new set of readings generated by the plugin.
* (*output)(outHandle, readings);
* Note that the plugin may not call the output stream if the result of
* the filtering is that no readings are to be sent onwards in the chain.
* This allows the plugin to discard data or to buffer it for aggregation
* with data that follows in subsequent calls
*
* @param config The configuration category for the filter
* @param outHandle A handle that will be passed to the output stream
* @param output The output stream (function pointer) to which data is passed
* @return An opaque handle that is used in all subsequent calls to the plugin
*/
PLUGIN_HANDLE plugin_init(ConfigCategory* config,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output)
{
FILTER_INFO *info = new FILTER_INFO;
info->handle = new DeltaFilter(FILTER_NAME,
*config,
outHandle,
output);
info->configCatName = config->getName();
return (PLUGIN_HANDLE)info;
}
/**
* Ingest a set of readings into the plugin for processing
*
* @param handle The plugin handle returned from plugin_init
* @param readingSet The readings to process
*/
void plugin_ingest(PLUGIN_HANDLE *handle,
READINGSET *readingSet)
{
FILTER_INFO *info = (FILTER_INFO *) handle;
DeltaFilter *filter = info->handle;
if (!filter->isEnabled())
{
// Current filter is not active: just pass the readings set
filter->m_func(filter->m_data, readingSet);
return;
}
vector<Reading *> newReadings;
filter->ingest(readingSet->getAllReadingsPtr(), newReadings);
const vector<Reading *>& readings = readingSet->getAllReadings();
for (vector<Reading *>::const_iterator elem = readings.begin();
elem != readings.end();
++elem)
{
AssetTracker::getAssetTracker()->addAssetTrackingTuple(info->configCatName, (*elem)->getAssetName(), string("Filter"));
}
// Remove the input readingSet data
delete (ReadingSet *)readingSet;
AssetTracker *tracker = AssetTracker::getAssetTracker();
// Create a new ReadingSet from new reading data
ReadingSet *newReadingSet = new ReadingSet(&newReadings);
const vector<Reading *>& readings2 = newReadingSet->getAllReadings();
for (vector<Reading *>::const_iterator elem = readings2.begin();
elem != readings2.end();
++elem)
{
if (tracker)
{
tracker->addAssetTrackingTuple(info->configCatName, (*elem)->getAssetName(), string("Filter"));
}
}
// Pass newReadings to filter->m_func
filter->m_func(filter->m_data, newReadingSet);
}
/**
* Call the shutdown method in the plugin
*
* @param handle The plugin handle, aka instance of DeltaFilter
* @return A JSON string with data to persist in storage service
*/
void plugin_shutdown(PLUGIN_HANDLE *handle)
{
FILTER_INFO *info = (FILTER_INFO *) handle;
delete info->handle;
delete info;
}
/**
* Call the reconfigure method in the plugin
*
* @param handle The plugin handle, aka instance of DeltaFilter
* @param newConfig The new configuration
* @return A JSON string with data to persist in storage service
*/
void plugin_reconfigure(PLUGIN_HANDLE *handle, const string& newConfig)
{
FILTER_INFO *info = (FILTER_INFO *) handle;
DeltaFilter *filter = info->handle;
filter->reconfigure(newConfig);
}
/**
* Plugin start with plugin data from storage service
*
* @param handle The filter handle
* @param storedData The stored plugin data
* from last run.
*/
void plugin_start(PLUGIN_HANDLE *handle,
const string& storedData)
{
FledgeFilter* filter = (FledgeFilter *)handle;
}
// End of extern "C"
};