Skip to content

Commit

Permalink
[INLONG-10249][Manager] Fix the problem of duplicate data appears dur…
Browse files Browse the repository at this point in the history
…ing data preview (#10250)
  • Loading branch information
fuweng11 committed May 22, 2024
1 parent 2372805 commit 59ea439
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,19 +409,18 @@ private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String t
* Query topic message for the given pulsar cluster.
*/
public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName,
String subName,
Integer messageCount, InlongStreamInfo streamInfo, boolean serial) {
LOGGER.info("begin to query message for topic {}, subName={}", topicFullName, subName);
LOGGER.info("begin to query message for topic {}", topicFullName);
List<BriefMQMessage> messageList = new ArrayList<>();
int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName);
for (int messageIndex = 0; messageIndex < messageCount; messageIndex++) {
int currentPartitionNum = messageIndex % partitionCount;
int messagePosition = messageIndex / partitionCount;
int messagePosition = messageIndex / partitionCount + 1;
String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex,
streamInfo, messagePosition));
}
LOGGER.info("success query message by subs={} for topic={}", subName, topicFullName);
LOGGER.info("success query message for topic={}", topicFullName);
return messageList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
*/
public static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";

public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = "%s_%s_consumer_group_realtime_review";

@Autowired
private InlongClusterService clusterService;
@Autowired
Expand Down Expand Up @@ -320,17 +318,9 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
this.executor.execute(task);
}
queryLatch.await(30, TimeUnit.SECONDS);
log.info("success query pulsar message for groupId={}, streamId={}", streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());

// insert the consumer group info into the inlong_consume table
String topicName = streamInfo.getMqResource();
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
String groupId = streamInfo.getInlongGroupId();
log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
id, subs, groupId, topicName);

// cut
int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
if (finalMsgCount > 0) {
return briefMQMessages.subList(0, finalMsgCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
*/
public class QueryLatestMessagesRunnable implements Runnable {

public static final String PULSAR_SUBSCRIPTION_REALTIME_REVIEW = "%s_%s_consumer_group_realtime_review";

private InlongPulsarInfo inlongPulsarInfo;
private InlongStreamInfo streamInfo;
private PulsarClusterInfo clusterInfo;
Expand Down Expand Up @@ -69,11 +67,9 @@ public void run() {
String namespace = inlongPulsarInfo.getMqResource();
String topicName = streamInfo.getMqResource();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
List<BriefMQMessage> messages = pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, subs,
messageCount, streamInfo, serial);
List<BriefMQMessage> messages =
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, messageCount, streamInfo, serial);
if (CollectionUtils.isNotEmpty(messages)) {
briefMQMessages.addAll(messages);
this.latch.countDown(messages.size());
Expand Down

0 comments on commit 59ea439

Please sign in to comment.