Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 文章 Qishang
Hi. FYI : https://issues.apache.org/jira/browse/FLINK-25435 Qishang 于2022年4月27日周三 17:28写道: > Hi . > 代码好像是没有设置 > 用这个手动设置一下 > set $internal.deployment.config-dir=/opt/flink-1.14.3/conf > > 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory -> > YarnJobClusterExecutor -&g

Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 文章 Qishang
Descriptor的调用路径。 > [image: image.png] > 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到 > YarnClusterDescriptor的调用路径。这两者不在同一个包。 > > Qishang 于2022年4月27日周三 13:46写道: > >> Hi. >> 确认下 conf 下,是否有 log4j.properties >> >> 应该是在这个地放生成的, >> >

Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-26 文章 Qishang
Hi. 确认下 conf 下,是否有 log4j.properties 应该是在这个地放生成的, https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699 ruiyun wan 于2022年4月26日周二 14:41写道: > Flink版本:1.13 > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target = > >

Re: Flink CDC 2.0 整库同步如何实现?

2021-12-26 文章 Qishang
Hi 暂时还不支持,你看到的应该是未来规划的内容。 casel.chen 于2021年12月24日周五 20:50写道: > 看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec > 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。 > 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!

Re: Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章 Qishang
afka]], fields=[tz]) > > > > > Stage 2 : Operator > > content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL]) > > ship_strategy : FORWARD > > > > 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避 > > > > > > > &g

Re: 关于flink sql自定义udf,eval方法被调用两次问题

2021-08-15 文章 Qishang
Hi. 应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573 打印一下执行计划和code gen 黄志高 于2021年8月15日周日 下午10:06写道: > hi all, > 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from > test_kafka) as t where tz is not null > 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select

Re: createTemporaryView接口注册table时,fieldname支持中横线(-)

2021-05-27 文章 Qishang
Hi Jun Zou. tEnv.createTemporaryView("`Word-Count`", input, $("word"), $("frequency")); 加上 ` 试一下。 Jun Zou 于2021年5月26日周三 下午4:33写道: > Hi,all: > 我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为: > > > tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray: > > _*) > > >

Re: avro.ComplexPayloadAvro

2021-05-25 文章 Qishang
Hi. 会生成 `${project.basedir}/target/generated-sources/` https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97 r pp 于2021年5月25日周二 上午9:58写道: > 各位好,请问下, > >

Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 Qishang
Hi casel. flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 https://github.com/ververica/flink-cdc-connectors/blob/master/README.md casel.chen 于2021年4月20日周二 下午6:18写道: > 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE `binlog_table` ( > > `id` INT, > >

Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 Qishang
Hi todd. -C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。 可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar -C "http://host:port/xxx.jar; 希望能帮到你。 todd 于2021年4月19日周一 下午10:22写道: > 执行指令:flink run \ > -m yarn-cluster \ > -C file:////flink-demo-1.0.jar \ > x > >

Re: SinkFunction与OutputFormat选择哪个?

2021-04-09 文章 Qishang
Hi Luna Wong. RichOutputFormat 实现的最终是由 Flink 提供的 OutputFormatSinkFunction 再包装成 SinkFunction。 OutputFormatSinkFunction 很早就 Deprecated 了,没有实现 CheckpointedFunction 。 jdbc 的是实现了 RichOutputFormat ,但是最后用 GenericJdbcSinkFunction 包装了一次, GenericJdbcSinkFunction 实现了 CheckpointedFunction, 刚好最近遇到

Re: flink消费kafka数据open()方法初始换多次的问题

2021-04-09 文章 Qishang
Hi guoxb. 没有全部代码,我猜你 addSink() 走了两次,调试看下。 guoxb__...@sina.com 于2021年4月9日周五 下午2:36写道: > hi: >情景: > 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 > RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法 > 个人理解: > 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接 >

Re: flink cdc读取Maxwell格式的binlog,如何获取表名等元信息

2021-04-09 文章 Qishang
Hi chen310. Canal 和 debezium 已经实现,Maxwell 还没有完成,可以关注下 https://issues.apache.org/jira/browse/FLINK-20926 chen310 <1...@163.com> 于2021年4月8日周四 下午5:35写道: > 请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql > Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效

Re: 在FlinkKafkaProducer获取sink表的建表key

2021-03-28 文章 Qishang
Hi Jimmy. FlinkKafkaProducer 里面是没有的,可以试着从 KafkaDynamicSink 里面传到 FlinkKafkaProducer 中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType 这个里面可以拿到 Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道: > Hi!大家好。 > 目前有一个需求,需要获取Kafka >

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark. 对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 > forward。 > > Best, > Jark > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate > > On Thu, 4 Mar 2021 at 15:30, Qis

Re: Flink1.12 如何使用代码提交Batch的Sql

2021-03-04 文章 Qishang
Hi shougou. 你要找的是不是这个[1] // **// BLINK BATCH QUERY// **import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings =

Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 文章 Qishang
Hi 社区。 Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认

Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi Benchao. 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。 这个 feature 社区有规划了吗? Benchao Li 于2021年3月3日周三 上午10:23写道: > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。 > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。 > > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在 > plan的过程中会将表达式完全展开,比如下面的SQL: > ```SQL >

