????????????issues, Fix Version/s:None 
??????????????????????????????????dataStream


------------------ ???????? ------------------
??????:&nbsp;"??????"<[email protected]&gt;;
????????:&nbsp;2020??7??1??(??????) ????3:31
??????:&nbsp;"user-zh"<[email protected]&gt;;"????"<[email protected]&gt;;

????:&nbsp;Re: flink sql ddl CREATE TABLE kafka011 sink 
????????????exactly-once??



??????????????????????????????????????https://issues.apache.org/jira/browse/FLINK-15221

???? <[email protected]&gt; ??2020??7??1?????? ????3:13??????

&gt; 
????,??????????????????Kafka011TableSourceSinkFactory??Kafka011TableSink??????exactly-once
&gt;
&gt; Kafka011TableSink
&gt;
&gt;
&gt; @Override
&gt; protected SinkFunction<Row&gt; createKafkaProducer(
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String topic,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SerializationSchema<Row&gt; 
serializationSchema,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Optional<FlinkKafkaPartitioner<Row&gt;&gt; partitioner) {
&gt;&nbsp;&nbsp;&nbsp; return new FlinkKafkaProducer011<&gt;(
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; topic,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new 
KeyedSerializationSchemaWrapper<&gt;(serializationSchema),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; partitioner,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 5);
&gt; }
&gt; ????????????????????,????????????KafkaTableSourceSinkFactoryBase
&gt;
&gt; ????:
&gt; 
https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
&gt; ------------------------------------------------------------------
&gt; ???????????????? <[email protected]&gt;
&gt; ??????????2020??7??1??(??????) 14:33
&gt; ????????user-zh <[email protected]&gt;
&gt; ?? ????flink sql ddl CREATE TABLE kafka011 sink ????????????exactly-once??
&gt;
&gt; &amp;nbsp;flink sql CREATE TABLE kafka 
sink????????checkpoint????????????sql
&gt; sink????????????????????exactly-once?????????? ??
&gt; ??????????
&gt; Consistency guarantees: By default, a Kafka sink ingests data with
&gt; at-least-once guarantees into a Kafka topic if the query is executed with
&gt; checkpointing enabled.??&amp;nbsp;&amp;nbsp;
&gt; CREATE TABLE ?????? at-least-once
&gt;
&gt;

回复