回复:控制台打印出流式数据

2023-04-19 文章 Jason_H
这个方法就可以打印在你本地的idea控制台里面,你试一下 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 16:01 | | 收件人 | user-zh | | 抄送人 | user-zh | | 主题 | 回复:控制台打印出流式数据 | 这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件

回复:控制台打印出流式数据

2023-04-19 文章 Jason_H
hi,你好 你应该使用 stream.print() 来打印流中的数据 不要system out 输出 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 15:51 | | 收件人 | user-zh | | 主题 | 控制台打印出流式数据 | 各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接

flink命令行提交作业读取不到properties配置文件

2023-04-19 文章 Jason_H
-1.15.2.jar:1.15.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] 我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误 请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢 | | Jason_H | | hyb_he...@163.com |

回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好, 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的

回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好, 对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前

flink k8s 部署启动报错

2023-03-13 文章 Jason_H
] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |

regular join每条流单独设置ttl

2023-02-14 文章 Jason_H
大家好, 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2 | | Jason_H | | hyb_he...@163.com |

运行中的作业状态清除操作

2023-02-13 文章 Jason_H
遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现, flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗? 具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。 在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。 | | Jason_H | | hyb_he...@163.com |

回复: flink-gelly官方文档

2022-12-13 文章 Jason_H
好的,谢谢 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | guozhi mang | | 发送日期 | 2022年12月13日 17:45 | | 收件人 | | | 主题 | Re: flink-gelly官方文档 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/gelly/overview/ Jason_H 于2022年12月13日周二 17:39写道: 请问一下,flink官网对应flink-gelly模

flink-gelly官方文档

2022-12-13 文章 Jason_H
请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。 | | Jason_H | | hyb_he...@163.com |

Re: flinksql join

2022-11-16 文章 Jason_H
hi,你好 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 | | Jason_H | | hyb_he...@163.com | Replied Message | From | 任召金 | | Date | 11/15/2022 09:52 | | To | user-zh | | Subject | Re: flinksql join | hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL

Re: flinksql join

2022-11-14 文章 Jason_H
hi,你好 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 | | Jason_H | | hyb_he...@163.com | Replied Message | From | RS | | Date | 11/15/2022 09:07 | | To | user-zh@flink.apache.org | | Subject | Re:flinksql join | Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据

Re: flinksql join

2022-11-14 文章 Jason_H
, Thanks 在 2022-11-11 11:10:03,"Jason_H" 写道: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 ->

Re: flinksql join

2022-11-10 文章 Jason_H
+ "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n&quo

Re: flinksql join

2022-11-10 文章 Jason_H
+ "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'passwor

Re: flinksql join

2022-11-10 文章 Jason_H
ver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //"

flinksql join

2022-11-10 文章 Jason_H
"SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |

flinksql-redis-connector

2022-10-27 文章 Jason_H
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |

回复:使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?

2022-10-26 文章 Jason_H
hi,Liting Liu 看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。 可以参考一下: https://blog.csdn.net/sinat_38079265/article/details/108535909 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Liting Liu (litiliu) | | 发送日期 | 2022年10月26日 13:19 | | 收件人 | user-zh

flinksql-redis插件

2022-10-26 文章 Jason_H
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |

回复: flink自动重启出错

2022-08-25 文章 Jason_H
) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106) ... 15 more | | Jason_H | | hyb_he

回复: flink自动重启出错

2022-08-24 文章 Jason_H
您好,报错如下: cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | yue ma | | 发送日期

回复: flink自动重启出错

2022-08-24 文章 Jason_H
or); // 设置状态TTL为 1天 StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build(); Descriptors.quantityJudgeStateDescriptor.enableTimeToLive(stateTtlConfig); | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Hangxiang Yu | | 发送日期 | 2022年8月24日 13:12 | | 收件人 | | | 主题 |

回复:flink自动重启出错

2022-08-20 文章 Jason_H
您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Michael Ran | | 发送日期 | 2022年8月20日 15:31 | | 收件人 | tsreape...@gmail.com | | 主题 | 回复:flink自动重启出错 | 改过任务吗? | | greemqq...@163.com | | 邮箱:greemqq...@163.com | 回复的原邮件 | 发件人 | Jason_H

回复: flink自动重启出错

2022-08-19 文章 Jason_H
您好,我的版本是 1.14.3的,运行的逻辑就是使用状态存放数据,然后设置了ttl,时间是一个小时。新的作业启动后,如果不是基于之前的检查点恢复,那应该不会和之前的作业还有任何关联吧,很奇怪的点就是新的作业,会报这个错误,我在本地复现了一下,是旧的作业不设置ttl,然后新的作业设置ttl,以就作业的ck去恢复,报了这个错误,但是我的是全新的作业启动。 感谢您的回复,谢谢。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | | | 发送日期 | 2022年8月19日 18:59 | | 收件人 | flink中文

flink自动重启出错

2022-08-18 文章 Jason_H
这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com |