Are you creating a fat assembly jar with spark-streaming-kafka included and using that to run your code? If yes, I am not sure why it is not finding it. If not, then make sure that your framework places the spark-stremaing-kafka jara in the runtime classpath.
On Tue, Nov 17, 2015 at 6:04 PM, tim_b123 <tim.barth...@iag.com.au> wrote: > > Hi, > > I have a spark kafka streaming application that works when I run with a > local spark context, but not with a remote one. > > My code consists of: > 1. A spring-boot application that creates the context > 2. A shaded jar file containing all of my spark code > > On my pc (windows), I have a spark (1.5.2) master and worker running > (spark-1.5.2-bin-hadoop2.6). > > The entry point for my application is the start() method. > > The code is: > > @throws(classOf[Exception]) > def start { > val ssc: StreamingContext = createStreamingContext > > val messagesRDD = createKafkaDStream(ssc, "myTopic", 2) > > def datasRDD = messagesRDD.map((line : String) => > MapFunctions.lineToSparkEventData(line)) > > def count = datasRDD.count() > datasRDD.print(1) > > ssc.start > ssc.awaitTermination > } > > > private def createStreamingContext: StreamingContext = { > System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir) > System.setProperty("spark.streaming.concurrentJobs", > String.valueOf(configContainer.getStreamingConcurrentJobs)) > def sparkConf = createSparkConf() > > val ssc = new StreamingContext(sparkConf, > Durations.seconds(configContainer.getStreamingContextDurationSeconds)) > ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context > start") > ssc.sparkContext.setLocalProperty("spark.scheduler.pool", > "real_time_pool") > ssc > } > > > private def createSparkConf() : SparkConf = { > def masterString = "spark://<<my_pc>>:7077" > def conf = new > SparkConf().setMaster(masterString).setAppName("devAppRem") // This is not > working > // def conf = new > SparkConf().setMaster("local[4]").setAppName("devAppLocal") // This IS > working > > conf.set("spark.scheduler.allocation.file", > "D:\\valid_path_to\\fairscheduler.xml"); > > > val pathToShadedApplicationJar: String = > configContainer.getApplicationJarPaths.get(0) > val jars: Array[String] = Array[String](pathToShadedApplicationJar) > conf.setJars(jars) > > conf.set("spark.scheduler.mode", "FAIR") > > } > > > private def createKafkaDStream(ssc: StreamingContext, topics: String, > numThreads: Int): DStream[String] = { > val zkQuorum: String = configContainer.getZkQuorum > val groupId: String = configContainer.getGroupId > val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap > def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, > StorageLevel.MEMORY_AND_DISK_SER).map(_._2) > lines > } > } > > > The Error that I get is: > > 2015-11-18 12:58:33.755 WARN 6132 --- [result-getter-2] > o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 2.0 (TID > 70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException: > org.apache.spark.streaming.kafka.KafkaReceiver > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) > at > > org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) > at > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > org.apache.spark.streaming.kafka.KafkaReceiver > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at > > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at > > org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) > ... 19 more > > > I’m building using maven to create a shaded jar file. > > Properties in the parent pom are: > > <properties> > <springframework.version>3.2.13.RELEASE</springframework.version> > > > <springframework.integration.version>2.2.6.RELEASE</springframework.integration.version> > <ibm.mq.version>7.0.1.3</ibm.mq.version> > <hibernate.version>3.5.6-Final</hibernate.version> > <hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version> > <validation-api.version>1.0.0.GA</validation-api.version> > > <hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version> > <jackson.version>1.8.5</jackson.version> > <powermock.version>1.4.12</powermock.version> > <scala.binary.version>2.10</scala.binary.version> > <spark.version>1.5.2</spark.version> > <scala.version>2.10.5</scala.version> > </properties> > > > The pom file for the shaded jar is: > > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/maven-v4_0_0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <artifactId>my-spark-client</artifactId> > ... > <build> > <finalName>my-spark-client</finalName> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-shade-plugin</artifactId> > <version>2.3</version> > <executions> > <execution> > <phase>package</phase> > <goals> > <goal>shade</goal> > </goals> > <configuration> > <filters> > <filter> > <artifact>*:*</artifact> > <excludes> > > <exclude>org/datanucleus/**</exclude> > <exclude>META-INF/*.SF</exclude> > <exclude>META-INF/*.DSA</exclude> > <exclude>META-INF/*.RSA</exclude> > </excludes> > </filter> > </filters> > <transformers> > <transformer > > implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> > <resource>reference.conf</resource> > </transformer> > <transformer > > implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" > /> > </transformers> > <artifactSet> > <excludes> > > <exclude>classworlds:classworlds</exclude> > <exclude>junit:junit</exclude> > <exclude>jmock:*</exclude> > </excludes> > </artifactSet> > </configuration> > </execution> > </executions> > </plugin> > > <plugin> > <groupId>org.scalatest</groupId> > <artifactId>scalatest-maven-plugin</artifactId> > <version>1.0</version> > <configuration> > > > <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> > <junitxml>.</junitxml> > <filereports>WDF TestSuite.txt</filereports> > </configuration> > <executions> > <execution> > <id>test</id> > <goals> > <goal>test</goal> > </goals> > </execution> > </executions> > </plugin> > > <plugin> > <groupId>org.scala-tools</groupId> > <artifactId>maven-scala-plugin</artifactId> > <version>2.15.0</version> > <executions> > <execution> > <id>compile-scala</id> > <phase>compile</phase> > <goals> > <goal>compile</goal> > </goals> > </execution> > > <execution> > <id>compile-tests-scala</id> > <phase>compile</phase> > <goals> > <goal>testCompile</goal> > </goals> > </execution> > </executions> > </plugin> > > </plugins> > </build> > > <dependencies> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.5</version> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-api</artifactId> > <version>1.7.5</version> > </dependency> > <dependency> > <groupId>org.springframework</groupId> > <artifactId>spring-test</artifactId> > <scope>test</scope> > </dependency> > ... > ... > <dependency> > <groupId>com.databricks</groupId> > <artifactId>spark-csv_2.10</artifactId> > <version>1.2.0</version> > </dependency> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-compiler</artifactId> > <version>${scala.version}</version> > <scope>compile</scope> > </dependency> > > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > </dependency> > <dependency> > <groupId>org.scalatest</groupId> > <artifactId>scalatest_${scala.binary.version}</artifactId> > <scope>test</scope> > </dependency> > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_${scala.binary.version}</artifactId> > <version>0.8.2.1</version> > <scope>compile</scope> > <exclusions> > <exclusion> > <artifactId>jmxri</artifactId> > <groupId>com.sun.jmx</groupId> > </exclusion> > <exclusion> > <artifactId>jms</artifactId> > <groupId>javax.jms</groupId> > </exclusion> > <exclusion> > <artifactId>jmxtools</artifactId> > <groupId>com.sun.jdmk</groupId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.spark</groupId> > > <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <exclusions> > <exclusion> > <artifactId>jcl-over-slf4j</artifactId> > <groupId>org.slf4j</groupId> > </exclusion> > <exclusion> > <artifactId>javax.servlet-api</artifactId> > <groupId>javax.servlet</groupId> > </exclusion> > <exclusion> > <groupId>javax.servlet</groupId> > <artifactId>servlet-api</artifactId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > > <artifactId>spark-streaming_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <exclusions> > <exclusion> > <artifactId>jcl-over-slf4j</artifactId> > <groupId>org.slf4j</groupId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-core_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <scope>provided</scope> > <exclusions> > <exclusion> > <artifactId>jcl-over-slf4j</artifactId> > <groupId>org.slf4j</groupId> > </exclusion> > <exclusion> > <artifactId>javax.servlet-api</artifactId> > <groupId>javax.servlet</groupId> > </exclusion> > <exclusion> > <groupId>javax.servlet</groupId> > <artifactId>servlet-api</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <exclusions> > <exclusion> > <artifactId>jcl-over-slf4j</artifactId> > <groupId>org.slf4j</groupId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.mockito</groupId> > <artifactId>mockito-all</artifactId> > <scope>test</scope> > </dependency> > <dependency> > <groupId>junit</groupId> > <artifactId>junit</artifactId> > <scope>test</scope> > </dependency> > <dependency> > <groupId>joda-time</groupId> > <artifactId>joda-time</artifactId> > </dependency> > > </dependencies> > > .... > > </project> > > > org.apache.spark.streaming.kafka.KafkaReceiver is inside the > spark-streaming-kafka jar file, so I’m not sure why I get the > ClassNotFoundException. > > Please help. > > Thanks, > Tim > > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/kafka-streaminf-1-5-2-ClassNotFoundException-org-apache-spark-streaming-kafka-KafkaReceiver-tp25408.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >