回复:控制台打印出流式数据

2023-04-19 Thread Jason_H
这个方法就可以打印在你本地的idea控制台里面,你试一下


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 16:01 |
| 收件人 | user-zh |
| 抄送人 | user-zh |
| 主题 | 回复:控制台打印出流式数据 |
这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jason_H |
| 发送日期 | 2023年4月19日 15:58 |
| 收件人 | flink中文邮件组 |
| 主题 | 回复:控制台打印出流式数据 |
hi,你好
你应该使用 stream.print() 来打印流中的数据 不要system out 输出


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 15:51 |
| 收件人 | user-zh |
| 主题 | 控制台打印出流式数据 |


各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new  
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接  
System.out.println(stream.toString);
但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:控制台打印出流式数据

2023-04-19 Thread Jason_H
hi,你好
你应该使用 stream.print() 来打印流中的数据 不要system out 输出


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 15:51 |
| 收件人 | user-zh |
| 主题 | 控制台打印出流式数据 |


各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new  
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接  
System.out.println(stream.toString);
但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink命令行提交作业读取不到properties配置文件

2023-04-19 Thread Jason_H
hi,大家好
我在使用命令行提交任务时,发现任务刚起来就会报错,根据错误发现没有读去到jar包中resource目录下的properties配置文件,导致在使用redis时,初始化报错
提交命令如下:
flink run -c com.test..etl.OdsChangeApplication 
/opt/dobrain/app/etl/test-etl-0.0.2-SNAPSHOT.jar \
-p 4 \
-job-name test-etl \


此处没有添加redis配置参数,但是配置文件中已经有默认的,提交运行后报错:
java.lang.IllegalArgumentException: template not initialized; call 
afterPropertiesSet() before using it
 at org.springframework.util.Assert.isTrue(Assert.java:121) 
~[spring-core-5.2.14.RELEASE.jar:5.2.14.RELEASE]
 at 
org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:204)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
 at 
org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
 at 
org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
 at 
org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
 at com.test.etl.client.RedisService.getStringValue(RedisService.java:30) 
~[classes/:?]
 at 
com.test.etl.manager.impl.RedisChangeManager.getCustId(RedisChangeManager.java:53)
 ~[classes/:?]
 at 
com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:46)
 ~[classes/:?]
 at 
com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:21)
 ~[classes/:?]
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-streaming-java-1.15.2.jar:1.15.2]
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-runtime-1.15.2.jar:1.15.2]
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-runtime-1.15.2.jar:1.15.2]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-runtime-1.15.2.jar:1.15.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-runtime-1.15.2.jar:1.15.2]
 at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]


我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误


请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢


| |
Jason_H
|
|
hyb_he...@163.com
|

回复: flink k8s 部署启动报错

2023-03-13 Thread Jason_H
您好,
我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


回复: flink k8s 部署启动报错

2023-03-13 Thread Jason_H
您好,
对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年3月14日 10:39 |
| 收件人 |  |
| 主题 | Re: flink k8s 部署启动报错 |
Hi,

看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。
可以参考文档[1],检查相关的 HA 路径,清理下异常数据

另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path

Best,
Weihua


On Tue, Mar 14, 2023 at 9:58 AM Jason_H  wrote:

hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
of globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source) ~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
JobResults of globally-terminated jobs from JobResultStore
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input
at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
line: 1, column: 0]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|


flink k8s 部署启动报错

2023-03-13 Thread Jason_H
hi,大家好
请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of 
globally-terminated jobs from JobResultStore
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve 
JobResults of globally-terminated jobs from JobResultStore
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:192)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
 No content to map due to end-of-input
 at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, 
column: 0]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-1.15.2.jar:1.15.2]
... 4 more


| |
Jason_H
|
|
hyb_he...@163.com
|

regular join每条流单独设置ttl

2023-02-14 Thread Jason_H
大家好,
我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular 
join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2


| |
Jason_H
|
|
hyb_he...@163.com
|

运行中的作业状态清除操作

2023-02-13 Thread Jason_H
遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现, 
flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗?


具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。


在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。
| |
Jason_H
|
|
hyb_he...@163.com
|

回复: flink-gelly官方文档

2022-12-13 Thread Jason_H
好的,谢谢


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | guozhi mang |
| 发送日期 | 2022年12月13日 17:45 |
| 收件人 |  |
| 主题 | Re: flink-gelly官方文档 |
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/gelly/overview/

Jason_H  于2022年12月13日周二 17:39写道:

请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。


| |
Jason_H
|
|
hyb_he...@163.com
|



--
Best regards


flink-gelly官方文档

