项目中使用精准一次语义写入kafka,代码和配置如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)
.name("event2kafka").uid("kafkasink");
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*60000 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

        return kafkaProducerProps;
}

回复