Hello everyone,

I just experienced something weird and I'd like to know if anyone has any idea 
of what could have happened.

I have a simple Flink cluster version 1.11.3 running on Kubernetes with a 
single worker.
I was testing a pipeline that connects 2 keyed streams and processes the result 
with a KeyedCoProcessFunction before writing the results to a database.
I enabled tracing in my logs and started sending test input data that would 
generate two keys, therefore the job would have 4 streams with 2 keys that 
would be connected into 2 streams.
In the logs I could see the data from the 4 streams with the correct keys, but 
the logs of the KeyedCoProcessFunction showed data for only one of the keys, 
and indeed the other key was never seen in my database.
I re-submitted the job and now it's behaving as expected without changing the 
code at all.

Is this a known issue? Has anyone else experienced something similar?

A sample of the code in case it's useful:

KeyedStream<A, String> allEventsWithTopology = openedEventsStream
        .getSideOutput(Filter.ALL_EVENTS_DISREGARDING_FILTER)
        .flatMap(new TopologicalPartitionKeyAssigner(...))
        .name("all-topological-events-stream")
        .uid(operatorPrefix + "all-topological-events-stream")
        .keyBy(keySelector);

DataStream<B> validCorrelationEvents = correlationEventStream
        .keyBy(new CorrelationEventKeySelector())
        .connect(allEventsWithTopology)
        .process(new CorrelationEventValidator(...))
        .name("valid-correlation-events")
        .uid(operatorPrefix + "valid-correlation-events");

Regards,
Alexis.

Reply via email to