Hi all, I’m trying to experiment with Kotlin and Streams, and I’m encountering an error:
$ kotlinc -cp kafka-streams-0.11.0.0-cp1.jar:kafka-clients-0.11.0.0-cp1.jar:slf4j-api-1.7.25.jar -jvm-target 1.8 Welcome to Kotlin version 1.1.4-3 (JRE 1.8.0_141-b15) Type :help for help, :quit for quit >>> import org.apache.kafka.streams.StreamsConfig >>> val config = hashMapOf("application.id" to "t2p.normalizer", >>> "bootstrap.servers" to "localhost:9092") >>> StreamsConfig(config) SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. java.lang.ExceptionInInitializerError Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.Serdes$ByteArraySerde for configuration default.key.serde: Class org.apache.kafka.common.serialization.Serdes$ByteArraySerde could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:944) at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:137) at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:157) at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:196) at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:358) at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:371) at org.apache.kafka.streams.StreamsConfig.<clinit>(StreamsConfig.java:270) at Line_2.<init>(Unknown Source) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.jetbrains.kotlin.cli.common.repl.GenericReplEvaluator$eval$1$scriptInstance$1.invoke(GenericReplEvaluator.kt:93) <cut> I’ve put the entire thing into this gist: https://gist.github.com/aviflax/d33ac2fbd77882fcb80fe8d3fd26feda I took a look at ConfigDef.java, and I _think_ the error is being thrown from line 706: https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L706 which is: return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()); so I tried running that in the Kotlin REPL: >>> import org.apache.kafka.common.utils.Utils >>> Class.forName("org.apache.kafka.common.serialization.Serdes\$ByteArraySerde", >>> true, Utils.getContextOrKafkaClassLoader()) java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.Serdes$ByteArraySerde at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) (that’s the whole trace) I thought maybe I was missing a JAR file, but this works fine: >>> Class.forName("org.apache.kafka.common.serialization.Serdes\$ByteArraySerde") class org.apache.kafka.common.serialization.Serdes$ByteArraySerde So I guess this has something to do with the ClassLoader being returned by Utils.getContextOrKafkaClassLoader() But when I call that, the result looks sane to me: >>> import org.apache.kafka.common.utils.Utils >>> Utils.getContextOrKafkaClassLoader() SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. sun.misc.Launcher$AppClassLoader@55f96302 Not that I know much about class loaders, but “AppClassLoader” sounds at least somewhat reasonable. So if anyone has any ideas on what’s going on here, I’d very much appreciate any help! Thanks, Avi ———— Software Architect @ Park Assist » http://tech.parkassist.com/