ableegoldman commented on a change in pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#discussion_r628817360



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -198,6 +198,10 @@ public StreamTask(final TaskId id,
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         this.committedOffsets = new HashMap<>();
         this.highWatermark = new HashMap<>();
+        for (final TopicPartition topicPartition: inputPartitions) {
+            this.committedOffsets.put(topicPartition, -1L);
+            this.highWatermark.put(topicPartition, -1L);
+        }

Review comment:
       nit: none of these require a `this.`, can you remove that here and in 
the two lines above?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class TaskMetadataIntegrationTest {
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+    private AtomicBoolean process;
+    private AtomicBoolean commit;
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+
+        process = new AtomicBoolean(true);

Review comment:
       ```suggestion
           processed = new AtomicBoolean(true);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1091,7 +1091,7 @@ private void commitOffsetsOrTransaction(final Map<Task, 
Map<TopicPartition, Offs
                     try {
                         tasks.streamsProducerForTask(task.id())
                             .commitTransaction(taskToCommit.getValue(), 
mainConsumer.groupMetadata());
-                        updateTaskMetadata(taskToCommit.getValue());
+                        this.updateTaskCommitMetadata(taskToCommit.getValue());

Review comment:
       `this` seems unnecessary?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -900,6 +902,14 @@ private long pollPhase() {
 
         final int numRecords = records.count();
 
+        for (final TopicPartition topicPartition: records.partitions()) {
+            records
+                .records(topicPartition)
+                .stream()
+                .max(Comparator.comparing(ConsumerRecord::offset))
+                .ifPresent(t -> 
taskManager.updateTaskEndMetadata(topicPartition, t.offset()));

Review comment:
       Dumb question: what do the TaskMetadata's `endOffsets` actually refer 
to? The KIP says that they reflect `"the highest offset seen so far"`, but I'm 
wondering what "seen" means in this context: is it the highest offset 
processed, or the highest offset consumed? I'm guessing the former since that 
feels more useful, but maybe an example of how this will be used will help me 
understand. (Asking because if it is the former, then we don't want to update 
them based on records that have been polled but not processed)
   
   Also, can you add javadocs explaining this and the other TaskMetadata getter 
methods like you had in the KIP?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class TaskMetadataIntegrationTest {
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+    private AtomicBoolean process;
+    private AtomicBoolean commit;
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;

Review comment:
       nit: initialize `appId` with the prefix you want (eg `appID_`, or 
something more descriptive like `TaskMetadataTest_`) and then just append 
testId, ie
   ```
   appId = appId + testId;
   ```
   
   Just makes it easier to locate what the prefix is (same with `inputTopic` 
below)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -302,6 +302,11 @@ public void updateCommittedOffsets(final TopicPartition 
topicPartition, final Lo
 
     }
 
+    @Override
+    public void updateEndOffsets(final TopicPartition topicPartition, final 
Long offset) {
+

Review comment:
       We should probably never have an empty method like this, can you either 
give it a default no-op implementation or just make it a method on StreamTask 
and check + cast the `Task` object? The latter is probably safer tbh, plus 
eventually we will have better Stream vs Standby Task safety in the TaskManager 
so you won't even need the cast

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1162,6 +1162,15 @@ private void updateTaskMetadata(final 
Map<TopicPartition, OffsetAndMetadata> all
         }
     }
 
+    public void updateTaskEndMetadata(final TopicPartition topicPartition, 
final Long offset) {
+        for (final Task task: tasks.activeTasks()) {
+            if (task.inputPartitions().contains(topicPartition)) {
+                task.updateEndOffsets(topicPartition, offset);
+            }
+        }
+    }
+

Review comment:
       super nit: extra linebreak

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1162,6 +1162,15 @@ private void updateTaskMetadata(final 
Map<TopicPartition, OffsetAndMetadata> all
         }
     }
 
+    public void updateTaskEndMetadata(final TopicPartition topicPartition, 
final Long offset) {
+        for (final Task task: tasks.activeTasks()) {

Review comment:
       Yeah, there are some TODOs to make `tasks` more type safe by 
distinguishing between StreamTask and StandbyTask more explicitly where 
appropriate, so I think it's safe to say this will eventually return a 
collection of actual `StreamTask`s rather than plain `Task`s, in which case it 
will make even more sense for `#updateEndOffsets` to be a method (only) on 
StreamTask itself

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class TaskMetadataIntegrationTest {
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+    private AtomicBoolean process;
+    private AtomicBoolean commit;
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+
+        process = new AtomicBoolean(true);
+        commit = new AtomicBoolean(true);
+
+        final KStream<String, String> stream = builder.stream(inputTopic);
+        stream.process(PauseProcessor::new);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+                        
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1L)
+                )
+        );
+    }
+
+    @Test
+    public void shouldReportCorrectCommittedOffsetInformation() {
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams),
 DEFAULT_DURATION);
