Skip to content

Commit

Permalink
feat: webhook v2 (#3651)
Browse files Browse the repository at this point in the history
* feat: webhook v2

* feat: webhook v2

* refactor: getAuthHeaders utility

* feat: add xml support

* feat: add batching

* feat: update batching logic

* chore: remove logs

* fix: excludeMappedFields utility

* test: add testcases

* fix: use sha256 for better hashing
  • Loading branch information
Gauravudia committed Aug 29, 2024
1 parent 9fb463e commit e21ebd0
Show file tree
Hide file tree
Showing 10 changed files with 1,171 additions and 6 deletions.
19 changes: 15 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"@ndhoule/extend": "^2.0.0",
"@pyroscope/nodejs": "^0.2.9",
"@rudderstack/integrations-lib": "^0.2.10",
"@rudderstack/json-template-engine": "^0.17.1",
"@rudderstack/json-template-engine": "^0.18.0",
"@rudderstack/workflow-engine": "^0.8.13",
"@shopify/jest-koa-mocks": "^5.1.1",
"ajv": "^8.12.0",
Expand All @@ -90,6 +90,7 @@
"json-diff": "^1.0.3",
"json-size": "^1.0.0",
"jsontoxml": "^1.0.1",
"jstoxml": "^5.0.2",
"koa": "^2.14.1",
"koa-bodyparser": "^4.4.0",
"koa2-swagger-ui": "^5.7.0",
Expand Down
67 changes: 67 additions & 0 deletions src/cdk/v2/destinations/webhook_v2/procWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
bindings:
- name: EventType
path: ../../../../constants
- path: ../../bindings/jsontemplate
exportAll: true
- path: ../../../../v0/destinations/webhook/utils
- name: getHashFromArray
path: ../../../../v0/util
- name: getIntegrationsObj
path: ../../../../v0/util
- name: removeUndefinedAndNullValues
path: ../../../../v0/util
- name: defaultRequestConfig
path: ../../../../v0/util
- name: isEmptyObject
path: ../../../../v0/util
- path: ./utils

steps:
- name: validateInput
template: |
$.assertConfig(.destination.Config.webhookUrl, "Webhook URL required. Aborting");
$.assertConfig(!(.destination.Config.auth === "basicAuth" && !(.destination.Config.username)), "Username is required for Basic Authentication. Aborting");
$.assertConfig(!(.destination.Config.auth === "bearerTokenAuth" && !(.destination.Config.bearerToken)), "Token is required for Bearer Token Authentication. Aborting");
$.assertConfig(!(.destination.Config.auth === "apiKeyAuth" && !(.destination.Config.apiKeyName)), "API Key Name is required for API Key Authentication. Aborting");
$.assertConfig(!(.destination.Config.auth === "apiKeyAuth" && !(.destination.Config.apiKeyValue)), "API Key Value is required for API Key Authentication. Aborting");
- name: deduceMethod
template: |
$.context.method = .destination.Config.method ?? 'POST';
- name: deduceBodyFormat
template: |
$.context.format = .destination.Config.format ?? 'JSON';
- name: buildHeaders
template: |
const configAuthHeaders = $.getAuthHeaders(.destination.Config);
const additionalConfigHeaders = $.getCustomMappings(.message, .destination.Config.headers);
$.context.headers = {
...configAuthHeaders,
...additionalConfigHeaders
}
- name: prepareParams
template: |
$.context.params = $.getCustomMappings(.message, .destination.Config.queryParams)
- name: deduceEndPoint
template: |
$.context.endpoint = $.addPathParams(.message, .destination.Config.webhookUrl);
- name: prepareBody
template: |
const payload = $.getCustomMappings(.message, .destination.Config.propertiesMapping);
$.context.payload = $.removeUndefinedAndNullValues($.excludeMappedFields(payload, .destination.Config.propertiesMapping))
$.context.format === "XML" && !$.isEmptyObject($.context.payload) ? $.context.payload = {payload: $.getXMLPayload($.context.payload)};
- name: buildResponseForProcessTransformation
template: |
const response = $.defaultRequestConfig();
$.context.format === "JSON" ? response.body.JSON = $.context.payload: response.body.XML = $.context.payload;
response.endpoint = $.context.endpoint;
response.headers = $.context.headers;
response.method = $.context.method;
response.params = $.context.params ?? {};
response
60 changes: 60 additions & 0 deletions src/cdk/v2/destinations/webhook_v2/rtWorkflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
bindings:
- name: handleRtTfSingleEventError
path: ../../../../v0/util/index
- path: ./utils
exportAll: true
- name: BatchUtils
path: '@rudderstack/workflow-engine'

steps:
- name: validateInput
template: |
$.assert(Array.isArray(^) && ^.length > 0, "Invalid event array")
- name: transform
externalWorkflow:
path: ./procWorkflow.yaml
loopOverInput: true

- name: successfulEvents
template: |
$.outputs.transform#idx.output.({
"batchedRequest": .,
"batched": false,
"destination": ^[idx].destination,
"metadata": ^[idx].metadata[],
"statusCode": 200
})[]
- name: failedEvents
template: |
$.outputs.transform#idx.error.(
$.handleRtTfSingleEventError(^[idx], .originalError ?? ., {})
)[]
- name: bodyFormat
template: |
$.outputs.successfulEvents[0].destination.Config.format ?? "JSON";
- name: batchingEnabled
template: |
$.outputs.successfulEvents[0].destination.Config.isBatchingEnabled;
- name: batchSize
template: |
$.outputs.successfulEvents[0].destination.Config.maxBatchSize;
- name: batchSuccessfulEvents
description: Batches the successfulEvents
condition: $.outputs.batchingEnabled && $.outputs.bodyFormat === "JSON"
template: |
$.batchSuccessfulEvents($.outputs.successfulEvents, $.outputs.batchSize);
- name: finalPayloadWithBatching
condition: $.outputs.batchingEnabled && $.outputs.bodyFormat === "JSON"
template: |
[...$.outputs.batchSuccessfulEvents, ...$.outputs.failedEvents]
else:
name: finalPayloadWithoutBatching
template: |
[...$.outputs.successfulEvents, ...$.outputs.failedEvents]
146 changes: 146 additions & 0 deletions src/cdk/v2/destinations/webhook_v2/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
const { toXML } = require('jstoxml');
const { groupBy } = require('lodash');
const { createHash } = require('crypto');
const { ConfigurationError } = require('@rudderstack/integrations-lib');
const { BatchUtils } = require('@rudderstack/workflow-engine');
const { base64Convertor, applyCustomMappings, isEmptyObject } = require('../../../../v0/util');

