guozhangwang commented on a change in pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#discussion_r561459260



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -78,15 +89,149 @@ RecordQueue queue() {
         }
     }
 
-    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, 
final Sensor recordLatenessSensor) {
+    PartitionGroup(final TaskId id,
+                   final Map<TopicPartition, RecordQueue> partitionQueues,
+                   final Sensor recordLatenessSensor,
+                   final Sensor enforcedProcessingSensor,
+                   final long maxTaskIdleMs) {
+        this.id = id;
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
         this.partitionQueues = partitionQueues;
+        this.enforcedProcessingSensor = enforcedProcessingSensor;
+        this.maxTaskIdleMs = maxTaskIdleMs;
         this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
     }
 
+    public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {
+        final Long lag = metadata.lag();
+        if (lag != null) {
+            LOG.debug("[{}] added fetched lag {}: {}", id, partition, lag);
+            fetchedLags.put(partition, lag);
+        }
+    }
+
+    public boolean readyToProcess(final long wallClockTime) {
+        if (LOG.isTraceEnabled()) {
+            for (final Map.Entry<TopicPartition, RecordQueue> entry : 
partitionQueues.entrySet()) {
+                LOG.trace(
+                    "[{}] buffered/lag {}: {}/{}",
+                    id,
+                    entry.getKey(),
+                    entry.getValue().size(),
+                    fetchedLags.get(entry.getKey())
+                );
+            }
+        }
+        // Log-level strategy:
+        //  TRACE for messages that don't wait for fetches, since these may be 
logged at extremely high frequency
+        //  DEBUG when we waited for a fetch and decided to wait some more, as 
configured
+        //  DEBUG when we are ready for processing and didn't have to enforce 
processing
+        //  INFO  when we enforce processing, since this has to wait for 
fetches AND may result in disorder
+
+        if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) {
+            if (LOG.isTraceEnabled() && !allBuffered && totalBuffered > 0) {
+                final Set<TopicPartition> bufferedPartitions = new HashSet<>();
+                final Set<TopicPartition> emptyPartitions = new HashSet<>();
+                for (final Map.Entry<TopicPartition, RecordQueue> entry : 
partitionQueues.entrySet()) {
+                    if (entry.getValue().isEmpty()) {
+                        emptyPartitions.add(entry.getKey());
+                    } else {
+                        bufferedPartitions.add(entry.getKey());
+                    }
+                }
+                LOG.trace("[{}] Ready for processing because max.task.idle.ms 
is disabled." +
+                              "\n\tThere may be out-of-order processing for 
this task as a result." +
+                              "\n\tBuffered partitions: {}" +
+                              "\n\tNon-buffered partitions: {}",
+                          id,
+                          bufferedPartitions,
+                          emptyPartitions);
+            }
+            return true;

Review comment:
       Should we log INFO if we are indeed enforcing processing? I.e. there are 
some empty partitions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -134,6 +134,8 @@
 @SuppressWarnings("deprecation")
 public class StreamsConfig extends AbstractConfig {
 
+    public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

Review comment:
       nit: move this down below to 147?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
##########
@@ -73,28 +82,22 @@
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
     private final Metrics metrics = new Metrics();
+    private final Sensor enforcedProcessingSensor = 
metrics.sensor(UUID.randomUUID().toString());
     private final MetricName lastLatenessValue = new 
MetricName("record-lateness-last-value", "", "", mkMap());
 
-    private PartitionGroup group;
 
     private static Sensor getValueSensor(final Metrics metrics, final 
MetricName metricName) {
         final Sensor lastRecordedValue = metrics.sensor(metricName.name());
         lastRecordedValue.add(metricName, new Value());
         return lastRecordedValue;
     }
 
-    @Before

Review comment:
       Good refactoring!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -52,15 +58,20 @@
  * (i.e., it increases or stays the same over time).
  */
 public class PartitionGroup {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionGroup.class);

Review comment:
       Is it more convienent to pass in the `log` object from AbstractTask to 
the PartitionGroup constructor? It is created with the logContext including the 
task-type / task-id.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -510,9 +516,7 @@ public StreamThread(final Time time,
         this.nextProbingRebalanceMs = nextProbingRebalanceMs;
 
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
-        final int dummyThreadIdx = 1;
-        this.maxPollTimeMs = new 
InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", 
"dummyClientId", dummyThreadIdx))
-            .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+        this.maxPollTimeMs = maxPollTimeMs;

Review comment:
       What's the rationale of this refactoring?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -134,6 +134,8 @@
 @SuppressWarnings("deprecation")
 public class StreamsConfig extends AbstractConfig {
 
+    public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

Review comment:
       Also nit, the line below should be
   
   ```
   private static final Logger
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -910,6 +900,11 @@ public void addRecords(final TopicPartition partition, 
final Iterable<ConsumerRe
         }
     }
 
+    @Override
+    public void addFetchedMetadata(final TopicPartition partition, final 
ConsumerRecords.Metadata metadata) {

Review comment:
       The only reason that we need to add this function at `Task` seems to be 
`tasks.activeTasksForInputPartition(partition)` at `TaskManager`. and there's a 
TODO to convert its return to `StreamTask` anyways. So let's just move this 
function to `StreamTask` only and in `TaskManager` force convert the `task` to 
`StreamTask`. And then we can remove it from `StandbyTask`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to