Yes, there is no code overlap between storm-kafka and storm-kafka-client. The setup code is completely different. The code you posted is still using the storm-kafka classes (e.g. old SpoutConfig instead of new KafkaSpoutConfig), so you're still on the old spout. I'm a little surprised that code even compiled for you before, since the sbt you posted doesn't list storm-kafka anymore.
There's documentation for the new spout at https://storm.apache.org/releases/1.1.1/storm-kafka-client.html, and some example code at https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java. 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <[email protected]>: > Stig, > > This project is still in a POC status, so I can wipe and restart > anything...no big deal. Thanks! So, I am now seeing some build errors > when I try to run my assembly. Is there a different method for setting up > the KafkaSpout? > > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:15:8: object BrokerHosts is not a > member of package org.apache.storm.kafka > [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout, > SpoutConfig, ZkHosts} > [error] ^ > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:161:22: not found: type BrokerHosts > [error] val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08: > 2181,r7u09:2181") > [error] ^ > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:161:40: not found: type ZkHosts > [error] val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08: > 2181,r7u09:2181") > [error] ^ > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:166:28: not found: type SpoutConfig > [error] val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts, > pcapKafkaTopic,pcapZkRoot,pcapClientId) > [error] ^ > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:166:46: not found: type SpoutConfig > [error] val pcapKafkaConf: SpoutConfig = new SpoutConfig(zkHosts, > pcapKafkaTopic,pcapZkRoot,pcapClientId) > [error] ^ > [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/ > scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/ > microbatchSummariesTopology.scala:173:34: not found: type KafkaSpout > [error] val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf) > [error] ^ > [error] 6 errors found > > and here is the relevant part of my topology: > > // Set up the Kafka Spout > val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181") > val pcapKafkaTopic: String = "packets" > val pcapZkRoot: String = "/packets" > val pcapClientId: String = "PcapKafkaSpout" > > val pcapKafkaConf: SpoutConfig = new > SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId) > pcapKafkaConf.ignoreZkOffsets = true > pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true > pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime > pcapKafkaConf.outputStreamId = "packets" > pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme) > > val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf) > > // Generates list of PCAP files to process, periodically scanning a magic > directory for new files > topologyBuilder.setSpout("pcapKafkaSpout", > pcapKafkaSpout,PcapFileSpoutWorkers) > .setNumTasks(PcapFileSpoutTasks) > .setMemoryLoad(16 * 1024) > > > On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <[email protected]> > wrote: > >> Hi Aaron, >> >> Please note that the position of your storm-kafka consumers won't be >> migrated, so the new storm-kafka-client spout will start over on the topics >> you subscribe to. Do you need offsets migrated, or is it fine if the >> consumers start from scratch? >> >> I don't know whether HDP has special requirements but here's the general >> how-to for setting up storm-kafka-client: >> >> The storm-kafka-client library should work with any Kafka version >> 0.10.0.0 and above. You should use the latest version of >> storm-kafka-client, which is currently 1.1.1. If you want a decent number >> of extra fixes, you might consider building version 1.2.0 yourself from >> https://github.com/apache/storm/tree/1.x-branch/external/ >> storm-kafka-client, we are hoping to release this version before too >> long. Once you've declared a dependency on storm-kafka-client, you should >> also add a dependency on org.apache.kafka:kafka-clients, in the version >> that matches your Kafka broker version. >> >> Looking at your sbt you should not need org.apache.kafka:kafka on the >> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on >> storm-kafka-client are unnecessary as well. I don't believe those >> dependencies are part of the storm-kafka-client dependency tree. >> >> In case making those changes doesn't solve it, could you post a bit more >> of the stack trace you get? >> >> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <[email protected]>: >> >>> All, >>> >>> I have resurrected some old code that includes a Kafka Producer class as >>> well as a storm topology that includes a Kafka Spout to ingest the messages >>> coming from the aforementioned producer. >>> >>> I was using the storm-kafka library with Scala 2.11, but when I changed >>> to the newer code base, which is using Scala 2.12, I found that the older >>> library wouldn't work...thus I wanted to switch to the new >>> storm-kafka-client, but am not sure what version I should be using. here >>> is my build.sbt file, as well as the error messages I am seeing when the >>> Spout actually runs (my favorite assistant, Google, tells me it is due to a >>> version mismatch between Storm and Kafka). I am using HDP 2.6.2, for >>> whatever that is worth... >>> >>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC >>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/ >>> lang/String;) >>> >>> name := "apachestormstreaming" >>> >>> version := "0.1" >>> >>> scalaVersion := "2.12.4" >>> >>> scalacOptions := Seq("-unchecked", "-deprecation") >>> >>> resolvers ++= Seq( >>> "clojars" at "http://clojars.org/repo/", >>> "HDP" at "http://repo.hortonworks.com/content/repositories/releases/", >>> "Hortonworks Jetty" at >>> "http://repo.hortonworks.com/content/repositories/jetty-hadoop/" >>> ) >>> >>> libraryDependencies ++= Seq( >>> "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205", >>> "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided >>> exclude("org.slf4j", "slf4j-log4j12") >>> exclude("log4j","log4j"), >>> "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha", >>> "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha", >>> "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205" >>> exclude("org.slf4j", "slf4j-log4j12") >>> exclude("log4j","log4j") >>> exclude("commons-beanutils", "commons-beanutils-core") >>> exclude("commons-beanutils", "commons-beanutils") >>> exclude("commons-collections", "commons-collections"), >>> "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205" >>> exclude("org.apache.kafka","kafka-clients") >>> exclude("org.slf4j", "slf4j-log4j12") >>> exclude("log4j","log4j") >>> exclude("org.apache.zookeeper","zookeeper"), >>> "org.apache.kafka" % "kafka-clients" % "0.10.1.1", >>> "org.apache.kafka" %% "kafka" % "0.10.1.1" >>> exclude("org.slf4j", "slf4j-log4j12") >>> exclude("log4j","log4j") >>> exclude("org.apache.zookeeper","zookeeper"), >>> "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205" >>> exclude("org.slf4j","slf4j-log4j12") >>> exclude("log4j","log4j"), >>> "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205" >>> exclude("org.slf4j","slf4j-log4j12") >>> exclude("log4j","log4j"), >>> "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2", >>> "org.apache.commons" % "commons-lang3" % "3.6", >>> "org.influxdb" % "influxdb-java" % "2.7" >>> ) >>> >>> assemblyMergeStrategy in assembly := { >>> case PathList(ps @ _*) if ps.last endsWith "project.clj" => >>> MergeStrategy.rename >>> case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class" >>> => >>> MergeStrategy.rename >>> case PathList("org","apache","http", _ @ _*) => >>> MergeStrategy.first >>> case PathList("org","apache","commons","codec", _ @ _*) => >>> MergeStrategy.first >>> case PathList("io","netty", _ @ _*) => >>> MergeStrategy.last >>> case PathList(ps @ _*) if ps.last equalsIgnoreCase >>> "io.netty.versions.properties" => >>> MergeStrategy.first >>> case PathList(ps @ _*) if ps.last equalsIgnoreCase >>> "libnetty-transport-native-epoll.so" => >>> MergeStrategy.first >>> case x => val oldStrategy = (assemblyMergeStrategy in assembly).value >>> oldStrategy(x) >>> } >>> >>> >> >
