I think you could not use offsetRange in such way, when you transform a
DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
changed into normal RDD, but offsetRange is a specific attribute for
KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
meet such exception.
you could only do something like:
directKafkaInputDStream.foreachRDD { rdd =>
rdd.asInstanceOf[HasOffsetRanges]....
...
}
Apply foreachRDD directly on DirectKafkaInputDStream.
2015-06-12 16:10 GMT+08:00 ZIGEN <[email protected]>:
> Hi, I'm using Spark Streaming(1.3.1).
> I want to get exactly-once messaging from Kafka and use Window operations
> of
> DStraem,
>
> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
> Direct-API
> java.lang.ClassCastException occurs as follows.
>
> --- stacktrace --
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
> at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
> at
>
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
> at
>
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
>
>
> --- my source ---
>
> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
> jssc.checkpoint("checkpoint");
>
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream
> (jssc, String.class, String.class, StringDecoder.class,
> StringDecoder.class, kafkaParams, topicsSet);
>
> JavaPairDStream<String, List<String>> pairDS = messages.mapToPair(...);
>
> JavaPairDStream<String, List<String>> windowDs =
> pairDS.reduceByKeyAndWindow(new Function2<List<String>, List<String>,
> List<String>>() {
> @Override
> public List<String> call(List<String> list1, List<String> list2)
> throws
> Exception {
> ...
> }
> }, windowDuration, slideDuration);
>
> windowDs.foreachRDD(new Function<JavaPairRDD<String,List<String>>,
> Void>() {
>
> @Override
> public Void call(JavaPairRDD<String, List<String>> rdd) throws
> Exception
> {
>
>
> OffsetRange[] offsetsList = ((HasOffsetRanges)
> rdd.rdd()).offsetRanges();
> // ClassCastException occurred
>
> KafkaCluster kc = new
> KafkaCluster(toScalaMap(kafkaParams));
> for (OffsetRange offsets : offsetsList) {
>
> TopicAndPartition topicAndPartition = new
> TopicAndPartition(offsets.topic(), offsets.partition());
>
> HashMap<TopicAndPartition, Object> map = new
> HashMap<TopicAndPartition,
> Object>();
> map.put(topicAndPartition, offsets.untilOffset());
> kc.setConsumerOffsets("group1", toScalaMap(map));
> }
>
> return null;
> }
> });
>
> Thanks!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>