thanks ingo!  i'll look at moving to rolling my own operator and using
ConnectedStreams.transform with it.

On Tue, May 18, 2021 at 3:18 AM Ingo Bürk <i...@ververica.com> wrote:

> Hi Jin,
>
> 1) As far as I know the order is only guaranteed for events from the same
> partition. If you want events across partitions to remain in order you may
> need to use parallelism 1. I'll attach some links here which might be
> useful:
>
>
> https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key
>
> https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink
>
> https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order
>
> 2) Indeed there doesn't seem to be a way to access the
> InternalTimerService from a ProcessFunction at the moment. One approach
> could be to implement this yourself using a MapState. Otherwise I think you
> need to implement your own operator from which you can then access
> InternalTimerService similar to how KeyedCoProcessOperator does it as well.
>
>
> Regards
> Ingo
>
> On Wed, May 12, 2021 at 8:32 AM Jin Yi <j...@promoted.ai> wrote:
>
>> hello.  thanks ahead of time for anyone who answers.
>>
>> 1.  verifying my understanding: for a kafka source that's partitioned on
>> the same piece of data that is later used in a keyBy, if we are relying on
>> the kafka timestamp as the event timestamp, is it guaranteed that the event
>> stream of the source is in the kafka pipeline's insertion order for the
>> topic?
>>
>> 2.  is there a way to use the InternalTimerService from within a
>> ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an
>> easy way to do this, except by changing the TimerService interface.  the
>> use case for my need is that i'd like to have timers to clean up the left
>> and right keyed state using namespaced timers like how IntervalJoin does it
>> (
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).
>> right now, b/c the KeyedCoProcessFunction only gives us the
>> SimpleTimerService via the Context, i can only trigger onTimer execution
>> without being able to refine the cleaning of state to just the event state
>> of the side that a timer was originated from.  without this, it'll end up
>> needing to visit state associated with both event streams which isn't
>> performant as those streams can have different throughputs (and therefore,
>> expect to have different retention characteristics/needs).
>>
>> thanks.
>>
>

Reply via email to