I'm working with your suggestions, thank you very much. What I'm missing here is what YourWindowFunction should do. I have no notion of event time there and so I can't assign a timestamp. Also this solution seems to be working by processing time, while I care about event time. I couldn't make it run yet but for what I got, this is slightly different from what I need.
2016-09-30 10:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Simone, > > I think I have a solution for your problem: > > val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time) > > val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate) > .keyBy(_._1) // key by id > .flatMap(new StateUpdater) // StateUpdater is a stateful > FlatMapFunction. It has a keyed state that stores the last state of each > id. For each input record it returns two records: (oldState, -1), > (newState, +1) > > stateChanges ensures that counts of previous states are subtracted. > > val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges // > (state, cntUpdate, time) > .keyBy(_._1) // key by state > .window() // your window, should be non-overlapping, so go for instance > for Tumbling > .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums > the cntUpdates and YourWindowFunction assigns the timestamp of your window > > this step aggregates all state changes for each state in a window > > val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, > time) > .keyBy(_._1) // key by state again > .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has > a keyed state that stores the current count. For each incoming record, the > count is adjusted and a record (state, newCount, time) is emitted. > > Now you have the new counts for your states in multiple records. If > possible, you can update your Elasticsearch index using these. Otherwise, > you have to collect them into one record using another window. > > Also note, that the state size of this program depends on the number of > unique ids. That might cause problems if the id space grows very fast. > > Please let me know, if you have questions or if that works ;-) > > Cheers, Fabian > > > 2016-09-30 0:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>: > >> Hello, >> >> in the last few days I tried to create my first real-time analytics job >> in Flink. The approach is kappa-architecture-like, so I have my raw data on >> Kafka where we receive a message for every change of state of any entity. >> >> So the messages are of the form >> >> (id,newStatus, timestamp) >> >> We want to compute, for every time window, the count of items in a given >> status. So the output should be of the form >> >> (outputTimestamp, state1:count1,state2:count2 ...) >> >> or equivalent. These rows should contain, at any given time, the count of >> the items in a given status, where the status associated to an Id is the >> most recent message observed for that id. The status for an id should be >> counted in any case, even if the event is way older than those getting >> processed. So the sum of all the counts should be equal to the number of >> different IDs observed in the system. The following step could be >> forgetting about the items in a final item after a while, but this is not a >> strict requirement right now. >> >> This will be written on elasticsearch and then queried. >> >> I tried many different paths and none of them completely satisfied the >> requirement. Using a sliding window I could easily achieve the expected >> behaviour, except that when the beginning of the sliding window surpassed >> the timestamp of an event, it was lost for the count, as you may expect. >> Others approaches failed to be consistent when working with a backlog >> because I did some tricks with keys and timestamps that failed when the >> data was processed all at once. >> >> So I would like to know, even at an high level, how should I approach >> this problem. It looks like a relatively common use-case but the fact that >> the relevant information for a given ID must be retained indefinitely to >> count the entities correctly creates a lot of problems. >> >> Thank you in advance, >> >> Simone >> >> >