[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-27 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244739834


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   Sure that is just fine with me. I'll add a test adding records out of the 
grace period to



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
 }
 }
 
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+builder = new StreamsBuilder();
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table("tableTopic2", consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+processor = supplier.theCapturedProcessor();
+}
+
+@Test
+public void shouldFailIfTableIsNotVersioned() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+() -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+assertThat(
+exception.getMessage(),
+is("KTable must be versioned to use a grace period in a stream 
table join.")
+);
+}
+
+@Test
+public void shouldDelayJoinByGracePeriod() {
+makeJoin(Duration.ofMillis(2));
+
+// push four items to the table. this should not produce any item.
+pushToTableNonRandom(4, "Y");
+processor.checkAndClearProcessResult(EMPTY);
+
+// push all four items to the primary stream. this should produce two 
items.
+pushToStream(4, "X");
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "X0+Y0", 0),
+new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+// push all items to the table. this should not produce any item
+pushToTableNonRandom(4, "YY");
+processor.checkAndClearP

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-27 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244730962


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);
+}
+}
+
+private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) 
{
+final Record record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+.withHeaders(toEmit.recordContext().headers());
+internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   certainly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-27 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244729657


##
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##
@@ -189,7 +221,22 @@ public Joined withOtherValueSerde(final 
Serde otherValueSerde) {
  */
 @Override
 public Joined withName(final String name) {
-return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+}
+
+/**
+ * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed. Out of order records in the grace period will 
be processed in timestamp order. Late records, out of the grace period, will be 
executed right as they come in, if it is past the table history retention this 
could result in joins on the wrong version or a null join. Long gaps in stream 
side arriving records will cause records to be delayed in processing, even 
resulting in be processed out of the grace period window.

Review Comment:
   updated and clarified 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-22 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1238801056


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,60 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+//cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)

Review Comment:
   Yeah we can remove



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1256,10 +1261,25 @@ private  KStream doStreamTableJoin(final 
KTable table,
 final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
 final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+Optional> buffer = Optional.empty();
+
+if (joined.gracePeriod() != null) {

Review Comment:
   This seems to be the main sticking point of the PR. 
   
   The thought was that this that we would need some way to stop using the 
grace period without loosing records, but you bring up a good point.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -122,12 +122,12 @@ public Maybe> 
priorValueForBuffered(final K key) {
 }
 
 @Override
-public void put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {
+public boolean put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {

Review Comment:
   added `shouldReturnIfRecordWasAdded`



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-19 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1234687152


##
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   Also https://github.com/apache/kafka/pull/13878



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-19 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232839068


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+private final static KeyValueTimestamp[] EMPTY = new 
KeyValueTimestamp[0];
+
+private final String streamTopic = "streamTopic";
+private final String tableTopic = "tableTopic";
+private TestInputTopic inputStreamTopic;
+private TestInputTopic inputTableTopic;
+private final int[] expectedKeys = {0, 1, 2, 3};
+
+private MockApiProcessor processor;
+private TopologyTestDriver driver;

Review Comment:
   So I was following the normal KStreamKTableJoinTest. I think its best if 
they aline with each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-16 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +75,63 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (buffer.isPresent()) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   If you find it easier to understand sure. that is fine



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1256,10 +1261,25 @@ private  KStream doStreamTableJoin(final 
KTable table,
 final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
 final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+Optional> buffer = Optional.empty();
+
+if (joined.gracePeriod() != null) {

Review Comment:
   I see what you mean but we actually were going to create the store and leave 
it empty for zero duration. The point was to make it easier to change the grace 
period if desired so the store isn't getting created and destroyed. Something 
about making it more backward compatible.



##
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   yep sure.
   
   (Edit: The class it extents also uses junit 4 and I cant change this one 
without also chaining that and all other join integration tests)



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +75,63 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (buffer.isPresent()) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+//cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timesta