[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645857#comment-16645857 ]
ASF GitHub Bot commented on KAFKA-7277: --------------------------------------- mjsax closed pull request #5759: KAFKA-7277: default implementation for new window store overloads URL: https://github.com/apache/kafka/pull/5759 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index ad74ae1e74d..50ce386f13f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -17,9 +17,12 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import java.time.Instant; + /** * A windowed store interface extending {@link StateStore}. * @@ -87,6 +90,13 @@ */ WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); + @Override + default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) { + ApiUtils.validateMillisecondInstant(from, "from"); + ApiUtils.validateMillisecondInstant(to, "to"); + return fetch(key, from.toEpochMilli(), to.toEpochMilli()); + } + /** * Get all the key-value pairs in the given key range and time range from all the existing windows. * <p> @@ -102,6 +112,13 @@ */ KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); + @Override + default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) { + ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); + ApiUtils.validateMillisecondInstant(toTime, "toTime"); + return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); + } + /** * Gets all the key-value pairs that belong to the windows within in the given time range. * @@ -112,4 +129,11 @@ * @throws NullPointerException if {@code null} is used for any key */ KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); + + @Override + default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) { + ApiUtils.validateMillisecondInstant(from, "from"); + ApiUtils.validateMillisecondInstant(to, "to"); + return fetchAll(from.toEpochMilli(), to.toEpochMilli()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index f6b62b2b935..b55e5448139 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; @@ -205,13 +203,6 @@ public synchronized void put(final Bytes key, final byte[] value, final long win return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator); } - @Override - public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { // since this function may not access the underlying inner store, we need to validate @@ -241,16 +232,6 @@ public synchronized void put(final Bytes key, final byte[] value, final long win ); } - @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, - final Bytes to, - final Instant fromTime, - final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - private V fetchPrevious(final Bytes key, final long timestamp) { final byte[] value = underlying.fetch(key, timestamp); if (value != null) { @@ -294,11 +275,4 @@ private V fetchPrevious(final Bytes key, final long timestamp) { cacheFunction ); } - - @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index d4e47c6d18f..9808ca967cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -16,10 +16,8 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -58,28 +56,11 @@ return bytesStore.fetch(key, from, to); } - @Override - public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { return bytesStore.fetch(keyFrom, keyTo, from, to); } - @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, - final Bytes to, - final Instant fromTime, - final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<Bytes>, byte[]> all() { return bytesStore.all(); @@ -90,13 +71,6 @@ return bytesStore.fetchAll(timeFrom, timeTo); } - @Override - public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - @Override public void put(final Bytes key, final byte[] value) { put(key, value, context.timestamp()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index e1b6cd1d52e..5162eac8848 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -149,13 +147,6 @@ public V fetch(final K key, final long timestamp) { time); } - @Override - public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<K>, V> all() { return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time); @@ -170,13 +161,6 @@ public V fetch(final K key, final long timestamp) { time); } - @Override - public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), @@ -186,13 +170,6 @@ public V fetch(final K key, final long timestamp) { time); } - @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public void flush() { final long startNs = time.nanoseconds(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index e8037bc8163..d7bb523b049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -16,10 +16,8 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -93,26 +91,12 @@ public V fetch(final K key, final long timestamp) { return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator(); } - @Override - public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } - @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public KeyValueIterator<Windowed<K>, V> all() { final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all(); @@ -125,13 +109,6 @@ public V fetch(final K key, final long timestamp) { return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } - @Override - public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - private void maybeUpdateSeqnumForDups() { if (retainDuplicates) { seqnum = (seqnum + 1) & 0x7FFFFFFF; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Migrate Streams API to Duration instead of longMs times > ------------------------------------------------------- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: Nikolay Izhikov > Priority: Major > Labels: kip, newbie > Fix For: 2.1.0 > > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times] -- This message was sent by Atlassian JIRA (v7.6.3#76005)