+            final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
+            assertThat(taskMetadata.committedOffsets().size(), equalTo(1));
+            final TopicPartition topicPartition = new 
TopicPartition(inputTopic, 0);
+
+            produceMessages(0L, inputTopic, "test");
+            TestUtils.waitForCondition(() -> !process.get(), "the record was 
processed");
+            TestUtils.waitForCondition(() -> 
taskMetadata.committedOffsets().get(topicPartition) == 1L, "the record was 
processed");
+            process.set(true);
+
+            produceMessages(0L, inputTopic, "test1");
+            TestUtils.waitForCondition(() -> !process.get(), "the record was 
processed");
+            TestUtils.waitForCondition(() -> 
taskMetadata.committedOffsets().get(topicPartition) == 2L, "the record was 
processed");
+            process.set(true);
+
+            produceMessages(0L, inputTopic, "test1");
+            TestUtils.waitForCondition(() -> !process.get(), "the record was 
processed");
+            TestUtils.waitForCondition(() -> 
taskMetadata.committedOffsets().get(topicPartition) == 3L, "the record was 
processed");
+        } catch (final Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void shouldReportCorrectEndOffsetInformation() {
+        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+            
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams),
 DEFAULT_DURATION);
+            final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
+            assertThat(taskMetadata.endOffsets().size(), equalTo(1));
+            final TopicPartition topicPartition = new 
TopicPartition(inputTopic, 0);
+            commit.set(false);
+
+            for (int i = 0; i < 10; i++) {
+                produceMessages(0L, inputTopic, "test");
+                TestUtils.waitForCondition(() -> !process.get(), "the record 
was processed");
+                process.set(true);
+            }
+            assertThat(taskMetadata.endOffsets().get(topicPartition), 
equalTo(9L));
+
+        } catch (final Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+        final List<TaskMetadata> taskMetadataList = 
kafkaStreams.localThreadsMetadata().stream().flatMap(t -> 
t.activeTasks().stream()).collect(Collectors.toList());
+        assertThat("only one task", taskMetadataList.size() == 1);
+        return taskMetadataList.get(0);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String 
streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+
+    private class PauseProcessor extends AbstractProcessor<String, String> {
+        @Override
+        public void process(final String key, final String value) {
+            while (!process.get()) {

Review comment:
       Is this just to prevent it from processing anything until you're ready 
to proceed? It seems like you can/are doing that just by controlling when to 
produce input messages and doing so one at a time
   
   (if that's accurate, then WDYT about renaming `process` to `processed` and 
flipping the boolean so it more clearly serves the purpose of indicating 
whether a record has yet been processed)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1193,6 +1196,11 @@ public void updateCommittedOffsets(final TopicPartition 
topicPartition, final Lo
         committedOffsets.put(topicPartition, offset);
     }
 
+    @Override
+    public void updateEndOffsets(final TopicPartition topicPartition, final 
Long offset) {
+        highWatermark.put(topicPartition, offset);

Review comment:
       Does this actually improve the endOffsets accuracy? This part of the 
code is new to me so I just had a look around, and I'm a bit confused about how 
it's supposed to work. Here's my reading of the code: 
   
   the endOffsets are exposed to the user via the TaskMetadata, which is 
updated/created in StreamThread#updateThreadMetadata. That method seems to be 
called only when the StreamThread's state is set to `RUNNING`, which in turn 
only occurs when it completes restoration. So it would seem that it never gets 
updated during normal processing, when the end offset is actually increasing. 
In the ticket you say it's not updated "optimally", is it possible that it's 
not updated at all? 
   
   Also, assuming the task metadata update is refactored to occur more 
regularly, I'm not sure it helps anything to  update the `highWatermark` more 
quickly/frequently since this is only read out and used when 
    it's returned by the `highWaterMark()` method, which is where it was being 
updated before. So it might be that we don't need this part of it at all, and 
just refactoring the TaskMetadata update alone will be sufficient




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to