Hi Arpan,
The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:
var offsetRanges: Array[OffsetRanger] = _
//create streaming context, streams, ...
// as first operation after the stream has been created, do:
stream.foreachRDD { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream
resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...
//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
I hope this helps,
kr, Gerard.
On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani <[email protected]> wrote:
> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
> // some time later, after outputs have completed
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] =
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String]
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
> val offsetRangers : Array[OffsetRanger] =
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
> // Filter out the values which have empty values and get the tuple of type
> // ( topicname, stringValue_read_from_kafka_topic)
> stream.map(x => ("topicName",x.value)).filter(x=>
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
> // Sometime later, after outputs have completed.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
> // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after starting a context is not supported
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>