You can have the bolt maintain a sliding window of the events (last two events including the current one). If the current event is immediately after the previous event in the queue, the current event will be the head in the window. Otherwise, you can compute the missing events and emits them, before acking the current event and making it the head in the window. For this to work, all events for a given device has to go to the same bolt instance, so for scaling the bolt you can partition the stream on sensor/devices (or some other dimension) and use the appropriate fields grouping. Also another assumption is that events do not come out of order and as they are emitting every 1 minute (and not really concurrently), they should not be out of order from a given device, but if they do, then you can hold more events in the window and sort them). Also you would have to think about fault tolerance as in this approach we are keeping state in-memory and it will get lost if a node dies.
On Tue, Aug 11, 2015 at 5:20 PM, Alec Lee <[email protected]> wrote: > the timestamp looks like: > 2013-03-21 11:43:00-07 > 2013-03-21 11:44:00-07 > 2013-03-21 11:45:00-07 > > > > On Aug 11, 2015, at 5:05 PM, Alec Lee <[email protected]> wrote: > > Thanks, Javier, here is the data records I am receiving: > id | sensor_id | timestamp | period | current | date_received | > > so basically what I understand, each tuple emitted, including all fields > above, but some records are missing in terms of sequential timestamp, for > example, I should receive the records every minute > 2015-08-11T17:01:49 > 2015-08-11T17:01:50 > 2015-08-11T17:01:51 > 2015-08-11T17:01:52 > . > . > . > > however, I may get such type of data, > 2015-08-11T17:01:49 > 2015-08-11T17:01:50 > 2015-08-11T17:01:53 > > I must find the missing records corresponding to 2 timestamps between 50 > and 53, and I will estimate the miss current value by average the 01:49 and > 01:53 current values. > > I am not sure if I explain clearly, thanks > > AL > > On Aug 11, 2015, at 3:35 PM, Javier Gonzalez <[email protected]> wrote: > > Just to make sure I'm understanding correctly: Do you have a single stream > of sequential ids or multiple streams that need to be interpolated? Do you > receive a stream of ids and emit a stream of timestamped ids? > On Aug 11, 2015 5:34 PM, "Alec Lee" <[email protected]> wrote: > >> Hello, all >> >> Here I have a question about storm doing analytics, I have a data stream >> coming in in real-time, each record associates a timestamp, it supposes to >> be ingested every 1 second from devices, but we know some records are >> missing, say, timestamp1, timestamp2, timestamp5, here timestamp3 and 4 >> records are missing. How can I identify these missing records, what I need >> to find out what records are missed base on the sequential timestamp, and >> estimate the missing values in terms of last record, and next record, i can >> make the average as this missing value. And output of this bolt will be a >> consecutive of data with no missing records. >> >> >> Thanks >> >> >> Al > > > >