2022-12-13 Thread Jason_H
请问一下,flink官网对应flink-gelly模块在哪里,请教大佬给个指引,半天找不到,网上搜到的是很老的版本。


| |
Jason_H
|
|
hyb_he...@163.com
|

Re: flinksql join

2022-11-16 Thread Jason_H
hi,你好
这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题


| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | 任召金 |
| Date | 11/15/2022 09:52 |
| To | user-zh |
| Subject | Re: flinksql join |
hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL


--Original--
From: "Jason_H"

Re: flinksql join

2022-11-14 Thread Jason_H
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H"  写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on 
ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode 
= tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flinksql join

2022-11-14 Thread Jason_H
hi,你好
我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。


| |
Jason
|
|
hyb_he...@163.com
|
 Replied Message 
| From | RS |
| Date | 11/15/2022 09:07 |
| To | user-zh@flink.apache.org |
| Subject | Re:flinksql join |
Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H"  写道:


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as tb on 
ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode 
= tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Jason_H |
| Date | 11/11/2022 14:42 |
| To | flink中文邮件组 |
| Subject | Re: flinksql join |
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式

Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"DIM_ACCOUNT_ID  string ,\n" +
"GMT_CREATE  string ,\n" +
"ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"GMT_UPDATE  string ,\n" +
"ACCT_CODE  string ,\n" +
"CUST_ID  string ,\n" +
"CUST_NAME  string ,\n" +
"CORP_ID  string ,\n" +
"CORP_CERT_CODE  string ,\n" +
"CORP_CERT_TYPE  string ,\n" +
"CUST_MANAGER_JOB_CODE  string ,\n" +
"TEAM_CODE  string ,\n" +
"ORG_ID  string, \n" +
"SUPER_ORG_ID  string, \n" +
"IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Zhiwen Sun |
| Date | 11/11/2022 14:08 |
| To |  |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


Re: flinksql join

2022-11-10 Thread Jason_H
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
   "WITH temp AS (\n" +
   "select \n" +
   "  ta.gmtStatistical as gmtStatistical,\n" +
   "  ta.paymentMethod as paymentMethod,\n" +
   "  tb.CORP_ID as outCorpId,\n" +
   "  tc.CORP_ID as inCorpId,\n" +
   "  sum(ta.tradeAmt) as tranAmount,\n" +
   "  sum(ta.tradeCnt) as tranNum \n" +
   "from dws_a2a_trade_year_index ta \n" +
   "left join dob_dim_account as tb on ta.outAcctCode = 
tb.ACCT_CODE \n" +
   "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE 
\n" +
   "group by \n" +
   " ta.gmtStatistical, \n" +
   " ta.paymentMethod, \n" +
   " tb.CORP_ID, \n" +
   " tc.CORP_ID \n" +
   ") \n" +
   "SELECT \n" +
   "   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as 
gmtUpdate, \n" +
   "   gmtStatistical, \n" +
   "   paymentMethod, \n" +
   "   outCorpId, \n" +
   "   inCorpId, \n" +
   "   tranAmount, \n" +
   "   tranNum \n" +
   "FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
   "DIM_ACCOUNT_ID  string ,\n" +
   "GMT_CREATE  string ,\n" +
   "ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
   "GMT_UPDATE  string ,\n" +
   "ACCT_CODE  string ,\n" +
   "CUST_ID  string ,\n" +
   "CUST_NAME  string ,\n" +
   "CORP_ID  string ,\n" +
   "CORP_CERT_CODE  string ,\n" +
   "CORP_CERT_TYPE  string ,\n" +
   "CUST_MANAGER_JOB_CODE  string ,\n" +
   "TEAM_CODE  string ,\n" +
   "ORG_ID  string, \n" +
   "SUPER_ORG_ID  string, \n" +
   "IS_OUTSIDE  BIGINT \n" +
   ") \n" +
   "WITH (\n" +
   "  'connector' = 'jdbc',\n" +
   "  'url' = '***',\n" +
   "  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
   "  'username' = 'root',\n" +
   "  'password' = '123456',\n" +
   //"  'lookup.cache.ttl' = '1s', \n" +
   "  'table-name' = 'dob_dim_account' \n" +
   //"  'lookup.cache.max-rows' = '1000' \n" +
   //"  'lookup.cache.ttl' = '1 minute',\n" +
   //"  'lookup.max-retries' = '3' \n" +
   " )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来




| |
Jason_H
|
|
hyb_he...@163.com
|
 Replied Message 
| From | Zhiwen Sun |
| Date | 11/11/2022 14:08 |
| To |  |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|


flinksql join

2022-11-10 Thread Jason_H


hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
 100 1  -> 未匹配
 100 1  -> 未匹配
 100 1  -> 匹配上

