Stig,
This project is still in a POC status, so I can wipe and restart
anything...no big deal. Thanks! So, I am now seeing some build errors
when I try to run my assembly. Is there a different method for setting up
the KafkaSpout?
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:15:8:
object BrokerHosts is not a member of package org.apache.storm.kafka
[error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
SpoutConfig, ZkHosts}
[error] ^
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:22:
not found: type BrokerHosts
[error] val zkHosts: BrokerHosts = new
ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error] ^
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:161:40:
not found: type ZkHosts
[error] val zkHosts: BrokerHosts = new
ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
[error] ^
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:28:
not found: type SpoutConfig
[error] val pcapKafkaConf: SpoutConfig = new
SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error] ^
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:166:46:
not found: type SpoutConfig
[error] val pcapKafkaConf: SpoutConfig = new
SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
[error] ^
[error]
/Users/mbossert/IdeaProjects/apachestormstreaming/src/main/scala/mil/darpa/netdefense/apachestormstreaming/storm/topologies/microbatchSummariesTopology.scala:173:34:
not found: type KafkaSpout
[error] val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
[error] ^
[error] 6 errors found
and here is the relevant part of my topology:
// Set up the Kafka Spout
val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
val pcapKafkaTopic: String = "packets"
val pcapZkRoot: String = "/packets"
val pcapClientId: String = "PcapKafkaSpout"
val pcapKafkaConf: SpoutConfig = new
SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
pcapKafkaConf.ignoreZkOffsets = true
pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
pcapKafkaConf.outputStreamId = "packets"
pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
// Generates list of PCAP files to process, periodically scanning a
magic directory for new files
topologyBuilder.setSpout("pcapKafkaSpout", pcapKafkaSpout,PcapFileSpoutWorkers)
.setNumTasks(PcapFileSpoutTasks)
.setMemoryLoad(16 * 1024)
On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <[email protected]> wrote:
> Hi Aaron,
>
> Please note that the position of your storm-kafka consumers won't be
> migrated, so the new storm-kafka-client spout will start over on the topics
> you subscribe to. Do you need offsets migrated, or is it fine if the
> consumers start from scratch?
>
> I don't know whether HDP has special requirements but here's the general
> how-to for setting up storm-kafka-client:
>
> The storm-kafka-client library should work with any Kafka version 0.10.0.0
> and above. You should use the latest version of storm-kafka-client, which
> is currently 1.1.1. If you want a decent number of extra fixes, you might
> consider building version 1.2.0 yourself from https://github.com/apache/
> storm/tree/1.x-branch/external/storm-kafka-client, we are hoping to
> release this version before too long. Once you've declared a dependency on
> storm-kafka-client, you should also add a dependency on
> org.apache.kafka:kafka-clients, in the version that matches your Kafka
> broker version.
>
> Looking at your sbt you should not need org.apache.kafka:kafka on the
> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on
> storm-kafka-client are unnecessary as well. I don't believe those
> dependencies are part of the storm-kafka-client dependency tree.
>
> In case making those changes doesn't solve it, could you post a bit more
> of the stack trace you get?
>
> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <[email protected]>:
>
>> All,
>>
>> I have resurrected some old code that includes a Kafka Producer class as
>> well as a storm topology that includes a Kafka Spout to ingest the messages
>> coming from the aforementioned producer.
>>
>> I was using the storm-kafka library with Scala 2.11, but when I changed
>> to the newer code base, which is using Scala 2.12, I found that the older
>> library wouldn't work...thus I wanted to switch to the new
>> storm-kafka-client, but am not sure what version I should be using. here
>> is my build.sbt file, as well as the error messages I am seeing when the
>> Spout actually runs (my favorite assistant, Google, tells me it is due to a
>> version mismatch between Storm and Kafka). I am using HDP 2.6.2, for
>> whatever that is worth...
>>
>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/lang/String;)
>>
>> name := "apachestormstreaming"
>>
>> version := "0.1"
>>
>> scalaVersion := "2.12.4"
>>
>> scalacOptions := Seq("-unchecked", "-deprecation")
>>
>> resolvers ++= Seq(
>> "clojars" at "http://clojars.org/repo/",
>> "HDP" at "http://repo.hortonworks.com/content/repositories/releases/",
>> "Hortonworks Jetty" at
>> "http://repo.hortonworks.com/content/repositories/jetty-hadoop/"
>> )
>>
>> libraryDependencies ++= Seq(
>> "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>> "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>> exclude("org.slf4j", "slf4j-log4j12")
>> exclude("log4j","log4j"),
>> "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>> "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>> "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>> exclude("org.slf4j", "slf4j-log4j12")
>> exclude("log4j","log4j")
>> exclude("commons-beanutils", "commons-beanutils-core")
>> exclude("commons-beanutils", "commons-beanutils")
>> exclude("commons-collections", "commons-collections"),
>> "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>> exclude("org.apache.kafka","kafka-clients")
>> exclude("org.slf4j", "slf4j-log4j12")
>> exclude("log4j","log4j")
>> exclude("org.apache.zookeeper","zookeeper"),
>> "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>> "org.apache.kafka" %% "kafka" % "0.10.1.1"
>> exclude("org.slf4j", "slf4j-log4j12")
>> exclude("log4j","log4j")
>> exclude("org.apache.zookeeper","zookeeper"),
>> "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>> exclude("org.slf4j","slf4j-log4j12")
>> exclude("log4j","log4j"),
>> "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>> exclude("org.slf4j","slf4j-log4j12")
>> exclude("log4j","log4j"),
>> "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>> "org.apache.commons" % "commons-lang3" % "3.6",
>> "org.influxdb" % "influxdb-java" % "2.7"
>> )
>>
>> assemblyMergeStrategy in assembly := {
>> case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>> MergeStrategy.rename
>> case PathList(ps @ _*) if ps.last endsWith "UnknownPacketFactory.class"
>> =>
>> MergeStrategy.rename
>> case PathList("org","apache","http", _ @ _*) =>
>> MergeStrategy.first
>> case PathList("org","apache","commons","codec", _ @ _*) =>
>> MergeStrategy.first
>> case PathList("io","netty", _ @ _*) =>
>> MergeStrategy.last
>> case PathList(ps @ _*) if ps.last equalsIgnoreCase
>> "io.netty.versions.properties" =>
>> MergeStrategy.first
>> case PathList(ps @ _*) if ps.last equalsIgnoreCase
>> "libnetty-transport-native-epoll.so" =>
>> MergeStrategy.first
>> case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>> oldStrategy(x)
>> }
>>
>>
>