Re: UDF 重复调用的问题、

2021-03-02 文章 Qishang
2. 是我搞错了,是四次,没问题 Qishang 于2021年3月2日周二 下午6:50写道: > Hi 社区。 > 版本 : Flink 1.12.1 > 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。 > e.g. > INSERT INTO table_a > SELECT > update_time, > MD5(p_key) AS id, > p_key > FROM > ( > SELECT >

UDF 重复调用的问题、

2021-03-02 文章 Qishang
Hi 社区。 版本 : Flink 1.12.1 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。 e.g. INSERT INTO table_a SELECT update_time, MD5(p_key) AS id, p_key FROM ( SELECT LOCALTIMESTAMP AS update_time , findkeyudf(p_name) AS p_key FROM table_b ) T WHERE COALESCE(p_key,

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-24 文章 Qishang
Hi . 我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 . flink2021 于2021年2月19日周五 下午12:39写道: > 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢? > JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC > -XX:MaxDirectMemorySize=8192m" > 其它也就是写常规的配置: > og.segment.bytes=1073741824 >

Yarn资源本地化的地方报权限错误

2020-12-29 文章 Qishang
HI. 环境 :2.6.0+cdh5.15.2+ islon FlinkX (基于Flink 1.8.1) 提交任务报错。这个问题卡了好长时间了。提交任务的地方Kerberos是正常通过的, Yarn资源本地化的地方报权限错误,很不理解,各位大佬能不能帮忙提供一点排除思路。 1. Flinkx的任务是正常提交的; 2. 还有一个测试环境也是CDH + Kerberos , Flinkx 提交也是正常的; 3. 升级到FlinkX 1.10.1 + Flink 1.10.1 也是同样的问题。 提交命令 : /opt/flinkx/bin/flinkx -mode yarnPer -job

Job manager 异常信息

2020-10-21 文章 Qishang
Flink 1.11.2 Per Job On Yarn Job manager 报异常 : 这个大概是什么问题。重启之后就没有了。 2020-10-21 16:34:39,657 INFO org.apache.flink.runtime.checkpoint. CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @ 1603269279616 for job 3acdf1d32591165ddbf5ef277bfbe260. 2020-10-21 16:34:40,370 INFO

Per-job mode 任务失败 jm没有退出

2020-09-17 文章 Qishang
Flink 1.11.1 CDH 5.15.2 提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive /tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar flink-conf.yaml 重启策策略 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 5

Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 文章 Qishang
Hi. 大罗 试一下这个方法 org.apache.flink.table.api.StatementSet#execute ss.execute(); 大罗 于2020年9月9日周三 下午3:13写道: > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > :2}, > {"pid":"a", "val":1, "data_type": 1, "app_type"

Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-31 文章 Qishang
Hi. 我们也遇到一样的场景,现在您是否有一些具体实施和思考可以交流一下吗? USERNAME 于2020年8月13日周四 下午3:27写道: > > > 任务流程: > OGG->KAFKA->FLINK->HIVE > > > KAFKA数据样例: > 其中会有多个 > "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。 > { > "table": "SCOOT.TABLENAME", > "op_type": "U", > "op_ts":

Re: tidb Binlog 整库同步到 hive

2020-08-28 文章 Qishang
Hi Rui Li. > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了 这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配? Rui Li 于2020年8月28日周五 下午1:47写道: > Hi, > > 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive > connector里现有的sink。 > > On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu wrote: > > > Hi > > > >

Re: tidb Binlog 整库同步到 hive

2020-08-27 文章 Qishang
可以调整的?如果可以的话,是否有类似的案例可以参考。 Leonard Xu 于2020年8月28日周五 上午9:30写道: > Hi, qishang > > > 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 > ? > > 我看了一些邮件好像说不可以的,在问一下。 > 在一个SQL作业中是不行的,因为SQL是强依赖Schema的,schema需要事先声明。 > > > 2. 或者有什么好的解决方式吗?因为数据量都不是很大

tidb Binlog 整库同步到 hive

2020-08-27 文章 Qishang
大家好 . 我现在有一个场景需要调研。 背景:对整库 Tidb binlog 做实时落 Hive,好几个库的binlog发送到一个Topic或者几个Topic里面,一个Topic里面有复数个表的binlog。 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 ? 我看了一些邮件好像说不可以的,在问一下。 2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。 感谢!

quickstart工程默认的log不打印到console

2020-08-27 文章 Qishang
我从 flink-quickstart-java 1.11.1 建的工程,IDEA console log 默认是没有打印的。 pom 里面的 jar 换成 1.10 的2个就可以打印了。 是有问题还是我的姿势不对。。 1.10的 jar : org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17

CEP匹配乱序数据的问题

2019-12-23 文章 qishang zhong
HI,大家好。 咨询一个问题,flink-training-exercises练习的工程里面 com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution Pattern completedRides = Pattern.begin("start") .where(new SimpleCondition() { @Override public boolean filter(TaxiRide ride) throws Exception { return ride.isStart; } })