ableegoldman commented on a change in pull request #10634: URL: https://github.com/apache/kafka/pull/10634#discussion_r628817360
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -198,6 +198,10 @@ public StreamTask(final TaskId id, stateMgr.registerGlobalStateStores(topology.globalStateStores()); this.committedOffsets = new HashMap<>(); this.highWatermark = new HashMap<>(); + for (final TopicPartition topicPartition: inputPartitions) { + this.committedOffsets.put(topicPartition, -1L); + this.highWatermark.put(topicPartition, -1L); + } Review comment: nit: none of these require a `this.`, can you remove that here and in the two lines above? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class TaskMetadataIntegrationTest { + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); + + @Rule + public TestName testName = new TestName(); + + private String inputTopic; + private static StreamsBuilder builder; + private static Properties properties; + private static String appId = ""; + private AtomicBoolean process; + private AtomicBoolean commit; + + @Before + public void setup() { + final String testId = safeUniqueTestName(getClass(), testName); + appId = "appId_" + testId; + inputTopic = "input" + testId; + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + + builder = new StreamsBuilder(); + + process = new AtomicBoolean(true); Review comment: ```suggestion processed = new AtomicBoolean(true); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1091,7 +1091,7 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs try { tasks.streamsProducerForTask(task.id()) .commitTransaction(taskToCommit.getValue(), mainConsumer.groupMetadata()); - updateTaskMetadata(taskToCommit.getValue()); + this.updateTaskCommitMetadata(taskToCommit.getValue()); Review comment: `this` seems unnecessary? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -900,6 +902,14 @@ private long pollPhase() { final int numRecords = records.count(); + for (final TopicPartition topicPartition: records.partitions()) { + records + .records(topicPartition) + .stream() + .max(Comparator.comparing(ConsumerRecord::offset)) + .ifPresent(t -> taskManager.updateTaskEndMetadata(topicPartition, t.offset())); Review comment: Dumb question: what do the TaskMetadata's `endOffsets` actually refer to? The KIP says that they reflect `"the highest offset seen so far"`, but I'm wondering what "seen" means in this context: is it the highest offset processed, or the highest offset consumed? I'm guessing the former since that feels more useful, but maybe an example of how this will be used will help me understand. (Asking because if it is the former, then we don't want to update them based on records that have been polled but not processed) Also, can you add javadocs explaining this and the other TaskMetadata getter methods like you had in the KIP? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class TaskMetadataIntegrationTest { + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); + + @Rule + public TestName testName = new TestName(); + + private String inputTopic; + private static StreamsBuilder builder; + private static Properties properties; + private static String appId = ""; + private AtomicBoolean process; + private AtomicBoolean commit; + + @Before + public void setup() { + final String testId = safeUniqueTestName(getClass(), testName); + appId = "appId_" + testId; Review comment: nit: initialize `appId` with the prefix you want (eg `appID_`, or something more descriptive like `TaskMetadataTest_`) and then just append testId, ie ``` appId = appId + testId; ``` Just makes it easier to locate what the prefix is (same with `inputTopic` below) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ########## @@ -302,6 +302,11 @@ public void updateCommittedOffsets(final TopicPartition topicPartition, final Lo } + @Override + public void updateEndOffsets(final TopicPartition topicPartition, final Long offset) { + Review comment: We should probably never have an empty method like this, can you either give it a default no-op implementation or just make it a method on StreamTask and check + cast the `Task` object? The latter is probably safer tbh, plus eventually we will have better Stream vs Standby Task safety in the TaskManager so you won't even need the cast ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1162,6 +1162,15 @@ private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> all } } + public void updateTaskEndMetadata(final TopicPartition topicPartition, final Long offset) { + for (final Task task: tasks.activeTasks()) { + if (task.inputPartitions().contains(topicPartition)) { + task.updateEndOffsets(topicPartition, offset); + } + } + } + Review comment: super nit: extra linebreak ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1162,6 +1162,15 @@ private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> all } } + public void updateTaskEndMetadata(final TopicPartition topicPartition, final Long offset) { + for (final Task task: tasks.activeTasks()) { Review comment: Yeah, there are some TODOs to make `tasks` more type safe by distinguishing between StreamTask and StandbyTask more explicitly where appropriate, so I think it's safe to say this will eventually return a collection of actual `StreamTask`s rather than plain `Task`s, in which case it will make even more sense for `#updateEndOffsets` to be a method (only) on StreamTask itself ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.TaskMetadata; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class TaskMetadataIntegrationTest { + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); + + @Rule + public TestName testName = new TestName(); + + private String inputTopic; + private static StreamsBuilder builder; + private static Properties properties; + private static String appId = ""; + private AtomicBoolean process; + private AtomicBoolean commit; + + @Before + public void setup() { + final String testId = safeUniqueTestName(getClass(), testName); + appId = "appId_" + testId; + inputTopic = "input" + testId; + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + + builder = new StreamsBuilder(); + + process = new AtomicBoolean(true); + commit = new AtomicBoolean(true); + + final KStream<String, String> stream = builder.stream(inputTopic); + stream.process(PauseProcessor::new); + + properties = mkObjectProperties( + mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), + mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2), + mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1L) + ) + ); + } + + @Test + public void shouldReportCorrectCommittedOffsetInformation() { + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), DEFAULT_DURATION); + final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams); + assertThat(taskMetadata.committedOffsets().size(), equalTo(1)); + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + + produceMessages(0L, inputTopic, "test"); + TestUtils.waitForCondition(() -> !process.get(), "the record was processed"); + TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 1L, "the record was processed"); + process.set(true); + + produceMessages(0L, inputTopic, "test1"); + TestUtils.waitForCondition(() -> !process.get(), "the record was processed"); + TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 2L, "the record was processed"); + process.set(true); + + produceMessages(0L, inputTopic, "test1"); + TestUtils.waitForCondition(() -> !process.get(), "the record was processed"); + TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 3L, "the record was processed"); + } catch (final Exception e) { + e.printStackTrace(); + } + } + + @Test + public void shouldReportCorrectEndOffsetInformation() { + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), DEFAULT_DURATION); + final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams); + assertThat(taskMetadata.endOffsets().size(), equalTo(1)); + final TopicPartition topicPartition = new TopicPartition(inputTopic, 0); + commit.set(false); + + for (int i = 0; i < 10; i++) { + produceMessages(0L, inputTopic, "test"); + TestUtils.waitForCondition(() -> !process.get(), "the record was processed"); + process.set(true); + } + assertThat(taskMetadata.endOffsets().get(topicPartition), equalTo(9L)); + + } catch (final Exception e) { + e.printStackTrace(); + } + } + + private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) { + final List<TaskMetadata> taskMetadataList = kafkaStreams.localThreadsMetadata().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList()); + assertThat("only one task", taskMetadataList.size() == 1); + return taskMetadataList.get(0); + } + + @After + public void teardown() throws IOException { + purgeLocalStreamsState(properties); + } + + private void produceMessages(final long timestamp, final String streamOneInput, final String msg) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + streamOneInput, + Collections.singletonList(new KeyValue<>("1", msg)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + timestamp); + } + + private class PauseProcessor extends AbstractProcessor<String, String> { + @Override + public void process(final String key, final String value) { + while (!process.get()) { Review comment: Is this just to prevent it from processing anything until you're ready to proceed? It seems like you can/are doing that just by controlling when to produce input messages and doing so one at a time (if that's accurate, then WDYT about renaming `process` to `processed` and flipping the boolean so it more clearly serves the purpose of indicating whether a record has yet been processed) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -1193,6 +1196,11 @@ public void updateCommittedOffsets(final TopicPartition topicPartition, final Lo committedOffsets.put(topicPartition, offset); } + @Override + public void updateEndOffsets(final TopicPartition topicPartition, final Long offset) { + highWatermark.put(topicPartition, offset); Review comment: Does this actually improve the endOffsets accuracy? This part of the code is new to me so I just had a look around, and I'm a bit confused about how it's supposed to work. Here's my reading of the code: the endOffsets are exposed to the user via the TaskMetadata, which is updated/created in StreamThread#updateThreadMetadata. That method seems to be called only when the StreamThread's state is set to `RUNNING`, which in turn only occurs when it completes restoration. So it would seem that it never gets updated during normal processing, when the end offset is actually increasing. In the ticket you say it's not updated "optimally", is it possible that it's not updated at all? Also, assuming the task metadata update is refactored to occur more regularly, I'm not sure it helps anything to update the `highWatermark` more quickly/frequently since this is only read out and used when it's returned by the `highWaterMark()` method, which is where it was being updated before. So it might be that we don't need this part of it at all, and just refactoring the TaskMetadata update alone will be sufficient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org