Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-06-05 Thread via GitHub


mjsax commented on PR #15790:
URL: https://github.com/apache/kafka/pull/15790#issuecomment-2151048809

   Thanks for the PR @AyoubOm. Merged to `trunk` and cherry-picked to `3.8` 
branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-06-05 Thread via GitHub


mjsax merged PR #15790:
URL: https://github.com/apache/kafka/pull/15790


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-31 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1622840595


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Thanks everyone for your input ! I will make the change accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1621827532


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Good point -- I agree with Bruno, the catch block should be for just 
ConfigException or StreamsException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-30 Thread via GitHub


cadonna commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1620256284


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Why `RuntimeException`? Can't we catch `ConfigException` and 
`StreamsException` in one catch clause and then throw a `StreamsException`. 
This seems safer to me, because otherwise with future code changes that might 
throw exceptions like `IllegalStateException` we would wrap unexpected 
exception due to mistakes in Streams in `StreamsException`.
   
   ```java
   catch (final ConfigException | final StreamsException ex) {
   // handle the exception
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-29 Thread via GitHub


ableegoldman commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Can't say what it's original purpose was but StreamsException has definitely 
morphed into a catch-all for exceptions throughout Streams. It's definitely not 
exclusive to the state of a task though (that would be 
ProcessorStateException). The nice thing about StreamsException is you can add 
other useful metadata such as the taskId where the error originated, so I 
always prefer to just throw the StreamsException. We also know for a fact that 
StreamsException will be caught and handled properly as it gets bubbled up. So 
I'd go for merging this into a single `catch RuntimeException` block, then wrap 
it in a StreamsException. 
   
   And don't forget to add the task id too! 😄  (you can get it from the 
processor context)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-29 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1619318104


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   It's a corner case I would say -- also, we do wrap a log of exception as 
StreamsException throughout the whole code base, and given that we wrap the 
original `ConfigException` inside the StreamsException we would not lose 
information. It's just the idea to simplify the code a little bit.
   
   Let's hear what @cadonna or @ableegoldman think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-29 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1618387142


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   @mjsax the idea was to preserve the exception that was thrown, as I think 
StreamsException and ConfigException don't have the same nature. AFAIK 
StreamsException captures problems related to the state of the task. If you 
believe it's ok to throw StreamsException here, I can do the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-28 Thread via GitHub


mjsax commented on PR #15790:
URL: https://github.com/apache/kafka/pull/15790#issuecomment-2136299144

   Sorry for the long wait... KIP and feature freeze deadline for 3.8 got me 
busy. But this PR look overall good, and can easily get it merge before code 
freeze :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-28 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1618014606


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Wondering if we should merge both blocks and just catch `RuntimeException`? 
Seems to be duplicate code? And just pass in whatever exception we catch and 
rethrow as `new StreamsException(..., e);` -- this way we preserve the original 
exception and don't need to "awkwardly" add `e.getMessage()` for the first 
case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-28 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1618014606


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Wondering if we should merge both blocks and just catch `RuntimeException`? 
Seems to be duplicate code? And just pass in whatever exception we catch and 
rethrow as `new StreamsException(..., e);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-23 Thread via GitHub


AyoubOm commented on PR #15790:
URL: https://github.com/apache/kafka/pull/15790#issuecomment-2126424513

   @mjsax please let me know if the changes are fine for you ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496660


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() {
 
 final ConfigException se = assertThrows(ConfigException.class, () -> 
new TopologyTestDriver(topology));
 final String msg = se.getMessage();
-assertTrue("Error about class cast with serdes", 
msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));

Review Comment:
   recovered test



##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.StateSerdes;
+
+
+public class StoreSerdeInitializer {
+static  StateSerdes prepareStoreSerde(final StateStoreContext 
context, final String storeName,
+  final String 
changelogTopic, final Serde keySerde,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496472


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));

Review Comment:
   added original message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596492735


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());

Review Comment:
   Nice catch ! Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596366478


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   `prepareSerializer` calls `context.keySerde` which calls 
`config.defaultKeySerde()`. The latter can throw both exceptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596366478


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   prepareSerializer calls context.keySerde which calls 
config.defaultKeySerde(). The latter can throw both exceptions



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   `prepareSerializer` calls `context.keySerde` which calls 
`config.defaultKeySerde().` The latter can throw both exceptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-09 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596180605


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.StateSerdes;
+
+
+public class StoreSerdeInitializer {
+static  StateSerdes prepareStoreSerde(final StateStoreContext 
context, final String storeName,
+  final String 
changelogTopic, final Serde keySerde,

Review Comment:
   nit formatting. We should have a single parameter per line, not multiple 
(both line above) -- also below



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));

Review Comment:
   Should we somehow preserve `e.getMessage()` -- it seems useful?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());

Review Comment:
   I did dig into `prepareKeySerializer` and `prepareValueSerializer` which 
both use `WrappingNullableUtils#prepareSerializer()` which might call both 
`context.keySerde()` and `context.valueSerde()`, and thus, I believe we could 
currently get an exception when trying to get the key serde, even if default 
key serde is set, but default value serde is not set?
   
   I think this internal helper method needs some updated, too.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Why are we catching `StreamsException`?  Seems the only exception that might 
bubble up her is a `ConfigException`?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() {
 
 final ConfigException se = assertThrows(ConfigException.c

Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-04 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1589953917


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##
@@ -173,21 +172,15 @@ protected Serde prepareValueSerdeForStore(final 
Serde valueSerde, final Se
 protected void initStoreSerde(final ProcessorContext context) {
 final String storeName = name();
 final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
-serdes = new StateSerdes<>(
-changelogTopic,
-prepareKeySerde(keySerde, new SerdeGetter(context)),
-prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
-);
+serdes = StoreSerdeInitializer.prepareStoreSerde(
+context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerdeForStore);
 }
 
 protected void initStoreSerde(final StateStoreContext context) {
 final String storeName = name();
 final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
-serdes = new StateSerdes<>(
-changelogTopic,
-prepareKeySerde(keySerde, new SerdeGetter(context)),
-prepareValueSerdeForStore(valueSerde, new SerdeGetter(context))
-);
+serdes = StoreSerdeInitializer.prepareStoreSerde(
+context, storeName, changelogTopic, keySerde, valueSerde, 
this::prepareValueSerdeForStore);

Review Comment:
   I added a parameter function prepareValueSerde to be able to use the correct 
function in children TimestampedStore classes. These don't directly use 
`WrappingNullableUtils.prepareValueSerde` by overriding the behavior of 
prepareValueSerdeForStore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-04 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1589952122


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,14 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize 
serdes for sink node %s", name()), e);

Review Comment:
   Good idea, I moved the exception handling in a separate class to share the 
code between the store classes, with corresponding unit tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-01 Thread via GitHub


mjsax commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1587028687


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,14 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize 
serdes for sink node %s", name()), e);

Review Comment:
   Should we split this up further, and have two try-catch blocks, one for the 
key, and one for the value, to narrow it down further and add key/value as 
information to the error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-04-25 Thread via GitHub


AyoubOm commented on PR #15790:
URL: https://github.com/apache/kafka/pull/15790#issuecomment-2077173169

   ping @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org