Skip to content

Commit

Permalink
Merge pull request #180 from ably/ECO-4650/remove-message-grouping-fo…
Browse files Browse the repository at this point in the history
…r-batch

[ECO-4650] fix: remove message grouping for Ably Batch calls
  • Loading branch information
ttypic committed Mar 13, 2024
2 parents fcdcd2d + 7e4dc40 commit 49d21ad
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 247 deletions.
87 changes: 0 additions & 87 deletions src/main/java/com/ably/kafka/connect/batch/MessageGroup.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,36 @@
import com.ably.kafka.connect.mapping.MessageConverter;
import com.ably.kafka.connect.mapping.RecordMapping;
import com.ably.kafka.connect.mapping.RecordMappingException;
import com.google.common.collect.Lists;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class MessageGrouper {
private static final Logger logger = LoggerFactory.getLogger(MessageGrouper.class);
public class MessageTransformer {
private static final Logger logger = LoggerFactory.getLogger(MessageTransformer.class);

private final RecordMapping channelMapping;
private final RecordMapping messageNameMapping;
private final ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure;
@Nullable private final ErrantRecordReporter dlqReporter;
@Nullable
private final ErrantRecordReporter dlqReporter;

/**
* Construct a new message grouper, for generating Ably BatchSpecs and converting
* Construct a new message transformer, for generating Ably BatchSpecs and converting
* records to messages as needed.
*
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param channelMapping The RecordMapping to use to generate Ably channel names
* @param messageNameMapping The RecordMapping to use to generate Ably Message names
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
* @param actionOnFailure Action to perform when a message mapping attempt fails
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
*/
public MessageGrouper(
public MessageTransformer(
RecordMapping channelMapping,
RecordMapping messageNameMapping,
ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure,
Expand All @@ -45,44 +45,32 @@ public MessageGrouper(
}

/**
* Construct a message group for an incoming batch of Kafka records
* Construct Ably messages for an incoming batch of Kafka records
*
* @param records Kafka sink records to group by channel and transform to Ably messages
* @return MessageGroup for outgoing message batch
* @param records Kafka sink records to transform to Ably messages
* @return List of Kafka sink records with transformed Ably messages
* @throws FatalBatchProcessingException if a fatal error occurred processing records
*/
public MessageGroup group(List<SinkRecord> records) throws FatalBatchProcessingException {
final Map<String, List<MessageGroup.RecordMessagePair>> groupedRecords = new HashMap<>();
for (SinkRecord record : records) {
public List<RecordMessagePair> transform(List<SinkRecord> records) throws FatalBatchProcessingException {
return records.stream().map(record -> {
try {
final String channel = channelMapping.map(record);
final String messageName = messageNameMapping.map(record);
final Message message = MessageConverter.toAblyMessage(messageName, record);

groupedRecords.compute(channel, (ch, recs) -> {
final MessageGroup.RecordMessagePair pair = new MessageGroup.RecordMessagePair(record, message);
if (recs != null) {
recs.add(pair);
return recs;
} else {
return Lists.newArrayList(pair);
}
});

String channel = channelMapping.map(record);
String messageName = messageNameMapping.map(record);
Message message = MessageConverter.toAblyMessage(messageName, record);
return new RecordMessagePair(record, message, channel);
} catch (RecordMappingException mappingError) {
handleMappingFailure(record, mappingError);
return null;
}
}

return new MessageGroup(groupedRecords);
}).filter(Objects::nonNull).collect(Collectors.toList());
}


/**
* Process a record that we're unable to forward to Ably due to a failed channel or
* message name mapping according to the configured handling behaviour.
*
* @param record The SinkRecord we weren't able to map
* @param record The SinkRecord we weren't able to map
* @param mappingError The error raised by the RecordMapping
*/
private void handleMappingFailure(
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/com/ably/kafka/connect/batch/RecordMessagePair.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.ably.kafka.connect.batch;

import com.ably.kafka.connect.client.BatchSpec;
import io.ably.lib.types.Message;
import org.apache.kafka.connect.sink.SinkRecord;

/**
* Kafka Records with outgoing Ably Batch Spec.
*/
public class RecordMessagePair {
private final SinkRecord kafkaRecord;
private final Message message;
private final String channelName;
/**
* Construct a new record-message pairing.
*/
RecordMessagePair(SinkRecord kafkaRecord, Message message, String channelName) {
this.kafkaRecord = kafkaRecord;
this.message = message;
this.channelName = channelName;
}

/**
* Returns the incoming Kafka SinkRecord
*/
public SinkRecord getKafkaRecord() {
return kafkaRecord;
}

/**
* Returns the outgoing Ably Message
*/
public BatchSpec getBatchSpec() {
return new BatchSpec(channelName, message);
}

/**
* Returns Ably message associated with Kafka Record
*/
public Message getMessage() {
return message;
}

/**
* Returns channel name
*/
public String getChannelName() {
return channelName;
}
}
5 changes: 4 additions & 1 deletion src/main/java/com/ably/kafka/connect/client/BatchSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.ably.lib.types.Message;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -18,6 +18,9 @@ public BatchSpec(Set<String> channels, List<Message> messages) {
this.channels = channels;
this.messages = messages;
}
public BatchSpec(String channelName, Message message) {
this(Collections.singleton(channelName), Collections.singletonList(message));
}
public Set<String> getChannels() {
return channels;
}
Expand Down
Loading

0 comments on commit 49d21ad

Please sign in to comment.