Re: flink sql源表定义字段列表问题

2021-05-17 文章 HunterXHunter
不需要提供全部字段



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:flink sql怎样将change log stream转换成append log stream?

2021-05-17 文章 HunterXHunter
 tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info"));
tableEnv
.toRetractStream(tableEnv.from("order_info"), Row.class)
.filter((FilterFunction>)
booleanRowTuple2 -> booleanRowTuple2.f0)
.map((MapFunction, Row>)
booleanRowTuple2 -> booleanRowTuple2.f1)
.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element, recordTimestamp)
-> System.currentTimeMillis(
.keyBy((KeySelector) row ->
row.getField("consignee").toString())
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
   
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
.process(new ProcessWindowFunction,
String,
TimeWindow>() {
@Override
public void process(String s, Context context,
Iterable elements, Collector> out) throws
Exception {
Long count = 0L;
for (Row element : elements) {
count += 1;
}
out.collect(new Tuple2(context.window(),
count));
}
})
.print();
;
streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


????????????????????????????????????????????????checkpoint??????

2021-05-17 文章 5599





----
??: ""<62...@163.com; 
: 2021??5??17??(??) 6:23
??: "user-zh"

flink sql写mysql中文乱码问题

2021-05-17 文章 casel.chen
我的flink sql作业如下


SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Received task GroupAggregate(groupBy=[product_name, window_start, 
window_end], select=[product_name, window_start, window_end, 
SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> 
Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS 
trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
bus_name]) -> Sink: 
Sink(table=[default_catalog.default_database.all_trans_5m_new], 
fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into 
slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, 
mer_cust_id, order_no, trans_date], select=[product_name, window_start, 
window_end, id, data_type, mer_cust_id, order_no, trans_date, 
MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
window_start, window_end, trans_amt, order_no]) (1/1)#0 
(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.

flink sql源表定义字段列表问题

2021-05-17 文章 casel.chen
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢?
这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。
cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错?
kafka表定义是否支持部分字段?

Re: flink问题咨询

2021-05-17 文章 Leonard Xu
Hello
你可以把具体的问题描述清楚点,比如给出一些数据和sql,能够复现你遇到的问题,这样大家才能帮忙排查。

祝好,
Leonard Xu


> 在 2021年5月18日,09:49,清酌  写道:
> 
> 您好!
>   我在使用1.11版本flink sql cdc 
> 时候,用sql形式想对多表关联生成实时的宽表,发现经常出现宽表的数据不准。特别是源表在cdc变更时候。比如:宽表本应该10条数据变更,但是实际只变更了3条。
>   我想知道这个问题是基于我使用不当产生的还是1.11版本的问题,如果是版本的问题后续会修复吗?



flink问题咨询

2021-05-17 文章 清酌
您好!
   我在使用1.11版本flink sql cdc 
时候,用sql形式想对多表关联生成实时的宽表,发现经常出现宽表的数据不准。特别是源表在cdc变更时候。比如:宽表本应该10条数据变更,但是实际只变更了3条。
   我想知道这个问题是基于我使用不当产生的还是1.11版本的问题,如果是版本的问题后续会修复吗?

Re: 维度表 处理时间

2021-05-17 文章 Leonard Xu
只需要最新的维表数据,可以用处理时间,这样是事实表每条都实时去查mysql最新维表数据;
如果业务可以接受近似最新的维表数据,也可以将查询的维表结果通过缓存优化,减少访问mysql io访问,这两个参数:
lookup.cache.max-rows"
lookup.cache.ttl

祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#%E8%BF%9E%E6%8E%A5%E5%99%A8%E5%8F%82%E6%95%B0

> 在 2021年5月18日,08:46,流弊 <1353637...@qq.com> 写道:
> 
> 大佬们好,我们现在有个场景,是left join 
> mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要更新最新数据。如果采用mysql 
> cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?



维度表 处理时间

2021-05-17 文章 流弊
大佬们好,我们现在有个场景,是left join 
mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要join最新数据。如果采用mysql 
cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?

维度表 处理时间

2021-05-17 文章 流弊
大佬们好,我们现在有个场景,是left join 
mysql维度表的,但是更新比较慢,大概10分钟更新一条,但是事实表速度比较快,每秒几万条。并且需要更新最新数据。如果采用mysql 
cdc形式,那水位对等就要较长延迟。有什么好方式能够join到最新数据吗,使用处理时间?

Re:flink sql怎样将change log stream转换成append log stream?

2021-05-17 文章 casel.chen
没有人知道吗?
















在 2021-05-13 17:20:15,"casel.chen"  写道:

flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + 
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!




 

Re:flink sql如何修改执行计划?

2021-05-17 文章 casel.chen









没有人知道吗?








在 2021-05-13 08:19:24,"casel.chen"  写道:
>flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。
>我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!


使用hibench测试flink

2021-05-17 文章 penguin.
使用hibench测试flink时,hibench的report目录下的hibench.report没有吞吐量相关的信息。
请问使用hibench对flink进行测试时,如何获取flink的吞吐量和处理时延呢。

集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?

2021-05-17 文章 董建
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?



Re: Exception: Could not perform checkpoint

2021-05-17 文章 HunterXHunter
看源码是在
catch里面的(应该是在executeCheckpointing的时候报错了,但是catch里面还有一个nullpoint没catch导致程序退出):
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}. " +
"Alignment duration: {} 
ms, snapshot duration {} ms",
owner.getName(), 
checkpointMetaData.getCheckpointId(),

checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,

checkpointMetrics.getSyncDurationMillis());
}

的 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
部分报错的。建议关掉 debug日志看看。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


hibench使用identity程序测试flink

2021-05-17 文章 penguin.
根据介绍,identity是从kafka中读取数据,然后写回kafka。
在使用hibench中的identity程序对flink进行测试时,在hibench的conf目录下的flink.conf文件中将并行度设置为20。
提交任务后,在ui界面上发现只有一个子任务,即只有一个节点的一个slot中被分配了任务。请问如何在使用identity测试flink时,能有多个子任务呢?


(好像每次放图片都无法显示,就没有提供截图了)

Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-17 文章 王炳焱
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  
And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at