Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: klaviyo jobs order #3686

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
eventNameMapping,
jsonNameMapping,
} = require('./config');
const { processRouterDestV2, processV2 } = require('./transformV2');
const { processRouter: processRouterV2, processV2 } = require('./transformV2');
const {
createCustomerProperties,
subscribeUserToList,
Expand All @@ -35,6 +35,7 @@ const {
adduserIdFromExternalId,
getSuccessRespEvents,
handleRtTfSingleEventError,
groupEventsByType,
flattenJson,
isNewStatusCodesAccepted,
} = require('../../util');
Expand Down Expand Up @@ -333,11 +334,12 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => {
}
};

const processRouterDest = async (inputs, reqMetadata) => {
const processRouter = async (inputs, reqMetadata) => {
const { destination } = inputs[0];
// This is used to switch to latest API version
if (destination.Config?.apiVersion === 'v2') {
return processRouterDestV2(inputs, reqMetadata);
const a = processRouterV2(inputs, reqMetadata);
return a;
anantjain45823 marked this conversation as resolved.
Show resolved Hide resolved
anantjain45823 marked this conversation as resolved.
Show resolved Hide resolved
}
let batchResponseList = [];
const batchErrorRespList = [];
Expand Down Expand Up @@ -403,7 +405,26 @@ const processRouterDest = async (inputs, reqMetadata) => {

batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList];

return [...batchResponseList, ...batchErrorRespList];
return { successEvents: batchResponseList, errorEvents: batchErrorRespList };
};
const processRouterDest = async (inputs, reqMetadata) => {
/**
We are doing this to maintain the order of events not only fo transformation but for delivery as well
Job Id: 1 2 3 4 5 6
Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3']
Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']]
Output after transformation: [1, [2,4], [3,5,6]]
*/
const inputsGroupedByType = groupEventsByType(inputs);
const respList = [];
const errList = [];
await Promise.all(
inputsGroupedByType.map(async (typedEventList) => {
const { successEvents, errorEvents } = await processRouter(typedEventList, reqMetadata);
respList.push(...successEvents);
errList.push(...errorEvents);
}),
);
return [...respList, ...errList];
};

module.exports = { process, processRouterDest };
22 changes: 1 addition & 21 deletions src/v0/destinations/klaviyo/transformV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
handleRtTfSingleEventError,
addExternalIdToTraits,
adduserIdFromExternalId,
groupEventsByType,
flattenJson,
isDefinedAndNotNull,
} = require('../../util');
Expand Down Expand Up @@ -242,23 +241,4 @@ const processRouter = (inputs, reqMetadata) => {
return { successEvents: batchResponseList, errorEvents: batchErrorRespList };
};

const processRouterDestV2 = (inputs, reqMetadata) => {
/**
We are doing this to maintain the order of events not only fo transformation but for delivery as well
Job Id: 1 2 3 4 5 6
Input : ['user1 track1', 'user1 identify 1', 'user1 track 2', 'user2 identify 1', 'user2 track 1', 'user1 track 3']
Output after batching : [['user1 track1'],['user1 identify 1', 'user2 identify 1'], [ 'user1 track 2', 'user2 track 1', 'user1 track 3']]
Output after transformation: [1, [2,4], [3,5,6]]
*/
const inputsGroupedByType = groupEventsByType(inputs);
const respList = [];
const errList = [];
inputsGroupedByType.forEach((typedEventList) => {
const { successEvents, errorEvents } = processRouter(typedEventList, reqMetadata);
respList.push(...successEvents);
errList.push(...errorEvents);
});
return [...respList, ...errList];
};

module.exports = { processV2, processRouterDestV2 };
module.exports = { processV2, processRouter };
39 changes: 38 additions & 1 deletion test/integrations/destinations/klaviyo/router/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,43 @@
list_id: 'XUepkK',
subscriptions: [
{ email: 'test@rudderstack.com', phone_number: '+12 345 678 900' },
],
},
},
},
JSON_ARRAY: {},
XML: {},
FORM: {},
},
files: {},
},
],
metadata: [generateMetadata(3)],
batched: true,
statusCode: 200,
destination,
},
{
batchedRequest: [
{
version: '1',
type: 'REST',
method: 'POST',
endpoint: 'https://a.klaviyo.com/api/profile-subscription-bulk-create-jobs',
headers: {
Authorization: 'Klaviyo-API-Key dummyPrivateApiKey',
Dismissed Show dismissed Hide dismissed
'Content-Type': 'application/json',
Accept: 'application/json',
revision: '2023-02-22',
},
params: {},
body: {
JSON: {
data: {
type: 'profile-subscription-bulk-create-job',
attributes: {
list_id: 'XUepkK',
subscriptions: [
{
email: 'test@rudderstack.com',
phone_number: '+12 345 578 900',
Expand Down Expand Up @@ -120,7 +157,7 @@
files: {},
},
],
metadata: [generateMetadata(3), generateMetadata(2)],
metadata: [generateMetadata(2)],
batched: true,
statusCode: 200,
destination,
Expand Down
Loading