From fe21441f101f3d6d47a243b81157e5c8bf3ad573 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 14 Aug 2024 01:28:48 +0800 Subject: [PATCH] [improve] [pip] PIP-363: Add callback parameters to the method: org.apache.pulsar.client.impl.SendCallback.sendComplete. (#22940) --- pip/pip-363.md | 111 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 pip/pip-363.md diff --git a/pip/pip-363.md b/pip/pip-363.md new file mode 100644 index 0000000000000..2b250e69871e1 --- /dev/null +++ b/pip/pip-363.md @@ -0,0 +1,111 @@ +# PIP-363: Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`. + +# Background knowledge + + +As introduced in [PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar has been fully integrated into the `OpenTelemetry` system, which defines some metric specifications for [messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration). + +In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement `messaging.publish.messages` metric. + +In the `opentelemetry-java-instrumentation` code, the `org.apache.pulsar.client.impl.SendCallback` interface is used to instrument data points. For specific implementation details, we can refer to [this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135). + +# Motivation + + +In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does not provide a public method to obtain the `numMessagesInBatch`. + +So, we can add some of `org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the `org.apache.pulsar.client.impl.SendCallback.sendComplete` method. + +# Detailed Design + +Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`: + +```java +public interface SendCallback { + + /** + * invoked when send operation completes. + * + * @param e + */ + void sendComplete(Throwable e, OpSendMsgStats stats); +} + +public interface OpSendMsgStats { + long getUncompressedSize(); + + long getSequenceId(); + + int getRetryCount(); + + long getBatchSizeByte(); + + int getNumMessagesInBatch(); + + long getHighestSequenceId(); + + int getTotalChunks(); + + int getChunkId(); +} + +@Builder +public class OpSendMsgStatsImpl implements OpSendMsgStats { + private long uncompressedSize; + private long sequenceId; + private int retryCount; + private long batchSizeByte; + private int numMessagesInBatch; + private long highestSequenceId; + private int totalChunks; + private int chunkId; + + @Override + public long getUncompressedSize() { + return uncompressedSize; + } + + @Override + public long getSequenceId() { + return sequenceId; + } + + @Override + public int getRetryCount() { + return retryCount; + } + + @Override + public long getBatchSizeByte() { + return batchSizeByte; + } + + @Override + public int getNumMessagesInBatch() { + return numMessagesInBatch; + } + + @Override + public long getHighestSequenceId() { + return highestSequenceId; + } + + @Override + public int getTotalChunks() { + return totalChunks; + } + + @Override + public int getChunkId() { + return chunkId; + } +} +``` + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9 +* Mailing List voting thread: https://lists.apache.org/thread/t0olt3722j17gjtdxqqsl3cpy104ogpr