Suggest logging a new issue with details provided in this thread. Thanks
On Sun, Mar 6, 2016 at 9:56 AM, Jatin Kumar <jku...@rocketfuelinc.com> wrote: > Hello Ted, > > The JIRA you pointed is a different issue. Here are more details of the > issue I am talking about: > > Consider code block this: > > val streamingContext = new StreamingContext(sparkConfig, Seconds(2)) > > val totalVideoImps = streamingContext.sparkContext.accumulator(0, > "TotalVideoImpressions") > val totalImps = streamingContext.sparkContext.accumulator(0, > "TotalImpressions") > > val stream = KafkaReader.KafkaDirectStream(streamingContext) > stream.map(KafkaAdLogParser.parseAdLogRecord) > .filter(record => { > totalImps += 1 > KafkaAdLogParser.isVideoRecord(record) > }) > .map(record => { > totalVideoImps += 1 > record.url > }) > .window(Seconds(120)) > .count().foreachRDD((rdd, time) => { > println("Timestamp: " + ImpressionAggregator.millsToDate(time.milliseconds)) > println("Count: " + rdd.collect()(0)) > println("Total Impressions: " + totalImps.value) > totalImps.setValue(0) > println("Total Video Impressions: " + totalVideoImps.value) > totalVideoImps.setValue(0) > }) > streamingContext.start() > streamingContext.awaitTermination() > > > Batch Size before window operation is 2 sec and then after window batches > are of 120 seconds each. Now the output of the above program for first 2 > batches of 120 sec each is: > > Timestamp: 2016-03-06 12:02:56,000 > Count: 362195 > Total Impressions: 16882431 > Total Video Impressions: 362195 > > Timestamp: 2016-03-06 12:04:56,000 > Count: 367168 > Total Impressions: 19480293 > Total Video Impressions: 367168 > > Timestamp: 2016-03-06 12:06:56,000 > Count: 177711 > Total Impressions: 10196677 > Total Video Impressions: 177711 > > Whereas the spark UI shows different numbers as attached in the image. > Also if we check the start and end index of kafka partition offsets > reported by subsequent batch entries on UI, they do not result in all > overall continuous range. All numbers are fine if we remove the window > operation though. > > I think this is a bug and I couldn't find any existing bug regarding this. > > -- > Thanks > Jatin > > On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Have you taken a look at SPARK-12739 ? >> >> FYI >> >> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar < >> jku...@rocketfuelinc.com.invalid> wrote: >> >>> Hello all, >>> >>> Consider following two code blocks: >>> >>> val ssc = new StreamingContext(sparkConfig, Seconds(2)) >>> val stream = KafkaUtils.createDirectStream(...) >>> >>> a) stream.filter(filterFunc).count().foreachRDD(rdd => >>> println(rdd.collect())) >>> b) stream.filter(filterFunc).window(Seconds(60), >>> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect())) >>> >>> I have observed that in case >>> a) the UI behaves correctly and numbers reported for each batch are >>> correct. >>> b) UI reports numbers every 60 seconds but the batch-id/input-size etc >>> are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch >>> etc. These numbers become totally useless, infact confusing in this case >>> though the delay/processing-time numbers are still helpful. >>> >>> Is someone working on a fix to show aggregated numbers which will be >>> more useful? >>> >>> -- >>> Thanks >>> Jatin >>> >> >> >