维表 
账号  企业
  
     -> 后插入的账号信息
实际输出结果
企业  金额  笔数
 100   1


我想要的结果:
企业  金额  笔数
 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
   "WITH temp AS (\n" +
   "select \n" +
   "  ta.gmtStatistical as gmtStatistical,\n" +
   "  ta.paymentMethod as paymentMethod,\n" +
   "  tb.CORP_ID as outCorpId,\n" +
   "  tc.CORP_ID as inCorpId,\n" +
   "  sum(ta.tradeAmt) as tranAmount,\n" +
   "  sum(ta.tradeCnt) as tranNum \n" +
   "from dws_a2a_trade_year_index ta \n" +
   "left join dob_dim_account for system_time as of ta.proc as tb 
on ta.outAcctCode = tb.ACCT_CODE \n" +
   "left join dob_dim_account for system_time as of ta.proc as tc 
on ta.inAcctCode = tc.ACCT_CODE \n" +
   "group by \n" +
   " ta.gmtStatistical, \n" +
   " ta.paymentMethod, \n" +
   " tb.CORP_ID, \n" +
   " tc.CORP_ID \n" +
   ") \n" +
   "SELECT \n" +
   "   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as 
gmtUpdate, \n" +
   "   gmtStatistical, \n" +
   "   paymentMethod, \n" +
   "   outCorpId, \n" +
   "   inCorpId, \n" +
   "   tranAmount, \n" +
   "   tranNum \n" +
   "FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|

flinksql-redis-connector

2022-10-27 Thread Jason_H
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。


| |
Jason_H
|
|
hyb_he...@163.com
|

回复:使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题?

2022-10-26 Thread Jason_H
hi,Liting Liu
看你的设置,是将并行度设置为了2,那么作业在运行的时候,所用到的slot为2个,你可以尝试修改并行度,来验证是否这个参数决定了你的taskslot的数量。
可以参考一下:
https://blog.csdn.net/sinat_38079265/article/details/108535909



| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Liting Liu (litiliu) |
| 发送日期 | 2022年10月26日 13:19 |
| 收件人 | user-zh |
| 主题 | 使用flink-operator 配置中的 taskmanager.numOfSlots 取值存在问题? |
hi:
我尝试使用flink-operator 1.2.0 用如下yaml 创建一个任务(该yaml 
中并未设置taskmanager.numberOfTaskSlots).  遇到了一个问题。 可以稳定复现。

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless

但是在生成的configMap (flink-config-basic-example)中, 
发现“taskmanager.numberOfTaskSlots: 2”
不太理解,taskmanager.numberOfTaskSlots=2 是怎么被设置进去的(为什么是2?)。 感觉configMap 
里不应该有这项配置才对,或者该配置项的值为1。






flinksql-redis插件

2022-10-26 Thread Jason_H
hi,
请问一下,flinksql读写redis,官方有没有成熟的插件。如果没有,请教一下大家用的什么插件去读写redis,来做维表join的,谢谢。


| |
Jason_H
|
|
hyb_he...@163.com
|

回复: flink自动重启出错

2022-08-25 Thread Jason_H
您好,详细报错如下:
Caused by: java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
at 
com.myouchai.mishap.transformation.process.AsyncDecisionUpgrade.open(AsyncDecisionUpgrade.java:61)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:91)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, 
the new state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@b3dd31ab) must not be 
incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@35a76e76).
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:211)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:276)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:225)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:177)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:132)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
... 15 more


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | yue ma |
| 发送日期 | 2022年8月25日 13:56 |
| 收件人 |  |
| 主题 | Re: flink自动重启出错 |
你好,可以贴下完整的 JM/TM 日志嘛。 感觉不是很符合预期 , 如果是自动failover 的话应该不会发生 StateMig
rationException

Jason_H  于2022年8月25日周四 09:36写道:

您好,报错如下:
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new
state serializer must not be incompatible with the old state serializer


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | yue ma |
| 发送日期 | 2022年8月24日 16:48 |
| 收件人 |  |
| 主题 | Re: flink自动重启出错 |
你好 ~可以贴下更详细的报错

Hangxiang Yu  于2022年8月24日周三 13:10写道:

是DS作业吗?可以share下使用state的部分吗?

On Sat, Aug 20, 2022 at 3:35 PM Jason_H  wrote:

您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Michael Ran |
| 发送日期 | 2022年8月20日 15:31 |
| 收件人 | tsreape...@gmail.com |
| 主题 | 回复:flink自动重启出错 |
改过任务吗?



| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|




 回复的原邮件 
