Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-28 Thread via GitHub


wcarlson5 merged PR #15414:
URL: https://github.com/apache/kafka/pull/15414


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-28 Thread via GitHub


wcarlson5 commented on PR #15414:
URL: https://github.com/apache/kafka/pull/15414#issuecomment-2025500658

   Test configuration issues in the build is in `core:test' and not related to 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-26 Thread via GitHub


mjsax commented on PR #15414:
URL: https://github.com/apache/kafka/pull/15414#issuecomment-2021597301

   Jenkins needs a re-run...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-26 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1540190176


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -252,6 +252,11 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+
+//Visible for testing
+public void setDeserializationExceptionHandler(final 
DeserializationExceptionHandler deserializationExceptionHandler) {
+this.deserializationExceptionHandler = deserializationExceptionHandler;
+}
 @SuppressWarnings("unchecked")

Review Comment:
   nit: missing empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-25 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1538043755


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +252,91 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+try {
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
+
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
+record.timestamp(),
+record.headers()));
+restoreCount++;
+}
+} catch (final Exception deserializationException) {
+handleDeserializationFailure(

Review Comment:
   If you still think we need to refactor RecordDeserializer I think we should 
do that in a follow up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on PR #15414:
URL: https://github.com/apache/kafka/pull/15414#issuecomment-2004539886

   @mjsax I added some testing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528822628


##
streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+@Timeout(600)
+@Tag("integration")
+public class GlobalStateReprocessTest {
+private static final int NUM_BROKERS = 1;
+private static final Properties BROKER_CONFIG;
+
+static {
+BROKER_CONFIG = new Properties();
+BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
+BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+}
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+@BeforeAll
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private final MockTime mockTime = CLUSTER.time;
+private final String globalStore = "globalStore";
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String globalStoreTopic;
+
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws Exception {
+builder = new StreamsBuilder();
+
+createTopics();
+streamsConfiguration = new Properties();
+final String safeTestName = safeUniqueTestName(testInfo);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+
+final KeyValueStoreBuilder storeBuilder = new 
KeyValueStoreBuilder<>(
+

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528810589


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +252,91 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+try {
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
+
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
+record.timestamp(),
+record.headers()));
+restoreCount++;
+}
+} catch (final Exception deserializationException) {
+handleDeserializationFailure(

Review Comment:
   That was my first thought too. Maybe we could refactor it a bit more, but a 
`RecordDeserializer` wants a `SourceNode`. It seemed like that would be 
changing more surfaces than necessary. But we can



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-18 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1528807717


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##
@@ -68,40 +68,49 @@ ConsumerRecord deserialize(final 
ProcessorContext processo
 Optional.empty()
 );
 } catch (final Exception deserializationException) {
-final 
DeserializationExceptionHandler.DeserializationHandlerResponse response;
-try {
-response = deserializationExceptionHandler.handle(
-(InternalProcessorContext) processorContext,
-rawRecord,
-deserializationException);
-} catch (final Exception fatalUserException) {
-log.error(
-"Deserialization error callback failed after 
deserialization error for record {}",
-rawRecord,
-deserializationException);
-throw new StreamsException("Fatal user code error in 
deserialization error callback", fatalUserException);
-}
+handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor);
+return null;

Review Comment:
   yeah we can add a note



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-16 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1527393295


##
streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+@Timeout(600)
+@Tag("integration")
+public class GlobalStateReprocessTest {
+private static final int NUM_BROKERS = 1;
+private static final Properties BROKER_CONFIG;
+
+static {
+BROKER_CONFIG = new Properties();
+BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
+BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+}
+
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+
+@BeforeAll
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+
+private final MockTime mockTime = CLUSTER.time;
+private final String globalStore = "globalStore";
+private StreamsBuilder builder;
+private Properties streamsConfiguration;
+private KafkaStreams kafkaStreams;
+private String globalStoreTopic;
+
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws Exception {
+builder = new StreamsBuilder();
+
+createTopics();
+streamsConfiguration = new Properties();
+final String safeTestName = safeUniqueTestName(testInfo);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+
+final KeyValueStoreBuilder storeBuilder = new 
KeyValueStoreBuilder<>(
+

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-15 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1527018572


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),

Review Comment:
   Just refactored the RecordDeserializer and reused the same logic in there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-15 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526870886


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),

Review Comment:
   I did think about this approach, too, however, I think we should not 
auto-drop. Assume, somebody is using a SR and when the app is running and 
pushing data into the store, everything just works. Later, KS crashes and we 
need to restore global state, but during the restore process there is some 
issue with the SR and deserialization fails. If we auto-drop, we would get data 
loss, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-15 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526811182


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),

Review Comment:
   should we just drop them? I'm actually not sure about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-15 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526810331


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);

Review Comment:
   should be store name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-15 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526795838


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);
+
+long restoreCount = 0L;
+
+while (offset < highWatermark) {
+// we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+// to give a fetch request a fair chance to actually complete 
and we don't want to
+// start `task.timeout.ms` too early
+//
+// TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+//  `poll(pollMS)` without adding the request timeout and 
do a more precise
+//  timeout handling
+final ConsumerRecords records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+if (records.isEmpty()) {
+currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+} else {
+currentDeadline = NO_DEADLINE;
+}
+
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+final ProcessorRecordContext recordContext =
+new ProcessorRecordContext(
+record.timestamp(),
+record.offset(),
+record.partition(),
+record.topic(),
+record.headers());
+globalProcessorContext.setRecordContext(recordContext);
+
+if (record.key() != null) {
+source.process(new Record<>(
+
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),

Review Comment:
   What about deserialization errors? Seems we should apply the regular 
exception handler for it?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -236,6 +247,76 @@ private List topicPartitionsForStore(final 
StateStore store) {
 }
 return topicPartitions;
 }
+@SuppressWarnings("unchecked")
+private void reprocessState(final List topicPartitions,
+final Map highWatermarks,
+final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+final String storeName) {
+final Processor source = reprocessFactory.processorSupplier().get();
+source.init(globalProcessorContext);
+
+for (final TopicPartition topicPartition : topicPartitions) {
+long currentDeadline = NO_DEADLINE;
+
+globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
+final Long checkpoint = checkpointFileCache.get(topicPartition);
+if (checkpoint != null) {
+globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
+} else {
+
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = getGlobalConsumerOffset(topicPartition);
+}
+final Long highWatermark = highWatermarks.get(topicPartition);
+stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);

Review Comment:
   Why do we pass 

Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-14 Thread via GitHub


wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1525585101


##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java:
##
@@ -76,6 +76,8 @@ public void init(final ProcessorContext context,
 false
 );
 // register the store
+open = true;

Review Comment:
   The store needs to be open for the processor to write to it when starting 
up. This actually matches the rocks store where it is opened right before 
registering instead of afterwards



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]

2024-03-01 Thread via GitHub


mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1509703745


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##
@@ -203,13 +202,27 @@ public void registerStore(final StateStore store,
 );
 
 try {
-restoreState(
-stateRestoreCallback,
-topicPartitions,
-highWatermarks,
-store.name(),
-converterForStore(store)
-);
+if 
(topology.storeNameToReprocessOnRestore().getOrDefault(store.name(), false)) {
+globalConsumer.assign(topicPartitions);
+globalConsumer.seekToBeginning(topicPartitions);
+for (final TopicPartition topicPartition : topicPartitions) {
+stateRestoreListener.onRestoreStart(topicPartition, 
store.name(),
+checkpointFileCache.getOrDefault(topicPartition, 0L),
+checkpointFileCache.getOrDefault(topicPartition, 0L));
+stateRestoreListener.onRestoreEnd(topicPartition, 
store.name(), 0L);

Review Comment:
   Where does the actual restore happen?
   
   Note that the original `restoreState()` is the "bootstrapping phase" before 
we move to `RUNNING`, and we should preserve this behavior. It seem, your PR 
basically skips the bootstrapping, and relies on the regular processing to 
re-read the data? For this case, we would go to `RUNNING` with an empty global 
store and thus lookups might fail as the data is not loaded yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org