Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助
Hi. FYI : https://issues.apache.org/jira/browse/FLINK-25435 Qishang 于2022年4月27日周三 17:28写道: > Hi . > 代码好像是没有设置 > 用这个手动设置一下 > set $internal.deployment.config-dir=/opt/flink-1.14.3/conf > > 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory -> > YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor > > getExecutorFactory:58, DefaultExecutorServiceLoader > (org.apache.flink.core.execution) > executeAsync:2032, StreamExecutionEnvironment > (org.apache.flink.streaming.api.environment) > executeAsync:95, DefaultExecutor > (org.apache.flink.table.planner.delegation) > executeQueryOperation:811, TableEnvironmentImpl > (org.apache.flink.table.api.internal) > executeInternal:1274, TableEnvironmentImpl > (org.apache.flink.table.api.internal) > lambda$executeOperation$3:209, LocalExecutor > (org.apache.flink.table.client.gateway.local) > wrapClassLoader:88, ExecutionContext > (org.apache.flink.table.client.gateway.context) > executeOperation:209, LocalExecutor > (org.apache.flink.table.client.gateway.local) > executeQuery:231, LocalExecutor > (org.apache.flink.table.client.gateway.local) > callSelect:532, CliClient (org.apache.flink.table.client.cli) > callOperation:423, CliClient (org.apache.flink.table.client.cli) > lambda$executeStatement$1:332, CliClient > (org.apache.flink.table.client.cli) > executeStatement:325, CliClient (org.apache.flink.table.client.cli) > executeInteractive:297, CliClient (org.apache.flink.table.client.cli) > executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli) > openCli:151, SqlClient (org.apache.flink.table.client) > start:95, SqlClient (org.apache.flink.table.client) > startClient:187, SqlClient (org.apache.flink.table.client) > main:161, SqlClient (org.apache.flink.table.client) > > ruiyun wan 于2022年4月27日周三 14:51写道: > >> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到 >> YarnClusterDescriptor的调用路径。 >> [image: image.png] >> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到 >> YarnClusterDescriptor的调用路径。这两者不在同一个包。 >> >> Qishang 于2022年4月27日周三 13:46写道: >> >>> Hi. >>> 确认下 conf 下,是否有 log4j.properties >>> >>> 应该是在这个地放生成的, >>> >>> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699 >>> >>> >>> ruiyun wan 于2022年4月26日周二 14:41写道: >>> >>> > Flink版本:1.13 >>> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target = >>> > >>> > >>> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。 >>> > >>> >>
Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助
Hi . 代码好像是没有设置 用这个手动设置一下 set $internal.deployment.config-dir=/opt/flink-1.14.3/conf 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory -> YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor getExecutorFactory:58, DefaultExecutorServiceLoader (org.apache.flink.core.execution) executeAsync:2032, StreamExecutionEnvironment (org.apache.flink.streaming.api.environment) executeAsync:95, DefaultExecutor (org.apache.flink.table.planner.delegation) executeQueryOperation:811, TableEnvironmentImpl (org.apache.flink.table.api.internal) executeInternal:1274, TableEnvironmentImpl (org.apache.flink.table.api.internal) lambda$executeOperation$3:209, LocalExecutor (org.apache.flink.table.client.gateway.local) wrapClassLoader:88, ExecutionContext (org.apache.flink.table.client.gateway.context) executeOperation:209, LocalExecutor (org.apache.flink.table.client.gateway.local) executeQuery:231, LocalExecutor (org.apache.flink.table.client.gateway.local) callSelect:532, CliClient (org.apache.flink.table.client.cli) callOperation:423, CliClient (org.apache.flink.table.client.cli) lambda$executeStatement$1:332, CliClient (org.apache.flink.table.client.cli) executeStatement:325, CliClient (org.apache.flink.table.client.cli) executeInteractive:297, CliClient (org.apache.flink.table.client.cli) executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli) openCli:151, SqlClient (org.apache.flink.table.client) start:95, SqlClient (org.apache.flink.table.client) startClient:187, SqlClient (org.apache.flink.table.client) main:161, SqlClient (org.apache.flink.table.client) ruiyun wan 于2022年4月27日周三 14:51写道: > 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到 > YarnClusterDescriptor的调用路径。 > [image: image.png] > 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到 > YarnClusterDescriptor的调用路径。这两者不在同一个包。 > > Qishang 于2022年4月27日周三 13:46写道: > >> Hi. >> 确认下 conf 下,是否有 log4j.properties >> >> 应该是在这个地放生成的, >> >> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699 >> >> >> ruiyun wan 于2022年4月26日周二 14:41写道: >> >> > Flink版本:1.13 >> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target = >> > >> > >> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。 >> > >> >
Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助
Hi. 确认下 conf 下,是否有 log4j.properties 应该是在这个地放生成的, https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699 ruiyun wan 于2022年4月26日周二 14:41写道: > Flink版本:1.13 > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target = > > yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。 >
Re: Flink CDC 2.0 整库同步如何实现?
Hi 暂时还不支持,你看到的应该是未来规划的内容。 casel.chen 于2021年12月24日周五 20:50写道: > 看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec > 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。 > 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!
Re: Re: 关于flink sql自定义udf,eval方法被调用两次问题
Hi 之前社区发过一个 JD 的解决方案,可以参考下[1]。 [1]: https://mp.weixin.qq.com/s/YluIj3vmebFmZjRbymKBZw 黄志高 于2021年8月16日周一 上午11:04写道: > == Physical Execution Plan == > > Stage 1 : Data Source > > content : Source: TableSourceScan(table=[[default_catalog, > default_database, test_kafka]], fields=[tz]) > > > > > Stage 2 : Operator > > content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL]) > > ship_strategy : FORWARD > > > > 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避 > > > > > > > > > > > > > > > > > > 在 2021-08-16 09:47:28,"Qishang" 写道: > >Hi. > > > >应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573 > > > >打印一下执行计划和code gen > > > > > >黄志高 于2021年8月15日周日 下午10:06写道: > > > >> hi all, > >> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from > >> test_kafka) as t where tz is not null > >> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from > >> test_kafka) as t这个sql时,不进行where tz is not null > 操作,eval方法此时只会调用一次,如果将where > >> tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null > >> 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from > >> test_kafka这个阶段发生一次,第二次是在where tz is not > >> > null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12 > >> > >> > >> > >> > >> >
Re: 关于flink sql自定义udf,eval方法被调用两次问题
Hi. 应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573 打印一下执行计划和code gen 黄志高 于2021年8月15日周日 下午10:06写道: > hi all, > 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from > test_kafka) as t where tz is not null > 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select tt1(tz) from > test_kafka) as t这个sql时,不进行where tz is not null 操作,eval方法此时只会调用一次,如果将where > tz is not null 改成 where tz ='某一个具体值'此时eval方法也只会调用一次,一开始我以为是is not null > 问题,我进行重写is not null方法,发现eval方法还是调用两次,不过此时发现eval方法是在select tt1(tz) from > test_kafka这个阶段发生一次,第二次是在where tz is not > null执行后发生,虽然最后输出的结果不是双份,但是我认为eval方法执行两次,对效率有一定的影响,以下附件是我的代码截图,望各位大佬帮忙分析下,flink版本1.12 > > > > >
Re: createTemporaryView接口注册table时,fieldname支持中横线(-)
Hi Jun Zou. tEnv.createTemporaryView("`Word-Count`", input, $("word"), $("frequency")); 加上 ` 试一下。 Jun Zou 于2021年5月26日周三 下午4:33写道: > Hi,all: > 我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为: > > > tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray: > > _*) > > > 其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source > connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed > name转换成expression 使用 *ExpressionParser.parseExpression* 这个方法 > > 正常情况下,都能注册成功。 > 但是,当field name带中横线,如 source中一个字段名称为 > “X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)” > 而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。 > > 有什么方法能够处理这种情况么? >
Re: avro.ComplexPayloadAvro
Hi. 会生成 `${project.basedir}/target/generated-sources/` https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97 r pp 于2021年5月25日周二 上午9:58写道: > 各位好,请问下, > > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java > > 在该类下的 > > > /flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java > 下面两个类,在代码哪里? > import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro; > import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro; > -- > Best, > pp >
Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?
Hi casel. flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 https://github.com/ververica/flink-cdc-connectors/blob/master/README.md casel.chen 于2021年4月20日周二 下午6:18写道: > 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE `binlog_table` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'mysql-cdc', > > 'hostname' = '${mysql.hostname}', > > 'port' = '3306', > > 'username' = '${mysql.username}', > > 'password' = '${mysql.password}', > > 'database-name' = '${mysql.database}', > > 'table-name' = '${mysql.table}' > > ); > > > > > CREATE TABLE `kafka_sink` ( > > `id` INT, > > `name` STRING, > > `sys_id` STRING, > > `sequence` INT, > > `filter` STRING, > > `tag` STRING, > > `remark` STRING, > > `create_date` TIMESTAMP, > > `update_date` TIMESTAMP, > > `reserve` STRING, > > `sys_name` STRING, > > `metric_seq` INT, > > `advanced_function` STRING, > > `value_type` STRING, > > `value_field` STRING, > > `status` INT, > > `syn_date` TIMESTAMP, > > `confirmer` STRING, > > `confirm_time` TIMESTAMP, > > `index_explain` STRING, > > `field_name` STRING, > > `tag_values` STRING, > > PRIMARY KEY (`id`) NOT ENFORCED > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = '${topic}', > > 'properties.bootstrap.servers' = '${bootstrap.servers}', > > 'format' = 'canal-json' > > ); > > > > > INSERT INTO `kafka_sink` > > (SELECT * > > FROM `binlog_table`); > > 出来的结果是这样: > > > { > "data": [ > { > "id": 3, > "name": "自动付款接口BuyETC金额", > "sys_id": "0184", > "sequence": 2, > "filter": "(a=1)", > "tag": "MerId(商户号)", > "remark": "O", > "create_date": "2020-11-02 15:01:31", > "update_date": "2021-04-07 09:23:59", > "reserve": "", > "sys_name": "NHL", > "metric_seq": 0, > "advanced_function": "", > "value_type": "sum", > "value_field": "value", > "status": 1, > "syn_date": "2021-01-28 19:31:36", > "confirmer": null, > "confirm_time": null, > "index_explain": "aa", > "field_name": null, > "tag_values": null > } > ], > "type": "INSERT" > } > 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 > > > > CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING, > `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING, > `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING, > `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING, > `value_type` STRING, `value_field` STRING, `status` INT, `syn_date` > TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain` > STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT > ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', > 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = > 'json', > 'value.format' = 'json' ); > > > 出来的数据长这样 > > > > {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 > 12:27:30","update_date":"2021-04-06 >
Re: flink1.11版本 -C 指令并未上传udf jar包
Hi todd. -C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。 可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar -C "http://host:port/xxx.jar; 希望能帮到你。 todd 于2021年4月19日周一 下午10:22写道: > 执行指令:flink run \ > -m yarn-cluster \ > -C file:////flink-demo-1.0.jar \ > x > > 在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: SinkFunction与OutputFormat选择哪个?
Hi Luna Wong. RichOutputFormat 实现的最终是由 Flink 提供的 OutputFormatSinkFunction 再包装成 SinkFunction。 OutputFormatSinkFunction 很早就 Deprecated 了,没有实现 CheckpointedFunction 。 jdbc 的是实现了 RichOutputFormat ,但是最后用 GenericJdbcSinkFunction 包装了一次, GenericJdbcSinkFunction 实现了 CheckpointedFunction, 刚好最近遇到 https://issues.apache.org/jira/browse/FLINK-20552 ,可以看下这个 BUG 的修复。 1.12 之前 : org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala#L97-L101 最新 1.13 : org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L153-L163 Luna Wong 于2021年4月8日周四 下午5:42写道: > 自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢? >
Re: flink消费kafka数据open()方法初始换多次的问题
Hi guoxb. 没有全部代码,我猜你 addSink() 走了两次,调试看下。 guoxb__...@sina.com 于2021年4月9日周五 下午2:36写道: > hi: >情景: > 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 > RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法 > 个人理解: > 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接 > 2. close()方法在程序结束的时候也是只走一次 > 3. invoke()方法在获取到每一条数据走一次这个方法 > 实际情况及问题(env.setParallelism(1)): > 1. open()在程序启动的时候运行了两次 > 2. invoke()方法在每条消息过来也会被处理两次 > > code: > reader端: > ```java > public class FlinkKafkaReader extends > DataKafkaConnect { > > @Override > protected DataStream reader(StreamExecutionEnvironment env, > KafkaConfig cfg) throws JobException { > > DataStream stream = null; > try { > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("bootstrap.servers", > cfg.getBootstrapServers()); > kafkaProps.setProperty("group.id", cfg.getGroupId()); > kafkaProps.setProperty("auto.offset.reset", > cfg.getOffsetReset()); > kafkaProps.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaProps.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaProps.setProperty("enable.auto.commit", > cfg.getAutoCommit()); > kafkaProps.setProperty("max.poll.interval.ms", > cfg.getIntervalMs()); > > KafkaDeserializationSchema deserializationKdl = null; > // 根据不同的配置进行选择不同的消息解析器 > switch (cfg.getMessageType()) { > case "mysql": > deserializationKdl = new > KafkaDataDeserialiaztionBinlogSchemal(); > break; > case "mongodb": > deserializationKdl = new > KafkaDataDeserialiaztionOplogSchema(); > break; > } > FlinkKafkaConsumer flinkKafkaConsumer = new > FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")), > deserializationKdl, kafkaProps); > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > // 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费 > String consumptionStartOffset = cfg.getConsumptionStartTime(); > if (StringUtils.isBlank(consumptionStartOffset)) { > flinkKafkaConsumer.setStartFromGroupOffsets(); > } else { > flinkKafkaConsumer.setStartFromTimestamp( > new SimpleDateFormat("-MM-dd HH:mm:ss") > .parse(cfg.getConsumptionStartTime()) > .getTime() > ); > } > // 设置并行度 > env.setParallelism(1); > //env.getCheckpointConfig().setCheckpointInterval(1000 * 30); > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > // 设置可容忍checkpoint失败的次数 > > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); > stream = env.addSource(flinkKafkaConsumer) > .filter(new FilterFunction() { > @Override > public boolean filter(Object value) throws > Exception { > return null != value; > } > }); > } catch (Exception e) { > throw new JobException(e.getMessage()); > } > return stream; > } > } > ``` > sink端: > ```java > public class MysqlSink extends RichSinkFunction { > @Override > public void open(Configuration config) throw Exception{ > ... > } > @Override > public void close(){ > ... > } > @Override > public void invoke(Object obj,Context context){ > //业务逻辑,这里的逻辑每一条数据过来会运行两次,这里也是我的问题 > ... > } > } > ``` > > 还请知悉原因的道友给点指引,万分感谢 > > > guoxb__...@sina.com >
Re: flink cdc读取Maxwell格式的binlog,如何获取表名等元信息
Hi chen310. Canal 和 debezium 已经实现,Maxwell 还没有完成,可以关注下 https://issues.apache.org/jira/browse/FLINK-20926 chen310 <1...@163.com> 于2021年4月8日周四 下午5:35写道: > 请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql > Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效 `origin_table` STRING > METADATA FROM 'value.source.table' VIRTUAL, > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html > > < > http://apache-flink.147419.n8.nabble.com/file/t572/lALPDg7mQSzBJifNAmzNBpY_1686_620.png> > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: 在FlinkKafkaProducer获取sink表的建表key
Hi Jimmy. FlinkKafkaProducer 里面是没有的,可以试着从 KafkaDynamicSink 里面传到 FlinkKafkaProducer 中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType 这个里面可以拿到 Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道: > Hi!大家好。 > 目前有一个需求,需要获取Kafka > sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢! > 例如:CREATE TABLE kafkaTable ( > > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'csv', > 'scan.startup.mode' = 'earliest-offset' > ) > 想获取到 user_id, item_id ,category_id ,behavior这四个字段。 > > > | | > Jimmy Zhang > | > | > 13669299...@163.com > | > 签名由网易邮箱大师定制
Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
Hi Jark. 对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 > 学到了,感谢。 > > Jark Wu 于2021年3月4日周四 下午11:11写道: > >> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize >> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json >> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true >> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 >> forward。 >> >> Best, >> Jark >> >> [1]: >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate >> >> On Thu, 4 Mar 2021 at 15:30, Qishang wrote: >> >> > Hi 社区。 >> > Flink 1.12.1 >> > >> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition >> ,设置大的并发,对于只有 >> > forword 的ETL没有作用。 >> > >> > insert into table_a select id,udf(a),b,c from table_b; >> > >> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 >> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? >> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka >> 中生效吗?可以用在我上面说的场景上面吗? >> > >> > ``` >> > == Physical Execution Plan == >> > Stage 1 : Data Source >> > content : Source: TableSourceScan(table=[[default_catalog, >> > default_database, temp_table]], fields=[id...]) >> > >> > Stage 3 : Operator >> > content : ChangelogNormalize(key=[id]) >> > ship_strategy : HASH >> > >> > Stage 4 : Operator >> > content : Calc(select=[...]) >> > ship_strategy : FORWARD >> > >> > Stage 5 : Data Sink >> > content : Sink: Sink(table=[default_catalog.default_database.table_a], >> > fields=[id...]) >> > ship_strategy : FORWARD >> > ``` >> > >> >
Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 学到了,感谢。 Jark Wu 于2021年3月4日周四 下午11:11写道: > 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize > 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json > 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true > 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 > forward。 > > Best, > Jark > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate > > On Thu, 4 Mar 2021 at 15:30, Qishang wrote: > > > Hi 社区。 > > Flink 1.12.1 > > > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition > ,设置大的并发,对于只有 > > forword 的ETL没有作用。 > > > > insert into table_a select id,udf(a),b,c from table_b; > > > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > > > ``` > > == Physical Execution Plan == > > Stage 1 : Data Source > > content : Source: TableSourceScan(table=[[default_catalog, > > default_database, temp_table]], fields=[id...]) > > > > Stage 3 : Operator > > content : ChangelogNormalize(key=[id]) > > ship_strategy : HASH > > > > Stage 4 : Operator > > content : Calc(select=[...]) > > ship_strategy : FORWARD > > > > Stage 5 : Data Sink > > content : Sink: Sink(table=[default_catalog.default_database.table_a], > > fields=[id...]) > > ship_strategy : FORWARD > > ``` > > >
Re: Flink1.12 如何使用代码提交Batch的Sql
Hi shougou. 你要找的是不是这个[1] // **// BLINK BATCH QUERY// **import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment shougou <80562...@qq.com> 于2021年3月5日周五 上午9:59写道: > 我们知道如果在1.12里使用Table API来提交Batch的作业,比如: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > > 但是,如果提交Sql作业的话: > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Table result = tableEnv.sqlQuery(...); > 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment? > > 感谢各位提供思路! > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
Hi 社区。 Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? ``` == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, temp_table]], fields=[id...]) Stage 3 : Operator content : ChangelogNormalize(key=[id]) ship_strategy : HASH Stage 4 : Operator content : Calc(select=[...]) ship_strategy : FORWARD Stage 5 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[id...]) ship_strategy : FORWARD ```
Re: UDF 重复调用的问题、
Hi Benchao. 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。 这个 feature 社区有规划了吗? Benchao Li 于2021年3月3日周三 上午10:23写道: > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。 > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。 > > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在 > plan的过程中会将表达式完全展开,比如下面的SQL: > ```SQL > SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as > key3 > FROM ( > SELECT dump_json_to_map(col1) as my_map > FROM T > ) > ``` > 这种写法也会将`dump_json_to_map`这个函数执行3次。 > > HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道: > > > 为什么4次是没问题的,感觉只执行一次才是最优的 > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > -- > > Best, > Benchao Li >
Re: UDF 重复调用的问题、
2. 是我搞错了,是四次,没问题 Qishang 于2021年3月2日周二 下午6:50写道: > Hi 社区。 > 版本 : Flink 1.12.1 > 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。 > e.g. > INSERT INTO table_a > SELECT > update_time, > MD5(p_key) AS id, > p_key > FROM > ( > SELECT > LOCALTIMESTAMP AS update_time , > findkeyudf(p_name) AS p_key > FROM table_b > ) T > WHERE COALESCE(p_key, '')<> '' > ; > > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table_b]], fields=[p_name, xxx, ...]) > > Stage 2 : Operator > content : Calc(select=[CAST(()) AS update_date, > CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key], > where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <> > _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)]) > ship_strategy : FORWARD > > Stage 3 : Data Sink > content : Sink: Sink(table=[default_catalog.default_database.table_a], > fields=[update_date, comp_name, p_key]) > ship_strategy : FORWARD > > 查看 explain, udf 调用有四次,但是从日志发现同一个Key 执行了 8次。 > > 现在有2个问题: > 1. udf 调用不会被优化成一次,结果复用吗? > 2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> '' > )是执行了2次的。 > 3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗? > > >
UDF 重复调用的问题、
Hi 社区。 版本 : Flink 1.12.1 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。 e.g. INSERT INTO table_a SELECT update_time, MD5(p_key) AS id, p_key FROM ( SELECT LOCALTIMESTAMP AS update_time , findkeyudf(p_name) AS p_key FROM table_b ) T WHERE COALESCE(p_key, '')<> '' ; == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table_b]], fields=[p_name, xxx, ...]) Stage 2 : Operator content : Calc(select=[CAST(()) AS update_date, CAST(MD5(findkeyudf(p_name))) AS comp_name, findkeyudf(p_name) AS p_key], where=[(findkeyudf(p_name) IS NOT NULL CASE (CAST(findkeyudf(p_name)) <> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE false)]) ship_strategy : FORWARD Stage 3 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[update_date, comp_name, p_key]) ship_strategy : FORWARD 查看 explain, udf 调用有四次,但是从日志发现同一个Key 执行了 8次。 现在有2个问题: 1. udf 调用不会被优化成一次,结果复用吗? 2. 查看 explain,不应该是四次吗,执行了八次有点不理解,没有加 过滤条件( WHERE COALESCE(p_key, '')<> '' )是执行了2次的。 3. 顺便问下,JDBC 维表异步Join的 Feature 有对应的规划吗?
Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory
Hi . 我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 . flink2021 于2021年2月19日周五 下午12:39写道: > 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢? > JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC > -XX:MaxDirectMemorySize=8192m" > 其它也就是写常规的配置: > og.segment.bytes=1073741824 > log.retention.check.interval.ms=30 > #broker能接收消息的最大字节数 > message.max.bytes=2 > #broker可复制的消息的最大字节数 > replica.fetch.max.bytes=204857600 > #消费者端的可读取的最大消息 > fetch.message.max.bytes=204857600 > max.poll.records=500 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Yarn资源本地化的地方报权限错误
HI. 环境 :2.6.0+cdh5.15.2+ islon FlinkX (基于Flink 1.8.1) 提交任务报错。这个问题卡了好长时间了。提交任务的地方Kerberos是正常通过的, Yarn资源本地化的地方报权限错误,很不理解,各位大佬能不能帮忙提供一点排除思路。 1. Flinkx的任务是正常提交的; 2. 还有一个测试环境也是CDH + Kerberos , Flinkx 提交也是正常的; 3. 升级到FlinkX 1.10.1 + Flink 1.10.1 也是同样的问题。 提交命令 : /opt/flinkx/bin/flinkx -mode yarnPer -job /tmp/flink_tmp_json/mysql2mysql.json -pluginRoot /opt/flinkx/plugins -flinkconf /opt/flink-1.8.1/conf -flinkLibJar /opt/flink-1.8.1/lib -yarnconf /etc/hadoop/conf -queue root.liu -jobid 10698 报错如下: 13:51:45.298 [main] INFO com.dtstack.flinkx.launcher.perjob.PerJobSubmitter - start to submit per-job task, LauncherOptions = com.dtstack.flinkx.options.Options@32eebfca log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 13:51:45.530 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6124 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 1024 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.web.port, 8082 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.web.checkpoints.disable, true 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.archive.fs.dir, hdfs://xxx:8020/data/flink/flink-completed-jobs 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.mb, 2048 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.preallocate, false 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: io.tmp.dirs, /tmp/flink/taskmanager 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 2 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.per-job-cluster.include-user-jar, last 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: akka.lookup.timeout, 30 s 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: web.checkpoints.history, 30 13:51:45.531 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.backend, filesystem 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.backend.fs.checkpointdir, hdfs://xxx:8020/data/flink/flink-checkpoints 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.checkpoints.dir, hdfs://xxx:8020/data/flink/flink-checkpoints 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.checkpoints.num-retained, 5 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, hdfs://xxx:8020/data/flink/flink-savepoints 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.archive.fs.dir, hdfs://xxx:8020/data/flink/flink-completed-jobs 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: historyserver.web.port, 16899 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.application-attempts, 10 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: restart-strategy.fixed-delay.attempts, 1 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: restart-strategy.fixed-delay.delay, 30s 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: security.kerberos.login.contexts, Client 13:51:45.532 [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: security.kerberos.login.keytab, /root/keytab/hive.keytab 13:51:45.532 [main]
Job manager 异常信息
Flink 1.11.2 Per Job On Yarn Job manager 报异常 : 这个大概是什么问题。重启之后就没有了。 2020-10-21 16:34:39,657 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @ 1603269279616 for job 3acdf1d32591165ddbf5ef277bfbe260. 2020-10-21 16:34:40,370 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator [] - Completed checkpoint 1 for job 3acdf1d32591165ddbf5ef277bfbe260 (53879 bytes in 687 ms). 2020-10-21 16:34:59,621 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator [] - Triggering checkpoint 2 (type=CHECKPOINT) @ 1603269299614 for job 3acdf1d32591165ddbf5ef277bfbe260. 2020-10-21 16:34:59,846 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator [] - Completed checkpoint 2 for job 3acdf1d32591165ddbf5ef277bfbe260 (53879 bytes in 76 ms). 2020-10-21 16:35:00,764 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - error while responding java.nio.file.FileAlreadyExistsException: /tmp/flink-web-492d85b8-f971-45ef -bad7-c00af9d50c35/flink-web-ui/index.html at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) ~[?:1.8.0_141] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_141] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_141] at sun.nio.fs.UnixFileSystemProvider.newByteChannel( UnixFileSystemProvider.java:214) ~[?:1.8.0_141] at java.nio.file.spi.FileSystemProvider.newOutputStream( FileSystemProvider.java:434) ~[?:1.8.0_141] at java.nio.file.Files.newOutputStream(Files.java:216) ~[?:1.8.0_141] at java.nio.file.Files.copy(Files.java:3016) ~[?:1.8.0_141] at org.apache.flink.runtime.rest.handler.legacy.files. StaticFileServerHandler.respondToRequest(StaticFileServerHandler.java:184) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.legacy.files. StaticFileServerHandler.respondAsLeader(StaticFileServerHandler.java:143) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler .lambda$channelRead0$0(LeaderRetrievalHandler.java:81) ~[flink-dist_2.11- 1.11.2.jar:1.11.2] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_141] at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer .java:46) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler .channelRead0(LeaderRetrievalHandler.java:78) [flink-dist_2.11-1.11.2.jar: 1.11.2] at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler .channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.11-1.11.2.jar: 1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:360) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext .java:352) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed( RouterHandler.java:110) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.router.RouterHandler .channelRead0(RouterHandler.java:89) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.rest.handler.router.RouterHandler .channelRead0(RouterHandler.java:54) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:360) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext .java:352) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.handler.codec. MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:374) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannelHandlerContext.invokeChannelRead(
Per-job mode 任务失败 jm没有退出
Flink 1.11.1 CDH 5.15.2 提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive /tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar flink-conf.yaml 重启策策略 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 5 restart-strategy.fixed-delay.delay: 10 s 我在测试失败重启策略,发现任务失败之后会在重试次数之后,Task停止。Web UI 显示在Completed Jobs里面,jm没有挂掉,看yarn上面任务在Runing状态。占用的资源是只有jm的资源了。 1. per-job 任务失败重试次数之后jm不会退出吗,还是我某些参数设置的不对? 是我在flapmap里面手动抛出的异常,报错: 2020-09-17 15:48:47 org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1. ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1. ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler .java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler .handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler .maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler .updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase .updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster .updateTaskExecutionState(JobMaster.java:386) at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor .handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool .java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread .java:107) Caused by: java.lang.Exception: Test failed at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap( TidbBinlogSyncHive.java:231) at com.qcc.hive.TidbBinlogSyncHive$BinlogFlatMapFunction.flatMap( TidbBinlogSyncHive.java:178) at org.apache.flink.streaming.api.operators.StreamFlatMap .processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs
Hi. 大罗 试一下这个方法 org.apache.flink.table.api.StatementSet#execute ss.execute(); 大罗 于2020年9月9日周三 下午3:13写道: > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > :2}, > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] > > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1, > "app_type" :2} > > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink()) > > 然后,再创建临时表,比如: > tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator, > $("pid"), $("val"), $("app_type"), $("data_type")); > > 接着定义不同的sql,比如: > String sql1 = "insert into ods_data_10 select pid, val where data_type = 1 > and app_type = 0" > String sql2 = "insert into ods_data_11 select pid, val where data_type = 1 > and app_type = 1" > String sql3 = "insert into ods_data_01 select pid, val where data_type = 0 > and app_type = 1" > String sql4 = "insert into ods_data_00 select pid, val where data_type = 0 > and app_type = 0" > > 使用StatementSet运行它们: > StatementSet ss = tableEnv.createStatementSet(); > ss.addInsertSql(sql1); > ss.addInsertSql(sql2); > ss.addInsertSql(sql3); > ss.addInsertSql(sql4); > > 最后执行作业: > env.execute(jobName); > > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> > > > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA), > > 作业"insert-into_myhive.dw.ods_analog_sems > ***"对应的应该是写入4个表的操作(假设作业ID为jobB),如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> > > > 其中,顶端的operator的定义如下: > Source: Custom Source -> Map -> Flat Map -> Filter -> > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et, > run_data_type]) -> > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter) > > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m :8081 jobA" > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a > savepoint." > 相应的停止作业jobB的时候也会生成这个savepoint。 > > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询
Hi. 我们也遇到一样的场景,现在您是否有一些具体实施和思考可以交流一下吗? USERNAME 于2020年8月13日周四 下午3:27写道: > > > 任务流程: > OGG->KAFKA->FLINK->HIVE > > > KAFKA数据样例: > 其中会有多个 > "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。 > { > "table": "SCOOT.TABLENAME", > "op_type": "U", > "op_ts": "2020-08-11 07:53:40.008001", > "current_ts": "2020-08-11T15:56:41.233000", > "pos": "980119769930", > "before": { > "C1": 4499000, > "C2": null, > "C3": null, > "C4": null, > "C5": null > }, > "after": { > "C1": 4499000, > "C2": null, > "C3": "", > "C4": "", > "C5": "通过" > } > } > 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致? > 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的? > > > 例如 样例数据在hive中建表 > create table TABLENAME > ( > op_type STRING, > op_ts STRING, > current_ts STRING, > pos STRING, > "C1" STRING, > "C2" STRING, > "C3" STRING, > "C4" STRING, > "C5" STRING > ) > 理解的难点, > 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的 > 2.同一FLINK任务会有新增的表,需自动适配 > 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等 > > > 或者只能采用通过表结构 > create table TABLENAME > ( > table STRING, > op_type STRING, > op_ts STRING, > current_ts STRING, > pos STRING, > "before" STRING, > "after" STRING > ) > 然后剩下的在HIVE中解决。 > > > 或者有其他更好的方案? > >
Re: tidb Binlog 整库同步到 hive
Hi Rui Li. > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了 这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配? Rui Li 于2020年8月28日周五 下午1:47写道: > Hi, > > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive > connector里现有的sink。 > > On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu wrote: > > > Hi > > > > > 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog > > create > > > table 是否可以在运行中来调用吗? > > > 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。 > > > > 用dataStream是会更灵活些,思路也差不多,在运行中可以调用的建表动作的,但是运行的拓扑是不可以动态调整的,不管DataStream 还是 > > SQL 的拓扑。 > > > > 祝好 > > Leonard > > > > > > -- > Best regards! > Rui Li >
Re: tidb Binlog 整库同步到 hive
Hi Leonard. > 除了多个sql作业的方式,如果需要在一个SQL作业中可以试试在一个作业里把所有表的binlog 格式统一用一个字段(如string) 接入,然后写针对每个表的schema写一个udtf解析对应的数据,最后多路输出到hive的不同表。 如果不限定SQL作业的话,用DataSteam API的话是不是可以实现这样的功能。 多路输出是个动态的,因为是整库的binlog ,不确定库中是否有新增表,作业要触发hive里面去创建表的动作。hive catalog create table 是否可以在运行中来调用吗? 程序启动之后生成的jobgraph运行中是不是不可以调整的?如果可以的话,是否有类似的案例可以参考。 Leonard Xu 于2020年8月28日周五 上午9:30写道: > Hi, qishang > > > 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 > ? > > 我看了一些邮件好像说不可以的,在问一下。 > 在一个SQL作业中是不行的,因为SQL是强依赖Schema的,schema需要事先声明。 > > > 2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。 > > 除了多个sql作业的方式,如果需要在一个SQL作业中可以试试在一个作业里把所有表的binlog 格式统一用一个字段(如string) > 接入,然后写针对每个表的schema写一个udtf解析对应的数据,最后多路输出到hive的不同表。 > > 祝好 > Leonard
tidb Binlog 整库同步到 hive
大家好 . 我现在有一个场景需要调研。 背景:对整库 Tidb binlog 做实时落 Hive,好几个库的binlog发送到一个Topic或者几个Topic里面,一个Topic里面有复数个表的binlog。 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 ? 我看了一些邮件好像说不可以的,在问一下。 2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。 感谢!
quickstart工程默认的log不打印到console
我从 flink-quickstart-java 1.11.1 建的工程,IDEA console log 默认是没有打印的。 pom 里面的 jar 换成 1.10 的2个就可以打印了。 是有问题还是我的姿势不对。。 1.10的 jar : org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17 runtime
CEP匹配乱序数据的问题
HI,大家好。 咨询一个问题,flink-training-exercises练习的工程里面 com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution Pattern completedRides = Pattern.begin("start") .where(new SimpleCondition() { @Override public boolean filter(TaxiRide ride) throws Exception { return ride.isStart; } }) .next("end") .where(new SimpleCondition() { @Override public boolean filter(TaxiRide ride) throws Exception { return !ride.isStart; } }); 现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。 是不是就不能匹配例子中的Pattern? 如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗?