vvcephei commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r501351457



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -382,9 +479,23 @@ private boolean setInnerIterators() {
                 currentKey = nextKeyEntry.getKey();
 
                 if (latestSessionStartTime == Long.MAX_VALUE) {
-                    recordIterator = 
nextKeyEntry.getValue().entrySet().iterator();
+                    final Set<Entry<Long, byte[]>> entries;
+                    if (forward) entries = 
nextKeyEntry.getValue().descendingMap().entrySet();
+                    else entries = nextKeyEntry.getValue().entrySet();

Review comment:
       The code style discourages inline conditionals. It's more maintainable 
to always use blocks.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
##########
@@ -24,35 +24,156 @@
  * Implementations should be thread-safe as concurrent reads and writes
  * are expected.
  *
- * @param <K> the key type
+ * @param <K>   the key type
  * @param <AGG> the aggregated value type
  */
 public interface ReadOnlySessionStore<K, AGG> {
+
     /**
-     * Retrieve all aggregated sessions for the provided key.
+     * Fetch any sessions with the matching key and the sessions end is &ge; 
earliestSessionEndTime and the sessions
+     * start is &le; latestSessionStartTime iterating from earliest to latest.
+     * <p>
      * This iterator must be closed after use.
      *
+     * @param key                    the key to return sessions for
+     * @param earliestSessionEndTime the end timestamp of the earliest session 
to search for, where iteration starts.
+     * @param latestSessionStartTime the end timestamp of the latest session 
to search for, where iteration ends.
+     * @return iterator of sessions with the matching key and aggregated 
values, from earliest to latest session time.
+     * @throws NullPointerException If null is used for key.
+     */
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                            final long 
earliestSessionEndTime,
+                                                            final long 
latestSessionStartTime) {
+        throw new UnsupportedOperationException("Moved from SessionStore");

Review comment:
       It won't matter to users whether this method was moved from another 
interface or not. They just need to know why they're getting the exception. 
I.e., we just need to tell them that the store implementation they selected 
didn't implement the method.
   
   ```suggestion
           throw new UnsupportedOperationException("This API is not supported 
by this implementation of ReadOnlySessionStore.");
   ```
   
   We should say the exact same thing in all default implementations. Right 
now, they're a bit inconsistent.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##########
@@ -150,22 +149,44 @@ public void remove(final Windowed<Bytes> sessionKey) {
         validateStoreOpen();
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
-            new CacheIteratorWrapper(key, earliestSessionEndTime, 
latestSessionStartTime) :
+            new CacheIteratorWrapper(key, earliestSessionEndTime, 
latestSessionStartTime, true) :
             context.cache().range(cacheName,
-                        
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
earliestSessionEndTime)),
-                        
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime))
+                cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
earliestSessionEndTime)),
+                cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime))

Review comment:
       There are a lot of unnecessary whitespace changes in this PR. You don't 
need to back them all out right now, but in the future, please clean up the 
diff before submitting a PR. These extra changes make it harder to review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to