Hi All,
I am trying to read from Kafka using spark streaming from spark-shell but
getting the below error. Any suggestions to fix this is much appreciated.
I am running from spark-shell hence it is client mode and the files are
available in the local filesystem.
I tried to access the files as shown below. But I still get the same error.
Any suggestions to make this work from spark-shell
spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.8
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--files
/local_dir/kafka.client.truststore.jks,/local_dir/test.kafka.client.xxx.com.jks
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1,server2")
.option("subscribe", "wm-cth-salesstreams")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100)
.option("failOnDataLoss", false)
.option("kafka.security.protocol","SSL")
*
//.option("kafka.ssl.truststore.location","/local_dir/kafka.client.truststore.jks").option("kafka.ssl.truststore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/kafka.client.truststore.jks"))*
.option("kafka.ssl.truststore.password","pwd")
.option("kafka.ssl.keystore.password","pwd")
*
//.option("kafka.ssl.keystore.location","/local_dir/test.kafka.client.xxx.com.jks")).load.option("kafka.ssl.keystore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/test.kafka.client.xxx.com.jks"))).load*
Exception:
22/02/12 15:57:03 INFO org.apache.spark.sql.kafka010.KafkaMicroBatchReader:
Initial offsets:
{"wm-cth-salesstreams":{"23":167267092,"59":167276860,"50":167258479,"32":167281169,"41":167272687,"53":167256274,"17":167269072,"8":167282513,"35":167298150,"44":167244867,"26":167242913,"11":167283073,"56":167304913,"29":167307963,"38":167287380,"47":167312027,"20":167280591,"2":167248970,"5":167308945,"14":167231970,"46":167267534,"55":167275890,"58":167287699,"49":167245856,"40":167247065,"13":167249522,"4":167301468,"22":167269011,"31":167349129,"16":167266948,"7":167272315,"52":167276042,"43":167273593,"25":167232737,"34":167264787,"10":167265137,"37":167252586,"1":167312454,"19":167247237,"28":167280632,"54":167307408,"45":167280214,"27":167249248,"36":167282370,"18":167223580,"9":167223643,"57":167340670,"21":167277793,"48":167273190,"3":167294084,"12":167299093,"30":167236443,"39":167311503,"15":167274468,"42":167292272,"51":167252733,"24":167245661,"6":167241738,"33":167224273,"0":167295530}}
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:713)
... 51 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
... 55 more
*Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore /local_dir/test.kafka.client.xxx.com.jks of type JKS*
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at
org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 56 more
*Caused by: java.io.FileNotFoundException:
/local_dir/test.kafka.client.xxx.com.jks (No such file or directory)*
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileInputStream.(FileInputStream.java:93)
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
... 58 more