Hi Aaron, Please note that the position of your storm-kafka consumers won't be migrated, so the new storm-kafka-client spout will start over on the topics you subscribe to. Do you need offsets migrated, or is it fine if the consumers start from scratch?
I don't know whether HDP has special requirements but here's the general how-to for setting up storm-kafka-client: The storm-kafka-client library should work with any Kafka version 0.10.0.0 and above. You should use the latest version of storm-kafka-client, which is currently 1.1.1. If you want a decent number of extra fixes, you might consider building version 1.2.0 yourself from https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka-client, we are hoping to release this version before too long. Once you've declared a dependency on storm-kafka-client, you should also add a dependency on org.apache.kafka:kafka-clients, in the version that matches your Kafka broker version. Looking at your sbt you should not need org.apache.kafka:kafka on the classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on storm-kafka-client are unnecessary as well. I don't believe those dependencies are part of the storm-kafka-client dependency tree. In case making those changes doesn't solve it, could you post a bit more of the stack trace you get? 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <[email protected]>: > All, > > I have resurrected some old code that includes a Kafka Producer class as > well as a storm topology that includes a Kafka Spout to ingest the messages > coming from the aforementioned producer. > > I was using the storm-kafka library with Scala 2.11, but when I changed to > the newer code base, which is using Scala 2.12, I found that the older > library wouldn't work...thus I wanted to switch to the new > storm-kafka-client, but am not sure what version I should be using. here > is my build.sbt file, as well as the error messages I am seeing when the > Spout actually runs (my favorite assistant, Google, tells me it is due to a > version mismatch between Storm and Kafka). I am using HDP 2.6.2, for > whatever that is worth... > > java.lang.*NoSuchMethodError*: kafka.javaapi.consumer. > SimpleConsumer.<init>(Ljava/lang/String;IIILjava/lang/ > String;Ljava/lang/String;) > > name := "apachestormstreaming" > > version := "0.1" > > scalaVersion := "2.12.4" > > scalacOptions := Seq("-unchecked", "-deprecation") > > resolvers ++= Seq( > "clojars" at "http://clojars.org/repo/", > "HDP" at "http://repo.hortonworks.com/content/repositories/releases/", > "Hortonworks Jetty" at > "http://repo.hortonworks.com/content/repositories/jetty-hadoop/" > ) > > libraryDependencies ++= Seq( > "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205", > "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided > exclude("org.slf4j", "slf4j-log4j12") > exclude("log4j","log4j"), > "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha", > "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha", > "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205" > exclude("org.slf4j", "slf4j-log4j12") > exclude("log4j","log4j") > exclude("commons-beanutils", "commons-beanutils-core") > exclude("commons-beanutils", "commons-beanutils") > exclude("commons-collections", "commons-collections"), > "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205" > exclude("org.apache.kafka","kafka-clients") > exclude("org.slf4j", "slf4j-log4j12") > exclude("log4j","log4j") > exclude("org.apache.zookeeper","zookeeper"), > "org.apache.kafka" % "kafka-clients" % "0.10.1.1", > "org.apache.kafka" %% "kafka" % "0.10.1.1" > exclude("org.slf4j", "slf4j-log4j12") > exclude("log4j","log4j") > exclude("org.apache.zookeeper","zookeeper"), > "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205" > exclude("org.slf4j","slf4j-log4j12") > exclude("log4j","log4j"), > "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205" > exclude("org.slf4j","slf4j-log4j12") > exclude("log4j","log4j"), > "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2", > "org.apache.commons" % "commons-lang3" % "3.6", > "org.influxdb" % "influxdb-java" % "2.7" > ) > > assemblyMergeStrategy in assembly := { > case PathList(ps @ _*) if ps.last endsWith "project.clj" => > MergeStrategy.rename > case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" => > MergeStrategy.rename > case PathList("org","apache","http", _ @ _*) => > MergeStrategy.first > case PathList("org","apache","commons","codec", _ @ _*) => > MergeStrategy.first > case PathList("io","netty", _ @ _*) => > MergeStrategy.last > case PathList(ps @ _*) if ps.last equalsIgnoreCase > "io.netty.versions.properties" => > MergeStrategy.first > case PathList(ps @ _*) if ps.last equalsIgnoreCase > "libnetty-transport-native-epoll.so" => > MergeStrategy.first > case x => val oldStrategy = (assemblyMergeStrategy in assembly).value > oldStrategy(x) > } > >
