flink canal json格式忽略不识别的type
不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如 例1: {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT NULL DEFAULT '-00-00 00:00:00', `updatedby` varchar(255) DEFAULT NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} 例2: { "action":"ALTER", "before":[], "bid":0, "data":[], "db":"db_test", "dbValType":{ "col1":"varchar(22)", "col2":"varchar(22)", "col_pk":"varchar(22)" }, "ddl":true, "entryType":"ROWDATA", "execTs":1669789188000, "jdbcType":{ "col1":12, "col2":12, "col_pk":12 }, "pks":[], "schema":"db_test", "sendTs":1669789189533, "sql":"alter table table_test add col2 varchar(22) null", "table":"table_test", "tableChanges":{ "table":{ "columns":[ { "jdbcType":12, // jdbc 类型。 "name":"col1",// 字段名称。 "position":0, // 字段的顺序。 "typeExpression":"varchar(22)", // 类型描述。 "typeName":"varchar" // 类型名称。 }, { "jdbcType":12, "name":"col2", "position":1, "typeExpression":"varchar(22)", "typeName":"varchar" }, { "jdbcType":12, "name":"col_pk", "position":2, "typeExpression":"varchar(22)", "typeName":"varchar" } ], "primaryKeyColumnNames":["col_pk"] // 主键名列表。 }, "type":"ALTER" } }
Re: Re: Kafka 数据源无法实现基于事件时间的窗口聚合
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。 wei_yuze 于2023年2月8日周三 13:30写道: > > 非常感谢各位的回答! > > > > Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot > 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。 > > > 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。 > > > 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism > 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。 > > > > 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据? > > > > > > Best, > > Lucas > > > > Original Email > > > > Sender:"Weihua Hu"< huweihua@gmail.com ; > > Sent Time:2023/2/7 18:48 > > To:"user-zh"< user-zh@flink.apache.org ; > > Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合 > > > Hi, > > 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task > 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 > 可以尝试通过以下办法解决: > 1. 将 source 并发控制为 1 > 2. 为 watermark 策略开始 idleness 处理,参考 [#1] > > fromElement 数据源会强制指定并发为 1 > > [#1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > > Best, > Weihua > > > On Tue, Feb 7, 2023 at 1:31 PM wei_yuze wrote: > > 您好! > > > > > > 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource > 和 kafkaSource > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 > > > > > public class WindowReduceTest2 { public static void > main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > // 使用fromElement数据源 > DataStreamSource env.fromElements( > new > Event2("Alice", "./home", "2023-02-04 17:10:11"), > new Event2("Bob", > "./cart", "2023-02-04 17:10:12"), > new > Event2("Alice", "./home", "2023-02-04 17:10:13"), > new > Event2("Alice", "./home", "2023-02-04 17:10:15"), > new > Event2("Cary", > "./home", "2023-02-04 17:10:16"), > new > Event2("Cary", > "./home", "2023-02-04 17:10:16") > ); > > > // 使用Kafka数据源 > JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<(Event2.class); > KafkaSource > KafkaSource. > .setBootstrapServers(Config.KAFKA_BROKERS) > > .setTopics(Config.KAFKA_TOPIC) > > .setGroupId("my-group") > > .setStartingOffsets(OffsetsInitializer.earliest()) > > .setValueOnlyDeserializer(jsonFormat) > .build(); > DataStreamSource env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > kafkaSource.print(); > > > // 生成watermark,从数据中提取时间作为事件时间 > SingleOutputStreamOperator watermarkedStream = > > kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. > .withTimestampAssigner(new SerializableTimestampAssigner > > @Override > > public long extractTimestamp(Event2 element, long recordTimestamp) { > >SimpleDateFormat simpleDateFormat = new > SimpleDateFormat("-MM-dd HH:mm:ss"); > >Date date = null; > >try { > > date = > simpleDateFormat.parse(element.getTime()); > >} catch (ParseException e) { > > throw new RuntimeException(e); > >} > >long time = date.getTime(); > >System.out.println(time); > >return time; >} > })); > > > // 窗口聚合 > watermarkedStream.map(new MapFunction Tuple2 > > @Override > > public Tuple2 > >// 将数据转换成二元组,方便计算 > >return Tuple2.of(value.getUser(), 1L); >} > }) > .keyBy(r - > r.f0) > // 设置滚动事件时间窗口 > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .reduce(new > ReduceFunction > @Override > > public Tuple2 Tuple2 > >// 定义累加规则,窗口闭合时,向下游发送累加结果 > >return Tuple2.of(value1.f0, value1.f1 + value2.f1); >} > }) > > .print("Aggregated > stream"); > > > env.execute(); >} > } > > > > > > > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 > > > > 感谢您花时间查看这个问题! > Lucas