vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r702070313
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
time);
}
+ private long getActualWindowStartTime(final long timeFrom) {
+ return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>)
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+ }
+
+ private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final
boolean forward) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]>
allWindowedKeyValueIterator = forward ? wrapped().all() :
wrapped().backwardAll();
+
+ final long observedStreamTime = ((PersistentWindowStore<Bytes,
byte[]>) wrapped()).getObservedStreamTime();
+ if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime ==
ConsumerRecord.NO_TIMESTAMP)
+ return new
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor,
streamsMetrics, serdes, time);
+
+ final long windowStartBoundary = observedStreamTime - retentionPeriod
+ 1;
+ final List<KeyValue<Windowed<Bytes>, byte[]>>
windowedKeyValuesInBoundary = new ArrayList<>();
+
+ while (allWindowedKeyValueIterator.hasNext()) {
+ final KeyValue<Windowed<Bytes>, byte[]> next =
allWindowedKeyValueIterator.next();
+ if
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
{
+ continue;
+ }
+ windowedKeyValuesInBoundary.add(next);
+ }
+ return new MeteredWindowedKeyValueIterator<>(new
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor,
streamsMetrics, serdes, time);
+ }
Review comment:
@mjsax i have a question here... In the jira ticket, you. have mentioned
that the best place for adding this filtering is in the MeteredStore as that
implicitly adds the logic even for custom state stores. While for the most
part, this kind of filtering has worked fine(fetching relevant records and then
filtering in MeteredStore) but there's a case where it's failing. It's for
test. cases like `shouldNotThrowConcurrentModificationException` . This seems
to be because the put() call while iterating is appending to the wrapped
instance of iterator and hence it's not visible.
Looking at this, do you think it would be a good idea to move this logic in
the actual RocksDB implementations? Or do you think there's a better way to do
it here in MeteredStore class itself?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]