Hi, I have a spark kafka streaming application that works when I run with a local spark context, but not with a remote one. My code consists of: 1. A spring-boot application that creates the context 2. A shaded jar file containing all of my spark code On my pc (windows), I have a spark (1.5.2) master and worker running (spark-1.5.2-bin-hadoop2.6). The entry point for my application is the start() method. The code is: @throws(classOf[Exception]) def start { val ssc: StreamingContext = createStreamingContext val messagesRDD = createKafkaDStream(ssc, "myTopic", 2) def datasRDD = messagesRDD.map((line : String) => MapFunctions.lineToSparkEventData(line)) def count = datasRDD.count() datasRDD.print(1) ssc.start ssc.awaitTermination } private def createStreamingContext: StreamingContext = { System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir) System.setProperty("spark.streaming.concurrentJobs", String.valueOf(configContainer.getStreamingConcurrentJobs)) def sparkConf = createSparkConf() val ssc = new StreamingContext(sparkConf, Durations.seconds(configContainer.getStreamingContextDurationSeconds)) ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context start") ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "real_time_pool") ssc } private def createSparkConf() : SparkConf = { def masterString = "spark://<<my_pc>>:7077" def conf = new SparkConf().setMaster(masterString).setAppName("devAppRem") // This is not working // def conf = new SparkConf().setMaster("local[4]").setAppName("devAppLocal") // This IS working conf.set("spark.scheduler.allocation.file", "D:\\valid_path_to\\fairscheduler.xml"); val pathToShadedApplicationJar: String = configContainer.getApplicationJarPaths.get(0) val jars: Array[String] = Array[String](pathToShadedApplicationJar) conf.setJars(jars) conf.set("spark.scheduler.mode", "FAIR") } private def createKafkaDStream(ssc: StreamingContext, topics: String, numThreads: Int): DStream[String] = { val zkQuorum: String = configContainer.getZkQuorum val groupId: String = configContainer.getGroupId val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) lines } } The Error that I get is: 2015-11-18 12:58:33.755 WARN 6132 --- [result-getter-2] o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 2.0 (TID 70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) ... 19 more
I’m building using maven to create a shaded jar file. Properties in the parent pom are: <properties> <springframework.version>3.2.13.RELEASE</springframework.version> <springframework.integration.version>2.2.6.RELEASE</springframework.integration.version> <ibm.mq.version>7.0.1.3</ibm.mq.version> <hibernate.version>3.5.6-Final</hibernate.version> <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version> <validation-api.version>1.0.0.GA</validation-api.version> <hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version> <jackson.version>1.8.5</jackson.version> <powermock.version>1.4.12</powermock.version> <scala.binary.version>2.10</scala.binary.version> <spark.version>1.5.2</spark.version> <scala.version>2.10.5</scala.version> </properties> The pom file for the shaded jar is: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>my-spark-client</artifactId> ... <build> <finalName>my-spark-client</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>org/datanucleus/**</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> </transformers> <artifactSet> <excludes> <exclude>classworlds:classworlds</exclude> <exclude>junit:junit</exclude> <exclude>jmock:*</exclude> </excludes> </artifactSet> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <version>1.0</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> <filereports>WDF TestSuite.txt</filereports> </configuration> <executions> <execution> <id>test</id> <goals> <goal>test</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.0</version> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>compile-tests-scala</id> <phase>compile</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> </dependency> ... ... <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>0.8.2.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>javax.servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>javax.servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>jcl-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> </dependencies> .... </project> org.apache.spark.streaming.kafka.KafkaReceiver is inside the spark-streaming-kafka jar file, so I’m not sure why I get the ClassNotFoundException. Please help. Thanks, Tim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kafka-streaminf-1-5-2-ClassNotFoundException-org-apache-spark-streaming-kafka-KafkaReceiver-tp25408.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org