Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 文章 Qishang
Hi.
FYI : https://issues.apache.org/jira/browse/FLINK-25435

Qishang  于2022年4月27日周三 17:28写道:

> Hi .
> 代码好像是没有设置
> 用这个手动设置一下
> set $internal.deployment.config-dir=/opt/flink-1.14.3/conf
>
> 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory  ->
> YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor
>
> getExecutorFactory:58, DefaultExecutorServiceLoader
> (org.apache.flink.core.execution)
> executeAsync:2032, StreamExecutionEnvironment
> (org.apache.flink.streaming.api.environment)
> executeAsync:95, DefaultExecutor
> (org.apache.flink.table.planner.delegation)
> executeQueryOperation:811, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> executeInternal:1274, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> lambda$executeOperation$3:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> wrapClassLoader:88, ExecutionContext
> (org.apache.flink.table.client.gateway.context)
> executeOperation:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> executeQuery:231, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> callSelect:532, CliClient (org.apache.flink.table.client.cli)
> callOperation:423, CliClient (org.apache.flink.table.client.cli)
> lambda$executeStatement$1:332, CliClient
> (org.apache.flink.table.client.cli)
> executeStatement:325, CliClient (org.apache.flink.table.client.cli)
> executeInteractive:297, CliClient (org.apache.flink.table.client.cli)
> executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli)
> openCli:151, SqlClient (org.apache.flink.table.client)
> start:95, SqlClient (org.apache.flink.table.client)
> startClient:187, SqlClient (org.apache.flink.table.client)
> main:161, SqlClient (org.apache.flink.table.client)
>
> ruiyun wan  于2022年4月27日周三 14:51写道:
>
>> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
>> YarnClusterDescriptor的调用路径。
>> [image: image.png]
>> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
>> YarnClusterDescriptor的调用路径。这两者不在同一个包。
>>
>> Qishang  于2022年4月27日周三 13:46写道:
>>
>>> Hi.
>>> 确认下 conf 下,是否有 log4j.properties
>>>
>>> 应该是在这个地放生成的,
>>>
>>> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
>>>
>>>
>>> ruiyun wan  于2022年4月26日周二 14:41写道:
>>>
>>> > Flink版本:1.13
>>> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>>> >
>>> >
>>> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
>>> >
>>>
>>


Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 文章 Qishang
Hi .
代码好像是没有设置
用这个手动设置一下
set $internal.deployment.config-dir=/opt/flink-1.14.3/conf

调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory  ->
YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor

getExecutorFactory:58, DefaultExecutorServiceLoader
(org.apache.flink.core.execution)
executeAsync:2032, StreamExecutionEnvironment
(org.apache.flink.streaming.api.environment)
executeAsync:95, DefaultExecutor (org.apache.flink.table.planner.delegation)
executeQueryOperation:811, TableEnvironmentImpl
(org.apache.flink.table.api.internal)
executeInternal:1274, TableEnvironmentImpl
(org.apache.flink.table.api.internal)
lambda$executeOperation$3:209, LocalExecutor
(org.apache.flink.table.client.gateway.local)
wrapClassLoader:88, ExecutionContext
(org.apache.flink.table.client.gateway.context)
executeOperation:209, LocalExecutor
(org.apache.flink.table.client.gateway.local)
executeQuery:231, LocalExecutor
(org.apache.flink.table.client.gateway.local)
callSelect:532, CliClient (org.apache.flink.table.client.cli)
callOperation:423, CliClient (org.apache.flink.table.client.cli)
lambda$executeStatement$1:332, CliClient (org.apache.flink.table.client.cli)
executeStatement:325, CliClient (org.apache.flink.table.client.cli)
executeInteractive:297, CliClient (org.apache.flink.table.client.cli)
executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli)
openCli:151, SqlClient (org.apache.flink.table.client)
start:95, SqlClient (org.apache.flink.table.client)
startClient:187, SqlClient (org.apache.flink.table.client)
main:161, SqlClient (org.apache.flink.table.client)

ruiyun wan  于2022年4月27日周三 14:51写道:

> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
> YarnClusterDescriptor的调用路径。
> [image: image.png]
> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
> YarnClusterDescriptor的调用路径。这两者不在同一个包。
>
> Qishang  于2022年4月27日周三 13:46写道:
>
>> Hi.
>> 确认下 conf 下,是否有 log4j.properties
>>
>> 应该是在这个地放生成的,
>>
>> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
>>
>>
>> ruiyun wan  于2022年4月26日周二 14:41写道:
>>
>> > Flink版本:1.13
>> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>> >
>> >
>> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
>> >
>>
>


Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-26 文章 Qishang
Hi.
确认下 conf 下,是否有 log4j.properties

应该是在这个地放生成的,
https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699


ruiyun wan  于2022年4月26日周二 14:41写道:

> Flink版本:1.13
> 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>
> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
>


Re: Flink CDC 2.0 整库同步如何实现?

2021-12-26 文章 Qishang
Hi
暂时还不支持,你看到的应该是未来规划的内容。

casel.chen  于2021年12月24日周五 20:50写道:

> 看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec
> 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。
> 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!


Re: Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章 Qishang
Hi
之前社区发过一个 JD 的解决方案,可以参考下[1]。

[1]: https://mp.weixin.qq.com/s/YluIj3vmebFmZjRbymKBZw



黄志高  于2021年8月16日周一 上午11:04写道:

> == Physical Execution Plan ==
>
> Stage 1 : Data Source
>
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, test_kafka]], fields=[tz])
>
>
>
>
> Stage 2 : Operator
>
> content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL])
>
> ship_strategy : FORWARD
>
>
>
> 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-08-16 09:47:28,"Qishang"  写道:
> >Hi.
> >
> >应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573
> >
> >打印一下执行计划和code gen
> >
> >
> >黄志高  于2021年8月15日周日 下午10:06写道:
> >
> >> hi all,
> >> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from
> >> test_kafka) as t where tz is not null
> >> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from
> >> test_kafka) as t这个sql时,不进行where tz is not null
> 操作,eval方法此时只会调用一次,如果将where
> >> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null
> >> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from
> >> test_kafka这个阶段发生一次,第二次是在where tz is not
> >>
> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12
> >>
> >>
> >>
> >>
> >>
>


Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章 Qishang
Hi.

应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573

打印一下执行计划和code gen


黄志高  于2021年8月15日周日 下午10:06写道:

> hi all,
> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from
> test_kafka) as t where tz is not null
> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from
> test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where
> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null
> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from
> test_kafka这个阶段发生一次,第二次是在where tz is not
> null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12
>
>
>
>
>


Re: createTemporaryView接口注册table时,fieldname支持中横线(-)

