thanks ingo! i'll look at moving to rolling my own operator and using ConnectedStreams.transform with it.
On Tue, May 18, 2021 at 3:18 AM Ingo Bürk <i...@ververica.com> wrote: > Hi Jin, > > 1) As far as I know the order is only guaranteed for events from the same > partition. If you want events across partitions to remain in order you may > need to use parallelism 1. I'll attach some links here which might be > useful: > > > https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key > > https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink > > https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order > > 2) Indeed there doesn't seem to be a way to access the > InternalTimerService from a ProcessFunction at the moment. One approach > could be to implement this yourself using a MapState. Otherwise I think you > need to implement your own operator from which you can then access > InternalTimerService similar to how KeyedCoProcessOperator does it as well. > > > Regards > Ingo > > On Wed, May 12, 2021 at 8:32 AM Jin Yi <j...@promoted.ai> wrote: > >> hello. thanks ahead of time for anyone who answers. >> >> 1. verifying my understanding: for a kafka source that's partitioned on >> the same piece of data that is later used in a keyBy, if we are relying on >> the kafka timestamp as the event timestamp, is it guaranteed that the event >> stream of the source is in the kafka pipeline's insertion order for the >> topic? >> >> 2. is there a way to use the InternalTimerService from within a >> ProcessFunction (specifically, a KeyedCoProcessFunction)? i don't see an >> easy way to do this, except by changing the TimerService interface. the >> use case for my need is that i'd like to have timers to clean up the left >> and right keyed state using namespaced timers like how IntervalJoin does it >> ( >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256). >> right now, b/c the KeyedCoProcessFunction only gives us the >> SimpleTimerService via the Context, i can only trigger onTimer execution >> without being able to refine the cleaning of state to just the event state >> of the side that a timer was originated from. without this, it'll end up >> needing to visit state associated with both event streams which isn't >> performant as those streams can have different throughputs (and therefore, >> expect to have different retention characteristics/needs). >> >> thanks. >> >