还是没有消费到,麻烦查看附件中的图片
在 2022-10-31 10:03:05,"guozhi mang" <rookiegao...@gmail.com> 写道:
>我想你的格式错了
>下面我修改了一下
>tenv.executeSql(
>" create table t_upsert_kafka( "
>+ " userid int , "
>+ " username string, "
>+ " age int, "
>+ " `partition` int , "
>+ " PRIMARY KEY (userid) NOT ENFORCED "
>+ " ) with ( "
>+ " 'connector' = 'upsert-kafka', "
>+ " 'topic' = 'test02', "
>+ " 'properties.bootstrap.servers' = '192.168.0.82:9092', "
>+ " 'key.format' = 'json', "
>+ " 'value.format' = 'json' "
>+ " ) "
>);
>
>*下面是官方案例*
>
>CREATE TABLE pageviews_per_region (
> user_region STRING,
> pv BIGINT,
> uv BIGINT,
> PRIMARY KEY (user_region) NOT ENFORCED) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'pageviews_per_region',
> 'properties.bootstrap.servers' = '...',
> 'key.format' = 'avro',
> 'value.format' = 'avro');
>
>
>左岩 <13520871...@163.com> 于2022年10月31日周一 09:57写道:
>
>>
>>
>>
>>
>> public static void main(String[] args) throws Exception {
>> Configuration conf = new Configuration();
>> conf.setInteger("rest.port", 10041);
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> // env.setParallelism(1);
>>
>> env.enableCheckpointing(3000);
>> env.setStateBackend(new HashMapStateBackend());
>> env.getCheckpointConfig().setCheckpointStorage("file:///d:/zuoyanckpt");
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setCheckpointTimeout(20 * 1000);
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>>
>> env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> // 创建目标 kafka映射表
>> tenv.executeSql(
>> " create table t_upsert_kafka( "
>> + " userid int primary key not enforced, "
>> + " username string, "
>> + " age int, "
>> + " `partition` int "
>> + " ) with ( "
>> + " 'connector' = 'upsert-kafka', "
>> + " 'topic' = 'test02', "
>> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', "
>> + " 'key.format' = 'json', "
>> + " 'value.format' = 'json' "
>> + " ) "
>> );
>>
>> tenv.executeSql("select * from t_upsert_kafka").print();
>>
>> tenv.executeSql(
>> " CREATE TABLE t_kafka_connector ( "
>> + " userid int , "
>> + " username string, "
>> + " age int, "
>> + " `partition` int "
>> + " ) WITH ( "
>> + " 'connector' = 'kafka', "
>> + " 'topic' = 'test02', "
>> + " 'properties.bootstrap.servers' = '192.168.0.82:9092', "
>> + " 'properties.group.id' = 'testGroup1', "
>> + " 'scan.startup.mode' = 'earliest-offset', "
>> + " 'format'='json' "
>> + " ) "
>>
>> );
>>
>> tenv.executeSql("select * from t_kafka_connector").print();
>>
>> env.execute();
>>
>>
>>
>>
>>
>> t_upsert_kafka 消费不到 t_kafka_connector可以消费到
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-10-31 09:43:49,"Shengkai Fang" <fskm...@gmail.com> 写道:
>> >hi,
>> >
>> >看不到的图片。能不能直接展示文字或者用图床工具?
>> >
>> >Best,
>> >Shengkai
>> >
>> >左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
>> >
>> >> upsert kafka作为source时,消费不到kafka中的数据
>> >> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
>> >> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>> >>
>>