mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661045444
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final
Object maxInFlightReque
public Serde defaultKeySerde() {
final Object keySerdeConfigSetting =
get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
try {
+ if (keySerdeConfigSetting == null) {
+ return null;
Review comment:
It's been a while, but I am still wondering if we should throw a
`ConfigException` directly instead of returning `null`? And this would be the
only place in the code when we throw a `ConfigException` for this case. Below,
there is some repetitive code that calls `defaultKeySerde` (or
`defaultValueSerd`) and throws `ConfigException` if `null` is returned.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -69,6 +70,9 @@
Optional.empty()
);
} catch (final Exception deserializationException) {
+ if (deserializationException instanceof ConfigException) {
Review comment:
Where does the `ConfigException` come from?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,20 +31,37 @@
public class WrappingNullableUtils {
@SuppressWarnings("unchecked")
- private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final
boolean isKey) {
- Deserializer<T> deserializerToUse = specificDeserializer;
- if (deserializerToUse == null) {
+ private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final ProcessorContext context, final
boolean isKey, final String name) {
+ final Deserializer<?> contextKeyDeserializer = context.keySerde() ==
null ? null : context.keySerde().deserializer();
+ final Deserializer<?> contextValueDeserializer = context.valueSerde()
== null ? null : context.valueSerde().deserializer();
+ final Deserializer<T> deserializerToUse;
+
+ if (specificDeserializer == null) {
deserializerToUse = (Deserializer<T>) (isKey ?
contextKeyDeserializer : contextValueDeserializer);
+ } else {
+ deserializerToUse = specificDeserializer;
+ }
+ if (deserializerToUse == null) {
+ final String serde = isKey ? "key" : "value";
+ throw new ConfigException("Failed to create deserializers. Please
specify a " + serde + " serde through produced or materialized, or set one
through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) +
"_SERDE_CLASS_CONFIG for node " + name);
Review comment:
Following the comments from above. If we throw in `context.keySerde()`
we can avoid this redundant code. Of course, we should only call
`context.keySerde()` if `specificDeserializer == null` for this case.
Similar below.
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1445,6 +1447,9 @@ public Serde defaultKeySerde() {
public Serde defaultValueSerde() {
final Object valueSerdeConfigSetting =
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
try {
+ if (valueSerdeConfigSetting == null) {
+ return null;
Review comment:
As above
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
@Override
public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer,
final Deserializer<T> defaultValueDeserializer) {
Review comment:
Could we pass the context instead of both deserializers, and simplify to
(relying on the context to throw if necessary):
```
if (inner == null) {
inner = context.valueSerde();
}
```
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
##########
@@ -54,10 +55,14 @@ public CombinedKeySchema(final Supplier<String>
foreignKeySerdeTopicSupplier,
public void init(final ProcessorContext context) {
primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
- primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>)
context.keySerde().serializer() : primaryKeySerializer;
- primaryKeyDeserializer = primaryKeyDeserializer == null ?
(Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
- foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>)
context.keySerde().serializer() : foreignKeySerializer;
- foreignKeyDeserializer = foreignKeyDeserializer == null ?
(Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
+ try {
+ primaryKeySerializer = primaryKeySerializer == null ?
(Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
+ primaryKeyDeserializer = primaryKeyDeserializer == null ?
(Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
+ foreignKeySerializer = foreignKeySerializer == null ?
(Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
+ foreignKeyDeserializer = foreignKeyDeserializer == null ?
(Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
+ } catch (final NullPointerException e) {
+ throw new ConfigException("Please specify a key serde or set one
through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");
Review comment:
If we let `context.keySerde` throw, there won't be a NPE and we also
don't need to throw `ConfigException` here expliclity, but delegate to
`keySerde() / valueSerde()` methods.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
valueSerdeTopic = valueSerdeTopicSupplier.get();
// get default key serde if it wasn't supplied directly at
construction
if (foreignKeySerializer == null) {
+ if (context.keySerde() == null) {
+ throw new ConfigException("Please specify a key serde or
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");
Review comment:
as above (same below)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -60,9 +60,12 @@ public void close() {
}
public void setIfUnset(final Serde<InnerK> defaultKeySerde, final
Serde<InnerV> defaultValueSerde) {
- Objects.requireNonNull(defaultKeySerde);
- Objects.requireNonNull(defaultValueSerde);
- serializer.setIfUnset(defaultKeySerde.serializer(),
defaultValueSerde.serializer());
- deserializer.setIfUnset(defaultKeySerde.deserializer(),
defaultValueSerde.deserializer());
+ if (defaultKeySerde != null && defaultValueSerde != null) {
Review comment:
Why is this a single condition? Should we not handle keySerde and
valueSerde independently? -- Atm, if one is null, but the other is not null, we
would execute the `else` and set _both_ to `null`; is this intentional?
Also, following the comment from above, should we pass in the context into
this method?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
@Override
public void setIfUnset(final Serializer<Void> defaultKeySerializer, final
Serializer<T> defaultValueSerializer) {
if (inner == null) {
- inner = Objects.requireNonNull(defaultValueSerializer);
+ if (defaultValueSerializer == null) {
Review comment:
as above (to avoid redundant code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]