TD, it looks like your instincts were correct.  I misunderstood what you meant. 
 If I force an eval on the inputstream using foreachRDD, the windowing will 
work correctly.  If I don’t do that, lazy eval somehow screws with window 
batches I eventually receive.  Any reason the bug is categorized as minor?  It 
seems that anyone who uses the windowing functionality would run into this bug. 
 I imagine this would include anyone who wants to use spark streaming to 
aggregate data in fixed time batches, which seems like a fairly common use case.

Alan


On Jul 22, 2014, at 11:30 PM, Alan Ngai <a...@opsclarity.com> wrote:

> foreachRDD is how I extracted values in the first place, so that’s not going 
> to make a difference.  I don’t think it’s related to SPARK-1312 because I’m 
> generating data every second in the first place and I’m using foreachRDD 
> right after the window operation.  The code looks something like
> 
> val batchInterval = 5
> val windowInterval = 25
> val slideInterval = 15
> 
> val windowedStream = inputStream.window(Seconds(windowInterval), 
> Seconds(slideInterval))
> 
> val outputFunc = (r: RDD[MetricEvent], t: Time) => {
>   println("======================================== %s".format(t.milliseconds 
> / 1000))
>   r.foreach{metric =>
>     val timeKey = metric.timeStamp / batchInterval * batchInterval
>     println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, 
> metric.value))
>   }
> }
> testWindow.foreachRDD(outputFunc)
> 
> On Jul 22, 2014, at 10:13 PM, Tathagata Das <tathagata.das1...@gmail.com> 
> wrote:
> 
>> It could be related to this bug that is currently open. 
>> https://issues.apache.org/jira/browse/SPARK-1312
>> 
>> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and 
>> try these combos again?
>> 
>> TD
>> 
>> 
>> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <a...@opsclarity.com> wrote:
>> I have a sample application pumping out records 1 per second.  The batch 
>> interval is set to 5 seconds.  Here’s a list of “observed window intervals” 
>> vs what was actually set
>> 
>> window=25, slide=25 : observed-window=25, overlapped-batches=0
>> window=25, slide=20 : observed-window=20, overlapped-batches=0
>> window=25, slide=15 : observed-window=15, overlapped-batches=0
>> window=25, slide=10 : observed-window=20, overlapped-batches=2
>> window=25, slide=5 : observed-window=25, overlapped-batches=3
>> 
>> can someone explain this behavior to me?  I’m trying to aggregate metrics by 
>> time batches, but want to skip partial batches.  Therefore, I’m trying to 
>> find a combination which results in 1 overlapped batch, but no combination I 
>> tried gets me there.
>> 
>> Alan
>> 
>> 
> 

Reply via email to