还是没有消费到,麻烦查看附件中的图片











在 2022-10-31 10:03:05,"guozhi mang" <rookiegao...@gmail.com> 写道:
>我想你的格式错了
>下面我修改了一下
>tenv.executeSql(
>" create table t_upsert_kafka(                 "
>+ "    userid int ,        "
>+ "    username string,                          "
>+ "    age int,                             "
>+ "    `partition` int ,                            "
>+ "  PRIMARY KEY (userid) NOT ENFORCED "
>+ " ) with (                                    "
>+ "  'connector' = 'upsert-kafka',              "
>+ "  'topic' = 'test02',                "
>+ "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
>+ "  'key.format' = 'json',                             "
>+ "  'value.format' = 'json'                            "
>+ " )                                                  "
>);
>
>*下面是官方案例*
>
>CREATE TABLE pageviews_per_region (
>  user_region STRING,
>  pv BIGINT,
>  uv BIGINT,
>  PRIMARY KEY (user_region) NOT ENFORCED) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'pageviews_per_region',
>  'properties.bootstrap.servers' = '...',
>  'key.format' = 'avro',
>  'value.format' = 'avro');
>
>
>左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:
>
>>
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>> Configuration conf = new Configuration();
>> conf.setInteger("rest.port", 10041);
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> //        env.setParallelism(1);
>>
>> env.enableCheckpointing(3000);
>> env.setStateBackend(new HashMapStateBackend());
>> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>>
>> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> // 创建目标 kafka映射表
>> tenv.executeSql(
>> " create table t_upsert_kafka(                 "
>> + "    userid int primary key not enforced,        "
>> + "    username string,                          "
>> + "    age int,                             "
>> + "    `partition` int                             "
>> + " ) with (                                    "
>> + "  'connector' = 'upsert-kafka',              "
>> + "  'topic' = 'test02',                "
>> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',  "
>> + "  'key.format' = 'json',                             "
>> + "  'value.format' = 'json'                            "
>> + " )                                                  "
>> );
>>
>> tenv.executeSql("select * from t_upsert_kafka").print();
>>
>> tenv.executeSql(
>> " CREATE TABLE t_kafka_connector (                       "
>> + "    userid int ,        "
>> + "    username string,                          "
>> + "    age int,                             "
>> + "    `partition` int                             "
>> + " ) WITH (                                               "
>> + "  'connector' = 'kafka',                                "
>> + "  'topic' = 'test02',                             "
>> + "  'properties.bootstrap.servers' = '192.168.0.82:9092',      "
>> + "  'properties.group.id' = 'testGroup1',                  "
>> + "  'scan.startup.mode' = 'earliest-offset',           "
>> + "  'format'='json'                               "
>> + " )                                                   "
>>
>> );
>>
>> tenv.executeSql("select * from t_kafka_connector").print();
>>
>> env.execute();
>>
>>
>>
>>
>>
>> t_upsert_kafka 消费不到   t_kafka_connector可以消费到
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
>> >hi,
>> >
>> >看不到的图片。能不能直接展示文字或者用图床工具?
>> >
>> >Best,
>> >Shengkai
>> >
>> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>> >
>> >> upsert kafka作为source时,消费不到kafka中的数据
>> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>> >>
>>

回复