Re: Spark Stream is corrupted

2018-01-18 Thread KhajaAsmath Mohammed
Any solutions for this problem please .

Sent from my iPhone

> On Jan 17, 2018, at 10:39 PM, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> I have created a streaming object from checkpoint but it always through up 
> error as stream corrupted when I restart spark streaming job. any solution 
> for this?
> 
> private def createStreamingContext(
> sparkCheckpointDir: String, sparkSession: SparkSession,
> batchDuration: Int, config: com.typesafe.config.Config) = {
> val topics = config.getString(Constants.Properties.KafkaTopics)
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> config.getString(Constants.Properties.KafkaBrokerList))
> val ssc = new StreamingContext(sparkSession.sparkContext, 
> Seconds(batchDuration))
> val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> val datapointDStream = 
> messages.map(_._2).map(TransformDatapoint.parseDataPointText)
> lazy val sqlCont = sparkSession.sqlContext
> 
> hiveDBInstance = config.getString("hiveDBInstance")
> 
> TransformDatapoint.readDstreamData(sparkSession, sqlCont, 
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, 
> fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)
> 
> ssc.checkpoint(sparkCheckpointDir)
> ssc
>   }
> 
> 
> 
> // calling streming context method
> 
>  val streamingContext = 
> StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
>  () => 
> createStreamingContext(config.getString(Constants.Properties.CheckPointDir), 
> sparkSession, config.getInt(Constants.Properties.BatchInterval), config))
> 
> ERROR:
> org.apache.spark.SparkException: Failed to read checkpoint from directory 
> hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC
> 
> java.io.IOException: Stream is corrupted
> 
> 
> Thanks,
> Asmath


Spark Stream is corrupted

2018-01-17 Thread KhajaAsmath Mohammed
Hi,

I have created a streaming object from checkpoint but it always through up
error as stream corrupted when I restart spark streaming job. any solution
for this?

private def createStreamingContext(
sparkCheckpointDir: String, sparkSession: SparkSession,
batchDuration: Int, config: com.typesafe.config.Config) = {
val topics = config.getString(Constants.Properties.KafkaTopics)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
val ssc = new StreamingContext(sparkSession.sparkContext,
Seconds(batchDuration))
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
lazy val sqlCont = sparkSession.sqlContext

hiveDBInstance = config.getString("hiveDBInstance")

TransformDatapoint.readDstreamData(sparkSession, sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)

ssc.checkpoint(sparkCheckpointDir)
ssc
  }



// calling streming context method

 val streamingContext =
StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
() =>
createStreamingContext(config.getString(Constants.Properties.CheckPointDir),
sparkSession, config.getInt(Constants.Properties.BatchInterval), config))

*ERROR:*
org.apache.spark.SparkException: Failed to read checkpoint from directory
hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC

java.io.IOException: Stream is corrupted


Thanks,
Asmath