This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dcaf95a35f4 KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249) dcaf95a35f4 is described below commit dcaf95a35f4ba7de73433aa5a5b1a33427744e45 Author: Victoria Xia <victoria....@confluent.io> AuthorDate: Wed Feb 15 18:07:47 2023 -0800 KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249) Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store). Part of KIP-889. Reviewers: Matthias J. Sax <matth...@confluent.io> --- gradle/spotbugs-exclude.xml | 7 ++ .../kafka/streams/state/ValueAndTimestamp.java | 14 ++- .../NullableValueAndTimestampDeserializer.java | 105 +++++++++++++++++++++ .../internals/NullableValueAndTimestampSerde.java | 89 +++++++++++++++++ .../NullableValueAndTimestampSerializer.java | 92 ++++++++++++++++++ .../NullableValueAndTimestampSerdeTest.java | 72 ++++++++++++++ 6 files changed, 378 insertions(+), 1 deletion(-) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index efc1f03b950..e040e54d306 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -488,6 +488,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read <!-- END Suppress warnings for unused members that are undetectably used by Jackson --> + <Match> + <!-- Boolean deserializer intentionally returns null on null input. --> + <Class name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/> + <Method name="deserialize"/> + <Bug pattern="NP_BOOLEAN_RETURN_NULL"/> + </Match> + <Match> <!-- Suppress a warning about ignoring return value because this is intentional. --> <Class name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java index 1cf727712b2..227021d6cf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java @@ -31,7 +31,6 @@ public final class ValueAndTimestamp<V> { private ValueAndTimestamp(final V value, final long timestamp) { - Objects.requireNonNull(value); this.value = value; this.timestamp = timestamp; } @@ -50,6 +49,19 @@ public final class ValueAndTimestamp<V> { return value == null ? null : new ValueAndTimestamp<>(value, timestamp); } + /** + * Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}. + * + * @param value the value + * @param timestamp the timestamp + * @param <V> the type of the value + * @return a new {@link ValueAndTimestamp} instance + */ + public static <V> ValueAndTimestamp<V> makeAllowNullable( + final V value, final long timestamp) { + return new ValueAndTimestamp<>(value, timestamp); + } + /** * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter * if the parameter is not {@code null}. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java new file mode 100644 index 00000000000..7d70708ff12 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer; +import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH; +import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH; + +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer; + +/** + * See {@link NullableValueAndTimestampSerde}. + */ +public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> { + public final Deserializer<V> valueDeserializer; + private final Deserializer<Long> timestampDeserializer; + private final Deserializer<Boolean> booleanDeserializer; + + NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) { + this.valueDeserializer = Objects.requireNonNull(valueDeserializer); + timestampDeserializer = new LongDeserializer(); + booleanDeserializer = new BooleanDeserializer(); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + valueDeserializer.configure(configs, isKey); + timestampDeserializer.configure(configs, isKey); + booleanDeserializer.configure(configs, isKey); + } + + @Override + public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) { + if (rawValueAndTimestamp == null) { + return null; + } + + final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp)); + final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp)); + if (isTombstone) { + return ValueAndTimestamp.makeAllowNullable(null, timestamp); + } else { + final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp)); + if (value == null) { + throw new SerializationException("Deserializer cannot deserialize non-null bytes as null"); + } + return ValueAndTimestamp.make(value, timestamp); + } + } + + @Override + public void close() { + valueDeserializer.close(); + timestampDeserializer.close(); + booleanDeserializer.close(); + } + + @Override + public void setIfUnset(final SerdeGetter getter) { + // NullableValueAndTimestampDeserializer never wraps a null deserializer (or configure would throw), + // but it may wrap a deserializer that itself wraps a null deserializer. + initNullableDeserializer(valueDeserializer, getter); + } + + private static byte[] rawTimestamp(final byte[] rawValueAndTimestamp) { + final byte[] rawTimestamp = new byte[RAW_TIMESTAMP_LENGTH]; + System.arraycopy(rawValueAndTimestamp, 0, rawTimestamp, 0, RAW_TIMESTAMP_LENGTH); + return rawTimestamp; + } + + private static byte[] rawIsTombstone(final byte[] rawValueAndTimestamp) { + final byte[] rawIsTombstone = new byte[RAW_BOOLEAN_LENGTH]; + System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH, rawIsTombstone, 0, RAW_BOOLEAN_LENGTH); + return rawIsTombstone; + } + + private static byte[] rawValue(final byte[] rawValueAndTimestamp) { + final int rawValueLength = rawValueAndTimestamp.length - RAW_TIMESTAMP_LENGTH - RAW_BOOLEAN_LENGTH; + final byte[] rawValue = new byte[rawValueLength]; + System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH, rawValue, 0, rawValueLength); + return rawValue; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java new file mode 100644 index 00000000000..090a4daa35c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +/** + * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing + * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}. + * <p> + * The serialized format is: + * <pre> + * <timestamp> + <bool indicating whether value is null> + <raw value> + * </pre> + * where the boolean is needed in order to distinguish between null and empty values (i.e., between + * tombstones and {@code byte[0]} values). + */ +public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> { + + static final int RAW_TIMESTAMP_LENGTH = 8; + static final int RAW_BOOLEAN_LENGTH = 1; + + public NullableValueAndTimestampSerde(final Serde<V> valueSerde) { + super( + new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()), + new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer()) + ); + } + + static final class BooleanSerde { + private static final byte TRUE = 0x01; + private static final byte FALSE = 0x00; + + static class BooleanSerializer implements Serializer<Boolean> { + @Override + public byte[] serialize(final String topic, final Boolean data) { + if (data == null) { + return null; + } + + return new byte[] { + data ? TRUE : FALSE + }; + } + } + + static class BooleanDeserializer implements Deserializer<Boolean> { + @Override + public Boolean deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + + if (data.length != 1) { + throw new SerializationException("Size of data received by BooleanDeserializer is not 1"); + } + + if (data[0] == TRUE) { + return true; + } else if (data[0] == FALSE) { + return false; + } else { + throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]); + } + } + } + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java new file mode 100644 index 00000000000..93eaca0ff21 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer; +import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH; +import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer; + +/** + * See {@link NullableValueAndTimestampSerde}. + */ +public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> { + public final Serializer<V> valueSerializer; + private final Serializer<Long> timestampSerializer; + private final Serializer<Boolean> booleanSerializer; + + NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) { + this.valueSerializer = Objects.requireNonNull(valueSerializer); + timestampSerializer = new LongSerializer(); + booleanSerializer = new BooleanSerializer(); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + valueSerializer.configure(configs, isKey); + timestampSerializer.configure(configs, isKey); + booleanSerializer.configure(configs, isKey); + } + + @Override + public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) { + if (data == null) { + return null; + } + final byte[] rawValue = valueSerializer.serialize(topic, data.value()); + final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null); + final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp()); + if (rawIsTombstone.length != RAW_BOOLEAN_LENGTH) { + throw new SerializationException("Unexpected length for serialized boolean: " + rawIsTombstone.length); + } + if (rawTimestamp.length != RAW_TIMESTAMP_LENGTH) { + throw new SerializationException("Unexpected length for serialized timestamp: " + rawTimestamp.length); + } + + final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue; + return ByteBuffer + .allocate(RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH + nonNullRawValue.length) + .put(rawTimestamp) + .put(rawIsTombstone) + .put(nonNullRawValue) + .array(); + } + + @Override + public void close() { + valueSerializer.close(); + timestampSerializer.close(); + booleanSerializer.close(); + } + + @Override + public void setIfUnset(final SerdeGetter getter) { + // NullableValueAndTimestampSerializer never wraps a null serializer (or configure would throw), + // but it may wrap a serializer that itself wraps a null serializer. + initNullableSerializer(valueSerializer, getter); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java new file mode 100644 index 00000000000..3b5bb7fd618 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class NullableValueAndTimestampSerdeTest { + + private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde()); + private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer(); + private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer(); + + @Test + public void shouldSerdeNull() { + assertThat(SERIALIZER.serialize(null, null), is(nullValue())); + assertThat(DESERIALIZER.deserialize(null, null), is(nullValue())); + } + + @Test + public void shouldSerdeNonNull() { + final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L); + + final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp); + assertThat(rawValueAndTimestamp, is(notNullValue())); + + assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp)); + } + + @Test + public void shouldSerdeNonNullWithNullValue() { + final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L); + + final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp); + assertThat(rawValueAndTimestamp, is(notNullValue())); + + assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp)); + } + + @Test + public void shouldSerializeNonNullWithEmptyBytes() { + // empty string serializes to empty bytes + final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("", 10L); + + final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp); + assertThat(rawValueAndTimestamp, is(notNullValue())); + + assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp)); + } +} \ No newline at end of file