Hi.
FYI : https://issues.apache.org/jira/browse/FLINK-25435
Qishang 于2022年4月27日周三 17:28写道:
> Hi .
> 代码好像是没有设置
> 用这个手动设置一下
> set $internal.deployment.config-dir=/opt/flink-1.14.3/conf
>
> 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory ->
> YarnJobClusterExecutor -&g
Descriptor的调用路径。
> [image: image.png]
> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
> YarnClusterDescriptor的调用路径。这两者不在同一个包。
>
> Qishang 于2022年4月27日周三 13:46写道:
>
>> Hi.
>> 确认下 conf 下,是否有 log4j.properties
>>
>> 应该是在这个地放生成的,
>>
>
Hi.
确认下 conf 下,是否有 log4j.properties
应该是在这个地放生成的,
https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
ruiyun wan 于2022年4月26日周二 14:41写道:
> Flink版本:1.13
> 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>
>
Hi
暂时还不支持,你看到的应该是未来规划的内容。
casel.chen 于2021年12月24日周五 20:50写道:
> 看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec
> 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。
> 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!
afka]], fields=[tz])
>
>
>
>
> Stage 2 : Operator
>
> content : Calc(select=[tt1(tz) AS tz], where=[tt1(tz) IS NOT NULL])
>
> ship_strategy : FORWARD
>
>
>
> 从执行计划中看出,在select与where中这个tt1(tz)的udf确实调用了两次,看issuse,目前还没有被分配,是否有什么办法可以规避
>
>
>
>
>
>
>
&g
Hi.
应该是这个问题,https://issues.apache.org/jira/browse/FLINK-21573
打印一下执行计划和code gen
黄志高 于2021年8月15日周日 下午10:06写道:
> hi all,
> 各位大佬,有没有遇到过eval方法被调用两次问题,在我使用select * from (select tt1(tz) from
> test_kafka) as t where tz is not null
> 时,在eval方法进行入参打印时,发现eval方法被调用了两次,当我使用select * from (select
Hi Jun Zou.
tEnv.createTemporaryView("`Word-Count`", input, $("word"), $("frequency"));
加上 ` 试一下。
Jun Zou 于2021年5月26日周三 下午4:33写道:
> Hi,all:
> 我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为:
>
> > tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray:
> > _*)
> >
>
Hi.
会生成 `${project.basedir}/target/generated-sources/`
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97
r pp 于2021年5月25日周二 上午9:58写道:
> 各位好,请问下,
>
>
Hi casel.
flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
casel.chen 于2021年4月20日周二 下午6:18写道:
> 目标是用flink作业实现类似canal server的功能
>
>
> CREATE TABLE `binlog_table` (
>
> `id` INT,
>
>
Hi todd.
-C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。
可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar
-C "http://host:port/xxx.jar;
希望能帮到你。
todd 于2021年4月19日周一 下午10:22写道:
> 执行指令:flink run \
> -m yarn-cluster \
> -C file:////flink-demo-1.0.jar \
> x
>
>
Hi Luna Wong.
RichOutputFormat 实现的最终是由 Flink 提供的 OutputFormatSinkFunction 再包装成
SinkFunction。 OutputFormatSinkFunction 很早就 Deprecated 了,没有实现
CheckpointedFunction 。
jdbc 的是实现了 RichOutputFormat ,但是最后用 GenericJdbcSinkFunction 包装了一次,
GenericJdbcSinkFunction 实现了 CheckpointedFunction,
刚好最近遇到
Hi guoxb.
没有全部代码,我猜你 addSink() 走了两次,调试看下。
guoxb__...@sina.com 于2021年4月9日周五 下午2:36写道:
> hi:
>情景:
> 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承
> RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法
> 个人理解:
> 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接
>
Hi chen310.
Canal 和 debezium 已经实现,Maxwell 还没有完成,可以关注下
https://issues.apache.org/jira/browse/FLINK-20926
chen310 <1...@163.com> 于2021年4月8日周四 下午5:35写道:
> 请教下,看了这篇文章https://developer.aliyun.com/article/771438,flink-cdc 读取mysql
> Maxwell 格式binlog,怎么在flink 源表上获取mysql表名,通过这样的方式并没有生效
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从 KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到
Jimmy Zhang <13669299...@163.com> 于2021年3月18日周四 上午10:40写道:
> Hi!大家好。
> 目前有一个需求,需要获取Kafka
>
Hi Jark.
对于 upsert-kafka connector 有两个疑问:
1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?
Qishang 于2021年3月5日周五 上午11:14写道:
>
> 某些原因导致上游 kafka partition
数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
> forward。
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate
>
> On Thu, 4 Mar 2021 at 15:30, Qis
Hi shougou.
你要找的是不是这个[1]
// **// BLINK BATCH QUERY// **import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings =
Hi 社区。
Flink 1.12.1
现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。
insert into table_a select id,udf(a),b,c from table_b;
发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认
Hi Benchao.
现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
这个 feature 社区有规划了吗?
Benchao Li 于2021年3月3日周三 上午10:23写道:
> 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
>
> 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> plan的过程中会将表达式完全展开,比如下面的SQL:
> ```SQL
>
2. 是我搞错了,是四次,没问题
Qishang 于2021年3月2日周二 下午6:50写道:
> Hi 社区。
> 版本 : Flink 1.12.1
> 在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
> e.g.
> INSERT INTO table_a
> SELECT
> update_time,
> MD5(p_key) AS id,
> p_key
> FROM
> (
> SELECT
>
Hi 社区。
版本 : Flink 1.12.1
在Flink SQL 中调用自定义 UDF,UDF中调用了SDK的方法,复用的字段,发现SDK被重复调用了。
e.g.
INSERT INTO table_a
SELECT
update_time,
MD5(p_key) AS id,
p_key
FROM
(
SELECT
LOCALTIMESTAMP AS update_time ,
findkeyudf(p_name) AS p_key
FROM table_b
) T
WHERE COALESCE(p_key,
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 .
flink2021 于2021年2月19日周五 下午12:39写道:
> 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
> JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
> -XX:MaxDirectMemorySize=8192m"
> 其它也就是写常规的配置:
> og.segment.bytes=1073741824
>
HI.
环境 :2.6.0+cdh5.15.2+ islon
FlinkX (基于Flink 1.8.1) 提交任务报错。这个问题卡了好长时间了。提交任务的地方Kerberos是正常通过的,
Yarn资源本地化的地方报权限错误,很不理解,各位大佬能不能帮忙提供一点排除思路。
1. Flinkx的任务是正常提交的;
2. 还有一个测试环境也是CDH + Kerberos , Flinkx 提交也是正常的;
3. 升级到FlinkX 1.10.1 + Flink 1.10.1 也是同样的问题。
提交命令 :
/opt/flinkx/bin/flinkx -mode yarnPer -job
Flink 1.11.2
Per Job On Yarn
Job manager 报异常 :
这个大概是什么问题。重启之后就没有了。
2020-10-21 16:34:39,657 INFO org.apache.flink.runtime.checkpoint.
CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @
1603269279616 for job 3acdf1d32591165ddbf5ef277bfbe260.
2020-10-21 16:34:40,370 INFO
Flink 1.11.1
CDH 5.15.2
提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm
2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive
/tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar
flink-conf.yaml 重启策策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute
ss.execute();
大罗 于2020年9月9日周三 下午3:13写道:
> Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
>
> 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> :2},
> {"pid":"a", "val":1, "data_type": 1, "app_type"
Hi.
我们也遇到一样的场景,现在您是否有一些具体实施和思考可以交流一下吗?
USERNAME 于2020年8月13日周四 下午3:27写道:
>
>
> 任务流程:
> OGG->KAFKA->FLINK->HIVE
>
>
> KAFKA数据样例:
> 其中会有多个
> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
> {
> "table": "SCOOT.TABLENAME",
> "op_type": "U",
> "op_ts":
Hi Rui Li.
> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了
这个实现有啥思路,能稍微详细说一下嘛? 是不是需要自己开发一个 Sink 来适配?
Rui Li 于2020年8月28日周五 下午1:47写道:
> Hi,
>
> 我理解也是如果多路输出需要能动态适配新表的话只能用DataStream自己实现了,具体写表的时候可以试试看能不能复用hive
> connector里现有的sink。
>
> On Fri, Aug 28, 2020 at 12:15 PM Leonard Xu wrote:
>
> > Hi
> >
> >
可以调整的?如果可以的话,是否有类似的案例可以参考。
Leonard Xu 于2020年8月28日周五 上午9:30写道:
> Hi, qishang
>
> > 1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗
> ?
> > 我看了一些邮件好像说不可以的,在问一下。
> 在一个SQL作业中是不行的,因为SQL是强依赖Schema的,schema需要事先声明。
>
> > 2. 或者有什么好的解决方式吗?因为数据量都不是很大
大家好 .
我现在有一个场景需要调研。
背景:对整库 Tidb binlog 做实时落
Hive,好几个库的binlog发送到一个Topic或者几个Topic里面,一个Topic里面有复数个表的binlog。
1. 好像 Flink SQL 里面只能一个数据源 kafka 到一张Hive表,一个Topic是否可以根据解析的表不同插入到不同的Hive表吗 ?
我看了一些邮件好像说不可以的,在问一下。
2. 或者有什么好的解决方式吗?因为数据量都不是很大,表比较多,每个表都要维护一个任务的话,代价比较大。
感谢!
我从 flink-quickstart-java 1.11.1 建的工程,IDEA console log 默认是没有打印的。
pom 里面的 jar 换成 1.10 的2个就可以打印了。
是有问题还是我的姿势不对。。
1.10的 jar :
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
HI,大家好。
咨询一个问题,flink-training-exercises练习的工程里面
com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution
Pattern completedRides =
Pattern.begin("start")
.where(new SimpleCondition() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return ride.isStart;
}
})
32 matches
Mail list logo