const getAuthHeaders = (config) => {
let headers;
switch (config.auth) {
case 'basicAuth': {
const credentials = `${config.username}:${config.password}`;
const encodedCredentials = base64Convertor(credentials);
headers = {
Authorization: `Basic ${encodedCredentials}`,
};
break;
}
case 'bearerTokenAuth':
headers = { Authorization: `Bearer ${config.bearerToken}` };
break;
case 'apiKeyAuth':
headers = { [config.apiKeyName]: `${config.apiKeyValue}` };
break;
default:
headers = {};
}
return headers;
};

const getCustomMappings = (message, mapping) => {
try {
return applyCustomMappings(message, mapping);
} catch (e) {
throw new ConfigurationError(`[Webhook]:: Error in custom mappings: ${e.message}`);
}
};

// TODO: write a func to evaluate json path template
const addPathParams = (message, webhookUrl) => webhookUrl;

const excludeMappedFields = (payload, mapping) => {
const rawPayload = { ...payload };
if (mapping) {
mapping.forEach(({ from, to }) => {
// continue when from === to
if (from === to) return;

// Remove the '$.' prefix and split the remaining string by '.'
const keys = from.replace(/^\$\./, '').split('.');
let current = rawPayload;

// Traverse to the parent of the key to be removed
keys.slice(0, -1).forEach((key) => {
if (current && current[key]) {
current = current[key];
} else {
current = null;
}
});

if (current) {
// Remove the 'from' field from input payload
delete current[keys[keys.length - 1]];
}
});
}

return rawPayload;
};

const getXMLPayload = (payload) =>
toXML(payload, {
header: true,
});

const getMergedEvents = (batch) => {
const events = [];
batch.forEach((event) => {
if (!isEmptyObject(event.batchedRequest.body.JSON)) {
events.push(event.batchedRequest.body.JSON);
}
});
return events;
};

const mergeMetadata = (batch) => batch.map((event) => event.metadata[0]);

const createHashKey = (endpoint, headers, params) => {
const hash = createHash('sha256');
hash.update(endpoint);
hash.update(JSON.stringify(headers));
hash.update(JSON.stringify(params));
return hash.digest('hex');
};

const buildBatchedRequest = (batch) => ({
batchedRequest: {
body: {
JSON: {},
JSON_ARRAY: { batch: JSON.stringify(getMergedEvents(batch)) },
XML: {},
FORM: {},
},
version: '1',
type: 'REST',
method: batch[0].batchedRequest.method,
endpoint: batch[0].batchedRequest.endpoint,
headers: batch[0].batchedRequest.headers,
params: batch[0].batchedRequest.params,
files: {},
},
metadata: mergeMetadata(batch),
batched: true,
statusCode: 200,
destination: batch[0].destination,
});

const batchSuccessfulEvents = (events, batchSize) => {
const response = [];
// group events by endpoint, headers and query params
const groupedEvents = groupBy(events, (event) => {
const { endpoint, headers, params } = event.batchedRequest;
return createHashKey(endpoint, headers, params);
});

// batch the each grouped event
Object.keys(groupedEvents).forEach((groupKey) => {
const batches = BatchUtils.chunkArrayBySizeAndLength(groupedEvents[groupKey], {
maxItems: batchSize,
}).items;
batches.forEach((batch) => {
response.push(buildBatchedRequest(batch));
});
});
return response;
};

module.exports = {
getAuthHeaders,
getCustomMappings,
addPathParams,
excludeMappedFields,
getXMLPayload,
batchSuccessfulEvents,
};
3 changes: 2 additions & 1 deletion src/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
"WUNDERKIND": true,
"CLICKSEND": true,
"ZOHO": true,
"CORDIAL": true
"CORDIAL": true,
"WEBHOOK_V2": true
},
"regulations": [
"BRAZE",
Expand Down
Loading

0 comments on commit e21ebd0

Please sign in to comment.