Any solutions for this problem please .
Sent from my iPhone
> On Jan 17, 2018, at 10:39 PM, KhajaAsmath Mohammed
> wrote:
>
> Hi,
>
> I have created a streaming object from checkpoint but it always through up
> error as stream corrupted when I restart spark streaming job. any solution
> for this?
>
> private def createStreamingContext(
> sparkCheckpointDir: String, sparkSession: SparkSession,
> batchDuration: Int, config: com.typesafe.config.Config) = {
> val topics = config.getString(Constants.Properties.KafkaTopics)
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> config.getString(Constants.Properties.KafkaBrokerList))
> val ssc = new StreamingContext(sparkSession.sparkContext,
> Seconds(batchDuration))
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> val datapointDStream =
> messages.map(_._2).map(TransformDatapoint.parseDataPointText)
> lazy val sqlCont = sparkSession.sqlContext
>
> hiveDBInstance = config.getString("hiveDBInstance")
>
> TransformDatapoint.readDstreamData(sparkSession, sqlCont,
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
> fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)
>
> ssc.checkpoint(sparkCheckpointDir)
> ssc
> }
>
>
>
> // calling streming context method
>
> val streamingContext =
> StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
> () =>
> createStreamingContext(config.getString(Constants.Properties.CheckPointDir),
> sparkSession, config.getInt(Constants.Properties.BatchInterval), config))
>
> ERROR:
> org.apache.spark.SparkException: Failed to read checkpoint from directory
> hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC
>
> java.io.IOException: Stream is corrupted
>
>
> Thanks,
> Asmath