Hello Jacek,

Yes this is a spark-streaming.
 I have removed all code and created a new project with just the base code
that is enough to open a stream and loop over it to see what i am doing
wrong.

Not adding the packages would result me in the following error

21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka010.KafkaRDDPartition

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)


Which should not really be the case cause this should be included in the
kubernetes pod. Anyway I can confirm this ?


So my simple class is as follow :


streamingContext = new JavaStreamingContext(javaSparkContext,
Durations.seconds(5));

stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
   ConsumerStrategies.Subscribe(topics, kafkaConfiguration));

stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String,
byte[]>>>) rdd -> {
   try {
      rdd.foreachPartition(partition -> {
         while (partition.hasNext()) {
            ConsumerRecord<String, byte[]> consumerRecord = partition.next();
            LOGGER.info("WORKING " + consumerRecord.topic()
+consumerRecord.partition() + ": "+consumerRecord.offset());
         }
      });
   } catch (Exception e) {
      e.printStackTrace();
   }
});

streamingContext.start();
try {
   streamingContext.awaitTermination();
} catch (InterruptedException e) {
   e.printStackTrace();
} finally {
   streamingContext.stop();
   javaSparkContext.stop();
}


This is all there is too the class which is a java boot @Component.

Now in order my pom is as such

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.kafka</groupId>
  <artifactId>SimpleKafkaStream</artifactId>
  <version>1.0</version>

  <packaging>jar</packaging>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <start-class>com.kafka.Main</start-class>
  </properties>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.2</version>
    <relativePath/>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>3.1.2</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

    </plugins>
  </build>

</project>

a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
provided or not it would stilly give the same error.

I have tried to build an uber jar in order to test with that but i was
still unable to make it work as such :

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
      <configuration>
        <fork>true</fork>
        <mainClass>com.kafka.Main</mainClass>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>repackage</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.2.0</version>
      <configuration>
        <descriptorRefs>
          <descriptorRef>dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <addClasspath>true</addClasspath>
            <mainClass>com.kafka.Main</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>

  </plugins>

</build>

 I am open to any suggestions and implementations in why this is not
working and what needs to be done.


Thank you for your time,

Stelios

On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> No idea still, but noticed
> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
> \" that bothers me quite a lot.
>
> First of all, it's a Spark Streaming (not Structured Streaming) app.
> Correct? Please upgrade at your earliest convenience since it's no longer
> in active development (if supported at all).
>
> Secondly, why are these jars listed explicitly since they're part of
> Spark? You should not really be doing such risky config changes (unless
> you've got no other choice and you know what you're doing).
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <stevo...@gmail.com>
> wrote:
>
>> Yes you are right.
>> I am using Spring Boot for this.
>>
>> The same does work for the event that does not involve any kafka events.
>> But again i am not sending out extra jars there so nothing is replaced and
>> we are using the default ones.
>>
>> If i do not use the userClassPathFirst which will force the service to
>> use the newer version i will end up with the same problem
>>
>> We are using protobuf v3+ and as such we need to push that version since
>> apache core uses an older version.
>>
>> So all we should really need is the following : --jars
>> "protobuf-java-3.17.3.jar" \
>> and here we need the userClassPathFirst=true in order to use the latest
>> version.
>>
>>
>> Using only this jar as it works on local or no jars defined we ended up
>> with the following error.
>>
>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.Class.forName0(Native Method)
>>
>> at java.base/java.lang.Class.forName(Unknown Source)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>>
>>
>> Which can be resolved with passing more jars.
>>
>>
>> Any idea about this error ?
>>
>> K8 does not seem to like this, but Java Spring should be the one that is
>> responsible for the version but it seems K8 does not like this versions.
>>
>> Perhaps miss configuration on K8 ?
>>
>> I haven't set that up so i am not aware of what was done there.
>>
>>
>>
>> For downgrading to java 8 on my K8 might not be so easy. I want to
>> explore if there is something else before doing that as we will need to
>> spin off new instances of K8 to check that.
>>
>>
>>
>> Thank you for the time taken
>>
>>
>>
>>
>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi Stelios,
>>>
>>> I've never seen this error before, but a couple of things caught
>>> my attention that I would look at closer to chase the root cause of the
>>> issue.
>>>
>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>> Application run failed" seem to indicate that you're using Spring Boot
>>> (that I know almost nothing about so take the following with a pinch of
>>> salt :))
>>>
>>> Spring Boot manages the classpath by itself and together with another
>>> interesting option in your
>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>> much this exception:
>>>
>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>> could be due to casting compatible types from two different classloaders?
>>>
>>> Just a thought but wanted to share as I think it's worth investigating.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <stevo...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have been facing the current issue for some time now and I was
>>>> wondering if someone might have some inside on how I can resolve the
>>>> following.
>>>>
>>>> The code (java 11) is working correctly on my local machine but
>>>> whenever I try to launch the following on K8 I am getting the following
>>>> error.
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>> initializing SparkContext.
>>>>
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>>
>>>>
>>>> I have a spark that will monitor some directories and handle the data
>>>> accordingly.
>>>>
>>>> That part is working correctly on K8 and the SparkContext has no issue
>>>> being initialized there.
>>>>
>>>>
>>>> This is the spark-submit for that
>>>>
>>>>
>>>> spark-submit \
>>>> --master=k8s://https://url:port \
>>>> --deploy-mode cluster \
>>>> --name a-name\
>>>> --conf spark.driver.userClassPathFirst=true  \
>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>> --conf spark.kubernetes.namespace=spark \
>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>> --conf spark.dynamicAllocation.enabled=false \
>>>> --driver-memory 525m --executor-memory 525m \
>>>> --num-executors 1 --executor-cores 1 \
>>>> target/SparkStream.jar continuous-merge
>>>>
>>>>
>>>> My issue comes when I try to launch the service in order to listen to
>>>> kafka events and store them in HDFS.
>>>>
>>>>
>>>> spark-submit \
>>>> --master=k8s://https://url:port \
>>>> --deploy-mode cluster \
>>>> --name consume-data \
>>>> --conf spark.driver.userClassPathFirst=true  \
>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>> --jars 
>>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>>>  \
>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>> --conf spark.kubernetes.namespace=spark \
>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>> --conf spark.dynamicAllocation.enabled=false \
>>>> --driver-memory 1g --executor-memory 1g \
>>>> --num-executors 1 --executor-cores 1 \
>>>> target/SparkStream.jar consume
>>>>
>>>>
>>>> It could be that I am launching the application wrongly or perhaps that
>>>> my K8 is not configured correctly ?
>>>>
>>>>
>>>>
>>>> I have stripped down my code and left it barebone and will end up with
>>>> the following issue :
>>>>
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>> initializing SparkContext.
>>>>
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>
>>>> at
>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>> Source)
>>>>
>>>> at
>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>> Source)
>>>>
>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>
>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>
>>>> at
>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>
>>>>
>>>> 21/08/31 07:28:42 WARN  
>>>> org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>> Exception encountered during context initialization - cancelling refresh
>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>> expressed through field 'javaSparkContext'; nested exception is
>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>> bean with name 'javaSparkContext' defined in class path resource
>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>> factory method failed; nested exception is
>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>> 'javaSparkContext' threw exception; nested exception is
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>> Application run failed
>>>>
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'mainApplication': Unsatisfied dependency expressed
>>>> through field 'streamAllKafkaData'; nested exception is
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>> expressed through field 'javaSparkContext'; nested exception is
>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>> bean with name 'javaSparkContext' defined in class path resource
>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>> factory method failed; nested exception is
>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>> 'javaSparkContext' threw exception; nested exception is
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>>
>>>>
>>>>
>>>> It could be that i am launching the application for Kafka wrongly with
>>>> all the extra jars added ?
>>>>
>>>> Just that those seem to be needed or i am getting other errors when not
>>>> including those.
>>>>
>>>>
>>>>
>>>>
>>>> Any help will be greatly appreciated.
>>>>
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Stelios
>>>>
>>>>
>>>>
>>>>

Reply via email to