[ https://issues.apache.org/jira/browse/SPARK-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-4392. ------------------------------ Resolution: Won't Fix WontFix, per PR > Event proration based on event timestamps > ----------------------------------------- > > Key: SPARK-4392 > URL: https://issues.apache.org/jira/browse/SPARK-4392 > Project: Spark > Issue Type: New Feature > Components: Streaming > Reporter: Bijay Singh Bisht > > Topic: Spark Streaming using Event TimeStamps > Spark streaming creates a batch (RDD) of events every T duration. The batch > is based on a schedule and the timestamp associated with the batch is the > time at which it was scheduled by Spark. Spark applied timestamp may be less > relevant than the timestamp for which the event was originally meant to be. > The fundamental reason for the event timestamp to differ from the spark stamp > is the delay in event generation in the upstream system and delay in > transporting the event to spark after the event generation. The problem is > compounded in case of events having a start and an end with both the > timestamps packed in a single event generated after the event ends as > illustrated in the following diagram. > (upstream) -------s--------e----g----------------> > (spark ) ------------------------r-------------> > Horizontal axis is time. Event starts at s ends at e and the event record is > generated at g, which is then received by spark at r. > So there is a need to create batches which only contain the relevant events > or the relevant proportion of the events according to the original timestamp > passed to Spark as a part of the received tuples. Lets refer to a batch which > has all the events occurring in the time window it represents as a bin. So a > bin from T1 - T2 will 'only have events' which occurred in the period T1 - > T2. The definition of the bin can be extended to include ‘all the events’ > which occurred in a given period, however second constraint is harder to > satisfy in practice, because events can be arbitrarily delayed. > For the rest of the discussion the definition of the batch and the bin shall > be as per the previous paragraph. > Bin sizes determine time series time granularity and is an independent > consideration in itself i.e independent of the batch/event/delay. > Lets say that batch size is T and the bin size is n*T and an event is delayed > (for reception) at a maximum by d*T. So in order to generate a bin, n + d > batches of size T are required. > Conversely every batch is going to contribute to current up till the last > ceiling((n + d)/ n) bins. > For for batch @ t. The contents can be seen as T1 @ t (where the notation T1 > @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2 > @t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m = > ceiling(n + d)/ n))). > We can then de-multiplex the contributions from batch @ t into bins T1, T1 - > 1, T1 - 2, T1 -3, resulting into streams which represent partial bins > relative to the batch stream. So a stream i represents partial bin T1 - i > received at t. This way the spark application can deliver incremental bins to > the downstream in the most real time possible. Now depending on how the > downstream application can handle the partial bins, the definition and the > generation of the streams needs to be controlled. > Cases: > The downstream application can handle incremental updates to the bin (i.e. a > partial bin = current partial bin + latest partial bin). For this what is > required is m streams which send the updates every T interval where > Stream 1: T1 @ t > Stream 2: T1 - 1 @ t > … > Stream m: T1 - m @ t. > The downstream application can only handle full updates to the bin ( i.e. > partial bin = latest partial bin). For this what is required is m streams > which send the updates every T interval where > Stream 1: T1 @ t > Stream 2: T1 - 1 @ t + @ t - 1 > ... > Stream m: T1 - m @ t + … + @ t - m > i.e a bin is getting updated at every T until the bin is final. The first > stream represents the most current bin with the latest cumulative update. The > next stream represents the previous bin with the latest cumulative update and > so on. Until the last stream which represents a final bin. > The downstream application cannot handle updates to a bin. This is basically > the last stream from case 2 (highlighted in bold) with the exception that it > slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1 @ t + > n*T, because the size of the bin is n*T. > Typically each stream needs to treated similarly because it represents that > same kind of content, however there can be use cases where the stream may be > required to be treated differently. A consideration for the API. > Implementation: > In order to transform a batch stream to a partial bin stream, we can filter > the events and put the prorated events in a bin streams representing T @ t, > T-1 @ t and so on. > For this we can define a new DStream which generates a DStream by prorating > the data from batch to a bin corresponding to the stream. > For the use case 2 which requires progressively accumulating all the events > for a bin. A new DStream is required which generates a pulsating window which > goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A > stream index 0 implies that it is the most current partial bin stream. > APIs > BinStreamerT > This will return a BinStreamer object. > The BinStreamer object can be used to generate incremental bin streams (case > 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the > following APIs. > BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int, > numStreams: Int) : Seq[BinStream[(T,percentage)]] > BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) : > BinStream[(T,percentage)] > BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int, > numStreams: Int) : Seq[BinStream[(T,percentage)]] > DStream[T] : This is the batch stream. > start : Closure to get the start time from the record. > end : Closure to get the end time from the record. > sizeInNumBatches : The size of bin as a multiple of batch size. > delayIndex : The maximum delay between the event relevance and event > reception. > numStreams: This is the number of bin streams. Even though it is fixed by > batch size, bin size and the delayIndex. This is an optional parameter to > control the number of output Streams and it does so by delaying the most > current bin. > Each BinStream will wrap a DStream. > def prorate(binStart: Time, binEnd: Time)(x: T) = { > val sx = startFunc(x) > val ex = endFunc(x) > // Even though binStart is not inclusive, binStart here implies limit x as > x approaches binStart+ > val s = if (sx > binStart) sx else binStart > val e = if (ex < binEnd) ex else binEnd > if (ex == sx) { > (x, 1.0) > } > else { > (x, (e - s) / (ex - sx)) > } > } > def filter(binStart: Time, binEnd: Time)(x: T) = { > // The flow is starting in the subsequent bin > if (startFunc(x) > binEnd) false > // The flow ended in the prior bin > else if (endFunc(x) <= binStart) false > // start(x) approaches from binEnd+ > else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false > else true > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org