Re: flink sink kafka exactly once plz help me

2022-08-17 Thread David Anderson
You can keep the same transaction ID if you are restarting the job as a
continuation of what was running before. You need distinct IDs for
different jobs that will be running against the same kafka brokers. I think
of the transaction ID as an application identifier.

See [1] for a complete list of what needs to be done to achieve
exactly-once with Kafka.

[1]
https://www.docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/

On Wed, Aug 17, 2022 at 1:02 AM kcz <573693...@qq.com> wrote:

> flink-1.14.4
> kafka-2.4.0
> setTransactionalIdPrefix has a small question, this parameter if I start
> the next job, can not use the last transaction ID, need to automatically
> generate a new one, I just tested the restart from chk, but also generated
> a new transaction ID, this will not lead to data loss, right? I would like
> to ask for guidance from the big guys, I don't know if I can guarantee no
> data loss by producing a new transaction ID through
> System.currentTimeMillis() during production. Please help me, I need to use
> this function in production environment, but I don't know how to write it
> correctly. Pushing cleanup data from kafka consumption to kafka.
>
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(BOOTSTRAP_SERVERS)
> .setKafkaProducerConfig(kafkaProducerConfig)
> 
> .setTransactionalIdPrefix(JOB_NAME+System.currentTimeMillis()+"transactional.id")
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(SINK_TOPICS)
> .setValueSerializationSchema(new SimpleStringSchema())
> .build())
> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
> .build();
>
>


flink sink kafka exactly once plz help me

2022-08-17 Thread kcz
flink-1.14.4
kafka-2.4.0
setTransactionalIdPrefix has a small question, this parameter if I start the 
next job, can not use the last transaction ID, need to automatically generate a 
new one, I just tested the restart from chk, but also generated a new 
transaction ID, this will not lead to data loss, right? I would like to ask for 
guidance from the big guys, I don't know if I can guarantee no data loss by 
producing a new transaction ID through System.currentTimeMillis() during 
production. Please help me, I need to use this function in production 
environment, but I don't know how to write it correctly. Pushing cleanup data 
from kafka consumption to kafka.


KafkaSink