Hi,
I have a spark kafka streaming application that works when I run with a local
spark context, but not with a remote one.
My code consists of:
1. A spring-boot application that creates the context
2. A shaded jar file containing all of my spark code
On my pc (windows), I have a spark (1.5.1) master and worker running.
The entry point for my application is the start() method.
The code is:
@throws(classOf[Exception])
def start {
val ssc: StreamingContext = createStreamingContext
val messagesRDD = createKafkaDStream(ssc, "myTopic", 2)
def datasRDD = messagesRDD.map((line : String) =>
MapFunctions.lineToSparkEventData(line))
def count = datasRDD.count()
datasRDD.print(1)
ssc.start
ssc.awaitTermination
}
private def createStreamingContext: StreamingContext = {
System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir)
System.setProperty("spark.streaming.concurrentJobs",
String.valueOf(configContainer.getStreamingConcurrentJobs))
def sparkConf = createSparkConf()
val ssc = new StreamingContext(sparkConf,
Durations.seconds(configContainer.getStreamingContextDurationSeconds))
ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context
start")
ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "real_time_pool")
ssc
}
private def createSparkConf() : SparkConf = {
def masterString = "spark://<<my_pc>>:7077"
def conf = new SparkConf().setMaster(masterString).setAppName("devAppRem")
// This is not working
// def conf = new
SparkConf().setMaster("local[4]").setAppName("devAppLocal") // This IS working
conf.set("spark.scheduler.allocation.file",
"D:\\valid_path_to\\fairscheduler.xml");
val pathToShadedApplicationJar: String =
configContainer.getApplicationJarPaths.get(0)
val jars: Array[String] = Array[String](pathToShadedApplicationJar)
conf.setJars(jars)
conf.set("spark.scheduler.mode", "FAIR")
}
private def createKafkaDStream(ssc: StreamingContext, topics: String,
numThreads: Int): DStream[String] = {
val zkQuorum: String = configContainer.getZkQuorum
val groupId: String = configContainer.getGroupId
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap,
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
lines
}
}
The Error that I get is:
2015-11-18 10:41:19.191 WARN 3044 --- [result-getter-3]
o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 2.0 (TID 70,
169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 19 more
I'm building using maven - to create a shaded jar file.
Properties in the parent pom are:
<properties>
<springframework.version>3.2.13.RELEASE</springframework.version>
<springframework.integration.version>2.2.6.RELEASE</springframework.integration.version>
<ibm.mq.version>7.0.1.3</ibm.mq.version>
<hibernate.version>3.5.6-Final</hibernate.version>
<hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version>
<validation-api.version>1.0.0.GA</validation-api.version>
<hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version>
<jackson.version>1.8.5</jackson.version>
<powermock.version>1.4.12</powermock.version>
<scala.binary.version>2.10</scala.binary.version>
<spark.version>1.5.1</spark.version>
<scala.version>2.10.5</scala.version>
</properties>
The pom file for the shaded jar is:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>my-spark-client</artifactId>
...
<build>
<finalName>my-spark-client</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>org/datanucleus/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
</transformers>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>compile-tests-scala</id>
<phase>compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
...
...
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.2.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
</dependencies>
....
</project>
org.apache.spark.streaming.kafka.KafkaReceiver is inside the
spark-streaming-kafka jar file, so I'm not sure why I get the
ClassNotFoundException.
Please help.
Thanks,
Tim
_____________________________________________________________________
The information transmitted in this message and its attachments (if any) is
intended
only for the person or entity to which it is addressed.
The message may contain confidential and/or privileged material. Any review,
retransmission, dissemination or other use of, or taking of any action in
reliance
upon this information, by persons or entities other than the intended recipient
is
prohibited.
If you have received this in error, please contact the sender and delete this
e-mail
and associated material from any computer.
The intended recipient of this e-mail may only use, reproduce, disclose or
distribute
the information contained in this e-mail and any attached files, with the
permission
of the sender.
This message has been scanned for viruses.
_____________________________________________________________________