Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-363: Add callback parameters to the method: org.apache.pulsar.client.impl.SendCallback.sendComplete. #22940

Merged
merged 8 commits into from
Aug 13, 2024
111 changes: 111 additions & 0 deletions pip/pip-363.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# PIP-363: Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`.

# Background knowledge


As introduced in [PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar has been fully integrated into the `OpenTelemetry` system, which defines some metric specifications for [messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration).

In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement `messaging.publish.messages` metric.

In the `opentelemetry-java-instrumentation` code, the `org.apache.pulsar.client.impl.SendCallback` interface is used to instrument data points. For specific implementation details, we can refer to [this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135).

# Motivation


In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does not provide a public method to obtain the `numMessagesInBatch`.

So, we can add some of `org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the `org.apache.pulsar.client.impl.SendCallback.sendComplete` method.

# Detailed Design

Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`:

```java
public interface SendCallback {

/**
* invoked when send operation completes.
*
* @param e
*/
void sendComplete(Exception e, OpSendMsgStats stats);
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
}

public interface OpSendMsgStats {
long getUncompressedSize();

long getSequenceId();

int getRetryCount();

long getBatchSizeByte();

int getNumMessagesInBatch();

long getHighestSequenceId();

int getTotalChunks();

int getChunkId();
}

@Builder
public class OpSendMsgStatsImpl implements OpSendMsgStats {
private long uncompressedSize;
private long sequenceId;
private int retryCount;
private long batchSizeByte;
private int numMessagesInBatch;
private long highestSequenceId;
private int totalChunks;
private int chunkId;

@Override
public long getUncompressedSize() {
return uncompressedSize;
}

@Override
public long getSequenceId() {
return sequenceId;
}

@Override
public int getRetryCount() {
return retryCount;
}

@Override
public long getBatchSizeByte() {
return batchSizeByte;
}

@Override
public int getNumMessagesInBatch() {
return numMessagesInBatch;
}

@Override
public long getHighestSequenceId() {
return highestSequenceId;
}

@Override
public int getTotalChunks() {
return totalChunks;
}

@Override
public int getChunkId() {
return chunkId;
}
}
```

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9
* Mailing List voting thread: