flink-1.14.4
kafka-2.4.0
setTransactionalIdPrefix has a small question, this parameter if I start the 
next job, can not use the last transaction ID, need to automatically generate a 
new one, I just tested the restart from chk, but also generated a new 
transaction ID, this will not lead to data loss, right? I would like to ask for 
guidance from the big guys, I don't know if I can guarantee no data loss by 
producing a new transaction ID through System.currentTimeMillis() during 
production. Please help me, I need to use this function in production 
environment, but I don't know how to write it correctly. Pushing cleanup data 
from kafka consumption to kafka.


KafkaSink<String&gt; sink = KafkaSink.<String&gt;builder()
        .setBootstrapServers(BOOTSTRAP_SERVERS)
        .setKafkaProducerConfig(kafkaProducerConfig)
        
.setTransactionalIdPrefix(JOB_NAME+System.currentTimeMillis()+"transactional.id")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(SINK_TOPICS)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .build();

Reply via email to