[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229867769


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+private void createBuffer(final Duration grace) {
+final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();
+buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing");
+buffer.setSerdesIfNull(serdeGetter);
+buffer.init((StateStoreContext) context, store);
+}
+
+private void pipeRecord(final String key, final String value, final long 
time) {
+final Record record = new Record<>(key, value, time);
+context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, 
"testing", new RecordHeaders()));
+buffer.put(time, record, context.recordContext());
+}
+
+@Test
+public void shouldPutInBufferAndUpdateFields() {
+createBuffer(Duration.ofMinutes(1));
+assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
+pipeRecord("1", "0", 0L);
+assertNumSizeAndTimestamp(buffer, 1, 0, 42);
+pipeRecord("3", "0", 2L);
+assertNumSizeAndTimestamp(buffer, 2, 0, 84);
+

Review Comment:
   ```suggestion
   ```



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may 

[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229370444


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RocksDBTimeOrderedKeyValueBufferTest {
+public RocksDBTimeOrderedKeyValueBuffer buffer;
+@Mock
+public SerdeGetter serdeGetter;
+public InternalProcessorContext context;
+public StreamsMetricsImpl streamsMetrics;
+@Mock
+public Sensor sensor;
+public long offset;
+
+@Before
+public void setUp() {
+when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
+when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
+final Metrics metrics = new Metrics();
+offset = 0;
+streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
+context = new 
MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
+}
+
+public void createJoin(final Duration grace) {
+final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDbTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();
+buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing");
+buffer.setSerdesIfNull(serdeGetter);
+store.init((StateStoreContext) context, store);
+buffer.init((StateStoreContext) context, store);
+}
+
+private void pipeRecord(final String key, final String value, final long 
time) {
+final Record record = new Record<>(key, value, time);
+context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, 
"testing", new RecordHeaders()));
+buffer.put(time, record, context.recordContext());
+}
+
+@Test
+public void shouldAddAndEvictRecord() {
+createJoin(Duration.ZERO);
+final AtomicInteger count = new AtomicInteger(0);
+pipeRecord("1", "0", 0L);
+buffer.evictWhile(() -> buffer.numRecords() > 0, r -> 
count.getAndIncrement());
+assertThat(count.get(), equalTo(1));
+}

Review Comment:
   Could you please add verifications that `numRecords()`, `bufferSize()`, and 
`minTimestamp()` are updated correctly? 



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the 

[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-14 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1229288396


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+if (wrapped().observedStreamTime - gracePeriod > 
minTimestamp) {
+return;
+}
+
+final V value = 
valueSerde.deserializer().deserialize(topic, bufferValue.newValue());
+
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize 

[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-13 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1227894014


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {

Review Comment:
   @wcarlson5 Sorry for commenting again on the segmented state store but I 
really want to understand since they are quite new to me. 
   As far as I understand, you extend the 
`AbstractRocksDBTimeOrderedSegmentedBytesStore` which is a state store that 
stores records in timestamp-order into segments and it has an index on the 
record key. It seems to me that you do not use the segments nor the index. I 
understand that `AbstractRocksDBTimeOrderedSegmentedBytesStore` implements 
interface `SegmentedByteStore` which gives you the ability to fetch data by 
time ranges. Furthermore, `AbstractRocksDBTimeOrderedSegmentedBytesStore` 
maintains the observed stream time which is also something that comes in handy 
for your PR.
   
   My proposal is to hide the segmented aspect of the state store as much as 
possible. More precisely: 
   - Rename `RocksDBTimeOrderedKeyValueSegmentedBytesStore` to 
`RocksDBTimeOrderedKeyValueBytesStore`
   - Do not expose `retention` and `segmentInterval` in 
`RocksDBTimeOrderedKeyValueBytesStore`'s constructor and in the corresponding 
supplier.
   - Set both `retention` and `segmentInterval` in the call to the super 
constructor to high values as @vcrfxia suggested.
   
   In such a way, you can use the features you want to use from the time 
ordered segmented state store and at the same hide all the aspects that you do 
not need behind your abstraction. WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-12 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1226821666


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private Serde valueSerde;
+private final String topic;
+private int seqnum;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+seqnum = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? getter.valueSerde() : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+while (iterator.hasNext() && predicate.get()) {
+keyValue = iterator.next();
+
+final BufferValue bufferValue = 
BufferValue.deserialize(ByteBuffer.wrap(keyValue.value));
+final K key = keySerde.deserializer().deserialize(topic,
+
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(keyValue.key.get()));
+minTimestamp = bufferValue.context().timestamp();
+
+if (wrapped().observedStreamTime - gracePeriod > 
minTimestamp) {
+return;
+}
+
+V value = valueSerde.deserializer().deserialize(topic, 
bufferValue.newValue());
+
+if (bufferValue.context().timestamp() != minTimestamp) {
+throw new IllegalStateException(
+"minTimestamp [" + minTimestamp + "] did not match 
the actual min timestamp [" +
+bufferValue.context().timestamp() + "]"
+);
+}
+
+callback.accept(new Eviction(key, value, 
bufferValue.context()));
+
+wrapped().remove(keyValue.key);
+numRecords--;
+bufferSize = 

[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-08 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1223073798


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private final String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+if (iterator.hasNext()) {
+keyValue = iterator.next();
+} else {
+if (numRecords() == 0) {

Review Comment:
   Why do you not directly access the field?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package