Hello,
I'm planning on adding a listener to update Zookeeper (for monitoring
purposes) when batch completes. What would be a consistent manner to index
the offsets for a given batch? In the PR above, it seems like batchTime was
used, but is there a way to create this batchTime -> offsets in the
streaming app itself?
Something like:
var currOffsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD { rdd =>
... /*do stuff with shuffling involved, then update DynamoDB*/
}
offsetMap += ((validTime, currOffsetRanges))
Then in the listener (onBatchComplete), retrieve corresponding offsetRanges
associated with the completed batchTime and update ZK accordingly.
I'm unsure how to define validTime above. Any help/advice/words of warning
would be appreciated.
Thanks!
Side note: I also considered the partition/batch updating examples in
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/,
but since shuffling occurs and DynamoDB/ZK aren't really the same
"database", neither would work in this case. If I'm missing something about
ZK here, please let me know too.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]