Re: Kafka Topic to Parquet HDFS with Structured Streaming

2020-11-19 Thread AlbertoMarq
Hi Chetan
I'm having the exact same issue with spark structured streaming and kafka
trying to write to HDFS.
Can you please tell me how did you fixed it?
I'm ussing spark 3.0.1 and hadoop 3.3.0

Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-10 Thread Chetan Khatri
Hello Deng, Thank you for your email.
Issue was with Spark - Hadoop / HDFS configuration settings.

Thanks

On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete 
wrote:

> Hi Chetan,
>
> Best to check if the user account that you're using to run the job has
> permission to write to the path in HDFS. I would suggest to write the
> parquet files to a different path, perhaps to a project space or user home,
> rather than at the root directory.
>
> HTH,
> Deng
>
> On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri 
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am trying to write data from Kafka Topic to Parquet HDFS with
>> Structured Streaming but Getting failures. Please do help.
>>
>> val spark: SparkSession = 
>> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
>> import spark.implicits._
>> val dataFromTopicDF = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "test")
>>   .option("startingOffsets", "earliest")
>>   .load()
>>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>
>> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
>> val topicQuery = dataFromTopicDF.writeStream
>>   .format("console")
>>   .option("truncate", false)
>>   .option("checkpointLocation", "/tmp/checkpoint")
>>   .trigger(Trigger.ProcessingTime(10.seconds))
>>   .start()
>>
>> topicQuery.awaitTermination()
>> topicQuery.stop()
>>
>>
>> Above code is working well but when I am trying to write to Parquet at HDFS 
>> getting exceptions.
>>
>>
>> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>>
>> val parquetQuery = dataFromTopicDF.writeStream
>> .format("parquet")
>> .option("startingOffsets", "earliest")
>> .option("checkpointLocation", "/tmp/checkpoint")
>> .option("path", "/sample-topic")
>> .start()
>>
>> parquetQuery.awaitTermination()
>> parquetQuery.stop()
>>
>>
>> *Exception Details:*
>>
>>
>> Exception in thread "main" java.io.IOException: mkdir of 
>> /sample-topic/_spark_metadata failed
>>  at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>>  at 
>> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>>  at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>>  at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>>  at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>>  at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>  at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>>  at 
>> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>>  at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>>  at 
>> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>>  at 
>> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>>  at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>  at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>  at scala.App$class.main(App.scala:76)
>>  at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>>  at com.dynasty.poc.DemoSparkKafka.mai

Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-10 Thread Deng Ching-Mallete
Hi Chetan,

Best to check if the user account that you're using to run the job has
permission to write to the path in HDFS. I would suggest to write the
parquet files to a different path, perhaps to a project space or user home,
rather than at the root directory.

HTH,
Deng

On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am trying to write data from Kafka Topic to Parquet HDFS with Structured
> Streaming but Getting failures. Please do help.
>
> val spark: SparkSession = 
> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
> import spark.implicits._
> val dataFromTopicDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "earliest")
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>
> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
> val topicQuery = dataFromTopicDF.writeStream
>   .format("console")
>   .option("truncate", false)
>   .option("checkpointLocation", "/tmp/checkpoint")
>   .trigger(Trigger.ProcessingTime(10.seconds))
>   .start()
>
> topicQuery.awaitTermination()
> topicQuery.stop()
>
>
> Above code is working well but when I am trying to write to Parquet at HDFS 
> getting exceptions.
>
>
> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>
> val parquetQuery = dataFromTopicDF.writeStream
> .format("parquet")
> .option("startingOffsets", "earliest")
> .option("checkpointLocation", "/tmp/checkpoint")
> .option("path", "/sample-topic")
> .start()
>
> parquetQuery.awaitTermination()
> parquetQuery.stop()
>
>
> *Exception Details:*
>
>
> Exception in thread "main" java.io.IOException: mkdir of 
> /sample-topic/_spark_metadata failed
>   at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>   at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>   at 
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>   at 
> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>   at 
> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>   at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks
>
>


Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Also anyone has any idea to resolve this issue -
https://stackoverflow.com/questions/56390492/spark-metadata-0-doesnt-exist-while-compacting-batch-9-structured-streaming-er

On Fri, Jun 7, 2019 at 5:59 PM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am trying to write data from Kafka Topic to Parquet HDFS with Structured
> Streaming but Getting failures. Please do help.
>
> val spark: SparkSession = 
> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
> import spark.implicits._
> val dataFromTopicDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "earliest")
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>
> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
> val topicQuery = dataFromTopicDF.writeStream
>   .format("console")
>   .option("truncate", false)
>   .option("checkpointLocation", "/tmp/checkpoint")
>   .trigger(Trigger.ProcessingTime(10.seconds))
>   .start()
>
> topicQuery.awaitTermination()
> topicQuery.stop()
>
>
> Above code is working well but when I am trying to write to Parquet at HDFS 
> getting exceptions.
>
>
> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>
> val parquetQuery = dataFromTopicDF.writeStream
> .format("parquet")
> .option("startingOffsets", "earliest")
> .option("checkpointLocation", "/tmp/checkpoint")
> .option("path", "/sample-topic")
> .start()
>
> parquetQuery.awaitTermination()
> parquetQuery.stop()
>
>
> *Exception Details:*
>
>
> Exception in thread "main" java.io.IOException: mkdir of 
> /sample-topic/_spark_metadata failed
>   at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>   at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>   at 
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>   at 
> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>   at 
> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>   at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks
>
>


Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured
Streaming but Getting failures. Please do help.

val spark: SparkSession =
SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", "/tmp/checkpoint")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start()

topicQuery.awaitTermination()
topicQuery.stop()


Above code is working well but when I am trying to write to Parquet at
HDFS getting exceptions.


logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()


*Exception Details:*


Exception in thread "main" java.io.IOException: mkdir of
/sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at 
org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at 
com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at 
com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks