[ 
https://issues.apache.org/jira/browse/SPARK-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-4392.
------------------------------
    Resolution: Won't Fix

WontFix, per PR

> Event proration based on event timestamps
> -----------------------------------------
>
>                 Key: SPARK-4392
>                 URL: https://issues.apache.org/jira/browse/SPARK-4392
>             Project: Spark
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Bijay Singh Bisht
>
> Topic: Spark Streaming using Event TimeStamps
> Spark streaming creates a batch (RDD) of events every T duration. The batch 
> is based on a schedule and the timestamp associated with the batch is the 
> time at which it was scheduled by Spark. Spark applied timestamp may be less 
> relevant than the timestamp for which the event was originally meant to be.
> The fundamental reason for the event timestamp to differ from the spark stamp 
> is the delay in event generation in the upstream system and delay in 
> transporting the event to spark after the event generation. The problem is 
> compounded in case of events having a start and an end with both the 
> timestamps packed in a single event generated after the event ends as 
> illustrated in the following diagram.
> (upstream) -------s--------e----g---------------->
> (spark ) ------------------------r------------->
> Horizontal axis is time. Event starts at s ends at e and the event record is 
> generated at g, which is then received by spark at r.
> So there is a need to create batches which only contain the relevant events 
> or the relevant proportion of the events according to the original timestamp 
> passed to Spark as a part of the received tuples. Lets refer to a batch which 
> has all the events occurring in the time window it represents as a bin. So a 
> bin from T1 - T2 will 'only have events' which occurred in the period T1 - 
> T2. The definition of the bin can be extended to include ‘all the events’ 
> which occurred in a given period, however second constraint is harder to 
> satisfy in practice, because events can be arbitrarily delayed.
> For the rest of the discussion the definition of the batch and the bin shall 
> be as per the previous paragraph.
> Bin sizes determine time series time granularity and is an independent 
> consideration in itself i.e independent of the batch/event/delay.
> Lets say that batch size is T and the bin size is n*T and an event is delayed 
> (for reception) at a maximum by d*T. So in order to generate a bin, n + d 
> batches of size T are required.
> Conversely every batch is going to contribute to current up till the last 
> ceiling((n + d)/ n) bins.
> For for batch @ t. The contents can be seen as T1 @ t (where the notation T1 
> @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2 
> @t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m = 
> ceiling(n + d)/ n))).
> We can then de-multiplex the contributions from batch @ t into bins T1, T1 - 
> 1, T1 - 2, T1 -3, resulting into streams which represent partial bins 
> relative to the batch stream. So a stream i represents partial bin T1 - i 
> received at t. This way the spark application can deliver incremental bins to 
> the downstream in the most real time possible. Now depending on how the 
> downstream application can handle the partial bins, the definition and the 
> generation of the streams needs to be controlled.
> Cases:
> The downstream application can handle incremental updates to the bin (i.e. a 
> partial bin = current partial bin + latest partial bin). For this what is 
> required is m streams which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t
> …
> Stream m: T1 - m @ t.
> The downstream application can only handle full updates to the bin ( i.e. 
> partial bin = latest partial bin). For this what is required is m streams 
> which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t + @ t - 1
> ...
> Stream m: T1 - m @ t + … + @ t - m
> i.e a bin is getting updated at every T until the bin is final. The first 
> stream represents the most current bin with the latest cumulative update. The 
> next stream represents the previous bin with the latest cumulative update and 
> so on. Until the last stream which represents a final bin.
> The downstream application cannot handle updates to a bin. This is basically 
> the last stream from case 2 (highlighted in bold) with the exception that it 
> slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1 @ t + 
> n*T, because the size of the bin is n*T.
> Typically each stream needs to treated similarly because it represents that 
> same kind of content, however there can be use cases where the stream may be 
> required to be treated differently. A consideration for the API.
> Implementation:
> In order to transform a batch stream to a partial bin stream, we can filter 
> the events and put the prorated events in a bin streams representing T @ t, 
> T-1 @ t and so on.
> For this we can define a new DStream which generates a DStream by prorating 
> the data from batch to a bin corresponding to the stream.
> For the use case 2 which requires progressively accumulating all the events 
> for a bin. A new DStream is required which generates a pulsating window which 
> goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A 
> stream index 0 implies that it is the most current partial bin stream.
> APIs
> BinStreamerT
> This will return a BinStreamer object.
> The BinStreamer object can be used to generate incremental bin streams (case 
> 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the 
> following APIs.
> BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int, 
> numStreams: Int) : Seq[BinStream[(T,percentage)]]
> BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) : 
> BinStream[(T,percentage)]
> BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int, 
> numStreams: Int) : Seq[BinStream[(T,percentage)]]
>     DStream[T] : This is the batch stream.
>         start : Closure to get the start time from the record.
> end : Closure to get the end time from the record.
> sizeInNumBatches : The size of bin as a multiple of batch size.
> delayIndex : The maximum delay between the event relevance and event 
> reception.
> numStreams: This is the number of bin streams. Even though it is fixed by 
> batch size, bin size and the delayIndex. This is an optional parameter to 
> control the number of output Streams and it does so by delaying the most 
> current bin.
> Each BinStream will wrap a DStream.
> def prorate(binStart: Time, binEnd: Time)(x: T) = {
>   val sx = startFunc(x)
>   val ex = endFunc(x)
>   // Even though binStart is not inclusive, binStart here implies limit x as 
> x approaches binStart+
>   val s = if (sx > binStart) sx else binStart
>   val e = if (ex < binEnd) ex else binEnd
>   if (ex == sx) {
>     (x, 1.0)
>   }
>   else {
>     (x, (e - s) / (ex - sx))
>   }
> }
> def filter(binStart: Time, binEnd: Time)(x: T) = {
> // The flow is starting in the subsequent bin
> if (startFunc(x) > binEnd) false
> // The flow ended in the prior bin
> else if (endFunc(x) <= binStart) false
> // start(x) approaches from binEnd+
> else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false
> else true
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to