The other thing to keep in mind about spark window operations against Kafka
is that spark streaming is based on current system clock, not the time
embedded in your messages.

So you're going to get a fundamentally wrong answer from a window operation
after a failure / restart, regardless of whether you're using the
createStream or createDirectStream api.

On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Casting to HasOffsetRanges would be meaningless anyway if done after an
> operation that changes partitioning.
>
> You can still use the messageHandler argument to createDirectStream to get
> access to offsets on a per-message basis.
>
> Also, it doesn't look like what you're doing is particularly concerned
> about transactional correctness (since you're saving offsets to a kafka api
> backed by zookeeper), so you can try doing a transform as the first step in
> your stream, and casting to HasOffsetRanges there.
>
>
>
> On Fri, Jun 12, 2015 at 5:03 AM, zigen <dbviewer.zi...@gmail.com> wrote:
>
>> Hi Shao,
>>
>> Thank you for your quick prompt.
>> I was disappointed.
>> I will try window operations with Receiver-based
>> Approach(KafkaUtils.createStream).
>>
>> Thank you again,
>> ZIGEN
>>
>>
>> 2015/06/12 17:18、Saisai Shao <sai.sai.s...@gmail.com> のメッセージ:
>>
>> I think you could not use offsetRange in such way, when you transform a
>> DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is
>> changed into normal RDD, but offsetRange is a specific attribute for
>> KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will
>> meet such exception.
>>
>> you could only do something like:
>>
>> directKafkaInputDStream.foreachRDD { rdd =>
>>    rdd.asInstanceOf[HasOffsetRanges]....
>>   ...
>> }
>>
>> Apply foreachRDD directly on DirectKafkaInputDStream.
>>
>>
>>
>>
>>
>>
>>
>> 2015-06-12 16:10 GMT+08:00 ZIGEN <dbviewer.zi...@gmail.com>:
>>
>>> Hi, I'm using Spark Streaming(1.3.1).
>>> I want to get exactly-once messaging from Kafka and use Window
>>> operations of
>>> DStraem,
>>>
>>> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
>>> Direct-API
>>> java.lang.ClassCastException occurs as follows.
>>>
>>> --- stacktrace --
>>> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD
>>> cannot
>>> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>>         at
>>>
>>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
>>>         at
>>>
>>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
>>>         at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>>         at
>>>
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>>         at
>>>
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>>         at
>>>
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>>         at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>>>         at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at
>>>
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at scala.util.Try$.apply(Try.scala:161)
>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>         at
>>>
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>         at java.lang.Thread.run(Thread.java:722)
>>>
>>>
>>> --- my source ---
>>>
>>> JavaStreamingContext jssc = new JavaStreamingContext(_ctx,
>>> batchInterval);
>>> jssc.checkpoint("checkpoint");
>>>
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream
>>>  (jssc, String.class, String.class, StringDecoder.class,
>>> StringDecoder.class, kafkaParams, topicsSet);
>>>
>>> JavaPairDStream<String, List&lt;String>> pairDS =
>>> messages.mapToPair(...);
>>>
>>> JavaPairDStream<String, List&lt;String>> windowDs =
>>> pairDS.reduceByKeyAndWindow(new Function2<List&lt;String>, List<String>,
>>> List<String>>() {
>>>         @Override
>>>         public List<String> call(List<String> list1, List<String> list2)
>>> throws
>>> Exception {
>>>                 ...
>>>         }
>>> }, windowDuration, slideDuration);
>>>
>>> windowDs.foreachRDD(new Function<JavaPairRDD&lt;String,List&lt;String>>,
>>> Void>() {
>>>
>>>         @Override
>>>         public Void call(JavaPairRDD<String, List&lt;String>> rdd)
>>> throws Exception
>>> {
>>>
>>>
>>>                 OffsetRange[] offsetsList = ((HasOffsetRanges)
>>> rdd.rdd()).offsetRanges();
>>> // ClassCastException occurred
>>>
>>>                 KafkaCluster kc = new
>>> KafkaCluster(toScalaMap(kafkaParams));
>>>                 for (OffsetRange offsets : offsetsList) {
>>>
>>>                         TopicAndPartition topicAndPartition = new
>>> TopicAndPartition(offsets.topic(), offsets.partition());
>>>
>>>                         HashMap<TopicAndPartition, Object> map = new
>>> HashMap<TopicAndPartition,
>>> Object>();
>>>                         map.put(topicAndPartition,
>>> offsets.untilOffset());
>>>                         kc.setConsumerOffsets("group1", toScalaMap(map));
>>>                 }
>>>
>>>                 return null;
>>>         }
>>> });
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to