我想你的格式错了
下面我修改了一下
tenv.executeSql(
" create table t_upsert_kafka(                 "
+ "    userid int ,        "
+ "    username string,                          "
+ "    age int,                             "
+ "    `partition` int ,                            "
+ "  PRIMARY KEY (userid) NOT ENFORCED "
+ " ) with (                                    "
+ "  'connector' = 'upsert-kafka',              "
+ "  'topic' = 'test02',                "
+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
+ "  'key.format' = 'json',                             "
+ "  'value.format' = 'json'                            "
+ " )                                                  "
);

*下面是官方案例*

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro');


左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:

>
>
>
>
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setInteger("rest.port", 10041);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> //        env.setParallelism(1);
>
> env.enableCheckpointing(3000);
> env.setStateBackend(new HashMapStateBackend());
> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>
> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> // 创建目标 kafka映射表
> tenv.executeSql(
> " create table t_upsert_kafka(                 "
> + "    userid int primary key not enforced,        "
> + "    username string,                          "
> + "    age int,                             "
> + "    `partition` int                             "
> + " ) with (                                    "
> + "  'connector' = 'upsert-kafka',              "
> + "  'topic' = 'test02',                "
> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
> + "  'key.format' = 'json',                             "
> + "  'value.format' = 'json'                            "
> + " )                                                  "
> );
>
> tenv.executeSql("select * from t_upsert_kafka").print();
>
> tenv.executeSql(
> " CREATE TABLE t_kafka_connector (                       "
> + "    userid int ,        "
> + "    username string,                          "
> + "    age int,                             "
> + "    `partition` int                             "
> + " ) WITH (                                               "
> + "  'connector' = 'kafka',                                "
> + "  'topic' = 'test02',                             "
> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',      "
> + "  'properties.group.id' = 'testGroup1',                  "
> + "  'scan.startup.mode' = 'earliest-offset',           "
> + "  'format'='json'                               "
> + " )                                                   "
>
> );
>
> tenv.executeSql("select * from t_kafka_connector").print();
>
> env.execute();
>
>
>
>
>
> t_upsert_kafka 消费不到   t_kafka_connector可以消费到
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
> >hi,
> >
> >看不到的图片。能不能直接展示文字或者用图床工具?
> >
> >Best,
> >Shengkai
> >
> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
> >
> >> upsert kafka作为source时,消费不到kafka中的数据
> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
> >>
>

回复