Re: flink sql源表定义字段列表问题
不需要提供全部字段 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink sql怎样将change log stream转换成append log stream?
tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info")); tableEnv .toRetractStream(tableEnv.from("order_info"), Row.class) .filter((FilterFunction>) booleanRowTuple2 -> booleanRowTuple2.f0) .map((MapFunction, Row>) booleanRowTuple2 -> booleanRowTuple2.f1) .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner(((element, recordTimestamp) -> System.currentTimeMillis( .keyBy((KeySelector) row -> row.getField("consignee").toString()) .window(TumblingEventTimeWindows.of(Time.seconds(100))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) .process(new ProcessWindowFunction, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable elements, Collector> out) throws Exception { Long count = 0L; for (Row element : elements) { count += 1; } out.collect(new Tuple2(context.window(), count)); } }) .print(); ; streamEnv.execute(); -- Sent from: http://apache-flink.147419.n8.nabble.com/
????????????????????????????????????????????????checkpoint??????
---- ??: ""<62...@163.com; : 2021??5??17??(??) 6:23 ??: "user-zh"
flink sql写mysql中文乱码问题
我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; 运行起来后写入mysql表的数据带有中文乱码 ?? 查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
flink sql源表定义字段列表问题
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢? 这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。 cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错? kafka表定义是否支持部分字段?
Re: flink问题咨询
Hello 你可以把具体的问题描述清楚点,比如给出一些数据和sql,能够复现你遇到的问题,这样大家才能帮忙排查。 祝好, Leonard Xu > 在 2021年5月18日,09:49,清酌 写道: > > 您好! > 我在使用1.11版本flink sql cdc > 时候,用sql形式想对多表关联生成实时的宽表,发现经常出现宽表的数据不准。特别是源表在cdc变更时候。比如:宽表本应该10条数据变更,但是实际只变更了3条。 > 我想知道这个问题是基于我使用不当产生的还是1.11版本的问题,如果是版本的问题后续会修复吗?
flink问题咨询
您好! 我在使用1.11版本flink sql cdc 时候,用sql形式想对多表关联生成实时的宽表,发现经常出现宽表的数据不准。特别是源表在cdc变更时候。比如:宽表本应该10条数据变更,但是实际只变更了3条。 我想知道这个问题是基于我使用不当产生的还是1.11版本的问题,如果是版本的问题后续会修复吗?
Re: 维度表 处理时间
只需要最新的维表数据,可以用处理时间,这样是事实表每条都实时去查mysql最新维表数据; 如果业务可以接受近似最新的维表数据,也可以将查询的维表结果通过缓存优化,减少访问mysql io访问,这两个参数: lookup.cache.max-rows" lookup.cache.ttl 祝好, Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%E8%BF%9E%E6%8E%A5%E5%99%A8%E5%8F%82%E6%95%B0 > 在 2021年5月18日,08:46,流弊 <1353637...@qq.com> 写道: > > 大佬们好,我们现在有个场景,是left join > mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要更新最新数据。如果采用mysql > cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?
维度表 处理时间
大佬们好,我们现在有个场景,是left join mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要join最新数据。如果采用mysql cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?
维度表 处理时间
大佬们好,我们现在有个场景,是left join mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要更新最新数据。如果采用mysql cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?
Re:flink sql怎样将change log stream转换成append log stream?
没有人知道吗? 在 2021-05-13 17:20:15,"casel.chen" 写道: flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
Re:flink sql如何修改执行计划?
没有人知道吗? 在 2021-05-13 08:19:24,"casel.chen" 写道: >flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 >我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!
使用hibench测试flink
使用hibench测试flink时,hibench的report目录下的hibench.report没有吞吐量相关的信息。 请问使用hibench对flink进行测试时,如何获取flink的吞吐量和处理时延呢。
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
Re: Exception: Could not perform checkpoint
看源码是在 catch里面的(应该是在executeCheckpointing的时候报错了,但是catch里面还有一个nullpoint没catch导致程序退出): if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } 的 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, 部分报错的。建议关掉 debug日志看看。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
hibench使用identity程序测试flink
根据介绍,identity是从kafka中读取数据,然后写回kafka。 在使用hibench中的identity程序对flink进行测试时,在hibench的conf目录下的flink.conf文件中将并行度设置为20。 提交任务后,在ui界面上发现只有一个子任务,即只有一个节点的一个slot中被分配了任务。请问如何在使用identity测试flink时,能有多个子任务呢? (好像每次放图片都无法显示,就没有提供截图了)
Flink upgraded to version 1.12.0 and started from SavePoint to report an error
When I upgraded from Flink1.10.0 to Flink1.12.0. Unable to restore SavePoint And prompt the following error 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at