项目中使用精准一次语义写入kafka,代码和配置如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build();
eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*60000 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; }