GitHub user wesleymiao opened a pull request:

    https://github.com/apache/spark/pull/5871

    [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't 
work all the time

    @tdas 
    
    The problem most likely resides in DStream.slice() implementation, as shown 
below.
    
      def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
        if (!isInitialized) {
          throw new SparkException(this + " has not been initialized")
        }
        if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
          logWarning("fromTime (" + fromTime + ") is not a multiple of 
slideDuration ("
            + slideDuration + ")")
        }
        if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
          logWarning("toTime (" + fromTime + ") is not a multiple of 
slideDuration ("
            + slideDuration + ")")
        }
        val alignedToTime = toTime.floor(slideDuration, zeroTime)
        val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
    
        logInfo("Slicing from " + fromTime + " to " + toTime +
          " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
    
        alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
          if (time >= zeroTime) getOrCompute(time) else None
        })
      }
    
    Here after performing floor() on both fromTime and toTime, the result 
(alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be 
multiple of the slidingDuration, thus making isTimeValid() check failed for all 
the remaining computation.
    
    The fix is to add a new floor() function in Time.scala to respect the 
zeroTime while performing the floor :
    
      def floor(that: Duration, zeroTime: Time): Time = {
        val t = that.milliseconds
        new Time(((this.millis - zeroTime.milliseconds) / t) * t + 
zeroTime.milliseconds)
      }
    
    And then change the DStream.slice to call this new floor function by 
passing in its zeroTime.
    
        val alignedToTime = toTime.floor(slideDuration, zeroTime)
        val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
    
    This way the alignedToTime and alignedFromTime are *really* aligned in 
respect to zeroTime whose value is not really a 0.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wesleymiao/spark spark-7326

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5871.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5871
    
----
commit 261174533048be2823e01a82071648794a86da8d
Author: Wesley Miao <[email protected]>
Date:   2015-05-03T12:15:35Z

    [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't 
work all the time
    
    The problem most likely resides in DStream.slice() implementation, as shown 
below.
    
      def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
        if (!isInitialized) {
          throw new SparkException(this + " has not been initialized")
        }
        if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
          logWarning("fromTime (" + fromTime + ") is not a multiple of 
slideDuration ("
            + slideDuration + ")")
        }
        if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
          logWarning("toTime (" + fromTime + ") is not a multiple of 
slideDuration ("
            + slideDuration + ")")
        }
        val alignedToTime = toTime.floor(slideDuration, zeroTime)
        val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
    
        logInfo("Slicing from " + fromTime + " to " + toTime +
          " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
    
        alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
          if (time >= zeroTime) getOrCompute(time) else None
        })
      }
    
    Here after performing floor() on both fromTime and toTime, the result 
(alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be 
multiple of the slidingDuration, thus making isTimeValid() check failed for all 
the remaining computation.
    
    The fix is to add a new floor() function in Time.scala to respect the 
zeroTime while performing the floor :
    
      def floor(that: Duration, zeroTime: Time): Time = {
        val t = that.milliseconds
        new Time(((this.millis - zeroTime.milliseconds) / t) * t + 
zeroTime.milliseconds)
      }
    
    And then change the DStream.slice to call this new floor function by 
passing in its zeroTime.
    
        val alignedToTime = toTime.floor(slideDuration, zeroTime)
        val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
    
    This way the alignedToTime and alignedFromTime are *really* aligned in 
respect to zeroTime whose value is not really a 0.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to