ableegoldman commented on code in PR #14708:
URL: https://github.com/apache/kafka/pull/14708#discussion_r1385859246


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowStoreMaterializer.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionWindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> {

Review Comment:
   nit: just call it `SessionStoreMaterializer`
   
   Technically they are "session windows", yes, but we have SessionStore vs 
WindowStore rather than SessionWindowStore vs TimeWindowStore, so both for 
clarify and brevity, let's remove the "Window" part of the name
   (I get that for the Materializer we actually do have different 
implementations for different types of time windows, but I think it's best to 
keep the session window stuff distinct from the time window variations -- 
otherwise it seems like this is just another subset of a WindowStore. I hope 
that explanation makes sense?)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java:
##########
@@ -108,56 +106,4 @@ public KTable<Windowed<K>, V> aggregate(final 
Initializer<V> initializer,
             sessionMerger);
     }
 
-    private StoreFactory materialize(final MaterializedInternal<K, V, 
SessionStore<Bytes, byte[]>> materialized) {

Review Comment:
   I LOVE that we are able to extract all this repeated logic into a single 
place ❤️ 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowStoreMaterializer.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionWindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueStoreMaterializer.class);

Review Comment:
   copy/paste error (I always check b/c I make this exact mistake myself all 
the time haha)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java:
##########
@@ -108,56 +106,4 @@ public KTable<Windowed<K>, V> aggregate(final 
Initializer<V> initializer,
             sessionMerger);
     }
 
-    private StoreFactory materialize(final MaterializedInternal<K, V, 
SessionStore<Bytes, byte[]>> materialized) {

Review Comment:
   I LOVE that we are able to extract all this repeated logic into a single 
place ❤️ 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowStoreMaterializer.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class TimeWindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> {
+
+    private final Windows<?> windows;
+    private final EmitStrategy emitStrategy;
+
+    public TimeWindowStoreMaterializer(
+            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized,
+            final Windows<?> windows,
+            final EmitStrategy emitStrategy
+    ) {
+        super(materialized);
+        this.windows = windows;
+        this.emitStrategy = emitStrategy;
+    }
+
+    @Override
+    public StateStore build() {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
+        if (supplier == null) {
+            final long retentionPeriod = retentionPeriod();
+
+            if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of 
the window store "
+                        + materialized.storeName() + " must be no smaller than 
its window size plus the grace period."
+                        + " Got size=[" + windows.size() + "],"
+                        + " grace=[" + windows.gracePeriodMs() + "],"
+                        + " retention=[" + retentionPeriod + "]");
+            }
+
+            switch (defaultStoreType) {
+                case IN_MEMORY:
+                    supplier = Stores.inMemoryWindowStore(
+                            materialized.storeName(),
+                            Duration.ofMillis(retentionPeriod),
+                            Duration.ofMillis(windows.size()),
+                            false
+                    );
+                    break;
+                case ROCKS_DB:
+                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+                            
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    Duration.ofMillis(windows.size()),
+                                    false,
+                                    false
+                            ) :
+                            Stores.persistentTimestampedWindowStore(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    Duration.ofMillis(windows.size()),
+                                    false
+                            );
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
+            }
+        }
+
+        final StoreBuilder<TimestampedWindowStore<K, V>> builder = 
Stores.timestampedWindowStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+        );
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        // TODO(agavra): remove before merging, should we do what we do with 
other stores
+        // and disable caching in the case StrategyType.ON_WINDOW_CLOSE is 
used?

Review Comment:
   Would be good to have @guozhangwang  confirm, or maybe @mjsax or @lihaosky 
can chime in here, but my understanding is that we have to disable caching when 
using `ON_WINDOW_CLOSE` because the alternative store implementation has a 
different ordering scheme than the CachingWindow/SessionStore, breaking range 
queries as we can't properly merge results between the cache and underlying 
store.
   
   Of course in the DSL, sliding and session window aggregations use range 
queries whereas time window aggregations do not. And since this seems to be a 
DSL-specific feature, I guess we just don't care about the incompatible cache 
ordering for time windows since we'll only be doing point lookups.
   
   I also noticed that we added a new caching layer specifically to work with 
the RocksDbIndexedTimeOrderedWindowBytesStore, called 
[TimeOrderedCachingWindowStore](https://github.com/apache/kafka/pull/12030/files#diff-65aaedea554a4017e0e1a673883e3b73f609f117f71e86541c9d45092bf0dbd2R59)
 Not sure how that comes into play here, but I think it makes it clear that we 
do intend to allow caching with time windows (only)
   
   Code-wise, the difference between sliding windows and time windows seems to 
be that the former is "indexed", as you can see by following the invocations of 
`RocksDbIndexedTimeOrderedWindowBytesStoreSupplier#create` Presumably this 
means the TimeOrderedCachingWindowStore doesn't work for sliding window stores, 
only time window stores. 
   
   I suppose it would be nice to include a condensed version of this in the 
comments about this in the session and sliding window materializers. Right now 
they only state that caching _is_ disabled (which we can plainly see from the 
code), but not _why_ it is disabled, which is the important part that can't be 
understood from the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to