rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##########
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.valueSerde()) : valueSerde);
     }
-}
\ No newline at end of file
+
+    public RawAndDeserializedValue<V> getWithBinary(final K key) {
+        try {
+            return maybeMeasureLatency(() -> { 
+                final byte[] serializedValue = wrapped().get(keyBytes(key));
+                return new RawAndDeserializedValue<V>(serializedValue, 
outerValue(serializedValue));
+            }, time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public boolean putIfDifferentValues(final K key,
+                                        final ValueAndTimestamp<V> newValue,
+                                        final byte[] oldSerializedValue) {
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+                    if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+                        return false;
+                    } else {
+                        wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
       @ConcurrencyPractitioner @vvcephei I'm trying to understand this to 
debug some broken tests in ksql. Couple questions:
   
   When the timestamp of the newer value is lower (ignoring the value), why do 
we want to put the new value into the store? Surely the store should have the 
value with the newer timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store (even if we don't forward 
it on to the next context) if the values are the same but the timestamp is 
newer? Otherwise if we get an out-of-order update with a different value, but a 
timestamp in between the rows with the same value, we'd incorrectly put that 
value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to