aliehsaeedii commented on code in PR #21486:
URL: https://github.com/apache/kafka/pull/21486#discussion_r2817496621


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used 
for recording operation metrics,
+ * and hence its inner WindowStore implementation does not need to provide its 
own metrics collecting functionality.
+ * The inner {@link WindowStore} of this class is of type 
<Bytes,byte[]>, hence we use {@link Serde}s
+ * to convert from <K,ValueTimestampHeaders<V>> to 
<Bytes,byte[]>.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class MeteredTimestampedWindowStoreWithHeaders<K, V>
+    extends MeteredWindowStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedWindowStoreWithHeaders<K, V> {
+
+    MeteredTimestampedWindowStoreWithHeaders(final WindowStore<Bytes, byte[]> 
inner,
+                                             final long windowSizeMs,
+                                             final String metricScope,
+                                             final Time time,
+                                             final Serde<K> keySerde,
+                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+        super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<ValueTimestampHeaders<V>> prepareValueSerde(final 
Serde<ValueTimestampHeaders<V>> valueSerde, final SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new ValueTimestampHeadersSerde<>((Serde<V>) 
getter.valueSerde());
+        } else {
+            return super.prepareValueSerde(valueSerde, getter);
+        }
+    }
+
+    @Override
+    public void put(final K key, final V value, final long 
windowStartTimestamp, final long timestamp, final Headers headers) {

Review Comment:
   Could we re-use the original put in `MeteredWindowStore`? The `V` value can 
be `ValueTimestampHeaders`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used 
for recording operation metrics,
+ * and hence its inner WindowStore implementation does not need to provide its 
own metrics collecting functionality.
+ * The inner {@link WindowStore} of this class is of type 
&lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
+ * to convert from &lt;K,ValueTimestampHeaders&lt;V&gt;&gt; to 
&lt;Bytes,byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class MeteredTimestampedWindowStoreWithHeaders<K, V>
+    extends MeteredWindowStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedWindowStoreWithHeaders<K, V> {
+
+    MeteredTimestampedWindowStoreWithHeaders(final WindowStore<Bytes, byte[]> 
inner,
+                                             final long windowSizeMs,
+                                             final String metricScope,
+                                             final Time time,
+                                             final Serde<K> keySerde,
+                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+        super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Serde<ValueTimestampHeaders<V>> prepareValueSerde(final 
Serde<ValueTimestampHeaders<V>> valueSerde, final SerdeGetter getter) {
+        if (valueSerde == null) {
+            return new ValueTimestampHeadersSerde<>((Serde<V>) 
getter.valueSerde());
+        } else {
+            return super.prepareValueSerde(valueSerde, getter);
+        }
+    }
+
+    @Override
+    public void put(final K key, final V value, final long 
windowStartTimestamp, final long timestamp, final Headers headers) {
+        put(key, ValueTimestampHeaders.make(value, timestamp, headers), 
windowStartTimestamp);
+    }
+
+    @Override
+    public WindowStoreIterator<ValueTimestampHeaders<V>> 
fetchWithHeaders(final K key, final long timeFrom, final long timeTo) {

Review Comment:
   the same about all these methods



##########
streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStoreWithHeaders.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.time.Instant;
+
+/**
+ * Interface for storing the aggregated values of fixed-size time windows.
+ * <p>
+ * Note, that the stores's physical key type is {@link Windowed 
Windowed&lt;K&gt;}.
+ * In contrast to a {@link WindowStore} that stores plain windowedKeys-value 
pairs,
+ * a {@code TimestampedWindowStore} stores windowedKeys-(value/timestamp) 
pairs.
+ * <p>
+ * While the window start- and end-timestamp are fixed per window, the 
value-side timestamp is used
+ * to store the last update timestamp of the corresponding window.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
+public interface TimestampedWindowStoreWithHeaders<K, V> extends 
WindowStore<K, ValueTimestampHeaders<V>> {

Review Comment:
   why it cannot be the same as `TimestampedWindowStore`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to