vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1061509859


   @mjsax , @ableegoldman , @guozhangwang , @showuon one quick question here. I 
was able to add the expiration changes at the inner store level. Had a doubt on 
CachingStore . Let's consider backwardFetch implementation =>
   ```
   @Override
       public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final 
Bytes keyFrom,
                                                                      final 
Bytes keyTo,
                                                                      final 
long timeFrom,
                                                                      final 
long timeTo) {
           if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 
0) {
               LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. "
                   + "This may be due to serdes that don't preserve ordering 
when lexicographically comparing the serialized bytes. " +
                   "Note that the built-in numerical serdes do not follow this 
for negative numbers");
               return KeyValueIterators.emptyIterator();
           }
   
           // since this function may not access the underlying inner store, we 
need to validate
           // if store is open outside as well.
           validateStoreOpen();
   
           final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
               wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
           if (context.cache() == null) {
               return underlyingIterator;
           }
   
           final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
               new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, 
false) :
               context.cache().reverseRange(
                   cacheName,
                   keyFrom == null ? null : 
cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)),
                   keyTo == null ? null : 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo))
               );
   
           final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo);
           final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
               new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
   
           return new MergedSortedCacheWindowStoreKeyValueIterator(
               filteredCacheIterator,
               underlyingIterator,
               bytesSerdes,
               windowSize,
               cacheFunction,
               false
           );
       }
   ```
   
   Here, we first hit the underlyingIterator anyways, and if there's no cache 
in the context we return it. If there is, cache, then it hits the underneath 
caching layer again. I felt this could have been inverted, no? And also, if we 
don't fix caching in this PR, then the results returned with and w/o caching 
would be different which might be confusing. Currently, that discrepancy in 
output is present only in InMemoryStores as that implements expiration. While 
the RocksDBStores return the same output with out without caching. So, I guess 
going by point 2 from Matthias above, we are ok to have the discrepancy for now 
and fix it later in a follow up PR, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to