| 发件人 | Jason_H |
| 日期 | 2022年08月19日 11:52 |
| 收件人 | flink中文邮件组 |
| 抄送至 | |
| 主题 | flink自动重启出错 |
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new
state serializer must not be incompatible with the old state serializer



大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|



--
Best,
Hangxiang.




回复: flink自动重启出错

2022-08-24 Thread Jason_H
您好,报错如下:
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | yue ma |
| 发送日期 | 2022年8月24日 16:48 |
| 收件人 |  |
| 主题 | Re: flink自动重启出错 |
你好 ~可以贴下更详细的报错

Hangxiang Yu  于2022年8月24日周三 13:10写道:

是DS作业吗?可以share下使用state的部分吗?

On Sat, Aug 20, 2022 at 3:35 PM Jason_H  wrote:

您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Michael Ran |
| 发送日期 | 2022年8月20日 15:31 |
| 收件人 | tsreape...@gmail.com |
| 主题 | 回复:flink自动重启出错 |
改过任务吗?



| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|




 回复的原邮件 
| 发件人 | Jason_H |
| 日期 | 2022年08月19日 11:52 |
| 收件人 | flink中文邮件组 |
| 抄送至 | |
| 主题 | flink自动重启出错 |
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new
state serializer must not be incompatible with the old state serializer


大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|



--
Best,
Hangxiang.



回复: flink自动重启出错

2022-08-24 Thread Jason_H
您好,具体设置如下:
public static final MapStateDescriptor quantityJudgeStateDescriptor = new 
MapStateDescriptor<>(
   "quantityJudgeMapState",
   String.class,
   Integer.class);

mistakeUpgradeRuleState = 
getRuntimeContext().getMapState(Descriptors.quantityJudgeStateDescriptor);
// 设置状态TTL为 1天
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
Descriptors.quantityJudgeStateDescriptor.enableTimeToLive(stateTtlConfig);


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Hangxiang Yu |
| 发送日期 | 2022年8月24日 13:12 |
| 收件人 |  |
| 主题 | Re: flink自动重启出错 |
是DS作业吗?可以share下使用state的部分吗?

On Sat, Aug 20, 2022 at 3:35 PM Jason_H  wrote:

您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Michael Ran |
| 发送日期 | 2022年8月20日 15:31 |
| 收件人 | tsreape...@gmail.com |
| 主题 | 回复:flink自动重启出错 |
改过任务吗?



| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|




 回复的原邮件 
| 发件人 | Jason_H |
| 日期 | 2022年08月19日 11:52 |
| 收件人 | flink中文邮件组 |
| 抄送至 | |
| 主题 | flink自动重启出错 |
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new
state serializer must not be incompatible with the old state serializer

大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|



--
Best,
Hangxiang.


回复:flink自动重启出错

2022-08-20 Thread Jason_H
您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | Michael Ran |
| 发送日期 | 2022年8月20日 15:31 |
| 收件人 | tsreape...@gmail.com |
| 主题 | 回复:flink自动重启出错 |
改过任务吗?



| |
greemqq...@163.com
|
|
邮箱:greemqq...@163.com
|




 回复的原邮件 
| 发件人 | Jason_H |
| 日期 | 2022年08月19日 11:52 |
| 收件人 | flink中文邮件组 |
| 抄送至 | |
| 主题 | flink自动重启出错 |
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer
大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|

回复: flink自动重启出错

2022-08-19 Thread Jason_H
您好,我的版本是 
1.14.3的,运行的逻辑就是使用状态存放数据,然后设置了ttl,时间是一个小时。新的作业启动后,如果不是基于之前的检查点恢复,那应该不会和之前的作业还有任何关联吧,很奇怪的点就是新的作业,会报这个错误,我在本地复现了一下,是旧的作业不设置ttl,然后新的作业设置ttl,以就作业的ck去恢复,报了这个错误,但是我的是全新的作业启动。
感谢您的回复,谢谢。


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 |  |
| 发送日期 | 2022年8月19日 18:59 |
| 收件人 | flink中文邮件组 |
| 主题 | Re: flink自动重启出错 |
能提供下你用的是什么 Flink 版本,运行的什么样的作业逻辑吗?老版本有发现过因为 state comparator 实现问题的导致的类似问题: 
https://issues.apache.org/jira/browse/FLINK-18452,1.12 中修复了。

新的作业如果开了 cp,全新启动后因为某些原因挂了也会从 cp 恢复,就能触发此类问题了

Best,
Zhanghao Chen

From: Jason_H 
Sent: Friday, August 19, 2022 11:52
To: flink中文邮件组 
Subject: flink自动重启出错

cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer
大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|


flink自动重启出错

2022-08-18 Thread Jason_H
cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer
大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|