Hi Folks,
I am writing a pipeline which reads from Kafka, applying some
transformations, then persist to HDFS.
Obviously such operation is not supported to DStream, since the
*DStream.save*(Path)
*method,
considers the Path as a directory, not a file. Also using
*repartition(1).mode(Savemode.APPEND) *before persisting did not work out.
Any thought how to solve such issue ? Below is a code snippet.
{
val inputStream = kafkaStreamingUtil.streamConsume(streamingContext,
Set(srcTopic),
consumerGroupId).filter(_.value().matches(youtubeRegex)).map(_.value())
inputStream.foreachRDD(rdd => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val rddAsDataFrame = rdd.toDF()
rddAsDataFrame.coalesce(1).write.mode(SaveMode.Append).csv(dstPath)
})