This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 87d493f KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842) 87d493f is described below commit 87d493f07219a215816783557a5981a74f192f06 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Sat Jun 1 19:12:58 2019 -0700 KAFKA-8446: Kafka Streams restoration crashes with NPE when the record value is null (#6842) When the restored record value is null, we are in danger of NPE during restoration phase. Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/state/internals/RecordConverters.java | 11 +- .../StateRestorationIntegrationTest.java | 122 +++++++++++++++++++++ .../state/internals/RecordConvertersTest.java | 49 +++++++++ 3 files changed, 177 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java index f65cc32..8305d52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java @@ -27,6 +27,11 @@ public final class RecordConverters { private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> { final byte[] rawValue = record.value(); final long timestamp = record.timestamp(); + final byte[] recordValue = rawValue == null ? null : + ByteBuffer.allocate(8 + rawValue.length) + .putLong(timestamp) + .put(rawValue) + .array(); return new ConsumerRecord<>( record.topic(), record.partition(), @@ -37,11 +42,7 @@ public final class RecordConverters { record.serializedKeySize(), record.serializedValueSize(), record.key(), - ByteBuffer - .allocate(8 + rawValue.length) - .putLong(timestamp) - .put(rawValue) - .array(), + recordValue, record.headers(), record.leaderEpoch() ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java new file mode 100644 index 0000000..e22ff4f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +@Category({IntegrationTest.class}) +public class StateRestorationIntegrationTest { + private StreamsBuilder builder = new StreamsBuilder(); + + private static final String APPLICATION_ID = "restoration-test-app"; + private static final String STATE_STORE_NAME = "stateStore"; + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private Properties streamsConfiguration; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private final MockTime mockTime = CLUSTER.time; + + @Before + public void setUp() throws Exception { + final Properties props = new Properties(); + + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + APPLICATION_ID, + CLUSTER.bootstrapServers(), + Serdes.Integer().getClass().getName(), + Serdes.ByteArray().getClass().getName(), + props); + + CLUSTER.createTopics(INPUT_TOPIC); + CLUSTER.createTopics(OUTPUT_TOPIC); + + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldRestoreNullRecord() throws InterruptedException, ExecutionException { + builder.table(INPUT_TOPIC, Materialized.<Integer, Bytes>as( + Stores.persistentTimestampedKeyValueStore(STATE_STORE_NAME)) + .withKeySerde(Serdes.Integer()) + .withValueSerde(Serdes.Bytes()) + .withCachingDisabled()).toStream().to(OUTPUT_TOPIC); + + final Properties producerConfig = TestUtils.producerConfig( + CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class); + + final List<KeyValue<Integer, Bytes>> initialKeyValues = Arrays.asList( + KeyValue.pair(3, new Bytes(new byte[]{3})), + KeyValue.pair(3, null), + KeyValue.pair(1, new Bytes(new byte[]{1}))); + + IntegrationTestUtils.produceKeyValuesSynchronously( + INPUT_TOPIC, initialKeyValues, producerConfig, mockTime); + + KafkaStreams streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration); + streams.start(); + + final Properties consumerConfig = TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class); + + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + consumerConfig, OUTPUT_TOPIC, initialKeyValues); + + // wipe out state store to trigger restore process on restart + streams.close(); + streams.cleanUp(); + + // Restart the stream instance. There should not be exception handling the null value within changelog topic. + final List<KeyValue<Integer, Bytes>> newKeyValues = + Collections.singletonList(KeyValue.pair(2, new Bytes(new byte[3]))); + IntegrationTestUtils.produceKeyValuesSynchronously( + INPUT_TOPIC, newKeyValues, producerConfig, mockTime); + streams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration); + streams.start(); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + consumerConfig, OUTPUT_TOPIC, newKeyValues); + streams.close(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java new file mode 100644 index 0000000..bacbacd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; + +public class RecordConvertersTest { + + private final RecordConverter timestampedValueConverter = rawValueToTimestampedValue(); + + @Test + public void shouldPreserveNullValueOnConversion() { + final ConsumerRecord<byte[], byte[]> nullValueRecord = new ConsumerRecord<>("", 0, 0L, new byte[0], null); + assertNull(timestampedValueConverter.convert(nullValueRecord).value()); + } + + @Test + public void shouldAddTimestampToValueOnConversionWhenValueIsNotNull() { + final long timestamp = 10L; + final byte[] value = new byte[1]; + final ConsumerRecord<byte[], byte[]> inputRecord = new ConsumerRecord<>( + "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], value); + final byte[] expectedValue = ByteBuffer.allocate(9).putLong(timestamp).put(value).array(); + final byte[] actualValue = timestampedValueConverter.convert(inputRecord).value(); + assertArrayEquals(expectedValue, actualValue); + } +}