This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0f7a87e KAFKA-7652: Restrict range of fetch/findSessions in cache (#6448) 0f7a87e is described below commit 0f7a87e93dd06659b886a75547ea576f374d01c2 Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Apr 17 16:42:49 2019 -0700 KAFKA-7652: Restrict range of fetch/findSessions in cache (#6448) Reduce the total key space cache iterators have to search for segmented byte stores by wrapping several single-segment iterators. Summary of Benchmarking Results (# records processed as primary indicator) Session Store: Only single-key findSessions seems to benefit (~4x improvement) due to conservative scanning of potentially variable-sized keys in key-range findSessions. Could get improvement from key-range findSessions as well if we can tell when/if keys are a fixed size, or pending an efficient custom comparator API from RocksDB Window Store: Both single and multi-key fetch saw some improvement; this depended on the size of the time-range in the fetch (in the DSL this would be window size) relative to the retention period. Performance benefits from this patch when the fetch spans multiple segments; hence the larger the time range being searched, the better this will do. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../streams/processor/internals/RecordQueue.java | 2 +- .../state/internals/CachingSessionStore.java | 153 +++++++++++++++++++- .../state/internals/CachingWindowStore.java | 160 ++++++++++++++++++++- .../state/internals/SegmentedCacheFunction.java | 18 ++- 4 files changed, 320 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 572e629..33c4d49e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -36,7 +36,7 @@ import java.util.ArrayDeque; */ public class RecordQueue { - static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP; + public static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP; private final Logger log; private final SourceNode source; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 9599105..adbaf4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.NoSuchElementException; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordQueue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -43,11 +47,14 @@ class CachingSessionStore private CacheFlushListener<byte[], byte[]> flushListener; private boolean sendOldValues; + private long maxObservedTimestamp; // Refers to the window end time (determines segmentId) + CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final long segmentInterval) { super(bytesStore); this.keySchema = new SessionKeySchema(); this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); + this.maxObservedTimestamp = RecordQueue.UNKNOWN; } @Override @@ -123,6 +130,8 @@ class CachingSessionStore context.partition(), context.topic()); cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); + + maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp); } @Override @@ -136,9 +145,13 @@ class CachingSessionStore final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)); - final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? + new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) : + cache.range(cacheName, + cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), + cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)) + ); final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = wrapped().findSessions(key, earliestSessionEndTime, @@ -224,4 +237,138 @@ class CachingSessionStore cache.close(cacheName); super.close(); } + + private class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { + + private final long segmentInterval; + + private final Bytes keyFrom; + private final Bytes keyTo; + private final long latestSessionStartTime; + private long lastSegmentId; + + private long currentSegmentId; + private Bytes cacheKeyFrom; + private Bytes cacheKeyTo; + + private ThreadCache.MemoryLRUCacheBytesIterator current; + + private CacheIteratorWrapper(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + this(key, key, earliestSessionEndTime, latestSessionStartTime); + } + + private CacheIteratorWrapper(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + this.keyFrom = keyFrom; + this.keyTo = keyTo; + this.latestSessionStartTime = latestSessionStartTime; + this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); + this.segmentInterval = cacheFunction.getSegmentInterval(); + + this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime); + + setCacheKeyRange(earliestSessionEndTime, currentSegmentLastTime()); + + this.current = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); + } + + @Override + public boolean hasNext() { + if (current == null) { + return false; + } + + if (current.hasNext()) { + return true; + } + + while (!current.hasNext()) { + getNextSegmentIterator(); + if (current == null) { + return false; + } + } + return true; + } + + @Override + public Bytes peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.peekNextKey(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> peekNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.peekNext(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.next(); + } + + @Override + public void close() { + current.close(); + } + + private long currentSegmentBeginTime() { + return currentSegmentId * segmentInterval; + } + + private long currentSegmentLastTime() { + return currentSegmentBeginTime() + segmentInterval - 1; + } + + private void getNextSegmentIterator() { + ++currentSegmentId; + lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); + + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } + + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + current = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); + } + + private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { + if (cacheFunction.segmentId(lowerRangeEndTime) != cacheFunction.segmentId(upperRangeEndTime)) { + throw new IllegalStateException("Error iterating over segments: segment interval has changed"); + } + + if (keyFrom == keyTo) { + cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); + cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); + } else { + cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); + cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime), currentSegmentId); + } + } + + private Bytes segmentLowerRangeFixedSize(final Bytes key, final long segmentBeginTime) { + final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, segmentBeginTime))); + return SessionKeySchema.toBinary(sessionKey); + } + + private Bytes segmentUpperRangeFixedSize(final Bytes key, final long segmentEndTime) { + final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(Math.min(latestSessionStartTime, segmentEndTime), segmentEndTime)); + return SessionKeySchema.toBinary(sessionKey); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 3875a79..b64c0eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,14 +16,17 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.NoSuchElementException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.RecordQueue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -47,6 +50,8 @@ class CachingWindowStore private StateSerdes<Bytes, byte[]> bytesSerdes; private CacheFlushListener<byte[], byte[]> flushListener; + private long maxObservedTimestamp; + private final SegmentedCacheFunction cacheFunction; CachingWindowStore(final WindowStore<Bytes, byte[]> underlying, @@ -55,6 +60,7 @@ class CachingWindowStore super(underlying); this.windowSize = windowSize; this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); + this.maxObservedTimestamp = RecordQueue.UNKNOWN; } @Override @@ -150,6 +156,8 @@ class CachingWindowStore context.partition(), context.topic()); cache.put(name, cacheFunction.cacheKey(keyBytes), entry); + + maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp); } @Override @@ -182,9 +190,13 @@ class CachingWindowStore if (cache == null) { return underlyingIterator; } - final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)); - final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo)); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? + new CacheIteratorWrapper(key, timeFrom, timeTo) : + cache.range(name, + cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)), + cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo)) + ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator( @@ -216,9 +228,13 @@ class CachingWindowStore if (cache == null) { return underlyingIterator; } - final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom)); - final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo)); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); + + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? + new CacheIteratorWrapper(from, to, timeFrom, timeTo) : + cache.range(name, + cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom)), + cacheFunction.cacheKey(keySchema.upperRange(to, timeTo)) + ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from, to, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); @@ -281,4 +297,136 @@ class CachingWindowStore cache.close(name); wrapped().close(); } + + private class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { + + private final long segmentInterval; + + private final Bytes keyFrom; + private final Bytes keyTo; + private final long timeTo; + private long lastSegmentId; + + private long currentSegmentId; + private Bytes cacheKeyFrom; + private Bytes cacheKeyTo; + + private ThreadCache.MemoryLRUCacheBytesIterator current; + + private CacheIteratorWrapper(final Bytes key, + final long timeFrom, + final long timeTo) { + this(key, key, timeFrom, timeTo); + } + + private CacheIteratorWrapper(final Bytes keyFrom, + final Bytes keyTo, + final long timeFrom, + final long timeTo) { + this.keyFrom = keyFrom; + this.keyTo = keyTo; + this.timeTo = timeTo; + this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp)); + + this.segmentInterval = cacheFunction.getSegmentInterval(); + this.currentSegmentId = cacheFunction.segmentId(timeFrom); + + setCacheKeyRange(timeFrom, currentSegmentLastTime()); + + this.current = cache.range(name, cacheKeyFrom, cacheKeyTo); + } + + @Override + public boolean hasNext() { + if (current == null) { + return false; + } + + if (current.hasNext()) { + return true; + } + + while (!current.hasNext()) { + getNextSegmentIterator(); + if (current == null) { + return false; + } + } + return true; + } + + @Override + public Bytes peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.peekNextKey(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> peekNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.peekNext(); + } + + @Override + public KeyValue<Bytes, LRUCacheEntry> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return current.next(); + } + + @Override + public void close() { + current.close(); + } + + private long currentSegmentBeginTime() { + return currentSegmentId * segmentInterval; + } + + private long currentSegmentLastTime() { + return Math.min(timeTo, currentSegmentBeginTime() + segmentInterval - 1); + } + + private void getNextSegmentIterator() { + ++currentSegmentId; + lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp)); + + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } + + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + current = cache.range(name, cacheKeyFrom, cacheKeyTo); + } + + private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { + if (cacheFunction.segmentId(lowerRangeEndTime) != cacheFunction.segmentId(upperRangeEndTime)) { + throw new IllegalStateException("Error iterating over segments: segment interval has changed"); + } + + if (keyFrom == keyTo) { + cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); + cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); + } else { + cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); + cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); + } + } + + private Bytes segmentLowerRangeFixedSize(final Bytes key, final long segmentBeginTime) { + return WindowKeySchema.toStoreKeyBinary(key, Math.max(0, segmentBeginTime), 0); + } + + private Bytes segmentUpperRangeFixedSize(final Bytes key, final long segmentEndTime) { + return WindowKeySchema.toStoreKeyBinary(key, segmentEndTime, Integer.MAX_VALUE); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java index 8f5768c..68a40b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java @@ -41,9 +41,13 @@ class SegmentedCacheFunction implements CacheFunction { @Override public Bytes cacheKey(final Bytes key) { + return cacheKey(key, segmentId(key)); + } + + Bytes cacheKey(final Bytes key, final long segmentId) { final byte[] keyBytes = key.get(); final ByteBuffer buf = ByteBuffer.allocate(SEGMENT_ID_BYTES + keyBytes.length); - buf.putLong(segmentId(key)).put(keyBytes); + buf.putLong(segmentId).put(keyBytes); return Bytes.wrap(buf.array()); } @@ -52,9 +56,17 @@ class SegmentedCacheFunction implements CacheFunction { System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, binaryKey.length); return binaryKey; } - + public long segmentId(final Bytes key) { - return keySchema.segmentTimestamp(key) / segmentInterval; + return segmentId(keySchema.segmentTimestamp(key)); + } + + long segmentId(final long timestamp) { + return timestamp / segmentInterval; + } + + long getSegmentInterval() { + return segmentInterval; } int compareSegmentedKeys(final Bytes cacheKey, final Bytes storeKey) {