Suggest logging a new issue with details provided in this thread.

Thanks

On Sun, Mar 6, 2016 at 9:56 AM, Jatin Kumar <jku...@rocketfuelinc.com>
wrote:

> Hello Ted,
>
> The JIRA you pointed is a different issue. Here are more details of the
> issue I am talking about:
>
> Consider code block this:
>
> val streamingContext = new StreamingContext(sparkConfig, Seconds(2))
>
> val totalVideoImps = streamingContext.sparkContext.accumulator(0, 
> "TotalVideoImpressions")
> val totalImps = streamingContext.sparkContext.accumulator(0, 
> "TotalImpressions")
>
> val stream = KafkaReader.KafkaDirectStream(streamingContext)
> stream.map(KafkaAdLogParser.parseAdLogRecord)
>   .filter(record => {
>     totalImps += 1
>     KafkaAdLogParser.isVideoRecord(record)
>   })
>   .map(record => {
>     totalVideoImps += 1
>     record.url
>   })
>   .window(Seconds(120))
>   .count().foreachRDD((rdd, time) => {
>   println("Timestamp: " + ImpressionAggregator.millsToDate(time.milliseconds))
>   println("Count: " + rdd.collect()(0))
>   println("Total Impressions: " + totalImps.value)
>   totalImps.setValue(0)
>   println("Total Video Impressions: " + totalVideoImps.value)
>   totalVideoImps.setValue(0)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
>
>
> Batch Size before window operation is 2 sec and then after window batches
> are of 120 seconds each. Now the output of the above program for first 2
> batches of 120 sec each is:
>
> Timestamp: 2016-03-06 12:02:56,000
> Count: 362195
> Total Impressions: 16882431
> Total Video Impressions: 362195
>
> Timestamp: 2016-03-06 12:04:56,000
> Count: 367168
> Total Impressions: 19480293
> Total Video Impressions: 367168
>
> Timestamp: 2016-03-06 12:06:56,000
> Count: 177711
> Total Impressions: 10196677
> Total Video Impressions: 177711
>
> Whereas the spark UI shows different numbers as attached in the image.
> Also if we check the start and end index of kafka partition offsets
> reported by subsequent batch entries on UI, they do not result in all
> overall continuous range. All numbers are fine if we remove the window
> operation though.
>
> I think this is a bug and I couldn't find any existing bug regarding this.
>
> --
> Thanks
> Jatin
>
> On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you taken a look at SPARK-12739 ?
>>
>> FYI
>>
>> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> Consider following two code blocks:
>>>
>>> val ssc = new StreamingContext(sparkConfig, Seconds(2))
>>> val stream = KafkaUtils.createDirectStream(...)
>>>
>>> a) stream.filter(filterFunc).count().foreachRDD(rdd =>
>>> println(rdd.collect()))
>>> b) stream.filter(filterFunc).window(Seconds(60),
>>> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))
>>>
>>> I have observed that in case
>>> a) the UI behaves correctly and numbers reported for each batch are
>>> correct.
>>> b) UI reports numbers every 60 seconds but the batch-id/input-size etc
>>> are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch
>>> etc. These numbers become totally useless, infact confusing in this case
>>> though the delay/processing-time numbers are still helpful.
>>>
>>> Is someone working on a fix to show aggregated numbers which will be
>>> more useful?
>>>
>>> --
>>> Thanks
>>> Jatin
>>>
>>
>>
>

Reply via email to