This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f7a87e  KAFKA-7652: Restrict range of fetch/findSessions in cache 
(#6448)
0f7a87e is described below

commit 0f7a87e93dd06659b886a75547ea576f374d01c2
Author: A. Sophie Blee-Goldman <ableegold...@gmail.com>
AuthorDate: Wed Apr 17 16:42:49 2019 -0700

    KAFKA-7652: Restrict range of fetch/findSessions in cache (#6448)
    
    Reduce the total key space cache iterators have to search for segmented 
byte stores by wrapping several single-segment iterators.
    
    Summary of Benchmarking Results (# records processed as primary indicator)
    
    Session Store:
    Only single-key findSessions seems to benefit (~4x improvement) due to 
conservative scanning of potentially variable-sized keys in key-range 
findSessions. Could get improvement from key-range findSessions as well if we 
can tell when/if keys are a fixed size, or pending an efficient custom 
comparator API from RocksDB
    
    Window Store:
    Both single and multi-key fetch saw some improvement; this depended on the 
size of the time-range in the fetch (in the DSL this would be window size) 
relative to the retention period. Performance benefits from this patch when the 
fetch spans multiple segments; hence the larger the time range being searched, 
the better this will do.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../streams/processor/internals/RecordQueue.java   |   2 +-
 .../state/internals/CachingSessionStore.java       | 153 +++++++++++++++++++-
 .../state/internals/CachingWindowStore.java        | 160 ++++++++++++++++++++-
 .../state/internals/SegmentedCacheFunction.java    |  18 ++-
 4 files changed, 320 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 572e629..33c4d49e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -36,7 +36,7 @@ import java.util.ArrayDeque;
  */
 public class RecordQueue {
 
-    static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
+    public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
     private final SourceNode source;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 9599105..adbaf4c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.NoSuchElementException;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 
@@ -43,11 +47,14 @@ class CachingSessionStore
     private CacheFlushListener<byte[], byte[]> flushListener;
     private boolean sendOldValues;
 
+    private long maxObservedTimestamp; // Refers to the window end time 
(determines segmentId)
+
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                         final long segmentInterval) {
         super(bytesStore);
         this.keySchema = new SessionKeySchema();
         this.cacheFunction = new SegmentedCacheFunction(keySchema, 
segmentInterval);
+        this.maxObservedTimestamp = RecordQueue.UNKNOWN;
     }
 
     @Override
@@ -123,6 +130,8 @@ class CachingSessionStore
                 context.partition(),
                 context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
+
+        maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), 
maxObservedTimestamp);
     }
 
     @Override
@@ -136,9 +145,13 @@ class CachingSessionStore
                                                                   final long 
earliestSessionEndTime,
                                                                   final long 
latestSessionStartTime) {
         validateStoreOpen();
-        final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
earliestSessionEndTime));
-        final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
+            new CacheIteratorWrapper(key, earliestSessionEndTime, 
latestSessionStartTime) :
+            cache.range(cacheName,
+                        
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
earliestSessionEndTime)),
+                        
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime))
+            );
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().findSessions(key,
                                                                                
                earliestSessionEndTime,
@@ -224,4 +237,138 @@ class CachingSessionStore
         cache.close(cacheName);
         super.close();
     }
+
+    private class CacheIteratorWrapper implements 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+
+        private final long segmentInterval;
+
+        private final Bytes keyFrom;
+        private final Bytes keyTo;
+        private final long latestSessionStartTime;
+        private long lastSegmentId;
+
+        private long currentSegmentId;
+        private Bytes cacheKeyFrom;
+        private Bytes cacheKeyTo;
+
+        private ThreadCache.MemoryLRUCacheBytesIterator current;
+
+        private CacheIteratorWrapper(final Bytes key,
+                                     final long earliestSessionEndTime,
+                                     final long latestSessionStartTime) {
+            this(key, key, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        private CacheIteratorWrapper(final Bytes keyFrom,
+                                     final Bytes keyTo,
+                                     final long earliestSessionEndTime,
+                                     final long latestSessionStartTime) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            this.latestSessionStartTime = latestSessionStartTime;
+            this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
+            this.segmentInterval = cacheFunction.getSegmentInterval();
+
+            this.currentSegmentId = 
cacheFunction.segmentId(earliestSessionEndTime);
+
+            setCacheKeyRange(earliestSessionEndTime, currentSegmentLastTime());
+
+            this.current = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current == null) {
+                return false;
+            }
+
+            if (current.hasNext()) {
+                return true;
+            }
+
+            while (!current.hasNext()) {
+                getNextSegmentIterator();
+                if (current == null) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.peekNextKey();
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.peekNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.next();
+        }
+
+        @Override
+        public void close() {
+            current.close();
+        }
+
+        private long currentSegmentBeginTime() {
+            return currentSegmentId * segmentInterval;
+        }
+
+        private long currentSegmentLastTime() {
+            return currentSegmentBeginTime() + segmentInterval - 1;
+        }
+
+        private void getNextSegmentIterator() {
+            ++currentSegmentId;
+            lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
+
+            if (currentSegmentId > lastSegmentId) {
+                current = null;
+                return;
+            }
+
+            setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+
+            current.close();
+            current = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
+        }
+
+        private void setCacheKeyRange(final long lowerRangeEndTime, final long 
upperRangeEndTime) {
+            if (cacheFunction.segmentId(lowerRangeEndTime) != 
cacheFunction.segmentId(upperRangeEndTime)) {
+                throw new IllegalStateException("Error iterating over 
segments: segment interval has changed");
+            }
+
+            if (keyFrom == keyTo) {
+                cacheKeyFrom = 
cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
+                cacheKeyTo = 
cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
+            } else {
+                cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), 
currentSegmentId);
+                cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime), 
currentSegmentId);
+            }
+        }
+
+        private Bytes segmentLowerRangeFixedSize(final Bytes key, final long 
segmentBeginTime) {
+            final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
SessionWindow(0, Math.max(0, segmentBeginTime)));
+            return SessionKeySchema.toBinary(sessionKey);
+        }
+
+        private Bytes segmentUpperRangeFixedSize(final Bytes key, final long 
segmentEndTime) {
+            final Windowed<Bytes> sessionKey = new Windowed<>(key, new 
SessionWindow(Math.min(latestSessionStartTime, segmentEndTime), 
segmentEndTime));
+            return SessionKeySchema.toBinary(sessionKey);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 3875a79..b64c0eb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,14 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.NoSuchElementException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -47,6 +50,8 @@ class CachingWindowStore
     private StateSerdes<Bytes, byte[]> bytesSerdes;
     private CacheFlushListener<byte[], byte[]> flushListener;
 
+    private long maxObservedTimestamp;
+
     private final SegmentedCacheFunction cacheFunction;
 
     CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
@@ -55,6 +60,7 @@ class CachingWindowStore
         super(underlying);
         this.windowSize = windowSize;
         this.cacheFunction = new SegmentedCacheFunction(keySchema, 
segmentInterval);
+        this.maxObservedTimestamp = RecordQueue.UNKNOWN;
     }
 
     @Override
@@ -150,6 +156,8 @@ class CachingWindowStore
                 context.partition(),
                 context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
+
+        maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(keyBytes), 
maxObservedTimestamp);
     }
 
     @Override
@@ -182,9 +190,13 @@ class CachingWindowStore
         if (cache == null) {
             return underlyingIterator;
         }
-        final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom));
-        final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
+            new CacheIteratorWrapper(key, timeFrom, timeTo) :
+            cache.range(name,
+                        
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)),
+                        
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo))
+            );
 
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key, key, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(
@@ -216,9 +228,13 @@ class CachingWindowStore
         if (cache == null) {
             return underlyingIterator;
         }
-        final Bytes cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
-        final Bytes cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(name, cacheKeyFrom, cacheKeyTo);
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
+            new CacheIteratorWrapper(from, to, timeFrom, timeTo) :
+            cache.range(name,
+                        cacheFunction.cacheKey(keySchema.lowerRange(from, 
timeFrom)),
+                        cacheFunction.cacheKey(keySchema.upperRange(to, 
timeTo))
+            );
 
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(from, to, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(cacheIterator, 
hasNextCondition, cacheFunction);
@@ -281,4 +297,136 @@ class CachingWindowStore
         cache.close(name);
         wrapped().close();
     }
+
+    private class CacheIteratorWrapper implements 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+
+        private final long segmentInterval;
+
+        private final Bytes keyFrom;
+        private final Bytes keyTo;
+        private final long timeTo;
+        private long lastSegmentId;
+
+        private long currentSegmentId;
+        private Bytes cacheKeyFrom;
+        private Bytes cacheKeyTo;
+
+        private ThreadCache.MemoryLRUCacheBytesIterator current;
+
+        private CacheIteratorWrapper(final Bytes key,
+                                     final long timeFrom,
+                                     final long timeTo) {
+            this(key, key, timeFrom, timeTo);
+        }
+
+        private CacheIteratorWrapper(final Bytes keyFrom,
+                                     final Bytes keyTo,
+                                     final long timeFrom,
+                                     final long timeTo) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            this.timeTo = timeTo;
+            this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp));
+
+            this.segmentInterval = cacheFunction.getSegmentInterval();
+            this.currentSegmentId = cacheFunction.segmentId(timeFrom);
+
+            setCacheKeyRange(timeFrom, currentSegmentLastTime());
+
+            this.current = cache.range(name, cacheKeyFrom, cacheKeyTo);
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current == null) {
+                return false;
+            }
+
+            if (current.hasNext()) {
+                return true;
+            }
+
+            while (!current.hasNext()) {
+                getNextSegmentIterator();
+                if (current == null) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.peekNextKey();
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.peekNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.next();
+        }
+
+        @Override
+        public void close() {
+            current.close();
+        }
+
+        private long currentSegmentBeginTime() {
+            return currentSegmentId * segmentInterval;
+        }
+
+        private long currentSegmentLastTime() {
+            return Math.min(timeTo, currentSegmentBeginTime() + 
segmentInterval - 1);
+        }
+
+        private void getNextSegmentIterator() {
+            ++currentSegmentId;
+            lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp));
+
+            if (currentSegmentId > lastSegmentId) {
+                current = null;
+                return;
+            }
+
+            setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+
+            current.close();
+            current = cache.range(name, cacheKeyFrom, cacheKeyTo);
+        }
+
+        private void setCacheKeyRange(final long lowerRangeEndTime, final long 
upperRangeEndTime) {
+            if (cacheFunction.segmentId(lowerRangeEndTime) != 
cacheFunction.segmentId(upperRangeEndTime)) {
+                throw new IllegalStateException("Error iterating over 
segments: segment interval has changed");
+            }
+
+            if (keyFrom == keyTo) {
+                cacheKeyFrom = 
cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
+                cacheKeyTo = 
cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
+            } else {
+                cacheKeyFrom = 
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), 
currentSegmentId);
+                cacheKeyTo = 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId);
+            }
+        }
+
+        private Bytes segmentLowerRangeFixedSize(final Bytes key, final long 
segmentBeginTime) {
+            return WindowKeySchema.toStoreKeyBinary(key, Math.max(0, 
segmentBeginTime), 0);
+        }
+
+        private Bytes segmentUpperRangeFixedSize(final Bytes key, final long 
segmentEndTime) {
+            return WindowKeySchema.toStoreKeyBinary(key, segmentEndTime, 
Integer.MAX_VALUE);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
index 8f5768c..68a40b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -41,9 +41,13 @@ class SegmentedCacheFunction implements CacheFunction {
 
     @Override
     public Bytes cacheKey(final Bytes key) {
+        return cacheKey(key, segmentId(key));
+    }
+
+    Bytes cacheKey(final Bytes key, final long segmentId) {
         final byte[] keyBytes = key.get();
         final ByteBuffer buf = ByteBuffer.allocate(SEGMENT_ID_BYTES + 
keyBytes.length);
-        buf.putLong(segmentId(key)).put(keyBytes);
+        buf.putLong(segmentId).put(keyBytes);
         return Bytes.wrap(buf.array());
     }
 
@@ -52,9 +56,17 @@ class SegmentedCacheFunction implements CacheFunction {
         System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, 
binaryKey.length);
         return binaryKey;
     }
-    
+
     public long segmentId(final Bytes key) {
-        return keySchema.segmentTimestamp(key) / segmentInterval;
+        return segmentId(keySchema.segmentTimestamp(key));
+    }
+
+    long segmentId(final long timestamp) {
+        return timestamp / segmentInterval;
+    }
+
+    long getSegmentInterval() {
+        return segmentInterval;
     }
 
     int compareSegmentedKeys(final Bytes cacheKey, final Bytes storeKey) {

Reply via email to