2021-05-27 文章 Qishang
Hi Jun Zou.
tEnv.createTemporaryView("`Word-Count`", input, $("word"), $("frequency"));
加上 ` 试一下。

Jun Zou  于2021年5月26日周三 下午4:33写道:

> Hi,all:
> 我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为:
>
> > tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray:
> > _*)
> >
> 其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source
> connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed
> name转换成expression 使用 *ExpressionParser.parseExpression* 这个方法
>
> 正常情况下,都能注册成功。
> 但是,当field name带中横线,如 source中一个字段名称为
> “X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”
> 而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。
>
> 有什么方法能够处理这种情况么?
>


Re: avro.ComplexPayloadAvro

2021-05-25 文章 Qishang
Hi.

会生成 `${project.basedir}/target/generated-sources/`
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97

r pp  于2021年5月25日周二 上午9:58写道:

> 各位好,请问下,
>
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
>
> 在该类下的
>
>
> /flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> 下面两个类,在代码哪里?
> import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
> import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
> --
> Best,
>   pp
>


Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 Qishang
Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。

https://github.com/ververica/flink-cdc-connectors/blob/master/README.md

casel.chen  于2021年4月20日周二 下午6:18写道:

> 目标是用flink作业实现类似canal server的功能
>
>
> CREATE TABLE `binlog_table` (
>
> `id` INT,
>
> `name` STRING,
>
> `sys_id` STRING,
>
> `sequence` INT,
>
> `filter` STRING,
>
> `tag` STRING,
>
> `remark` STRING,
>
> `create_date` TIMESTAMP,
>
> `update_date` TIMESTAMP,
>
> `reserve` STRING,
>
> `sys_name` STRING,
>
> `metric_seq` INT,
>
> `advanced_function` STRING,
>
> `value_type` STRING,
>
> `value_field` STRING,
>
> `status` INT,
>
> `syn_date` TIMESTAMP,
>
> `confirmer` STRING,
>
> `confirm_time` TIMESTAMP,
>
> `index_explain` STRING,
>
> `field_name` STRING,
>
> `tag_values` STRING,
>
> PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
>   'connector' = 'mysql-cdc',
>
>   'hostname' = '${mysql.hostname}',
>
>   'port' = '3306',
>
>   'username' = '${mysql.username}',
>
>   'password' = '${mysql.password}',
>
>   'database-name' = '${mysql.database}',
>
>   'table-name' = '${mysql.table}'
>
>   );
>
>
>
>
> CREATE TABLE `kafka_sink` (
>
>   `id` INT,
>
>   `name` STRING,
>
>   `sys_id` STRING,
>
>   `sequence` INT,
>
>   `filter` STRING,
>
>   `tag` STRING,
>
>   `remark` STRING,
>
>   `create_date` TIMESTAMP,
>
>   `update_date` TIMESTAMP,
>
>   `reserve` STRING,
>
>   `sys_name` STRING,
>
>   `metric_seq` INT,
>
>   `advanced_function` STRING,
>
>   `value_type` STRING,
>
>   `value_field` STRING,
>
>   `status` INT,
>
>   `syn_date` TIMESTAMP,
>
>   `confirmer` STRING,
>
>   `confirm_time` TIMESTAMP,
>
>   `index_explain` STRING,
>
>   `field_name` STRING,
>
>   `tag_values` STRING,
>
>   PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
>   'connector' = 'kafka',
>
>   'topic' = '${topic}',
>
>   'properties.bootstrap.servers' = '${bootstrap.servers}',
>
>   'format' = 'canal-json'
>
>   );
>
>
>
>
> INSERT INTO `kafka_sink`
>
> (SELECT *
>
>  FROM `binlog_table`);
>
> 出来的结果是这样:
>
>
> {
> "data": [
> {
> "id": 3,
> "name": "自动付款接口BuyETC金额",
> "sys_id": "0184",
> "sequence": 2,
> "filter": "(a=1)",
> "tag": "MerId(商户号)",
> "remark": "O",
> "create_date": "2020-11-02 15:01:31",
> "update_date": "2021-04-07 09:23:59",
> "reserve": "",
> "sys_name": "NHL",
> "metric_seq": 0,
> "advanced_function": "",
> "value_type": "sum",
> "value_field": "value",
> "status": 1,
> "syn_date": "2021-01-28 19:31:36",
> "confirmer": null,
> "confirm_time": null,
> "index_explain": "aa",
> "field_name": null,
> "tag_values": null
> }
> ],
> "type": "INSERT"
> }
> 并不是标准的canal json格式。改用upsert-kafka connector试了也不行
>
>
>
> CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
> `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
> `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
> `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
> `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
> TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
> STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
> ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
> 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
> 'json',
> 'value.format' = 'json' );
>
>
> 出来的数据长这样
>
>
>
> {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
> 12:27:30","update_date":"2021-04-06
> 

Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 Qishang
Hi todd.
-C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。
可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar
-C "http://host:port/xxx.jar;

希望能帮到你。


todd  于2021年4月19日周一 下午10:22写道:

> 执行指令:flink  run   \
> -m yarn-cluster \
> -C file:////flink-demo-1.0.jar \
> x
>
> 在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SinkFunction与OutputFormat选择哪个?

2021-04-09 文章 Qishang
Hi Luna Wong.

RichOutputFormat 实现的最终是由 Flink 提供的 OutputFormatSinkFunction 再包装成
SinkFunction。 OutputFormatSinkFunction 很早就 Deprecated 了,没有实现
CheckpointedFunction 。
jdbc 的是实现了 RichOutputFormat ,但是最后用 GenericJdbcSinkFunction 包装了一次,
GenericJdbcSinkFunction 实现了 CheckpointedFunction,
刚好最近遇到 https://issues.apache.org/jira/browse/FLINK-20552 ,可以看下这个 BUG 的修复。

1.12 之前 :
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink
https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala#L97-L101
最新 1.13
: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L153-L163

Luna Wong  于2021年4月8日周四 下午5:42写道:

> 自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢?
>


Re: flink消费kafka数据open()方法初始换多次的问题

2021-04-09 文章 Qishang
Hi guoxb.
没有全部代码,我猜你 addSink() 走了两次,调试看下。

guoxb__...@sina.com  于2021年4月9日周五 下午2:36写道:

> hi:
>情景:
> 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承
> RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法
> 个人理解:
> 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接
> 2. close()方法在程序结束的时候也是只走一次
> 3. invoke()方法在获取到每一条数据走一次这个方法
> 实际情况及问题(env.setParallelism(1)):
> 1. open()在程序启动的时候运行了两次
> 2. invoke()方法在每条消息过来也会被处理两次
>
> code:
> reader端:
> ```java
> public class FlinkKafkaReader extends
> DataKafkaConnect {
>
> @Override
> protected DataStream reader(StreamExecutionEnvironment env,
> KafkaConfig cfg) throws JobException {
>
> DataStream stream = null;
> try {
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("bootstrap.servers",
> cfg.getBootstrapServers());
> kafkaProps.setProperty("group.id", cfg.getGroupId());
> kafkaProps.setProperty("auto.offset.reset",
> cfg.getOffsetReset());
> kafkaProps.setProperty("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaProps.setProperty("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaProps.setProperty("enable.auto.commit",
> cfg.getAutoCommit());
> kafkaProps.setProperty("max.poll.interval.ms",
> cfg.getIntervalMs());
>
> KafkaDeserializationSchema deserializationKdl = null;
> // 根据不同的配置进行选择不同的消息解析器
> switch (cfg.getMessageType()) {
> case "mysql":
> deserializationKdl = new
> KafkaDataDeserialiaztionBinlogSchemal();
> break;
> case "mongodb":
> deserializationKdl = new
> KafkaDataDeserialiaztionOplogSchema();
> break;
> }
> FlinkKafkaConsumer flinkKafkaConsumer = new
> FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")),
> deserializationKdl, kafkaProps);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
> String consumptionStartOffset = cfg.getConsumptionStartTime();
> if (StringUtils.isBlank(consumptionStartOffset)) {
> flinkKafkaConsumer.setStartFromGroupOffsets();
> } else {
> flinkKafkaConsumer.setStartFromTimestamp(
> new SimpleDateFormat("-MM-dd HH:mm:ss")
> .parse(cfg.getConsumptionStartTime())
> .getTime()
> );
> }
> // 设置并行度
> env.setParallelism(1);
> //env.getCheckpointConfig().setCheckpointInterval(1000 * 30);
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // 设置可容忍checkpoint失败的次数
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> stream = env.addSource(flinkKafkaConsumer)
> .filter(new FilterFunction() {
> @Override
> public boolean filter(Object value) throws
> Exception {
> return null != value;
> }
> });
> } catch (Exception e) {
> throw new JobException(e.getMessage());
> }
> return stream;
> }
> }
> ```
> sink端:
> ```java
> public class MysqlSink extends RichSinkFunction {
> @Override
> public void open(Configuration config) throw Exception{
> ...
> }
> @Override
> public void close(){
> ...
> }
> @Override
> public void invoke(Object obj,Context context){
> //业务逻辑,这里的逻辑每一条数据过来会运行两次,这里也是我的问题
> ...
> }
> }
> ```
>
> 还请知悉原因的道友给点指引,万分感谢
>
>
> guoxb__...@sina.com
>


Re: flink cdc读取Maxwell格式的binlog,如何获取表名等元信息

2021-04-09 文章 Qishang
Hi chen310.
Canal 和 debezium 已经实现,Maxwell 还没有完成,可以关注下
https://issues.apache.org/jira/browse/FLINK-20926

chen310 <1...@163.com> 于2021年4月8日周四 下午5:35写道:

> 请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql
> Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效  `origin_table` STRING
> METADATA FROM 'value.source.table' VIRTUAL,
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t572/lALPDg7mQSzBJifNAmzNBpY_1686_620.png>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 在FlinkKafkaProducer获取sink表的建表key

2021-03-28 文章 Qishang
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从  KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到

Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道:

> Hi!大家好。
> 目前有一个需求,需要获取Kafka
> sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
> 例如:CREATE TABLE kafkaTable (
>
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> )
> 想获取到   user_id, item_id ,category_id ,behavior这四个字段。
>
>
> | |
> Jimmy Zhang
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark.
对于 upsert-kafka connector 有两个疑问:

1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?


Qishang  于2021年3月5日周五 上午11:14写道:

>
> 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
> 学到了,感谢。
>
> Jark Wu  于2021年3月4日周四 下午11:11写道:
>
>> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
>> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
>> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
>> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
>> forward。
>>
>> Best,
>> Jark
>>
>> [1]:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>>
>> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>>
>> > Hi 社区。
>> > Flink 1.12.1
>> >
>> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
>> ,设置大的并发,对于只有
>> > forword 的ETL没有作用。
>> >
>> > insert into table_a select id,udf(a),b,c from table_b;
>> >
>> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
>> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
>> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka
>> 中生效吗?可以用在我上面说的场景上面吗?
>> >
>> > ```
>> > == Physical Execution Plan ==
>> > Stage 1 : Data Source
>> > content : Source: TableSourceScan(table=[[default_catalog,
>> > default_database, temp_table]], fields=[id...])
>> >
>> > Stage 3 : Operator
>> > content : ChangelogNormalize(key=[id])
>> > ship_strategy : HASH
>> >
>> > Stage 4 : Operator
>> > content : Calc(select=[...])
>> > ship_strategy : FORWARD
>> >
>> > Stage 5 : Data Sink
>> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
>> > fields=[id...])
>> > ship_strategy : FORWARD
>> > ```
>> >
>>
>


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。

Jark Wu  于2021年3月4日周四 下午11:11写道:

> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
> forward。
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>
> On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:
>
> > Hi 社区。
> > Flink 1.12.1
> >
> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition
> ,设置大的并发,对于只有
> > forword 的ETL没有作用。
> >
> > insert into table_a select id,udf(a),b,c from table_b;
> >
> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
> >
> > ```
> > == Physical Execution Plan ==
> > Stage 1 : Data Source
> > content : Source: TableSourceScan(table=[[default_catalog,
> > default_database, temp_table]], fields=[id...])
> >
> > Stage 3 : Operator
> > content : ChangelogNormalize(key=[id])
> > ship_strategy : HASH
> >
> > Stage 4 : Operator
> > content : Calc(select=[...])
> > ship_strategy : FORWARD
> >
> > Stage 5 : Data Sink
> > content : Sink: Sink(table=[default_catalog.default_database.table_a],
> > fields=[id...])
> > ship_strategy : FORWARD
> > ```
> >
>


Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 Qishang
Hi shougou.

你要找的是不是这个[1]

// **// BLINK BATCH QUERY// **import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment
bbTableEnv = TableEnvironment.create(bbSettings);

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment


shougou <80562...@qq.com> 于2021年3月5日周五 上午9:59写道:

> 我们知道如果在1.12里使用Table API来提交Batch的作业,比如:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> 但是,如果提交Sql作业的话:
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Table result = tableEnv.sqlQuery(...);
> 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?
>
> 感谢各位提供思路!
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 文章 Qishang
Hi 社区。
Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?

```
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])

Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH

Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD

Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```


Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi Benchao.

现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
这个 feature 社区有规划了吗?


Benchao Li  于2021年3月3日周三 上午10:23写道:

> 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
>
> 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> plan的过程中会将表达式完全展开,比如下面的SQL:
> ```SQL
> SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> key3
> FROM (
>   SELECT dump_json_to_map(col1) as my_map
>   FROM T
> )
> ```
> 这种写法也会将`dump_json_to_map`这个函数执行3次。
>
> HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:
>
> > 为什么4次是没问题的,感觉只执行一次才是最优的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
2. 是我搞错了,是四次,没问题


Qishang  于2021年3月2日周二 下午6:50写道:

> Hi 社区。
> 版本 : Flink 1.12.1
> 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
> e.g.
> INSERT INTO table_a
> SELECT
> update_time,
> MD5(p_key) AS id,
> p_key
> FROM
> (
> SELECT
> LOCALTIMESTAMP AS update_time ,
> findkeyudf(p_name) AS p_key
> FROM table_b
> ) T
> WHERE COALESCE(p_key, '')<> ''
> ;
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table_b]], fields=[p_name, xxx, ...])
>
> Stage 2 : Operator
> content : Calc(select=[CAST(()) AS update_date,
> CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
> where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
> ship_strategy : FORWARD
>
> Stage 3 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[update_date, comp_name, p_key])
> ship_strategy : FORWARD
>
> 查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。
>
> 现在有2个问题:
> 1. udf 调用不会被优化成一次,结果复用吗?
> 2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
> )是执行了2次的。
> 3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?
>
>
>


UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi 社区。
版本 : Flink 1.12.1
在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
e.g.
INSERT INTO table_a
SELECT
update_time,
MD5(p_key) AS id,
p_key
FROM
(
SELECT
LOCALTIMESTAMP AS update_time ,
findkeyudf(p_name) AS p_key
FROM table_b
) T
WHERE COALESCE(p_key, '')<> ''
;

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table_b]], fields=[p_name, xxx, ...])

Stage 2 : Operator
content : Calc(select=[CAST(()) AS update_date,
CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key],
where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <>
_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)])
ship_strategy : FORWARD

Stage 3 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[update_date, comp_name, p_key])
ship_strategy : FORWARD

查看 explain,  udf 调用有四次,但是从日志发现同一个Key 执行了 8次。

现在有2个问题:
1. udf 调用不会被优化成一次,结果复用吗?
2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> ''
)是执行了2次的。
3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?


Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-24 文章 Qishang
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 .

flink2021  于2021年2月19日周五 下午12:39写道:

> 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
> JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
> -XX:MaxDirectMemorySize=8192m"
> 其它也就是写常规的配置:
> og.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> #broker能接收消息的最大字节数
> message.max.bytes=2
> #broker可复制的消息的最大字节数
> replica.fetch.max.bytes=204857600
> #消费者端的可读取的最大消息
> fetch.message.max.bytes=204857600
> max.poll.records=500
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Yarn资源本地化的地方报权限错误

2020-12-29 文章 Qishang
HI.
环境 :2.6.0+cdh5.15.2+ islon
FlinkX (基于Flink 1.8.1) 提交任务报错。这个问题卡了好长时间了。提交任务的地方Kerberos是正常通过的,
Yarn资源本地化的地方报权限错误,很不理解,各位大佬能不能帮忙提供一点排除思路。
1. Flinkx的任务是正常提交的;
2. 还有一个测试环境也是CDH + Kerberos , Flinkx 提交也是正常的;
3. 升级到FlinkX 1.10.1 + Flink 1.10.1 也是同样的问题。

提交命令 :
/opt/flinkx/bin/flinkx -mode yarnPer  -job
/tmp/flink_tmp_json/mysql2mysql.json  -pluginRoot /opt/flinkx/plugins
 -flinkconf /opt/flink-1.8.1/conf  -flinkLibJar /opt/flink-1.8.1/lib
 -yarnconf /etc/hadoop/conf  -queue root.liu  -jobid 10698

报错如下:
13:51:45.298 [main] INFO com.dtstack.flinkx.launcher.perjob.PerJobSubmitter
- start to submit per-job task, LauncherOptions =
com.dtstack.flinkx.options.Options@32eebfca
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
13:51:45.530 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.rpc.address, localhost
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.rpc.port, 6124
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.heap.mb, 1024
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.web.port, 8082
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.web.checkpoints.disable, true
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: jobmanager.archive.fs.dir,
hdfs://xxx:8020/data/flink/flink-completed-jobs
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.heap.mb, 2048
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.numberOfTaskSlots, 4
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: taskmanager.memory.preallocate, false
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: io.tmp.dirs, /tmp/flink/taskmanager
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: parallelism.default, 2
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: yarn.per-job-cluster.include-user-jar,
last
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: akka.lookup.timeout, 30 s
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: web.checkpoints.history, 30
13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.backend, filesystem
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.backend.fs.checkpointdir,
hdfs://xxx:8020/data/flink/flink-checkpoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.checkpoints.dir,
hdfs://xxx:8020/data/flink/flink-checkpoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.checkpoints.num-retained, 5
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: state.savepoints.dir,
hdfs://xxx:8020/data/flink/flink-savepoints
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: historyserver.archive.fs.dir,
hdfs://xxx:8020/data/flink/flink-completed-jobs
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: historyserver.web.port, 16899
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: yarn.application-attempts, 10
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: restart-strategy.fixed-delay.attempts,
1
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: restart-strategy.fixed-delay.delay, 30s
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: security.kerberos.login.contexts, Client
13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration
- Loading configuration property: security.kerberos.login.keytab,
/root/keytab/hive.keytab
13:51:45.532 [main] 

Job manager 异常信息

2020-10-21 文章 Qishang
Flink 1.11.2
Per Job On Yarn
Job manager 报异常 :
这个大概是什么问题。重启之后就没有了。

2020-10-21 16:34:39,657 INFO org.apache.flink.runtime.checkpoint.
CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @
1603269279616 for job 3acdf1d32591165ddbf5ef277bfbe260.
2020-10-21 16:34:40,370 INFO org.apache.flink.runtime.checkpoint.
CheckpointCoordinator [] - Completed checkpoint 1 for job
3acdf1d32591165ddbf5ef277bfbe260
(53879 bytes in 687 ms).
2020-10-21 16:34:59,621 INFO org.apache.flink.runtime.checkpoint.
CheckpointCoordinator [] - Triggering checkpoint 2 (type=CHECKPOINT) @
1603269299614 for job 3acdf1d32591165ddbf5ef277bfbe260.
2020-10-21 16:34:59,846 INFO org.apache.flink.runtime.checkpoint.
CheckpointCoordinator [] - Completed checkpoint 2 for job
3acdf1d32591165ddbf5ef277bfbe260
(53879 bytes in 76 ms).
2020-10-21 16:35:00,764 ERROR
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler
[] - error while responding
java.nio.file.FileAlreadyExistsException: /tmp/flink-web-492d85b8-f971-45ef
-bad7-c00af9d50c35/flink-web-ui/index.html
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
~[?:1.8.0_141]
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
~[?:1.8.0_141]
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
~[?:1.8.0_141]
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(
UnixFileSystemProvider.java:214) ~[?:1.8.0_141]
at java.nio.file.spi.FileSystemProvider.newOutputStream(
FileSystemProvider.java:434) ~[?:1.8.0_141]
at java.nio.file.Files.newOutputStream(Files.java:216) ~[?:1.8.0_141]
at java.nio.file.Files.copy(Files.java:3016) ~[?:1.8.0_141]
at org.apache.flink.runtime.rest.handler.legacy.files.
StaticFileServerHandler.respondToRequest(StaticFileServerHandler.java:184)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.legacy.files.
StaticFileServerHandler.respondAsLeader(StaticFileServerHandler.java:143)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
.lambda$channelRead0$0(LeaderRetrievalHandler.java:81) ~[flink-dist_2.11-
1.11.2.jar:1.11.2]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_141]
at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer
.java:46) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
.channelRead0(LeaderRetrievalHandler.java:78) [flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
.channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:360) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:352) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(
RouterHandler.java:110) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.router.RouterHandler
.channelRead0(RouterHandler.java:89) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.rest.handler.router.RouterHandler
.channelRead0(RouterHandler.java:54) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:360) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:352) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(

Per-job mode 任务失败 jm没有退出

2020-09-17 文章 Qishang
Flink 1.11.1
CDH 5.15.2
提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm
2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive
/tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar

flink-conf.yaml 重启策策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
restart-strategy.fixed-delay.delay: 10 s

我在测试失败重启策略,发现任务失败之后会在重试次数之后,Task停止。Web UI 显示在Completed
Jobs里面,jm没有挂掉,看yarn上面任务在Runing状态。占用的资源是只有jm的资源了。

1. per-job 任务失败重试次数之后jm不会退出吗,还是我某些参数设置的不对?

是我在flapmap里面手动抛出的异常,报错:
2020-09-17 15:48:47
org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
backoffTimeMS=1)
at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.
ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler
.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler
.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase
.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster
.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)
Caused by: java.lang.Exception: Test failed
at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap(
TidbBinlogSyncHive.java:231)
at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap(
TidbBinlogSyncHive.java:178)
at org.apache.flink.streaming.api.operators.StreamFlatMap
.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)


Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 文章 Qishang
Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute
ss.execute();

大罗  于2020年9月9日周三 下午3:13写道:

> Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
>
> 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> :2},
> {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
>
> 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
> "app_type" :2}
>
> 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
>
> 然后,再创建临时表,比如:
> tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
> $("pid"),  $("val"), $("app_type"), $("data_type"));
>
> 接着定义不同的sql,比如:
> String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
> and app_type = 0"
> String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
> and app_type = 1"
> String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
> and app_type = 1"
> String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
> and app_type = 0"
>
> 使用StatementSet运行它们:
> StatementSet ss = tableEnv.createStatementSet();
> ss.addInsertSql(sql1);
> ss.addInsertSql(sql2);
> ss.addInsertSql(sql3);
> ss.addInsertSql(sql4);
>
> 最后执行作业:
> env.execute(jobName);
>
> 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png>
>
>
> 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
>
> 作业"insert-into_myhive.dw.ods_analog_sems
> ***"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png>
>
>
> 其中,顶端的operator的定义如下:
> Source: Custom Source -> Map -> Flat Map -> Filter ->
> SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> run_data_type]) ->
> (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter)
>
> 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m :8081 jobA"
> 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> savepoint."
> 相应的停止作业jobB的时候也会生成这个savepoint。
>
> 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-31 文章 Qishang
Hi.
我们也遇到一样的场景,现在您是否有一些具体实施和思考可以交流一下吗?

USERNAME  于2020年8月13日周四 下午3:27写道:

>
>
> 任务流程:
> OGG->KAFKA->FLINK->HIVE
>
>
> KAFKA数据样例:
> 其中会有多个
> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
> {
> "table": "SCOOT.TABLENAME",
> "op_type": "U",
> "op_ts": "2020-08-11 07:53:40.008001",
> "current_ts": "2020-08-11T15:56:41.233000",
> "pos": "980119769930",
> "before": {
> "C1": 4499000,
> "C2": null,
> "C3": null,
> "C4": null,
> "C5": null
> },
> "after": {
> "C1": 4499000,
> "C2": null,
> "C3": "",
> "C4": "",
> "C5": "通过"
> }
> }
> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>
>
> 例如 样例数据在hive中建表
> create table TABLENAME
> (
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "C1" STRING,
> "C2" STRING,
> "C3" STRING,
> "C4" STRING,
> "C5" STRING
> )
> 理解的难点,
> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
> 2.同一FLINK任务会有新增的表,需自动适配
> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>
>
> 或者只能采用通过表结构
> create table TABLENAME
> (
> table   STRING,
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "before"  STRING,
> "after" STRING
> )
> 然后剩下的在HIVE中解决。
>
>
> 或者有其他更好的方案?
>
>


Re: tidb Binlog 整库同步到 hive

2020-08-28 文章 Qishang
Hi Rui Li.

> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了
这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配?

Rui Li  于2020年8月28日周五 下午1:47写道:

> Hi,
>
> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive
> connector里现有的sink。
>
> On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu  wrote:
>
> > Hi
> >
> > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog
> > create
> > > table 是否可以在运行中来调用吗?
> > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。
> >
> > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream 还是
> > SQL 的拓扑。
> >
> > 祝好
> > Leonard
> >
> >
>
> --
> Best regards!
> Rui Li
>


Re: tidb Binlog 整库同步到 hive

2020-08-27 文章 Qishang
Hi Leonard.
> 除了多个sql作业的方式,如果需要在一个SQL作业中可以试试在一个作业里把所有表的binlog 格式统一用一个字段(如string)
接入,然后写针对每个表的schema写一个udtf解析对应的数据,最后多路输出到hive的不同表。
如果不限定SQL作业的话,用DataSteam API的话是不是可以实现这样的功能。

多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog create
table 是否可以在运行中来调用吗?

程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。


Leonard Xu  于2020年8月28日周五 上午9:30写道:

> Hi, qishang
>
> > 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗
> ?
> > 我看了一些邮件好像说不可以的,在问一下。
> 在一个SQL作业中是不行的,因为SQL是强依赖Schema的,schema需要事先声明。
>
> > 2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。
>
>  除了多个sql作业的方式,如果需要在一个SQL作业中可以试试在一个作业里把所有表的binlog 格式统一用一个字段(如string)
> 接入,然后写针对每个表的schema写一个udtf解析对应的数据,最后多路输出到hive的不同表。
>
> 祝好
> Leonard


tidb Binlog 整库同步到 hive

2020-08-27 文章 Qishang
大家好 .
我现在有一个场景需要调研。
背景:对整库 Tidb binlog 做实时落
Hive,好几个库的binlog发送到一个Topic或者几个Topic里面,一个Topic里面有复数个表的binlog。

1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 ?
我看了一些邮件好像说不可以的,在问一下。
2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。

感谢!


quickstart工程默认的log不打印到console

2020-08-27 文章 Qishang
我从 flink-quickstart-java 1.11.1 建的工程,IDEA console log 默认是没有打印的。
pom 里面的 jar 换成 1.10 的2个就可以打印了。
是有问题还是我的姿势不对。。

1.10的 jar :
 
org.slf4j
slf4j-log4j12
1.7.7
runtime


log4j
log4j
1.2.17
runtime



CEP匹配乱序数据的问题

2019-12-23 文章 qishang zhong
HI,大家好。

咨询一个问题,flink-training-exercises练习的工程里面
com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution

Pattern completedRides =
Pattern.begin("start")
.where(new SimpleCondition() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return ride.isStart;
}
})
.next("end")
.where(new SimpleCondition() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return !ride.isStart;
}
});

现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。
是不是就不能匹配例子中的Pattern?
如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗?