[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-07 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501435613



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -122,7 +158,7 @@ public boolean setFlushListener(final 
CacheFlushListener, V> listene
 @Override
 public void put(final K key,
 final V value) {
-put(key, value, context.timestamp());
+put(key, value, context != null ? context.timestamp() : 0L);

Review comment:
   Yeah, it's not something I normally like to do, either. In this case, 
though, it's necessary. The thing is that all our internal StateStoreContexts 
are InternalProcessorContext implementations, and therefore, they are also 
ProcessorContext implementations, so they have a `timestamp()` method.
   
   The thing that makes this unavoidable is that it's ok for users to `init` a 
state store using the MockProcessorContext we provide for them in `test-utils`. 
This is a bit of a bleed-over from the _next_ pr, which I'm still finishing up, 
but it's better if we keep their context "pure". I.e., I'm going to propose to 
add a new context that's _just_ an `api.ProcessorContext` and a separate 
implementation that _just_ a `StateStoreContext`. We should discuss on that PR 
whether that's really the best way to present it, but if you ultimately agree, 
then it means we have to expect a null context here.
   
   Note that the only functionality it affects is the recording of metrics that 
probably don't matter in unit tests and this stub behavior for a deprecated 
method that people shouldn't be using.
   
   If after reviewing the next PR, we do wind up converging the 
implementations, I'll come back and undo these checks here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-07 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501433923



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -83,14 +85,40 @@
 this.valueSerde = valueSerde;
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
-this.context = context;
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
 taskId = context.taskId().toString();
 initStoreSerde(context);
 streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+registerMetrics();
+final Sensor restoreSensor =
+StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, 
name(), streamsMetrics);
+
+// register and possibly restore the state from the logs
+maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
+}
+
+@Override
+public void init(final StateStoreContext context,
+ final StateStore root) {
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
+taskId = context.taskId().toString();
+initStoreSerde(context);
+streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+registerMetrics();
+final Sensor restoreSensor =

Review comment:
   Actually, I'll do that in a quick follow-up PR, so I can go ahead and 
merge this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-07 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501432967



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -83,14 +85,40 @@
 this.valueSerde = valueSerde;
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
-this.context = context;
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
 taskId = context.taskId().toString();
 initStoreSerde(context);
 streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+registerMetrics();
+final Sensor restoreSensor =
+StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, 
name(), streamsMetrics);
+
+// register and possibly restore the state from the logs
+maybeMeasureLatency(() -> super.init(context, root), time, 
restoreSensor);
+}
+
+@Override
+public void init(final StateStoreContext context,
+ final StateStore root) {
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
+taskId = context.taskId().toString();
+initStoreSerde(context);
+streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+registerMetrics();
+final Sensor restoreSensor =

Review comment:
   huh. I'll double-check and take it out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-07 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501432866



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##
@@ -29,7 +29,9 @@
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;

Review comment:
   Oh, probably just overlooked it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

2020-10-06 Thread GitBox


vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r500715702



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##
@@ -34,6 +34,7 @@
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link 
Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
+@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437

Review comment:
   This ticket needs to go in to 2.7.0 also, but I split it out for 
reviewability.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
##
@@ -45,12 +46,19 @@ public void flush() {
 throw new UnsupportedOperationException(ERROR_MESSAGE);
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
 throw new UnsupportedOperationException(ERROR_MESSAGE);
 }
 
+@Override
+public void init(final StateStoreContext context,
+ final StateStore root) {
+throw new UnsupportedOperationException(ERROR_MESSAGE);

Review comment:
   There are going to be a lot of duplicated init methods. It's not great, 
but hopefully we can drop the old API before too long.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final 
ProcessorContext context)
 return (StreamsMetricsImpl) context.metrics();
 }
 
+/**
+ * Should be removed as part of KAFKA-10217
+ */
+public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext 
context) {
+return (StreamsMetricsImpl) context.metrics();
+}
+
 public static String changelogFor(final ProcessorContext context, final 
String storeName) {
 return context instanceof InternalProcessorContext
 ? ((InternalProcessorContext) context).changelogFor(storeName)
 : 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
 }
+
+public static String changelogFor(final StateStoreContext context, final 
String storeName) {
+return context instanceof InternalProcessorContext
+? ((InternalProcessorContext) context).changelogFor(storeName)
+: 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+}
+
+public static InternalProcessorContext asInternalProcessorContext(final 
ProcessorContext context) {
+if (context instanceof InternalProcessorContext) {
+return (InternalProcessorContext) context;
+} else {
+throw new IllegalArgumentException(
+"This component requires internal features of Kafka Streams 
and must be disabled for unit tests."
+);
+}
+}

Review comment:
   I replaced a lot of casts with this checked-cast method, which also lets 
us get rid of a lot of similar cast-checking blocks, which were inconsistently 
applied.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -83,14 +85,40 @@
 this.valueSerde = valueSerde;
 }
 
+@Deprecated
 @Override
 public void init(final ProcessorContext context,
  final StateStore root) {
-this.context = context;
+this.context = context instanceof InternalProcessorContext ? 
(InternalProcessorContext) context : null;
 taskId = context.taskId().toString();
 initStoreSerde(context);
 streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+registerMetrics();

Review comment:
   I wasn't able to extract out quite as much common code in the Metered 
implementations because they need to work regardless of whether the context is 
an InternalProcessorContext or whether it's a straight mock (for unit tests).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##
@@ -65,7 +65,11 @@
  *
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
+ * @deprecated Since 2.7.0. Callers should invoke {@link 
this#init(StateStoreContext, StateStore)} instead.
+ * Implementers may choose to implement this method for 
backward compatibility or to throw an
+ * informative exception instead.
  */
+@Deprecated

Review comment:
   Adding the deprecation tag right now lets us be sure we encountered all 
places this method appears in the codebase.

##
File path: