Unable to create direct stream with SSL enabled Kafka cluster

2021-05-15 Thread dwgw
Hi
 I am trying to stream a Kafka topic using createDirectStream().  The Kafka
cluster is SSL enabled. The code for the same is:

***

import findspark

findspark.init('/u01/idp/spark')

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


kafkaParams = {"metadata.broker.list":"kfk-bro2.mydomain.com:9093",
"security.protocol":"ssl", "ssl.key.password":"Password123",
"ssl.keystore.location":"/tmp/keystore.jks",
"ssl.keystore.password":"Password123",
"ssl.truststore.location":"/tmp/truststore.jks",
"ssl.truststore.password":"Password123",
"ssl.endpoint.identification.algorithm":""}



if __name__ == "__main__":

sc = SparkContext(appName="PythonStreamingReciever")
ssc = StreamingContext(sc, 30)

 
message = KafkaUtils.createDirectStream(ssc, ["test1_topic"],
kafkaParams)
   
lines = message.map(lambda x: x[1])
lines.pprint()

 
ssc.start()
ssc.awaitTermination()

***

Submitting the python script to the cluster using spark-submit

# spark-submit --master yarn --deploy-mode client --files
/u01/idp/spark/conf/log4j.properties --conf
"spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'"
--driver-java-options
"-Dlog4j.configuration=file:/u01/idp/spark/conf/log4j.properties" 
--packages
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
streamingKafka.py

But during the execution of the above script, i am getting the following
error.

File "/home/spark/streamingKafka.py", line 23, in 
   message = KafkaUtils.createDirectStream(ssc, ["test1_topic"],
kafkaParams)
.
.
File "/u01/idp/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
File "/u01/idp/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o43.createDirectStreamWithoutMessageHandler.



What could be the possible causes of the error ?

I can stream Kafka topic using console consumer and can reach any one of the
broker.

Kafka version: 2.12
Spark version: 2.4.6

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



value col is not a member of org.apache.spark.rdd.RDD

2020-09-01 Thread dwgw
Hi
I am trying to generate a hierarchy table using Spark GraphX but during
runtime i am getting following error.  

*error: value col is not a member of org.apache.spark.rdd.RDD[(Any, (Int,
Any, String, Int, Int))]
   val empHirearchyDF = empHirearchyExtDF.join(empDF ,
empDF.col("emp_id") ===
empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")*


*Scala code:*

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import scala.util.hashing.MurmurHash3

...
...

// call the function
val empHirearchyExtDF = calcTopLevelHierarcy(empVertexDF,empEdgeDF)
.map{ case(pk,(level,root,path,iscyclic,isleaf)) =>
(pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)}
.toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache()

// extend original table with new columns. *Errors occur in the following
line*
val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") ===
empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")

...
...

The errors occur during the execution of resultstage. I have found in the
spark shell that the action 'col' is available for the data frame empDF

*scala> empDF.*
!=  
##  
+   
->  
==  
agg 
alias   
apply   
as  
asInstanceOf
cache   
checkpoint  
coalesce
*col *
colRegex
collect 
collectAsList   
columns 

Any insight on the issue will be appreciated.

Regards




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Streaming AVRO data in console: java.lang.ArrayIndexOutOfBoundsException

2020-08-10 Thread dwgw


Hi
I am trying to stream Kafka topic (in AVRO format) in the console and for
that i have loaded the avro data from kafka topic in the data-frame but when
try to stream in the console i am getting following error.

*scala>* val records = spark.
   readStream.
   format("kafka").
   option("kafka.bootstrap.servers", "broker1:9093").
   option("subscribe", "PERSON").  
   option("startingOffsets", "earliest").
   load()

records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]

*scala>* val jsonFormatSchema = new
String(Files.readAllBytes(Paths.get("/home/spark/person.avsc")))

jsonFormatSchema: String =
"{
  "type": "record",
  "name": "Person",
  "namespace": "io.confluent.connect.avro",
  "fields": [
...
...

*scala>* val output =
records.select(from_avro(col("value"),jsonFormatSchema).as("person"))
output: org.apache.spark.sql.DataFrame = [person: struct]

*scala>*  .select("icxsession.*")

res15: org.apache.spark.sql.DataFrame = [SESSION_ID: bigint,
VERSION_STARTSCN: bigint ... 46 more fields]

*Error occurs here:*

*scala>* output.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

20/08/10 01:14:24 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0
(TID 20, workstation.com, executor 2):
*java.lang.ArrayIndexOutOfBoundsException: 1405994075*
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at
org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

[Stage 5:>  (0 + 1)
/ 1]20/08/10 01:14:25 ERROR scheduler.TaskSetManager: Task 0 in stage 5.0
failed 4 times; aborting job
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
is aborting.
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
aborted.
*20/08/10 01:14:25 ERROR streaming.MicroBatchExecution: Query [id =
5e8ffd55-fb54-45d1-8255-56ba810c1f51, runId =
1b7245de-de96-43e7-98ef-8bc62a6f697e] terminated with error
org.apache.spark.SparkException: Writing job aborted.*
at

error: object functions is not a member of package org.apache.spark.sql.avro

2020-08-08 Thread dwgw
Hi
I am getting the following error while trying to import the package
org.apache.spark.sql.avro.functions._ in the scala shell:

scala> import org.apache.spark.sql.avro.functions._
:23: error: object functions is not a member of package
org.apache.spark.sql.avro
import org.apache.spark.sql.avro.functions._

and i have invoked the spark-shell with the following command:

# spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0,com.databricks:spark-avro_2.11:4.0.0,org.apache.spark:spark-avro_2.11:2.4.0

Which package i have to passed as a parameter along with spark shell ? I am
trying to implement few examples from here
https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Confluent kafka

2020-07-03 Thread dwgw
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at

Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error. 
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
found org.apache.kafka#kafka-clients;2.4.1 in central
found com.github.luben#zstd-jni;1.4.4-3 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.7.5 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
:: modules in use:
com.github.luben#zstd-jni;1.4.4-3 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.4.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
   
-
|  |modules||   artifacts  
|
|   conf   | number| search|dwnlded|evicted||
number|dwnlded|
   
-
|  default |   9   |   0   |   0   |   0   ||   9   |   0  
|
   
-
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
  /_/
 
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
 | readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "XXX").
 | option("subscribe", "XXX").
 | option("kafka.sasl.mechanisms", "XXX").
 | option("kafka.security.protocol", "XXX").
 | option("kafka.sasl.username","XXX").
 | option("kafka.sasl.password", "XXX").
 | option("startingOffsets", "earliest").
 | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Kafka

2020-07-02 Thread dwgw
HiI am trying to stream kafka topic from spark shell but i am getting the
following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM)
64-Bit Server VM, *Java 1.8.0_212*)*[spark@hdp-dev ~]$ spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*Ivy Default
Cache set to: /home/spark/.ivy2/cacheThe jars for the packages stored in:
/home/spark/.ivy2/jars:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12
added as a dependency:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0   

confs: [default]found
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in centralfound
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central   
found org.apache.kafka#kafka-clients;2.4.1 in centralfound
com.github.luben#zstd-jni;1.4.4-3 in centralfound
org.lz4#lz4-java;1.7.1 in centralfound
org.xerial.snappy#snappy-java;1.1.7.5 in centralfound
org.slf4j#slf4j-api;1.7.30 in centralfound
org.spark-project.spark#unused;1.0.0 in centralfound
org.apache.commons#commons-pool2;2.6.2 in central:: resolution report ::
resolve 502ms :: artifacts dl 10ms:: modules in use:   
com.github.luben#zstd-jni;1.4.4-3 from central in [default]   
org.apache.commons#commons-pool2;2.6.2 from central in [default]   
org.apache.kafka#kafka-clients;2.4.1 from central in [default]   
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]  
 
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in
[default]org.lz4#lz4-java;1.7.1 from central in [default]   
org.slf4j#slf4j-api;1.7.30 from central in [default]   
org.spark-project.spark#unused;1.0.0 from central in [default]   
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]   
-   
|  |modules||   artifacts   |   
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|   
-   
|  default |   9   |   0   |   0   |   0   ||   9   |   0   |   
-::
retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226   
confs: [default]0 artifacts copied, 9 already retrieved
(0kB/13ms)Setting default log level to "WARN".To adjust logging level use
sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark
context Web UI available at http://hdp-dev.infodetics.com:4040Spark context
available as 'sc' (master = yarn, app id =
application_1593620640299_0015).Spark session available as 'spark'.Welcome
to    __ / __/__  ___ _/ /___\ \/ _ \/ _ `/
__/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0  /_/ Using
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)Type in expressions to have them evaluated.Type :help for more
information.scala> val df = spark. | readStream. | format("kafka").
| option("kafka.bootstrap.servers", "XXX"). | option("subscribe",
"XXX"). | option("kafka.sasl.mechanisms", "XXX"). |
option("kafka.security.protocol", "XXX"). |
option("kafka.sasl.username","XXX"). | option("kafka.sasl.password",
"XXX"). | option("startingOffsets", "earliest"). |
loadjava.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
 
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
 
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
 
... 57 elidedLooking forward for a response.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/