我想你的格式错了 下面我修改了一下 tenv.executeSql( " create table t_upsert_kafka( " + " userid int , " + " username string, " + " age int, " + " `partition` int , " + " PRIMARY KEY (userid) NOT ENFORCED " + " ) with ( " + " 'connector' = 'upsert-kafka', " + " 'topic' = 'test02', " + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " + " 'key.format' = 'json', " + " 'value.format' = 'json' " + " ) " );
*下面是官方案例* CREATE TABLE pageviews_per_region ( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = '...', 'key.format' = 'avro', 'value.format' = 'avro'); 左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道: > > > > > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setInteger("rest.port", 10041); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > // env.setParallelism(1); > > env.enableCheckpointing(3000); > env.setStateBackend(new HashMapStateBackend()); > env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt"); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(20 * 1000); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > > env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > // 创建目标 kafka映射表 > tenv.executeSql( > " create table t_upsert_kafka( " > + " userid int primary key not enforced, " > + " username string, " > + " age int, " > + " `partition` int " > + " ) with ( " > + " 'connector' = 'upsert-kafka', " > + " 'topic' = 'test02', " > + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " > + " 'key.format' = 'json', " > + " 'value.format' = 'json' " > + " ) " > ); > > tenv.executeSql("select * from t_upsert_kafka").print(); > > tenv.executeSql( > " CREATE TABLE t_kafka_connector ( " > + " userid int , " > + " username string, " > + " age int, " > + " `partition` int " > + " ) WITH ( " > + " 'connector' = 'kafka', " > + " 'topic' = 'test02', " > + " 'properties.bootstrap.servers' = '192.168.0.82:9092', " > + " 'properties.group.id' = 'testGroup1', " > + " 'scan.startup.mode' = 'earliest-offset', " > + " 'format'='json' " > + " ) " > > ); > > tenv.executeSql("select * from t_kafka_connector").print(); > > env.execute(); > > > > > > t_upsert_kafka 消费不到 t_kafka_connector可以消费到 > > > > > > > > > > > > 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道: > >hi, > > > >看不到的图片。能不能直接展示文字或者用图床工具? > > > >Best, > >Shengkai > > > >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道: > > > >> upsert kafka作为source时,消费不到kafka中的数据 > >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka > >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下 > >> >