这个方法就可以打印在你本地的idea控制台里面,你试一下
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 16:01 |
| 收件人 | user-zh |
| 抄送人 | user-zh |
| 主题 | 回复:控制台打印出流式数据 |
这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果
| |
小昌同学
|
|
ccc0606fight...@163.com
|
回复的原邮件
hi,你好
你应该使用 stream.print() 来打印流中的数据 不要system out 输出
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 15:51 |
| 收件人 | user-zh |
| 主题 | 控制台打印出流式数据 |
各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接
-1.15.2.jar:1.15.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误
请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢
| |
Jason_H
|
|
hyb_he...@163.com
|
您好,
我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 | |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,
看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的
您好,
对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 | |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,
看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据
另外问一下,之前
]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
| |
Jason_H
|
|
hyb_he...@163.com
|
大家好,
我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
| |
Jason_H
|
|
hyb_he...@163.com
|
遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现,
flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗?
具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。
在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。
| |
Jason_H
|
|
hyb_he...@163.com
|
好的,谢谢
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | guozhi mang |
| 发送日期 | 2022年12月13日 17:45 |
| 收件人 | |
| 主题 | Re: flink-gelly官方文档 |
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/gelly/overview/
Jason_H 于2022年12月13日周二 17:39写道:
请问一下,flink官网对应flink-gelly模
请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。
| |
Jason_H
|
|
hyb_he...@163.com
|
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | 任召金 |
| Date | 11/15/2022 09:52 |
| To | user-zh |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
| |
Jason_H
|
|
hyb_he...@163.com
|
Replied Message
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据
,
Thanks
在 2022-11-11 11:10:03,"Jason_H" 写道:
hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
100 1 -> 未匹配
100 1 -> 未匹配
100 1 ->
+
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n&quo
+
"SUPER_ORG_ID string, \n" +
"IS_OUTSIDE BIGINT \n" +
") \n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '***',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'passwor
ver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
//" 'lookup.cache.ttl' = '1s', \n" +
" 'table-name' = 'dob_dim_account' \n" +
//"
"SELECT \n" +
" DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
" gmtStatistical, \n" +
" paymentMethod, \n" +
" outCorpId, \n" +
" inCorpId, \n" +
" tranAmount, \n" +
" tranNum \n" +
"FROM temp";
| |
Jason_H
|
|
hyb_he...@163.com
|
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。
| |
Jason_H
|
|
hyb_he...@163.com
|
hi,Liting Liu
看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。
可以参考一下:
https://blog.csdn.net/sinat_38079265/article/details/108535909
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | Liting Liu (litiliu) |
| 发送日期 | 2022年10月26日 13:19 |
| 收件人 | user-zh
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。
| |
Jason_H
|
|
hyb_he...@163.com
|
)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
... 15 more
| |
Jason_H
|
|
hyb_he
您好,报错如下:
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state
serializer must not be incompatible with the old state serializer
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | yue ma |
| 发送日期
or);
// 设置状态TTL为 1天
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
Descriptors.quantityJudgeStateDescriptor.enableTimeToLive(stateTtlConfig);
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | Hangxiang Yu |
| 发送日期 | 2022年8月24日 13:12 |
| 收件人 | |
| 主题 |
您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | Michael Ran |
| 发送日期 | 2022年8月20日 15:31 |
| 收件人 | tsreape...@gmail.com |
| 主题 | 回复:flink自动重启出错 |
改过任务吗?
| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|
回复的原邮件
| 发件人 | Jason_H
您好,我的版本是
1.14.3的,运行的逻辑就是使用状态存放数据,然后设置了ttl,时间是一个小时。新的作业启动后,如果不是基于之前的检查点恢复,那应该不会和之前的作业还有任何关联吧,很奇怪的点就是新的作业,会报这个错误,我在本地复现了一下,是旧的作业不设置ttl,然后新的作业设置ttl,以就作业的ck去恢复,报了这个错误,但是我的是全新的作业启动。
感谢您的回复,谢谢。
| |
Jason_H
|
|
hyb_he...@163.com
|
回复的原邮件
| 发件人 | |
| 发送日期 | 2022年8月19日 18:59 |
| 收件人 | flink中文
这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。
| |
Jason_H
|
|
hyb_he...@163.com
|
26 matches
Mail list logo