Hi Mario, Try to include this to your libraryDependencies (in your sbt file):
"org.apache.kafka" % "kafka_2.10" % "0.8.0" exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") exclude("org.slf4j", "slf4j-simple") Regards, *--Flávio R. Santos* Chaordic | *Platform* *www.chaordic.com.br <http://www.chaordic.com.br/>* +55 48 3232.3200 On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli < mario.pastore...@teralytics.ch> wrote: > Thanks akhil for the answer. > > I am using sbt assembly and the build.sbt is in the first email. Do you > know why those classes are included in that way? > > > Thanks, > Mario > > > On 11.12.2014 14:51, Akhil Das wrote: > > Yes. You can do/use *sbt assembly* and create a big fat jar with all > dependencies bundled inside it. > > Thanks > Best Regards > > On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli < > mario.pastore...@teralytics.ch> wrote: > >> In this way it works but it's not portable and the idea of having a fat >> jar is to avoid exactly this. Is there any system to create a >> self-contained portable fatJar? >> >> >> On 11.12.2014 13:57, Akhil Das wrote: >> >> Add these jars while creating the Context. >> >> val sc = new SparkContext(conf) >> >> >> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/ >> *spark-streaming-kafka_2.10-1.1.0.jar*") >> sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/ >> *zkclient-0.3.jar*") >> >> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/ >> *metrics-core-2.2.0.jar*") >> >> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/ >> *kafka_2.10-0.8.0.jar*") >> >> val ssc = new StreamingContext(sc, Seconds(10)) >> >> >> Thanks >> Best Regards >> >> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli < >> mario.pastore...@teralytics.ch> wrote: >> >>> Hi, >>> >>> I'm trying to use spark-streaming with kafka but I get a strange error >>> on class that are missing. I would like to ask if my way to build the fat >>> jar is correct or no. My program is >>> >>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum, >>> kafkaGroupId, kafkaTopicsWithThreads) >>> .map(_._2) >>> >>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition { >>> iter:Iterator[CellWithLAC] => >>> println("time: " ++ t.toString ++ " #received: " ++ >>> iter.size.toString) >>> }) >>> >>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0 >>> plugin) is >>> >>> name := "spark_example" >>> >>> version := "0.0.1" >>> >>> scalaVersion := "2.10.4" >>> >>> scalacOptions ++= Seq("-deprecation","-feature") >>> >>> libraryDependencies ++= Seq( >>> "org.apache.spark" % "spark-streaming_2.10" % "1.1.1", >>> "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1", >>> "joda-time" % "joda-time" % "2.6" >>> ) >>> >>> assemblyMergeStrategy in assembly := { >>> case p if p startsWith "com/esotericsoftware/minlog" => >>> MergeStrategy.first >>> case p if p startsWith "org/apache/commons/beanutils" => >>> MergeStrategy.first >>> case p if p startsWith "org/apache/" => MergeStrategy.last >>> case "plugin.properties" => MergeStrategy.discard >>> case p if p startsWith "META-INF" => MergeStrategy.discard >>> case x => >>> val oldStrategy = (assemblyMergeStrategy in assembly).value >>> oldStrategy(x) >>> } >>> >>> I create the jar with sbt assembly and the run with >>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main >>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181 >>> test-consumer-group test1 >>> >>> where master:7077 is the spark master, localhost:2181 is zookeeper, >>> test-consumer-group is kafka groupid and test1 is the kafka topic. The >>> program starts and keep running but I get an error and nothing is printed. >>> In the log I found the following stack trace: >>> >>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection >>> from [10.0.3.1/10.0.3.1:54325] >>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection >>> to [jpl-devvax/127.0.1.1:38767] >>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to >>> [jpl-devvax/127.0.1.1:38767], 1 messages pending >>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added >>> broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free: >>> 265.4 MB) >>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver >>> for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602 >>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver >>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1 >>> at >>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown >>> Source) >>> at >>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown >>> Source) >>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) >>> at >>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown >>> Source) >>> at >>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown >>> Source) >>> at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source) >>> at >>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown >>> Source) >>> at >>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114) >>> at >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>> at >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>> at >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>> at >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) >>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> I have searched inside the fat jar and I found that that class is not in >>> it: >>> >>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep >>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector" >>> > >>> >>> The problem is the double dollar before anonfun: if you put only one >>> then the class is there: >>> >>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar | grep >>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector" >>> [...] >>> kafka/consumer/ZookeeperConsumerConnector.class >>> > >>> >>> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded >>> from the spark website. >>> >>> My question is: how can I solve this problem? I guess the problem is my >>> sbt script but I don't understand why. >>> >>> >>> Thanks, >>> Mario Pastorelli >>> >>> >> >> > >