回复:控制台打印出流式数据
这个方法就可以打印在你本地的idea控制台里面,你试一下 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 16:01 | | 收件人 | user-zh | | 抄送人 | user-zh | | 主题 | 回复:控制台打印出流式数据 | 这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 发送日期 | 2023年4月19日 15:58 | | 收件人 | flink中文邮件组 | | 主题 | 回复:控制台打印出流式数据 | hi,你好 你应该使用 stream.print() 来打印流中的数据 不要system out 输出 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 15:51 | | 收件人 | user-zh | | 主题 | 控制台打印出流式数据 | 各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接 System.out.println(stream.toString); 但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
回复:控制台打印出流式数据
hi,你好 你应该使用 stream.print() 来打印流中的数据 不要system out 输出 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 15:51 | | 收件人 | user-zh | | 主题 | 控制台打印出流式数据 | 各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接 System.out.println(stream.toString); 但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
flink命令行提交作业读取不到properties配置文件
hi,大家好 我在使用命令行提交任务时,发现任务刚起来就会报错,根据错误发现没有读去到jar包中resource目录下的properties配置文件,导致在使用redis时,初始化报错 提交命令如下: flink run -c com.test..etl.OdsChangeApplication /opt/dobrain/app/etl/test-etl-0.0.2-SNAPSHOT.jar \ -p 4 \ -job-name test-etl \ 此处没有添加redis配置参数,但是配置文件中已经有默认的,提交运行后报错: java.lang.IllegalArgumentException: template not initialized; call afterPropertiesSet() before using it at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.2.14.RELEASE.jar:5.2.14.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:204) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at com.test.etl.client.RedisService.getStringValue(RedisService.java:30) ~[classes/:?] at com.test.etl.manager.impl.RedisChangeManager.getCustId(RedisChangeManager.java:53) ~[classes/:?] at com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:46) ~[classes/:?] at com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:21) ~[classes/:?] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.2.jar:1.15.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] 我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误 请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢 | | Jason_H | | hyb_he...@163.com |
回复: flink k8s 部署启动报错
您好, 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
回复: flink k8s 部署启动报错
您好, 对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at 9:58 AM Jason_H wrote: hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
flink k8s 部署启动报错
hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.2.jar:1.15.2] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.2.jar:1.15.2] ... 4 more | | Jason_H | | hyb_he...@163.com |
regular join每条流单独设置ttl
大家好, 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2 | | Jason_H | | hyb_he...@163.com |
运行中的作业状态清除操作
遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现, flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗? 具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。 在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。 | | Jason_H | | hyb_he...@163.com |
回复: flink-gelly官方文档
好的,谢谢 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | guozhi mang | | 发送日期 | 2022年12月13日 17:45 | | 收件人 | | | 主题 | Re: flink-gelly官方文档 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/gelly/overview/ Jason_H 于2022年12月13日周二 17:39写道: 请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。 | | Jason_H | | hyb_he...@163.com | -- Best regards
flink-gelly官方文档
请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。 | | Jason_H | | hyb_he...@163.com |
Re: flinksql join
hi,你好 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 | | Jason_H | | hyb_he...@163.com | Replied Message | From | 任召金 | | Date | 11/15/2022 09:52 | | To | user-zh | | Subject | Re: flinksql join | hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL --Original-- From: "Jason_H"
Re: flinksql join
hi,你好 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 | | Jason_H | | hyb_he...@163.com | Replied Message | From | RS | | Date | 11/15/2022 09:07 | | To | user-zh@flink.apache.org | | Subject | Re:flinksql join | Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据, Thanks 在 2022-11-11 11:10:03,"Jason_H" 写道: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额 笔数 100 1 我想要的结果: 企业 金额 笔数 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |
Re: flinksql join
hi,你好 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 | | Jason | | hyb_he...@163.com | Replied Message | From | RS | | Date | 11/15/2022 09:07 | | To | user-zh@flink.apache.org | | Subject | Re:flinksql join | Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据, Thanks 在 2022-11-11 11:10:03,"Jason_H" 写道: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额 笔数 100 1 我想要的结果: 企业 金额 笔数 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |
Re: flinksql join
我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + "DIM_ACCOUNT_ID string ,\n" + "GMT_CREATE string ,\n" + "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + "GMT_UPDATE string ,\n" + "ACCT_CODE string ,\n" + "CUST_ID string ,\n" + "CUST_NAME string ,\n" + "CORP_ID string ,\n" + "CORP_CERT_CODE string ,\n" + "CORP_CERT_TYPE string ,\n" + "CUST_MANAGER_JOB_CODE string ,\n" + "TEAM_CODE string ,\n" + "ORG_ID string, \n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )"; 但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来 | | Jason_H | | hyb_he...@163.com | Replied Message | From | Jason_H | | Date | 11/11/2022 14:42 | | To | flink中文邮件组 | | Subject | Re: flinksql join | 我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + "DIM_ACCOUNT_ID string ,\n" + "GMT_CREATE string ,\n" + "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + "GMT_UPDATE string ,\n" + "ACCT_CODE string ,\n" + "CUST_ID string ,\n" + "CUST_NAME string ,\n" + "CORP_ID string ,\n" + "CORP_CERT_CODE string ,\n" + "CORP_CERT_TYPE string ,\n" + "CUST_MANAGER_JOB_CODE string ,\n" + "TEAM_CODE string ,\n" + "ORG_ID string, \n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )"; 但是此方式
Re: flinksql join
我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + "DIM_ACCOUNT_ID string ,\n" + "GMT_CREATE string ,\n" + "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + "GMT_UPDATE string ,\n" + "ACCT_CODE string ,\n" + "CUST_ID string ,\n" + "CUST_NAME string ,\n" + "CORP_ID string ,\n" + "CORP_CERT_CODE string ,\n" + "CORP_CERT_TYPE string ,\n" + "CUST_MANAGER_JOB_CODE string ,\n" + "TEAM_CODE string ,\n" + "ORG_ID string, \n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )"; 但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来 | | Jason_H | | hyb_he...@163.com | Replied Message | From | Zhiwen Sun | | Date | 11/11/2022 14:08 | | To | | | Subject | Re: flinksql join | 用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额 笔数 100 1 我想要的结果: 企业 金额 笔数 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |
Re: flinksql join
我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + "DIM_ACCOUNT_ID string ,\n" + "GMT_CREATE string ,\n" + "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + "GMT_UPDATE string ,\n" + "ACCT_CODE string ,\n" + "CUST_ID string ,\n" + "CUST_NAME string ,\n" + "CORP_ID string ,\n" + "CORP_CERT_CODE string ,\n" + "CORP_CERT_TYPE string ,\n" + "CUST_MANAGER_JOB_CODE string ,\n" + "TEAM_CODE string ,\n" + "ORG_ID string, \n" + "SUPER_ORG_ID string, \n" + "IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )"; 但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来 | | Jason_H | | hyb_he...@163.com | Replied Message | From | Zhiwen Sun | | Date | 11/11/2022 14:08 | | To | | | Subject | Re: flinksql join | 用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额 笔数 100 1 我想要的结果: 企业 金额 笔数 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |
flinksql join
hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 100 1 -> 未匹配 100 1 -> 未匹配 100 1 -> 匹配上 维表 账号 企业 -> 后插入的账号信息 实际输出结果 企业 金额 笔数 100 1 我想要的结果: 企业 金额 笔数 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |
flinksql-redis-connector
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |
回复:使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?
hi,Liting Liu 看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。 可以参考一下: https://blog.csdn.net/sinat_38079265/article/details/108535909 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Liting Liu (litiliu) | | 发送日期 | 2022年10月26日 13:19 | | 收件人 | user-zh | | 主题 | 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题? | hi: 我尝试使用flink-operator 1.2.0 用如下yaml 创建一个任务(该yaml 中并未设置taskmanager.numberOfTaskSlots). 遇到了一个问题。 可以稳定复现。 apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: image: flink:1.15 flinkVersion: v1_15 flinkConfiguration: serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless 但是在生成的configMap (flink-config-basic-example)中, 发现“taskmanager.numberOfTaskSlots: 2” 不太理解,taskmanager.numberOfTaskSlots=2 是怎么被设置进去的(为什么是2?)。 感觉configMap 里不应该有这项配置才对,或者该配置项的值为1。
flinksql-redis插件
hi, 请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。 | | Jason_H | | hyb_he...@163.com |
回复: flink自动重启出错
您好,详细报错如下: Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232) at com.myouchai.mishap.transformation.process.AsyncDecisionUpgrade.open(AsyncDecisionUpgrade.java:61) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:91) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@b3dd31ab) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@35a76e76). at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:211) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:276) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:177) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106) ... 15 more | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | yue ma | | 发送日期 | 2022年8月25日 13:56 | | 收件人 | | | 主题 | Re: flink自动重启出错 | 你好,可以贴下完整的 JM/TM 日志嘛。 感觉不是很符合预期 , 如果是自动failover 的话应该不会发生 StateMig rationException Jason_H 于2022年8月25日周四 09:36写道: 您好,报错如下: cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | yue ma | | 发送日期 | 2022年8月24日 16:48 | | 收件人 | | | 主题 | Re: flink自动重启出错 | 你好 ~可以贴下更详细的报错 Hangxiang Yu 于2022年8月24日周三 13:10写道: 是DS作业吗?可以share下使用state的部分吗? On Sat, Aug 20, 2022 at 3:35 PM Jason_H wrote: 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Michael Ran | | 发送日期 | 2022年8月20日 15:31 | | 收件人 | tsreape...@gmail.com | | 主题 | 回复:flink自动重启出错 | 改过任务吗? | | greemqq...@163.com | | 邮箱:greemqq...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 日期 | 2022年08月19日 11:52 | | 收件人 | flink中文邮件组 | | 抄送至 | | | 主题 | flink自动重启出错 | cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com | -- Best, Hangxiang.
回复: flink自动重启出错
您好,报错如下: cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | yue ma | | 发送日期 | 2022年8月24日 16:48 | | 收件人 | | | 主题 | Re: flink自动重启出错 | 你好 ~可以贴下更详细的报错 Hangxiang Yu 于2022年8月24日周三 13:10写道: 是DS作业吗?可以share下使用state的部分吗? On Sat, Aug 20, 2022 at 3:35 PM Jason_H wrote: 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Michael Ran | | 发送日期 | 2022年8月20日 15:31 | | 收件人 | tsreape...@gmail.com | | 主题 | 回复:flink自动重启出错 | 改过任务吗? | | greemqq...@163.com | | 邮箱:greemqq...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 日期 | 2022年08月19日 11:52 | | 收件人 | flink中文邮件组 | | 抄送至 | | | 主题 | flink自动重启出错 | cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com | -- Best, Hangxiang.
回复: flink自动重启出错
您好,具体设置如下: public static final MapStateDescriptor quantityJudgeStateDescriptor = new MapStateDescriptor<>( "quantityJudgeMapState", String.class, Integer.class); mistakeUpgradeRuleState = getRuntimeContext().getMapState(Descriptors.quantityJudgeStateDescriptor); // 设置状态TTL为 1天 StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build(); Descriptors.quantityJudgeStateDescriptor.enableTimeToLive(stateTtlConfig); | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Hangxiang Yu | | 发送日期 | 2022年8月24日 13:12 | | 收件人 | | | 主题 | Re: flink自动重启出错 | 是DS作业吗?可以share下使用state的部分吗? On Sat, Aug 20, 2022 at 3:35 PM Jason_H wrote: 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Michael Ran | | 发送日期 | 2022年8月20日 15:31 | | 收件人 | tsreape...@gmail.com | | 主题 | 回复:flink自动重启出错 | 改过任务吗? | | greemqq...@163.com | | 邮箱:greemqq...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 日期 | 2022年08月19日 11:52 | | 收件人 | flink中文邮件组 | | 抄送至 | | | 主题 | flink自动重启出错 | cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com | -- Best, Hangxiang.
回复:flink自动重启出错
您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Michael Ran | | 发送日期 | 2022年8月20日 15:31 | | 收件人 | tsreape...@gmail.com | | 主题 | 回复:flink自动重启出错 | 改过任务吗? | | greemqq...@163.com | | 邮箱:greemqq...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 日期 | 2022年08月19日 11:52 | | 收件人 | flink中文邮件组 | | 抄送至 | | | 主题 | flink自动重启出错 | cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com |
回复: flink自动重启出错
您好,我的版本是 1.14.3的,运行的逻辑就是使用状态存放数据,然后设置了ttl,时间是一个小时。新的作业启动后,如果不是基于之前的检查点恢复,那应该不会和之前的作业还有任何关联吧,很奇怪的点就是新的作业,会报这个错误,我在本地复现了一下,是旧的作业不设置ttl,然后新的作业设置ttl,以就作业的ck去恢复,报了这个错误,但是我的是全新的作业启动。 感谢您的回复,谢谢。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | | | 发送日期 | 2022年8月19日 18:59 | | 收件人 | flink中文邮件组 | | 主题 | Re: flink自动重启出错 | 能提供下你用的是什么 Flink 版本,运行的什么样的作业逻辑吗?老版本有发现过因为 state comparator 实现问题的导致的类似问题: https://issues.apache.org/jira/browse/FLINK-18452,1.12 中修复了。 新的作业如果开了 cp,全新启动后因为某些原因挂了也会从 cp 恢复,就能触发此类问题了 Best, Zhanghao Chen From: Jason_H Sent: Friday, August 19, 2022 11:52 To: flink中文邮件组 Subject: flink自动重启出错 cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com |
flink自动重启出错
cause by: java.lang.RuntimeException: Error while getting state org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible with the old state serializer 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法? 强调:作业是新的,没有基于之前的作业的ck进行重启。 | | Jason_H | | hyb_he...@163.com |