thanks, rdd.rdd() did the trick. I now have the offsets via
OffsetRange offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
But how can you commit the offset to Kafka?
Casting the JavaInputDStream throws an CCE:
CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream; // Throws CCE
org.apache.spark.streaming.api.java.JavaInputDStream cannot be cast to
2016-10-10 20:18 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> This should give you hints on the necessary cast:
> The main ugly thing there is that the java rdd is wrapping the scala
> rdd, so you need to unwrap one layer via rdd.rdd()
> If anyone wants to work on a PR to update the java examples in the
> docs for the 0-10 version, I'm happy to help.
> On Mon, Oct 10, 2016 at 10:34 AM, static-max <flasha...@googlemail.com>
> > Hi,
> > by following this article I managed to consume messages from Kafka 0.10
> > Spark 2.0:
> > http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> > However, the Java examples are missing and I would like to commit the
> > myself after processing the RDD. Does anybody have a working example for
> > "offsetRanges" seems to be a trait and not available after casting the
> > to "HasOffsetRanges"
> > Thanks a lot!
> > Scala example:
> > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)