I'm working with your suggestions, thank you very much. What I'm missing
here is what YourWindowFunction should do. I have no notion of event time
there and so I can't assign a timestamp. Also this solution seems to be
working by processing time, while I care about event time. I couldn't make
it run yet but for what I got, this is slightly different from what I need.

2016-09-30 10:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> I think I have a solution for your problem:
>
> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>
> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>   .keyBy(_._1) // key by id
>   .flatMap(new StateUpdater) // StateUpdater is a stateful
> FlatMapFunction. It has a keyed state that stores the last state of each
> id. For each input record it returns two records: (oldState, -1),
> (newState, +1)
>
> stateChanges ensures that counts of previous states are subtracted.
>
> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
> (state, cntUpdate, time)
>   .keyBy(_._1) // key by state
>   .window() // your window, should be non-overlapping, so go for instance
> for Tumbling
>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>
> this step aggregates all state changes for each state in a window
>
> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
> time)
>   .keyBy(_._1) // key by state again
>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
> a keyed state that stores the current count. For each incoming record, the
> count is adjusted and a record (state, newCount, time) is emitted.
>
> Now you have the new counts for your states in multiple records. If
> possible, you can update your Elasticsearch index using these. Otherwise,
> you have to collect them into one record using another window.
>
> Also note, that the state size of this program depends on the number of
> unique ids. That might cause problems if the id space grows very fast.
>
> Please let me know, if you have questions or if that works ;-)
>
> Cheers, Fabian
>
>
> 2016-09-30 0:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> Hello,
>>
>> in the last few days I tried to create my first real-time analytics job
>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>> Kafka where we receive a message for every change of state of any entity.
>>
>> So the messages are of the form
>>
>> (id,newStatus, timestamp)
>>
>> We want to compute, for every time window, the count of items in a given
>> status. So the output should be of the form
>>
>> (outputTimestamp, state1:count1,state2:count2 ...)
>>
>> or equivalent. These rows should contain, at any given time, the count of
>> the items in a given status, where the status associated to an Id is the
>> most recent message observed for that id. The status for an id should be
>> counted in any case, even if the event is way older than those getting
>> processed. So the sum of all the counts should be equal to the number of
>> different IDs observed in the system. The following step could be
>> forgetting about the items in a final item after a while, but this is not a
>> strict requirement right now.
>>
>> This will be written on elasticsearch and then queried.
>>
>> I tried many different paths and none of them completely satisfied the
>> requirement. Using a sliding window I could easily achieve the expected
>> behaviour, except that when the beginning of the sliding window surpassed
>> the timestamp of an event, it was lost for the count, as you may expect.
>> Others approaches failed to be consistent when working with a backlog
>> because I did some tricks with keys and timestamps that failed when the
>> data was processed all at once.
>>
>> So I would like to know, even at an high level, how should I approach
>> this problem. It looks like a relatively common use-case but the fact that
>> the relevant information for a given ID must be retained indefinitely to
>> count the entities correctly creates a lot of problems.
>>
>> Thank you in advance,
>>
>> Simone
>>
>>
>

Reply via email to