vamossagar12 commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-1061509859
@mjsax , @ableegoldman , @guozhangwang , @showuon one quick question here. I was able to add the expiration changes at the inner store level. Had a doubt on CachingStore . Let's consider backwardFetch implementation => ``` @Override public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo) { if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo); if (context.cache() == null) { return underlyingIterator; } final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) : context.cache().reverseRange( cacheName, keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); return new MergedSortedCacheWindowStoreKeyValueIterator( filteredCacheIterator, underlyingIterator, bytesSerdes, windowSize, cacheFunction, false ); } ``` Here, we first hit the underlyingIterator anyways, and if there's no cache in the context we return it. If there is, cache, then it hits the underneath caching layer again. I felt this could have been inverted, no? And also, if we don't fix caching in this PR, then the results returned with and w/o caching would be different which might be confusing. Currently, that discrepancy in output is present only in InMemoryStores as that implements expiration. While the RocksDBStores return the same output with out without caching. So, I guess going by point 2 from Matthias above, we are ok to have the discrepancy for now and fix it later in a follow up PR, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org