[
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512813#comment-16512813
]
ASF GitHub Bot commented on KAFKA-7021:
---------------------------------------
guozhangwang closed pull request #5207: KAFKA-7021: checkpoint offsets from
committed
URL: https://github.com/apache/kafka/pull/5207
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0b028e67381..0c611199d88 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -107,7 +107,6 @@ public InternalStreamsBuilder(final InternalTopologyBuilder
internalTopologyBuil
name);
internalTopologyBuilder.addStateStore(storeBuilder, name);
-
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
return kTable;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..bf6ceded143 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -164,7 +164,7 @@ public String toString(final String indent) {
return sb.toString();
}
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
return Collections.emptyMap();
}
@@ -239,7 +239,7 @@ void closeStateManager(final boolean writeCheckpoint)
throws ProcessorStateExcep
ProcessorStateException exception = null;
log.trace("Closing state manager");
try {
- stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+ stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets()
: null);
} catch (final ProcessorStateException e) {
exception = e;
} finally {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7d4b592b6a6..a9d5a93f228 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -122,7 +122,7 @@
private Map<Integer, Set<String>> nodeGroups = null;
- interface StateStoreFactory {
+ public interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
StateStore build();
@@ -1883,4 +1883,10 @@ public void updateSubscribedTopics(final Set<String>
topics, final String logPre
subscriptionUpdates.updateTopics(topics);
updateSubscriptions(subscriptionUpdates, logPrefix);
}
+
+ // following functions are for test only
+
+ public synchronized Map<String, StateStoreFactory> getStateStores() {
+ return stateFactories;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 195ea99f62d..b0761ac5507 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@
private final boolean isStandby;
private final ChangelogReader changelogReader;
private final Map<TopicPartition, Long> offsetLimits;
- private final Map<TopicPartition, Long> restoredOffsets;
+ private final Map<TopicPartition, Long> standbyRestoredOffsets;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used
for standby tasks, keyed by state topic name
private final Map<String, String> storeToChangelogTopic;
private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -79,9 +79,9 @@ public ProcessorStateManager(final TaskId taskId,
partitionForTopic.put(source.topic(), source);
}
offsetLimits = new HashMap<>();
- restoredOffsets = new HashMap<>();
+ standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
- restoreCallbacks = isStandby ? new HashMap<>() : null;
+ restoreCallbacks = isStandby ? new HashMap<String,
StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
// load the checkpoint information
@@ -168,7 +168,11 @@ public void reinitializeStateStoresForPartitions(final
Collection<TopicPartition
final int partition = getPartition(topicName);
final TopicPartition storePartition = new
TopicPartition(topicName, partition);
- partitionsAndOffsets.put(storePartition,
checkpointableOffsets.getOrDefault(storePartition, -1L));
+ if (checkpointableOffsets.containsKey(storePartition)) {
+ partitionsAndOffsets.put(storePartition,
checkpointableOffsets.get(storePartition));
+ } else {
+ partitionsAndOffsets.put(storePartition, -1L);
+ }
}
return partitionsAndOffsets;
}
@@ -207,7 +211,7 @@ public void reinitializeStateStoresForPartitions(final
Collection<TopicPartition
}
// record the restored offset for its change log partition
- restoredOffsets.put(storePartition, lastOffset + 1);
+ standbyRestoredOffsets.put(storePartition, lastOffset + 1);
return remainingRecords;
}
@@ -288,8 +292,8 @@ public void close(final Map<TopicPartition, Long>
ackedOffsets) throws Processor
// write the checkpoint
@Override
- public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
- checkpointableOffsets.putAll(changelogReader.restoredOffsets());
+ public void checkpoint(final Map<TopicPartition, Long>
checkpointableOffsets) {
+ this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
for (final StateStore store : stores.values()) {
final String storeName = store.name();
// only checkpoint the offset to the offsets file if
@@ -297,11 +301,11 @@ public void checkpoint(final Map<TopicPartition, Long>
ackedOffsets) {
if (store.persistent() &&
storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic =
storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new
TopicPartition(changelogTopic, getPartition(storeName));
- if (ackedOffsets.containsKey(topicPartition)) {
+ if (checkpointableOffsets.containsKey(topicPartition)) {
// store the last offset + 1 (the log position after
restoration)
- checkpointableOffsets.put(topicPartition,
ackedOffsets.get(topicPartition) + 1);
- } else if (restoredOffsets.containsKey(topicPartition)) {
- checkpointableOffsets.put(topicPartition,
restoredOffsets.get(topicPartition));
+ this.checkpointableOffsets.put(topicPartition,
checkpointableOffsets.get(topicPartition) + 1);
+ } else if (standbyRestoredOffsets.containsKey(topicPartition))
{
+ this.checkpointableOffsets.put(topicPartition,
standbyRestoredOffsets.get(topicPartition));
}
}
}
@@ -310,9 +314,9 @@ public void checkpoint(final Map<TopicPartition, Long>
ackedOffsets) {
checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
}
- log.trace("Writing checkpoint: {}", checkpointableOffsets);
+ log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
try {
- checkpoint.write(checkpointableOffsets);
+ checkpoint.write(this.checkpointableOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {}: {}",
checkpoint, e);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index c33ade6f36e..7623c66cd3b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -91,7 +91,7 @@ public StateDirectory(final StreamsConfig config,
* @return directory for the {@link TaskId}
* @throws ProcessorStateException if the task directory does not exists
and could not be created
*/
- File directoryForTask(final TaskId taskId) {
+ public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 21a4750f954..1f3f0b57a6a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -311,7 +311,7 @@ void commit(final boolean startNewTransaction) {
public void run() {
flushState();
if (!eosEnabled) {
- stateMgr.checkpoint(recordCollectorOffsets());
+ stateMgr.checkpoint(activeTaskCheckpointableOffsets());
}
commitOffsets(startNewTransaction);
}
@@ -322,8 +322,15 @@ public void run() {
}
@Override
- protected Map<TopicPartition, Long> recordCollectorOffsets() {
- return recordCollector.offsets();
+ protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+ final Map<TopicPartition, Long> checkpointableOffsets =
recordCollector.offsets();
+ for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
+ if (!checkpointableOffsets.containsKey(entry.getKey())) {
+ checkpointableOffsets.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return checkpointableOffsets;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2ba66a5b1bd..76190a00e2e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1186,4 +1186,8 @@ public String toString(final String indent) {
TaskManager taskManager() {
return taskManager;
}
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords()
{
+ return standbyRecords;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4a496b8dd28..e7ce819677b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -293,7 +293,26 @@ public void shouldUseDefaultNodeAndStoreNames() {
assertFalse(stores.hasNext());
assertFalse(subtopologies.hasNext());
}
-
+
+ @Test
+ public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+ final String topic = "topic";
+ builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store"));
+
+ final InternalTopologyBuilder internalTopologyBuilder =
InternalTopologyAccessor.getInternalTopologyBuilder(builder.build());
+ internalTopologyBuilder.setApplicationId("appId");
+
+ assertThat(internalTopologyBuilder.build().storeToChangelogTopic(),
equalTo(Collections.singletonMap("store", "appId-store-changelog")));
+
+ assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
+
+
+
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(true));
+
+
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
+ }
+
+
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
builder.stream(Collections.<String>emptyList());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedfdc94..9d554c5425c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -16,18 +16,15 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -44,11 +41,14 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -57,11 +57,10 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -76,32 +75,33 @@
@Category({IntegrationTest.class})
public class RestoreIntegrationTest {
private static final int NUM_BROKERS = 1;
+ private static final String APPID = "restore-test";
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private static final String INPUT_STREAM = "input-stream";
private static final String INPUT_STREAM_2 = "input-stream-2";
private final int numberOfKeys = 10000;
private KafkaStreams kafkaStreams;
- private String applicationId = "restore-test";
@BeforeClass
public static void createTopics() throws InterruptedException {
CLUSTER.createTopic(INPUT_STREAM, 2, 1);
CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+ CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
}
private Properties props(final String applicationId) {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
return streamsConfiguration;
}
@@ -112,26 +112,38 @@ public void shutdown() throws IOException {
}
}
-
@Test
- public void shouldRestoreState() throws ExecutionException,
InterruptedException {
+ public void shouldRestoreStateFromChangelogTopic() throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- createStateForRestoration();
+ final Properties props = props(APPID);
+
+ // restoring from 1000 to 5000, and then process from 5000 to 10000 on
each of the two partitions
+ final int offsetCheckpointed = 1000;
+ createStateForRestoration(APPID + "-store-changelog");
+ createStateForRestoration(INPUT_STREAM);
+
+ final StateDirectory stateDirectory = new StateDirectory(new
StreamsConfig(props), new MockTime());
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 0)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(APPID +
"-store-changelog", 0), (long) offsetCheckpointed));
+ new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new
TaskId(0, 1)), ".checkpoint"))
+ .write(Collections.singletonMap(new TopicPartition(APPID +
"-store-changelog", 1), (long) offsetCheckpointed));
- builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(),
Serdes.Integer()))
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(),
Serdes.Integer()), Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as("store"))
.toStream()
.foreach(new ForeachAction<Integer, Integer>() {
@Override
public void apply(final Integer key, final Integer value) {
- numReceived.incrementAndGet();
+ if (numReceived.incrementAndGet() == numberOfKeys)
+ shutdownLatch.countDown();
}
});
-
- final CountDownLatch startupLatch = new CountDownLatch(1);
- kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+ kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@Override
public void onChange(final KafkaStreams.State newState, final
KafkaStreams.State oldState) {
@@ -161,10 +173,11 @@ public void onRestoreEnd(final TopicPartition
topicPartition, final String store
kafkaStreams.start();
assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
- assertThat(restored.get(), equalTo((long) numberOfKeys));
- assertThat(numReceived.get(), equalTo(0));
- }
+ assertThat(restored.get(), equalTo((long) numberOfKeys - 2 *
offsetCheckpointed));
+ assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+ assertThat(numReceived.get(), equalTo(numberOfKeys));
+ }
@Test
public void shouldSuccessfullyStartWhenLoggingDisabled() throws
InterruptedException {
@@ -180,7 +193,7 @@ public Integer apply(final Integer value1, final Integer
value2) {
}, Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as("reduce-store").withLoggingDisabled());
final CountDownLatch startupLatch = new CountDownLatch(1);
- kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+ kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@Override
public void onChange(final KafkaStreams.State newState, final
KafkaStreams.State oldState) {
@@ -230,7 +243,7 @@ public void
shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedE
final Topology topology = streamsBuilder.build();
- kafkaStreams = new KafkaStreams(topology, props(applicationId +
"-logging-disabled"));
+ kafkaStreams = new KafkaStreams(topology, props(APPID +
"-logging-disabled"));
final CountDownLatch latch = new CountDownLatch(1);
kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@@ -250,6 +263,7 @@ public void onChange(final KafkaStreams.State newState,
final KafkaStreams.State
}
+
public static class KeyValueStoreProcessor implements Processor<Integer,
Integer> {
private String topic;
@@ -285,9 +299,8 @@ public void close() {
}
}
-
- private void createStateForRestoration()
- throws ExecutionException, InterruptedException {
+
+ private void createStateForRestoration(final String changelogTopic) {
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -295,31 +308,8 @@ private void createStateForRestoration()
new KafkaProducer<>(producerConfig, new
IntegerSerializer(), new IntegerSerializer())) {
for (int i = 0; i < numberOfKeys; i++) {
- producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
+ producer.send(new ProducerRecord<>(changelogTopic, i, i));
}
}
-
- final Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId);
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
-
- final Consumer consumer = new KafkaConsumer(consumerConfig);
- final List<TopicPartition> partitions = Arrays.asList(new
TopicPartition(INPUT_STREAM, 0),
- new
TopicPartition(INPUT_STREAM, 1));
-
- consumer.assign(partitions);
- consumer.seekToEnd(partitions);
-
- final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- for (TopicPartition partition : partitions) {
- final long position = consumer.position(partition);
- offsets.put(partition, new OffsetAndMetadata(position + 1));
- }
-
- consumer.commitSync(offsets);
- consumer.close();
}
-
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index b5e6fcb63b9..5fab6660c4e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -55,8 +55,8 @@ public void prepareTopology() throws InterruptedException {
appID = "table-table-join-integration-test";
builder = new StreamsBuilder();
- leftTable = builder.table(INPUT_TOPIC_LEFT);
- rightTable = builder.table(INPUT_TOPIC_RIGHT);
+ leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long,
String, KeyValueStore<Bytes, byte[]>>as("left").withLoggingDisabled());
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long,
String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
}
final private String expectedFinalJoinResult = "D-d";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index b9ba6089497..61a45f2898a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -153,7 +153,7 @@ public void
shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() thro
assertEquals(storeName, topology.stateStores().get(0).name());
assertEquals(1, topology.storeToChangelogTopic().size());
- assertEquals("topic2",
topology.storeToChangelogTopic().get(storeName));
+ assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog",
topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 4df2bd1c6a1..2373e4c69c5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -123,7 +123,7 @@ public void
shouldRestoreStoreWithBatchingRestoreSpecification() throws Exceptio
assertThat(batchingRestoreCallback.getRestoredRecords().size(),
is(1));
assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@@ -141,7 +141,7 @@ public void
shouldRestoreStoreWithSinglePutRestoreSpecification() throws Excepti
assertThat(persistentStore.keys.size(), is(1));
assertTrue(persistentStore.keys.contains(intKey));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@@ -169,7 +169,7 @@ public void testRegisterPersistentStore() throws
IOException {
stateMgr.register(persistentStore,
persistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new
TopicPartition(persistentStoreTopicName, 2)));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@@ -196,7 +196,7 @@ public void testRegisterNonPersistentStore() throws
IOException {
stateMgr.register(nonPersistentStore,
nonPersistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new
TopicPartition(nonPersistentStoreTopicName, 2)));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@@ -257,7 +257,7 @@ public void testChangeLogOffsets() throws IOException {
assertEquals(-1L, (long) changeLogOffsets.get(partition3));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@@ -269,7 +269,7 @@ public void testGetStore() throws IOException {
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -280,13 +280,13 @@ public void testGetStore() throws IOException {
assertEquals(mockStateStore,
stateMgr.getStore(nonPersistentStoreName));
} finally {
- stateMgr.close(Collections.emptyMap());
+ stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
}
@Test
public void testFlushAndClose() throws IOException {
- checkpoint.write(Collections.emptyMap());
+ checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
// set up ack'ed offsets
final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
@@ -339,7 +339,7 @@ public void
shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throw
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -358,7 +358,7 @@ public void shouldNotChangeOffsetsIfAckedOffsetsIsNull()
throws IOException {
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -408,7 +408,7 @@ public void shouldWriteCheckpointForStandbyReplica() throws
IOException {
bytes,
bytes)));
- stateMgr.checkpoint(Collections.emptyMap());
+ stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
final Map<TopicPartition, Long> read = checkpoint.read();
assertThat(read,
equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
@@ -433,7 +433,7 @@ public void shouldNotWriteCheckpointForNonPersistent()
throws IOException {
stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(Collections.emptyMap()));
+ assertThat(read, equalTo(Collections.<TopicPartition,
Long>emptyMap()));
}
@Test
@@ -443,7 +443,7 @@ public void
shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOEx
noPartitions,
true, // standby
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -453,7 +453,7 @@ public void
shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOEx
stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition,
987L));
final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(Collections.emptyMap()));
+ assertThat(read, equalTo(Collections.<TopicPartition,
Long>emptyMap()));
}
@Test
@@ -463,7 +463,7 @@ public void
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFile
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -483,7 +483,7 @@ public void
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeen
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
false,
logContext);
@@ -550,7 +550,7 @@ public void close() {
stateManager.register(stateStore, stateStore.stateRestoreCallback);
try {
- stateManager.close(Collections.emptyMap());
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
fail("Should throw ProcessorStateException if store close throws
exception");
} catch (final ProcessorStateException e) {
// pass
@@ -622,7 +622,7 @@ public void close() {
stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
try {
- stateManager.close(Collections.emptyMap());
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
} catch (final ProcessorStateException expected) { /* ignode */ }
Assert.assertTrue(closedStore.get());
}
@@ -639,7 +639,7 @@ public void
shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOExceptio
noPartitions,
false,
stateDirectory,
- Collections.emptyMap(),
+ Collections.<String, String>emptyMap(),
changelogReader,
true,
logContext);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a71aaad805a..73761eb6548 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -40,11 +40,13 @@
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -55,6 +57,8 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -65,6 +69,8 @@
import java.util.Set;
import java.util.UUID;
+import static java.util.Collections.singletonList;
+import static
org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -100,13 +106,16 @@ public void setUp() {
}
private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+ private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
// task0 is unused
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
+ private final TaskId task3 = new TaskId(1, 1);
private Properties configProps(final boolean enableEos) {
return new Properties() {
@@ -789,6 +798,75 @@ public void
shouldReturnStandbyTaskMetadataWhileRunningState() {
assertTrue(threadMetadata.activeTasks().isEmpty());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldUpdateStandbyTask() throws IOException {
+ final String storeName1 = "count-one";
+ final String storeName2 = "table-two";
+ final String changelogName1 = applicationId + "-" + storeName1 +
"-changelog";
+ final String changelogName2 = applicationId + "-" + storeName2 +
"-changelog";
+ final TopicPartition partition1 = new TopicPartition(changelogName1,
1);
+ final TopicPartition partition2 = new TopicPartition(changelogName2,
1);
+ internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+ .groupByKey().count(Materialized.<Object, Long,
KeyValueStore<Bytes, byte[]>>as(storeName1));
+ final MaterializedInternal materialized = new
MaterializedInternal(Materialized.<Object, Long, KeyValueStore<Bytes,
byte[]>>as(storeName2),
+ internalStreamsBuilder, "");
+ internalStreamsBuilder.table(topic2, new ConsumedInternal(),
materialized);
+
+ final StreamThread thread = createStreamThread(clientId, config,
false);
+ final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
+ restoreConsumer.updatePartitions(changelogName1,
+ singletonList(
+ new PartitionInfo(
+ changelogName1,
+ 1,
+ null,
+ new Node[0],
+ new Node[0]
+ )
+ )
+ );
+
+ restoreConsumer.assign(Utils.mkSet(partition1, partition2));
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1,
10L));
+
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1,
0L));
+ restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2,
10L));
+
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2,
0L));
+ // let the store1 be restored from 0 to 10; store2 be restored from 5
(checkpointed) to 10
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+ checkpoint.write(Collections.singletonMap(partition2, 5L));
+
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1,
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+ }
+
+ thread.setState(StreamThread.State.RUNNING);
+
+ thread.rebalanceListener.onPartitionsRevoked(null);
+
+ final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+ // assign single partition
+ standbyTasks.put(task1, Collections.singleton(t1p1));
+ standbyTasks.put(task3, Collections.singleton(t2p1));
+
+ thread.taskManager().setAssignmentMetadata(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks);
+
+
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+
+ thread.runOnce(-1);
+
+ final StandbyTask standbyTask1 =
thread.taskManager().standbyTask(partition1);
+ final StandbyTask standbyTask2 =
thread.taskManager().standbyTask(partition2);
+ final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object,
Long>) standbyTask1.getStore(storeName1);
+ final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object,
Long>) standbyTask2.getStore(storeName2);
+
+ assertEquals(10L, store1.approximateNumEntries());
+ assertEquals(5L, store2.approximateNumEntries());
+ assertEquals(0, thread.standbyRecords().size());
+ }
+
@Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
final StreamThread thread = createStreamThread(clientId, config,
false);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Source KTable checkpoint is not correct
> ---------------------------------------
>
> Key: KAFKA-7021
> URL: https://issues.apache.org/jira/browse/KAFKA-7021
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Matthias J. Sax
> Assignee: Guozhang Wang
> Priority: Major
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Kafka Streams treats source KTables,ie, table created via `builder.table()`,
> differently. Instead of creating a changelog topic, the original source topic
> is use to avoid unnecessary data redundancy.
> However, Kafka Streams does not write a correct local state checkpoint file.
> This results in unnecessary state restore after a rebalance. Instead of the
> latest committed offset, the latest restored offset is written into the
> checkpoint file in `ProcessorStateManager#close()`
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)