Hi, Danny. 
When the problem occurs, can you use flame graph to confirm whether the loop in 
this code is causing the busyness?
Since I'm not particularly familiar with kafka connector, I can't give you an 
accurate reply. I think Hang Ruan is an expert in this field :). 


Hi, Ruan Hang. Can you take a look at this strange situation?




--

    Best!
    Xuyang




在 2024-03-10 16:49:16,"Daniel Peled" <daniel.peled.w...@gmail.com> 写道:

Hello,


I am sorry I am addressing you personally.
I have tried sending the request in the user group and got no response


If you can't help me please let me know
And please tell me who can help up


The problem is as followed:


We have noticed that when we add a new kafka sink operator to the graph, and 
start from the last save point, the operator is 100% busy for several minutes 
and even 1/2-1 hour !!!


The problematic code seems to be the following for-loop in 
getTransactionalProducer() method:


org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer



private FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(long checkpointId) {
        checkState(
                checkpointId > lastCheckpointId,
                "Expected %s > %s",
                checkpointId,
                lastCheckpointId);
        FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
        // in case checkpoints have been aborted, Flink would create 
non-consecutive transaction ids
        // this loop ensures that all gaps are filled with initialized (empty) 
transactions
        for (long id = lastCheckpointId + 1; id <= checkpointId; id++) {
            String transactionalId =
                    TransactionalIdFactory.buildTransactionalId(
                            transactionalIdPrefix, 
kafkaSinkContext.getParallelInstanceId(), id);
            producer = getOrCreateTransactionalProducer(transactionalId);
        }
        this.lastCheckpointId = checkpointId;
        assert producer != null;
        LOG.info("Created new transactional producer {}", 
producer.getTransactionalId());
        return producer;
    }





Since we added a new sink operator the lastCheckpointId is 1, 
And if for example the checkpointId is 20000,
The loop will be executed for 20000 times !!!


We have several questions:
1. Is this behaviour expected ?
2. Are we doing something wrong ?
3. Is there a way to avoid this behavior ?

Best Regards,
Danny

Reply via email to