vcrfxia commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107815109


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally 
supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code 
null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty 
values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends 
WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new 
NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new 
NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't 
allow deserialization so

Review Comment:
   Resolved below.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally 
supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code 
null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty 
values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends 
WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new 
NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new 
NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't 
allow deserialization so
+                    // we fail here during serialization too for consistency
+                    throw new SerializationException("BooleanSerializer does 
not support null");
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't 
allow it (NP_BOOLEAN_RETURN_NULL)

Review Comment:
   OK, added an exception for this particular class.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import 
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements 
WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> 
valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] 
rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, 
rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, 
rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   You're correct that `value` should never be null here, assuming valid 
deserializer implementations which never deserialize non-null into null. 
   
   Regarding updating this call to `make()`, `make()` does not throw if `value 
== null`, instead it returns `null`. In the event of a buggy deserializer 
implementation which returns null when it shouldn't, calling `make()` here and 
returning null will cause cascading failures elsewhere -- for example, the 
changelogging layer uses this deserializer to obtain the value to write to the 
changelog (see https://github.com/apache/kafka/pull/13251) and will throw an 
exception if it gets a null ValueAndTimestamp from the deserializer. 
   
   I suppose I can update this to `make()` and throw an explicit exception if 
`value == null`. I'll do that.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new 
NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = 
SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER 
= SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, 
valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), 
is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, 
valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), 
is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   StringSerializer serializes an empty string as `byte[0]`, which is exactly 
what this test case is for (`isTombstone` will be set to `false`, which is how 
`byte[0]` is distinguished from a null value). 
   
   Did I misunderstand your question?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import 
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements 
WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, 
rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : 
rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + 
nonNullRawValue.length)

Review Comment:
   Yes that's correct. I extracted the expected lengths into static variables 
and added the relevant checks in order to be defensive against serialization 
surprises.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to