DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3246255396
##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -195,6 +219,35 @@ protected List<SubscriptionMessage> poll(final Set<String>
topicNames, final lon
return messages;
}
+ // Apply processor chain if configured
+ List<SubscriptionMessage> processed = messages;
+ if (!processors.isEmpty()) {
+ for (final SubscriptionMessageProcessor processor : processors) {
+ processed = processor.process(processed);
+ }
+ }
+
+ // Update watermark timestamp before stripping watermark events
+ for (final SubscriptionMessage m : processed) {
+ if (m.getMessageType() == SubscriptionMessageType.WATERMARK.getType()) {
+ final long ts = m.getWatermarkTimestamp();
+ if (ts > latestWatermarkTimestamp) {
+ latestWatermarkTimestamp = ts;
+ }
+ }
+ }
+
+ // Strip system messages — they are only for processors, not for users
+ processed.removeIf(
+ m -> {
+ final short type = m.getMessageType();
+ return type == SubscriptionMessageType.WATERMARK.getType();
+ });
+
+ if (processed.isEmpty()) {
+ return processed;
+ }
+
// add to uncommitted messages
Review Comment:
Fixed. Processor-buffered commit contexts are now tracked explicitly and
refreshed through heartbeat, so messages held inside processors remain pinned
on the server until they are emitted/released. Topic-scoped reset was also
added to avoid clearing unrelated buffered state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]