This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 95378f0 KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900) 95378f0 is described below commit 95378f0dd3bb1293f2e0d19a54d4a2febfc14e7b Author: Gitomain <lingxiaowan...@gmail.com> AuthorDate: Tue Jun 12 20:54:07 2018 +0200 KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900) Reviewer: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .gitignore | 1 + kafka | 1 + .../internals/GlobalStateManagerImpl.java | 2 +- ...st.java => GlobalKTableEOSIntegrationTest.java} | 117 +++++++++++++++------ .../integration/GlobalKTableIntegrationTest.java | 60 ++--------- .../integration/utils/IntegrationTestUtils.java | 35 ++++-- .../processor/internals/StreamTaskTest.java | 6 +- 7 files changed, 127 insertions(+), 95 deletions(-) diff --git a/.gitignore b/.gitignore index 6088349..964c8f6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ dist *classes +*.class target/ build/ build_eclipse/ diff --git a/kafka b/kafka new file mode 160000 index 0000000..cc43e77 --- /dev/null +++ b/kafka @@ -0,0 +1 @@ +Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1 diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 6052f96..036bb1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -192,8 +192,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager { if (record.key() != null) { restoreRecords.add(KeyValue.pair(record.key(), record.value())); } - offset = consumer.position(topicPartition); } + offset = consumer.position(topicPartition); stateRestoreAdapter.restoreAll(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java similarity index 77% copy from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java copy to streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index ba8841a..9c202f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -58,7 +57,7 @@ import java.util.Map; import java.util.Properties; @Category({IntegrationTest.class}) -public class GlobalKTableIntegrationTest { +public class GlobalKTableEOSIntegrationTest { private static final int NUM_BROKERS = 1; private static final Properties BROKER_CONFIG; static { @@ -85,17 +84,15 @@ public class GlobalKTableIntegrationTest { return value1 + "+" + value2; } }; + private final String globalStore = "globalStore"; + private final Map<String, String> results = new HashMap<>(); private StreamsBuilder builder; private Properties streamsConfiguration; private KafkaStreams kafkaStreams; - private String globalOne; - private String inputStream; - private String inputTable; - private final String globalStore = "globalStore"; + private String globalTableTopic; + private String streamTopic; private GlobalKTable<Long, String> globalTable; private KStream<String, Long> stream; - private KTable<String, Long> table; - final Map<String, String> results = new HashMap<>(); private ForeachAction<String, String> foreachAction; @Before @@ -104,22 +101,21 @@ public class GlobalKTableIntegrationTest { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); - final String applicationId = "globalOne-table-test-" + testNo; + final String applicationId = "globalTableTopic-table-eos-test-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - globalTable = builder.globalTable(globalOne, Consumed.with(Serdes.Long(), Serdes.String()), + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.String())); final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); - stream = builder.stream(inputStream, stringLongConsumed); - table = builder.table(inputTable, stringLongConsumed); + stream = builder.stream(streamTopic, stringLongConsumed); foreachAction = new ForeachAction<String, String>() { @Override public void apply(final String key, final String value) { @@ -142,7 +138,7 @@ public class GlobalKTableIntegrationTest { streamTableJoin.foreach(foreachAction); produceInitialGlobalTableValues(); startStreams(); - produceTopicValues(inputStream); + produceTopicValues(streamTopic); final Map<String, String> expected = new HashMap<>(); expected.put("a", "1+A"); @@ -169,7 +165,7 @@ public class GlobalKTableIntegrationTest { return "J".equals(replicatedStore.get(5L)); } }, 30000, "waiting for data in replicated store"); - produceTopicValues(inputStream); + produceTopicValues(streamTopic); expected.put("a", "1+F"); expected.put("b", "2+G"); @@ -191,7 +187,7 @@ public class GlobalKTableIntegrationTest { streamTableJoin.foreach(foreachAction); produceInitialGlobalTableValues(); startStreams(); - produceTopicValues(inputStream); + produceTopicValues(streamTopic); final Map<String, String> expected = new HashMap<>(); expected.put("a", "1+A"); @@ -218,7 +214,7 @@ public class GlobalKTableIntegrationTest { } }, 30000, "waiting for data in replicated store"); - produceTopicValues(inputStream); + produceTopicValues(streamTopic); expected.put("a", "1+F"); expected.put("b", "2+G"); @@ -236,7 +232,8 @@ public class GlobalKTableIntegrationTest { @Test public void shouldRestoreTransactionalMessages() throws Exception { - produceInitialGlobalTableValues(true); + produceInitialGlobalTableValues(); + startStreams(); final Map<Long, String> expected = new HashMap<>(); @@ -263,17 +260,49 @@ public class GlobalKTableIntegrationTest { return result.equals(expected); } }, 30000L, "waiting for initial values"); - System.out.println("no failed test"); } + + @Test + public void shouldNotRestoreAbortedMessages() throws Exception { + produceAbortedMessages(); + produceInitialGlobalTableValues(); + produceAbortedMessages(); - private void createTopics() throws InterruptedException { - inputStream = "input-stream-" + testNo; - inputTable = "input-table-" + testNo; - globalOne = "globalOne-" + testNo; - CLUSTER.createTopics(inputStream, inputTable); - CLUSTER.createTopic(globalOne, 2, 1); + startStreams(); + + final Map<Long, String> expected = new HashMap<>(); + expected.put(1L, "A"); + expected.put(2L, "B"); + expected.put(3L, "C"); + expected.put(4L, "D"); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + ReadOnlyKeyValueStore<Long, String> store = null; + try { + store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); + } catch (InvalidStateStoreException ex) { + return false; + } + Map<Long, String> result = new HashMap<>(); + Iterator<KeyValue<Long, String>> it = store.all(); + while (it.hasNext()) { + KeyValue<Long, String> kv = it.next(); + result.put(kv.key, kv.value); + } + return result.equals(expected); + } + }, 30000L, "waiting for initial values"); } + private void createTopics() throws InterruptedException { + streamTopic = "stream-" + testNo; + globalTableTopic = "globalTable-" + testNo; + CLUSTER.createTopics(streamTopic); + CLUSTER.createTopic(globalTableTopic, 2, 1); + } + private void startStreams() { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); @@ -296,23 +325,43 @@ public class GlobalKTableIntegrationTest { mockTime); } - private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException { - produceInitialGlobalTableValues(false); + private void produceAbortedMessages() throws Exception { + final Properties properties = new Properties(); + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + properties.put(ProducerConfig.RETRIES_CONFIG, 1); + IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp( + globalTableTopic, Arrays.asList( + new KeyValue<>(1L, "A"), + new KeyValue<>(2L, "B"), + new KeyValue<>(3L, "C"), + new KeyValue<>(4L, "D") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + StringSerializer.class, + properties), + mockTime.milliseconds()); } - private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException { - Properties properties = new Properties(); + private void produceInitialGlobalTableValues() throws Exception { + produceInitialGlobalTableValues(true); + } + + private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { + final Properties properties = new Properties(); if (enableTransactions) { properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); properties.put(ProducerConfig.RETRIES_CONFIG, 1); } IntegrationTestUtils.produceKeyValuesSynchronously( - globalOne, + globalTableTopic, Arrays.asList( new KeyValue<>(1L, "A"), new KeyValue<>(2L, "B"), new KeyValue<>(3L, "C"), - new KeyValue<>(4L, "D")), + new KeyValue<>(4L, "D") + ), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, @@ -324,7 +373,7 @@ public class GlobalKTableIntegrationTest { private void produceGlobalTableValues() throws Exception { IntegrationTestUtils.produceKeyValuesSynchronously( - globalOne, + globalTableTopic, Arrays.asList( new KeyValue<>(1L, "F"), new KeyValue<>(2L, "G"), @@ -338,6 +387,4 @@ public class GlobalKTableIntegrationTest { new Properties()), mockTime); } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index ba8841a..0816aba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.ForeachAction; @@ -53,23 +52,16 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Properties; @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { private static final int NUM_BROKERS = 1; - private static final Properties BROKER_CONFIG; - static { - BROKER_CONFIG = new Properties(); - BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1); - BROKER_CONFIG.put("transaction.state.log.min.isr", 1); - } @ClassRule public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile int testNo = 0; private final MockTime mockTime = CLUSTER.time; @@ -233,39 +225,7 @@ public class GlobalKTableIntegrationTest { } }, 30000L, "waiting for final values"); } - - @Test - public void shouldRestoreTransactionalMessages() throws Exception { - produceInitialGlobalTableValues(true); - startStreams(); - - final Map<Long, String> expected = new HashMap<>(); - expected.put(1L, "A"); - expected.put(2L, "B"); - expected.put(3L, "C"); - expected.put(4L, "D"); - - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - ReadOnlyKeyValueStore<Long, String> store = null; - try { - store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); - } catch (InvalidStateStoreException ex) { - return false; - } - Map<Long, String> result = new HashMap<>(); - Iterator<KeyValue<Long, String>> it = store.all(); - while (it.hasNext()) { - KeyValue<Long, String> kv = it.next(); - result.put(kv.key, kv.value); - } - return result.equals(expected); - } - }, 30000L, "waiting for initial values"); - System.out.println("no failed test"); - } - + private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; @@ -273,7 +233,7 @@ public class GlobalKTableIntegrationTest { CLUSTER.createTopics(inputStream, inputTable); CLUSTER.createTopic(globalOne, 2, 1); } - + private void startStreams() { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); @@ -296,11 +256,11 @@ public class GlobalKTableIntegrationTest { mockTime); } - private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException { + private void produceInitialGlobalTableValues() throws Exception { produceInitialGlobalTableValues(false); } - private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException { + private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { Properties properties = new Properties(); if (enableTransactions) { properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); @@ -312,14 +272,14 @@ public class GlobalKTableIntegrationTest { new KeyValue<>(1L, "A"), new KeyValue<>(2L, "B"), new KeyValue<>(3L, "C"), - new KeyValue<>(4L, "D")), + new KeyValue<>(4L, "D") + ), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, - StringSerializer.class, - properties), - mockTime, - enableTransactions); + StringSerializer.class + ), + mockTime); } private void produceGlobalTableValues() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index e8cd59e..304a3e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -140,16 +140,38 @@ public class IntegrationTestUtils { producer.flush(); } } + + public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection<KeyValue<K, V>> records, + final Properties producerConfig, + final Long timestamp) + throws ExecutionException, InterruptedException { + try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) { + producer.initTransactions(); + for (final KeyValue<K, V> record : records) { + producer.beginTransaction(); + final Future<RecordMetadata> f = producer + .send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); + f.get(); + producer.abortTransaction(); + } + } + } - public static <V> void produceValuesSynchronously( - final String topic, final Collection<V> records, final Properties producerConfig, final Time time) + public static <V> void produceValuesSynchronously(final String topic, + final Collection<V> records, + final Properties producerConfig, + final Time time) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false); } - public static <V> void produceValuesSynchronously( - final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions) - throws ExecutionException, InterruptedException { + public static <V> void produceValuesSynchronously(final String topic, + final Collection<V> records, + final Properties producerConfig, + final Time time, + final boolean enableTransactions) + throws ExecutionException, InterruptedException { final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>(); for (final V value : records) { final KeyValue<Object, V> kv = new KeyValue<>(null, value); @@ -161,10 +183,9 @@ public class IntegrationTestUtils { public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { - return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } - + /** * Wait until enough data (key-value records) has been consumed. * diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index caa6cb7..3321da5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -905,10 +905,12 @@ public class StreamTaskTest { @Test public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() { - task = createStatelessTask(true); + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); - assertTrue(!producer.transactionInFlight()); task.close(false, false); + task = null; } @Test -- To stop receiving notification emails like this one, please contact mj...@apache.org.