-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve] [pip] PIP-363: Add callback parameters to the method: org.a…
…pache.pulsar.client.impl.SendCallback.sendComplete. (#22940)
- Loading branch information
1 parent
9bf714f
commit fe21441
Showing
1 changed file
with
111 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
# PIP-363: Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`. | ||
|
||
# Background knowledge | ||
|
||
|
||
As introduced in [PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md), Pulsar has been fully integrated into the `OpenTelemetry` system, which defines some metric specifications for [messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingpublishduration). | ||
|
||
In the current Pulsar client code, it is not possible to obtain the number of messages sent in batches(as well as some other sending data), making it impossible to implement `messaging.publish.messages` metric. | ||
|
||
In the `opentelemetry-java-instrumentation` code, the `org.apache.pulsar.client.impl.SendCallback` interface is used to instrument data points. For specific implementation details, we can refer to [this](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java#L89-L135). | ||
|
||
# Motivation | ||
|
||
|
||
In the current situation, `org.apache.pulsar.client.impl.ProducerImpl` does not provide a public method to obtain the `numMessagesInBatch`. | ||
|
||
So, we can add some of `org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg`'s key data into the `org.apache.pulsar.client.impl.SendCallback.sendComplete` method. | ||
|
||
# Detailed Design | ||
|
||
Add callback parameters to the method: `org.apache.pulsar.client.impl.SendCallback.sendComplete`: | ||
|
||
```java | ||
public interface SendCallback { | ||
|
||
/** | ||
* invoked when send operation completes. | ||
* | ||
* @param e | ||
*/ | ||
void sendComplete(Throwable e, OpSendMsgStats stats); | ||
} | ||
|
||
public interface OpSendMsgStats { | ||
long getUncompressedSize(); | ||
|
||
long getSequenceId(); | ||
|
||
int getRetryCount(); | ||
|
||
long getBatchSizeByte(); | ||
|
||
int getNumMessagesInBatch(); | ||
|
||
long getHighestSequenceId(); | ||
|
||
int getTotalChunks(); | ||
|
||
int getChunkId(); | ||
} | ||
|
||
@Builder | ||
public class OpSendMsgStatsImpl implements OpSendMsgStats { | ||
private long uncompressedSize; | ||
private long sequenceId; | ||
private int retryCount; | ||
private long batchSizeByte; | ||
private int numMessagesInBatch; | ||
private long highestSequenceId; | ||
private int totalChunks; | ||
private int chunkId; | ||
|
||
@Override | ||
public long getUncompressedSize() { | ||
return uncompressedSize; | ||
} | ||
|
||
@Override | ||
public long getSequenceId() { | ||
return sequenceId; | ||
} | ||
|
||
@Override | ||
public int getRetryCount() { | ||
return retryCount; | ||
} | ||
|
||
@Override | ||
public long getBatchSizeByte() { | ||
return batchSizeByte; | ||
} | ||
|
||
@Override | ||
public int getNumMessagesInBatch() { | ||
return numMessagesInBatch; | ||
} | ||
|
||
@Override | ||
public long getHighestSequenceId() { | ||
return highestSequenceId; | ||
} | ||
|
||
@Override | ||
public int getTotalChunks() { | ||
return totalChunks; | ||
} | ||
|
||
@Override | ||
public int getChunkId() { | ||
return chunkId; | ||
} | ||
} | ||
``` | ||
|
||
# Links | ||
|
||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: https://lists.apache.org/thread/8pgmsvx1bxz4z1w8prpvpnfpt1kb57c9 | ||
* Mailing List voting thread: https://lists.apache.org/thread/t0olt3722j17gjtdxqqsl3cpy104ogpr |