GitHub user wesleymiao opened a pull request:
https://github.com/apache/spark/pull/5871
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't
work all the time
@tdas
The problem most likely resides in DStream.slice() implementation, as shown
below.
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of
slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of
slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
if (time >= zeroTime) getOrCompute(time) else None
})
}
Here after performing floor() on both fromTime and toTime, the result
(alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be
multiple of the slidingDuration, thus making isTimeValid() check failed for all
the remaining computation.
The fix is to add a new floor() function in Time.scala to respect the
zeroTime while performing the floor :
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t +
zeroTime.milliseconds)
}
And then change the DStream.slice to call this new floor function by
passing in its zeroTime.
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
This way the alignedToTime and alignedFromTime are *really* aligned in
respect to zeroTime whose value is not really a 0.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/wesleymiao/spark spark-7326
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/5871.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5871
----
commit 261174533048be2823e01a82071648794a86da8d
Author: Wesley Miao <[email protected]>
Date: 2015-05-03T12:15:35Z
[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't
work all the time
The problem most likely resides in DStream.slice() implementation, as shown
below.
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized) {
throw new SparkException(this + " has not been initialized")
}
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of
slideDuration ("
+ slideDuration + ")")
}
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of
slideDuration ("
+ slideDuration + ")")
}
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
if (time >= zeroTime) getOrCompute(time) else None
})
}
Here after performing floor() on both fromTime and toTime, the result
(alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be
multiple of the slidingDuration, thus making isTimeValid() check failed for all
the remaining computation.
The fix is to add a new floor() function in Time.scala to respect the
zeroTime while performing the floor :
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t +
zeroTime.milliseconds)
}
And then change the DStream.slice to call this new floor function by
passing in its zeroTime.
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
This way the alignedToTime and alignedFromTime are *really* aligned in
respect to zeroTime whose value is not really a 0.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]