Re: 关于keyby()如何保留原并行度的问题

2022-09-08 文章 r pp
keyby() 时,还没有选好分组呢,这个只是告诉flink 要根据什么分组,所有也没有Runtime...

junjie.m...@goupwith.com  于2022年9月8日周四 14:17写道:

> hi:
> flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
>
> 在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。
>
>

-- 
Best,
  pp


Re: 关于flink table store的疑问

2022-09-08 文章 r pp
应该是为了 流批一体 。不丢数据

Kyle Zhang  于2022年9月8日周四 08:37写道:

> Hi all,
>   看table
> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>
> Best.
>


-- 
Best,
  pp


Re: Re: flink jdbc source oom

2022-04-02 文章 r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: Re: flink jdbc source oom

2022-04-02 文章 r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: flink jdbc source oom

2022-03-31 文章 r pp
hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了


Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

2022-03-27 文章 r pp
purgingTrigger 是触发后清除windowstate,3min的触发,windowstate 会保存3min  的数据,这个看gc也可以。

yj h  于 2022年3月28日周一 上午12:08写道:

> hi,thank you, 是你说的TM内存不够的问题
>  我9点左右分下了下gc的日志,Gcviewr看显示fullgc后最大剩下300M左右,我看帖子提到的经验就把TM
> heap提升到了1GB,测试过,在总共500M的数据下没有发生OOM 了
>
> 另外咨询一下
> 我要怎么测试3min来了多少数据?
> 调整TM应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了
>
> best regards!
>
> r pp  于2022年3月27日周日 23:46写道:
>
> > hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> > 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
> >
>


Re: 计算UV时使用了PurgingTrigger仍旧发生taskManger OOM的问题

2022-03-27 文章 r pp
hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
内总数据量是多少?内存大概多大?可以试着调整TM 的内存量


Re: 在本地环境IDEA远程调试Flink报错

2021-07-09 文章 r pp
先编译正确后,再debug

tangzhi8...@gmail.com  于2021年6月28日周一 下午3:02写道:

> 目的:想在本地环境IDEA远程调试Flink
> 步骤:
> 1.这是Debug的配置项
> 2.报错堆栈信息:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'Streaming WordCount'.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:374)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:120)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:817)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:249)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1148)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1148)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'Streaming WordCount'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1984)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1845)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:357)
> ... 8 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:166)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83)
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:146)
> ... 9 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: Response was
> neither of the expected type([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobDetailsInfo]) nor an error.

Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-14 文章 r pp
如果你找到正确的JobManager 和TaskManager 的启动命令,如: Java -Dlog.file =...
 找到log4j中 类似:${sys:log.file} ,  ${env:MAX_LOG_FILE_NUMBER:-10}  ,一个是系统参数
-Dlog.file ,一个环境变量。

方法有:

1.直接在 运行命令中 加入新参数,再从 log4j 读取相应参数即可,相对直接一些,有可能需要改动源码

2.从日志名入手,你看无论TM 还是 JM ,看日志就区别好了,只要获取
日志名,再正则(在log4j配置中如何正则呢?)获取你想要的日志关键字,取为Kafka的topic name,基本上就可解了


王刚  于2021年6月14日周一 下午3:48写道:

> 用这两个配置 定制不同的启动参数传到log4j配置文件是不是就可以
> env.java.opts.jobmanager
> env.java.opts.taskmanager
>
>  原始邮件
> 发件人: DanielGu<610493...@qq.com>
> 收件人: user-zh
> 发送时间: 2021年6月11日(周五) 18:07
> 主题: Re: Flink1.12
> 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?
>
>
> 同求一波配置
> 谢谢大佬们
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 
Best,
  pp


Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-07 文章 r pp
可能和配置文件有关吧,我用的都是默认配置

smq <374060...@qq.com> 于2021年6月8日周二 上午7:10写道:

>
> 图里边可以看到,这个/jobmanager.log /jobmanager.out 
> /jobmanager.err中的LOG_DIR应该是一样的,也就是说这三个日志应该是放在一个目录下。至于什么原因少了这个. log 
> 确实是不清楚。
>
>
>
> -- 原始邮件 --
> *发件人:* r pp 
> *发送时间:* 2021年6月7日 23:56
> *收件人:* smq <374060...@qq.com>, user-zh 
> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
> flink on yarn 模式的 Per-Job  Cluster mode  在启动container ,产生jobmanager.log 前
> 都不会运行 Jar包的东西,所以,两个Jar
> ,一个正常,一个不正常,只能说明,不正常的要么没有读取到日志配置文件,不产生日志,要么就是有魔法把它原本生产的日志挪走或者
> 删除,(这是我的个人理解,如果,什么都一样,同样的提交命令、同样的提交环境,就是一个有log,一个没有log,差别就是Jar包不一样,真的无法分析了,没有集群,也干不了远程调试)
> 但这些 都已经和flink 是否有代码缺陷 没有关系了。附上 yarn 启动container(JobManager)的 不全
> 提交命令,日志路径部分是Yarn 提供的,图中可以看出,这里运行的是
> org.apache.flink.yarn.entrypoint.YarnJobclusterEntrypoint ,
> 和Jar没有啥关系,这启动JobManager 前,Jar包 只是一个String、一个运行路径、一个待运行的classJar包。
>
>
> smq <374060...@qq.com> 于2021年6月7日周一 下午5:48写道:
>
>> 这个配置读到了,可以在webui 里看到这个配置internal .yarn.log-configure-file
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* r pp 
>> *发送时间:* 2021年6月7日 17:24
>> *收件人:* smq <374060...@qq.com>, user-zh 
>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>
>> 好的,如果log 文件就没有产生,是真的没有产生,还有一个flink 的原因是,没有读取到日志的配置文件。
>>
>>
>>
>>
>> smq <374060...@qq.com> 于2021年6月7日周一 下午3:31写道:
>>
>>>
>>> 判断出确失这个.log是因为从yarn 上的logs中就看不到jobmanager .log 
>>> 这个文件,正常的是会有jobmanager.log文件,然后根据这个application id 去搜索containnerid 
>>> 也是找不到这个jobmanager .log 。正常的程序这三个文件是在一个containerid 
>>> 目录下在一起的,不正常的就是同一个containerID目录下只有这两个文件,这是对此很多次之后发现的。还有就是我说的完全一样的两个程序是打成两个jar
>>>  
>>> 包的,这两个程序是在不同模块下,为了找到原因,已经改成了完全一样的程序,但是结果还是之前正常的每次运行都正常,不正常的这个改成跟正常的程序一摸一样还是运行不正常。经过这一周多的测试发现,这个结果不是随机出现的。所以就感觉很奇怪。
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* r pp 
>>> *发送时间:* 2021年6月7日 13:55
>>> *收件人:* smq <374060...@qq.com>
>>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>>
>>> 补充下,我刚刚的疑惑, 你说的缺失 ,是没有log 文件,还是说在同一个文件目录下没有看到 log,只看到了 out err,其实,不确定log
>>> 是不是在别的地方?
>>>
>>> r pp  于2021年6月7日周一 下午1:42写道:
>>>
>>>> 你说的缺失是指?是指在本地机器上的文件就缺失log 么?如果,本地机器上文件都有(3个,全的)。为什么 flink-web ui 只显示两个呢?
>>>> 如果,本地机器上 只有两个(缺失了log),那缺失的log 文件路径有是哪个呢?
>>>>
>>>> 同一个Jar 运行,日志一会ok ,一会儿 不ok, 是不是 有别的程序 在挪动文件路径呢?
>>>>
>>>> smq <374060...@qq.com> 于2021年6月7日周一 上午10:49写道:
>>>>
>>>>>
>>>>> 你好,正常情况下flink on yarn 的container 日志中应该有.err .out .log 
>>>>> 这三个日志,你贴的这个-slog.file 应该就是jobmanager .log 的位置,但是我这个程序是没有.log日志这个文件的,只有err 
>>>>> 和out ,我试着在程序containner 里查看日志,发现能正常显示日志的container 
>>>>> 中有以上三种日志,不能正常显示日志的程序中,只有两个日志,没有.log日志文件。所以也看不到运行时的一些info信息。我觉得这个是yarn创建的日志文件,目前还没找到原因。我尝试过运行两个一模一样的程序,打成两个jar
>>>>>  包,但是其中一个正常,另一个不正常,这种情况不影响程序运行,只是缺一些日志。
>>>>>
>>>>>
>>>>>
>>>>> -- 原始邮件 ------
>>>>> *发件人:* r pp 
>>>>> *发送时间:* 2021年6月4日 18:17
>>>>> *收件人:* user-zh , smq <374060...@qq.com>
>>>>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>>>>
>>>>> 上封,复制错了,更正下,看时间顺序。 日志启动
>>>>> 2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
>>>>> ClusterEntrypoint [] - -Dlog
>>>>>
>>>>> .file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
>>>>> cluster 启动
>>>>> 2021-06-04 17:38:15,425 INFO org.apache.flink.runtime.entrypoint.
>>>>> ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
>>>>> obtainer: hdfs
>>>>>
>>>>> r pp  于2021年6月4日周五 下午6:11写道:
>>>>>
>>>>> > 嗨~
>>>>> >  我这边是  per-job on yarn 的mode
>>>>> >
>>>>> >  我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
>>>>> >  然后 形成config ,再启动 cluster。
>>>>> >
>>>>> > 而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。
>>>>> >
>>>>> > 所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示
>>>>> > ,但是改变路径,flink-web 就无法显示了
>>>>> >
>>>>> > 但是具体的差异可能无法细化了,所提供的信息太少
>>>>> > env 环境信息
>>>>> > 2021-06-04 17:38:15,417 INFO org.apache.flink.run

Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-07 文章 r pp
flink on yarn 模式的 Per-Job  Cluster mode  在启动container ,产生jobmanager.log 前
都不会运行 Jar包的东西,所以,两个Jar
,一个正常,一个不正常,只能说明,不正常的要么没有读取到日志配置文件,不产生日志,要么就是有魔法把它原本生产的日志挪走或者
删除,(这是我的个人理解,如果,什么都一样,同样的提交命令、同样的提交环境,就是一个有log,一个没有log,差别就是Jar包不一样,真的无法分析了,没有集群,也干不了远程调试)
但这些 都已经和flink 是否有代码缺陷 没有关系了。附上 yarn 启动container(JobManager)的 不全
提交命令,日志路径部分是Yarn 提供的,图中可以看出,这里运行的是
org.apache.flink.yarn.entrypoint.YarnJobclusterEntrypoint ,
和Jar没有啥关系,这启动JobManager 前,Jar包 只是一个String、一个运行路径、一个待运行的classJar包。
[image: image.png]

smq <374060...@qq.com> 于2021年6月7日周一 下午5:48写道:

> 这个配置读到了,可以在webui 里看到这个配置internal .yarn.log-configure-file
>
>
>
> -- 原始邮件 ----------
> *发件人:* r pp 
> *发送时间:* 2021年6月7日 17:24
> *收件人:* smq <374060...@qq.com>, user-zh 
> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
> 好的,如果log 文件就没有产生,是真的没有产生,还有一个flink 的原因是,没有读取到日志的配置文件。
>
>
>
>
> smq <374060...@qq.com> 于2021年6月7日周一 下午3:31写道:
>
>>
>> 判断出确失这个.log是因为从yarn 上的logs中就看不到jobmanager .log 
>> 这个文件,正常的是会有jobmanager.log文件,然后根据这个application id 去搜索containnerid 
>> 也是找不到这个jobmanager .log 。正常的程序这三个文件是在一个containerid 
>> 目录下在一起的,不正常的就是同一个containerID目录下只有这两个文件,这是对此很多次之后发现的。还有就是我说的完全一样的两个程序是打成两个jar 
>> 包的,这两个程序是在不同模块下,为了找到原因,已经改成了完全一样的程序,但是结果还是之前正常的每次运行都正常,不正常的这个改成跟正常的程序一摸一样还是运行不正常。经过这一周多的测试发现,这个结果不是随机出现的。所以就感觉很奇怪。
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* r pp 
>> *发送时间:* 2021年6月7日 13:55
>> *收件人:* smq <374060...@qq.com>
>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>
>> 补充下,我刚刚的疑惑, 你说的缺失 ,是没有log 文件,还是说在同一个文件目录下没有看到 log,只看到了 out err,其实,不确定log
>> 是不是在别的地方?
>>
>> r pp  于2021年6月7日周一 下午1:42写道:
>>
>>> 你说的缺失是指?是指在本地机器上的文件就缺失log 么?如果,本地机器上文件都有(3个,全的)。为什么 flink-web ui 只显示两个呢?
>>> 如果,本地机器上 只有两个(缺失了log),那缺失的log 文件路径有是哪个呢?
>>>
>>> 同一个Jar 运行,日志一会ok ,一会儿 不ok, 是不是 有别的程序 在挪动文件路径呢?
>>>
>>> smq <374060...@qq.com> 于2021年6月7日周一 上午10:49写道:
>>>
>>>>
>>>> 你好,正常情况下flink on yarn 的container 日志中应该有.err .out .log 
>>>> 这三个日志,你贴的这个-slog.file 应该就是jobmanager .log 的位置,但是我这个程序是没有.log日志这个文件的,只有err 
>>>> 和out ,我试着在程序containner 里查看日志,发现能正常显示日志的container 
>>>> 中有以上三种日志,不能正常显示日志的程序中,只有两个日志,没有.log日志文件。所以也看不到运行时的一些info信息。我觉得这个是yarn创建的日志文件,目前还没找到原因。我尝试过运行两个一模一样的程序,打成两个jar
>>>>  包,但是其中一个正常,另一个不正常,这种情况不影响程序运行,只是缺一些日志。
>>>>
>>>>
>>>>
>>>> -- 原始邮件 --
>>>> *发件人:* r pp 
>>>> *发送时间:* 2021年6月4日 18:17
>>>> *收件人:* user-zh , smq <374060...@qq.com>
>>>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>>>
>>>> 上封,复制错了,更正下,看时间顺序。 日志启动
>>>> 2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
>>>> ClusterEntrypoint [] - -Dlog
>>>>
>>>> .file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
>>>> cluster 启动
>>>> 2021-06-04 17:38:15,425 INFO org.apache.flink.runtime.entrypoint.
>>>> ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
>>>> obtainer: hdfs
>>>>
>>>> r pp  于2021年6月4日周五 下午6:11写道:
>>>>
>>>> > 嗨~
>>>> >  我这边是  per-job on yarn 的mode
>>>> >
>>>> >  我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
>>>> >  然后 形成config ,再启动 cluster。
>>>> >
>>>> > 而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。
>>>> >
>>>> > 所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示
>>>> > ,但是改变路径,flink-web 就无法显示了
>>>> >
>>>> > 但是具体的差异可能无法细化了,所提供的信息太少
>>>> > env 环境信息
>>>> > 2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
>>>> > ClusterEntrypoint [] - -Dlog
>>>> >
>>>> .file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
>>>> > cluster 启动信息
>>>> > 2021-06-04 16:47:01,429 INFO org.apache.flink.runtime.entrypoint.
>>>> > ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client
>>>> user
>>>> > obtainer: hdfs
>>>> >
>>>> > zilong xiao  于2021年6月3日周四 下午2:17写道:
>>>> >
>>>> >> 1.10默认用的log4j1,1.12用log4j2
>>>> >>
>>>> >> smq <374060...@qq.com> 于2021年6月2日周三 下午3:26写道:
>>>> >>
>>>> >> >
>>>> >> >
>>>

Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-07 文章 r pp
好的,如果log 文件就没有产生,是真的没有产生,还有一个flink 的原因是,没有读取到日志的配置文件。
[image: image.png]



smq <374060...@qq.com> 于2021年6月7日周一 下午3:31写道:

>
> 判断出确失这个.log是因为从yarn 上的logs中就看不到jobmanager .log 
> 这个文件,正常的是会有jobmanager.log文件,然后根据这个application id 去搜索containnerid 
> 也是找不到这个jobmanager .log 。正常的程序这三个文件是在一个containerid 
> 目录下在一起的,不正常的就是同一个containerID目录下只有这两个文件,这是对此很多次之后发现的。还有就是我说的完全一样的两个程序是打成两个jar 
> 包的,这两个程序是在不同模块下,为了找到原因,已经改成了完全一样的程序,但是结果还是之前正常的每次运行都正常,不正常的这个改成跟正常的程序一摸一样还是运行不正常。经过这一周多的测试发现,这个结果不是随机出现的。所以就感觉很奇怪。
>
>
>
> -- 原始邮件 --
> *发件人:* r pp 
> *发送时间:* 2021年6月7日 13:55
> *收件人:* smq <374060...@qq.com>
> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
> 补充下,我刚刚的疑惑, 你说的缺失 ,是没有log 文件,还是说在同一个文件目录下没有看到 log,只看到了 out err,其实,不确定log
> 是不是在别的地方?
>
> r pp  于2021年6月7日周一 下午1:42写道:
>
>> 你说的缺失是指?是指在本地机器上的文件就缺失log 么?如果,本地机器上文件都有(3个,全的)。为什么 flink-web ui 只显示两个呢?
>> 如果,本地机器上 只有两个(缺失了log),那缺失的log 文件路径有是哪个呢?
>>
>> 同一个Jar 运行,日志一会ok ,一会儿 不ok, 是不是 有别的程序 在挪动文件路径呢?
>>
>> smq <374060...@qq.com> 于2021年6月7日周一 上午10:49写道:
>>
>>>
>>> 你好,正常情况下flink on yarn 的container 日志中应该有.err .out .log 这三个日志,你贴的这个-slog.file 
>>> 应该就是jobmanager .log 的位置,但是我这个程序是没有.log日志这个文件的,只有err 和out ,我试着在程序containner 
>>> 里查看日志,发现能正常显示日志的container 
>>> 中有以上三种日志,不能正常显示日志的程序中,只有两个日志,没有.log日志文件。所以也看不到运行时的一些info信息。我觉得这个是yarn创建的日志文件,目前还没找到原因。我尝试过运行两个一模一样的程序,打成两个jar
>>>  包,但是其中一个正常,另一个不正常,这种情况不影响程序运行,只是缺一些日志。
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* r pp 
>>> *发送时间:* 2021年6月4日 18:17
>>> *收件人:* user-zh , smq <374060...@qq.com>
>>> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>>
>>> 上封,复制错了,更正下,看时间顺序。 日志启动
>>> 2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
>>> ClusterEntrypoint [] - -Dlog
>>>
>>> .file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
>>> cluster 启动
>>> 2021-06-04 17:38:15,425 INFO org.apache.flink.runtime.entrypoint.
>>> ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
>>> obtainer: hdfs
>>>
>>> r pp  于2021年6月4日周五 下午6:11写道:
>>>
>>> > 嗨~
>>> >  我这边是  per-job on yarn 的mode
>>> >
>>> >  我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
>>> >  然后 形成config ,再启动 cluster。
>>> >
>>> > 而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。
>>> >
>>> > 所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示
>>> > ,但是改变路径,flink-web 就无法显示了
>>> >
>>> > 但是具体的差异可能无法细化了,所提供的信息太少
>>> > env 环境信息
>>> > 2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
>>> > ClusterEntrypoint [] - -Dlog
>>> >
>>> .file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
>>> > cluster 启动信息
>>> > 2021-06-04 16:47:01,429 INFO org.apache.flink.runtime.entrypoint.
>>> > ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
>>> > obtainer: hdfs
>>> >
>>> > zilong xiao  于2021年6月3日周四 下午2:17写道:
>>> >
>>> >> 1.10默认用的log4j1,1.12用log4j2
>>> >>
>>> >> smq <374060...@qq.com> 于2021年6月2日周三 下午3:26写道:
>>> >>
>>> >> >
>>> >> >
>>> >>
>>> 你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > -- 原始邮件 --
>>> >> > 发件人: r pp >> >> > 发送时间: 2021年6月2日 15:08
>>> >> > 收件人: user-zh >> >> > 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>> >> >
>>> >> >
>>> >> >
>>> >> > 嗨~ 你们有没有改日志文件的名字
>>> >> >
>>> >> > smq <374060...@qq.com 于2021年6月2日周三 下午12:24写道:
>>> >> >
>>> >> >  你这个解决了吗,我也遇到了同样的问题
>>> >> > 
>>> >> > 
>>> >> > 
>>> >> > 
>>> >> > 
>>> >> >  -- 原始邮件 --
>>> >> >  发件人: todd >> >> >  发送时间: 2021年4月14日 19:11
>>> >> >  收件人: user-zh >> >> >  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>>> >> > 
>>> >> > 
>>> >> > 
>>> >> >  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
>>> >> > 
>>> >> > 
>>> >> > 
>>> >> >  --
>>> >> >  Sent from: http://apache-flink.147419.n8.nabble.com/
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > Best,
>>> >> >  pp
>>> >>
>>> >
>>> >
>>> > --
>>> > Best,
>>> >   pp
>>> >
>>>
>>>
>>> --
>>> Best,
>>>   pp
>>>
>>
>>
>> --
>> Best,
>>   pp
>>
>
>
> --
> Best,
>   pp
>


-- 
Best,
  pp


Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-04 文章 r pp
嗨~
 我这边是  per-job on yarn 的mode

 我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
 然后 形成config ,再启动 cluster。

而日志路径 是yarn 的配置模式取好的,之后 flink 的获取这个信息,用于web 展示。

所以,你可能需要定位 你的日志文件路径 是否有改变,我做test,改变日志名,flink-web 还是可以正常显示 ,但是改变路径,flink-web
就无法显示了

但是具体的差异可能无法细化了,所提供的信息太少
env 环境信息
2021-06-04 17:38:15,417 INFO org.apache.flink.runtime.entrypoint.
ClusterEntrypoint [] - -Dlog
.file=/yarn/container-logs/application_1622784975053_0013/container_1622784975053_0013_01_01/jobmanager.log
cluster 启动信息
2021-06-04 16:47:01,429 INFO org.apache.flink.runtime.entrypoint.
ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
obtainer: hdfs

zilong xiao  于2021年6月3日周四 下午2:17写道:

> 1.10默认用的log4j1,1.12用log4j2
>
> smq <374060...@qq.com> 于2021年6月2日周三 下午3:26写道:
>
> >
> >
> 你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: r pp  > 发送时间: 2021年6月2日 15:08
> > 收件人: user-zh  > 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
> >
> >
> >
> > 嗨~ 你们有没有改日志文件的名字
> >
> > smq <374060...@qq.com 于2021年6月2日周三 下午12:24写道:
> >
> >  你这个解决了吗,我也遇到了同样的问题
> > 
> > 
> > 
> > 
> > 
> >  -- 原始邮件 --
> >  发件人: todd  >  发送时间: 2021年4月14日 19:11
> >  收件人: user-zh  >  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
> > 
> > 
> > 
> >  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
> > 
> > 
> > 
> >  --
> >  Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> > Best,
> >  pp
>


-- 
Best,
  pp


Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-02 文章 r pp
嗨~  你们有没有改日志文件的名字

smq <374060...@qq.com> 于2021年6月2日周三 下午12:24写道:

> 你这个解决了吗,我也遇到了同样的问题
>
>
>
>
>
> -- 原始邮件 --
> 发件人: todd  发送时间: 2021年4月14日 19:11
> 收件人: user-zh  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
>
>
>  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


Re: kafka exactly-once语义下,从svaepoint恢复报错

2021-06-01 文章 r pp
 'properties.transaction.timeout.ms' = '3'  配置的太短了,30s
transactionalId   就过期了。 估计 都来不去启动吧
 官网的原文
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).
Having this in mind, please configure your transaction timeout
appropriately to your expected down times.

周瑞  于2021年6月1日周二 下午3:45写道:

>
> 您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
> //todo 通过配置传进来
> env.setParallelism(1);
> env.enableCheckpointing(60L, CheckpointingMode.EXACTLY_ONCE);
>
> // checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
> env.getCheckpointConfig()
>
> .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setCheckpointTimeout(6);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
>
> //TODO 生产中必须使用 HDFS
> env.setStateBackend(new FsStateBackend("hdfs://
> 10.10.98.226:8020/tmp/checkpoint66"));
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> public static final  String TABLE_NAME = "KafkaTable";
> public static final  String COLUMN_NAME = "source_value";
>
> public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' = '3',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
> an operation with an old epoch. Either there is a newer producer with the
> same transactionalId,
>  or the producer's transaction has been expired by the broker. while
> recovering transaction KafkaTransactionState [transactionalId=Source:
> TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) - Sink:
> Sink(table=[default_catalog.default_database.KafkaTable],
> fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009,
> epoch=216]. Presumably this transaction has been already committed before



-- 
Best,
  pp


Re: Flink是否支持自定义的限流功能

2021-06-01 文章 r pp
flink 的反压机制 不就是在限流么?

suisuimu <726400...@qq.com> 于2021年6月1日周二 下午5:37写道:

> Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
> 例如根据消息中的某个字段的名称,设置流控规则。
> 请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


Re: 不同的程序在同一时间段报同一个异常

2021-06-01 文章 r pp
你的网络环境是怎么样? 是在docker 上跑么?还是怎么?
从报错上看,netty 无法解码导致的,但是为什么会出现这样的现象?
或许 你可以把问题贴的在详细一点

5599 <673313...@qq.com> 于2021年6月1日周二 下午2:32写道:

> 退订
>
>
>
>
> --原始邮件----------
> 发件人: "r pp" 发送时间: 2021年6月1日(星期二) 下午2:07
> 收件人: "user-zh" 主题: Re: 不同的程序在同一时间段报同一个异常
>
>
>
> 你的程序有挂掉么?
>
> mq sun 
>  大家好:
>  最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
>  ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error
> while
>  excuting Blob connection
>  .
>  .
>  .
> 
> 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
>  :Adjusted frame length exceeds 10485760: 1347375960 -discarded
> 
> 
> 
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
> 
>
>
> --
> Best,
>  pp



-- 
Best,
  pp


Re: 不同的程序在同一时间段报同一个异常

2021-06-01 文章 r pp
你的程序有挂掉么?

mq sun  于2021年5月31日周一 下午7:23写道:

> 大家好:
>   最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> ERROR org.apache.flink.runtime.blob.BlobServerConnection  -Error while
> excuting Blob connection
> .
> .
> .
>
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> :Adjusted frame length exceeds 10485760: 1347375960 -discarded
>
>
> 上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因
>


-- 
Best,
  pp


Re: avro.ComplexPayloadAvro

2021-05-26 文章 r pp
谢谢,好奇为什么要这么做,动态编译么?

Qishang  于2021年5月26日周三 下午1:57写道:

> Hi.
>
> 会生成 `${project.basedir}/target/generated-sources/`
>
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97
>
> r pp  于2021年5月25日周二 上午9:58写道:
>
> > 各位好,请问下,
> >
> >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> >
> > 在该类下的
> >
> >
> >
> /flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> > 下面两个类,在代码哪里?
> > import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
> > import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
> > --
> > Best,
> >   pp
> >
>


-- 
Best,
  pp


avro.ComplexPayloadAvro

2021-05-24 文章 r pp
各位好,请问下,
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java

在该类下的

/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
下面两个类,在代码哪里?
import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
-- 
Best,
  pp


Re: flink-cdc报错

2021-01-13 文章 r pp
hi~ io.debezium 包版本 是你自己配置的? 还是 官方的? 尝试的解决下,所以问的


guoxb__...@sina.com  于2021年1月13日周三 下午4:32写道:

> HI:
> 大家好,我现在遇到了一个问题,flink在通过cdc的方式读取binlog的方式进行读取日志的时候报错,具体报错如下:
>
> 
>   2021-01-13 15:45:21,920 INFO
> org.apache.kafka.connect.runtime.WorkerConfig [] - Worker configuration
> property 'internal.key.converter' is deprecated and may be removed in an
> upcoming release. The specified value
> 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this
> property can be safely removed from the worker configuration.
> 2021-01-13 15:45:21,920 INFO org.apache.kafka.connect.runtime.WorkerConfig
> [] - Worker configuration property 'internal.value.converter' is deprecated
> and may be removed in an upcoming release. The specified value
> 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this
> property can be safely removed from the worker configuration.
> 2021-01-13 15:45:28,936 ERROR io.debezium.connector.mysql.SnapshotReader
> [] - Failed due to error: Aborting snapshot due to error when last running
> 'UNLOCK TABLES': io.debezium.ddl.parser.mysql.generated.MySqlParser and
> io.debezium.ddl.parser.mysql.generated.MySqlParser$StringDataTypeContext
> disagree on InnerClasses attribute
> org.apache.kafka.connect.errors.ConnectException:
> io.debezium.ddl.parser.mysql.generated.MySqlParser and
> io.debezium.ddl.parser.mysql.generated.MySqlParser$StringDataTypeContext
> disagree on InnerClasses attribute
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[yd-mysql-mysql.jar:?]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[yd-mysql-mysql.jar:?]
> at
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
> ~[yd-mysql-mysql.jar:?]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_192]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_192]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
>
>
> 
>
> 不知有没有热心的朋友遇到类似的问题的,希望能够获取到一点建议,非常感谢
>
>
> guoxb__...@sina.com
>


-- 
Best,
  pp


Re: Flink1.12触发保存点时失败

2021-01-13 文章 r pp
hi~ Java 语法不支持,Long 可以设置

赵一旦  于2021年1月7日周四 下午8:13写道:

> 报错信息如下:
> java.lang.IllegalArgumentException: Can not set long field
> com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
> UnsafeFieldAccessorImpl.java:167)
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
> UnsafeFieldAccessorImpl.java:171)
> at sun.reflect.UnsafeLongFieldAccessorImpl.set(
> UnsafeLongFieldAccessorImpl.java:80)
> at java.lang.reflect.Field.set(Field.java:764)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:409)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 根据堆栈找到报错位置代码为:
>
> try {
>for (int i = 0; i < numFields; i++) {
>   boolean isNull = source.readBoolean();
>
>   if (fields[i] != null) {
>  if (isNull) {
> fields[i].set(target, null); //
> 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
>  } else {
> Object field = fieldSerializers[i].deserialize(source);
> fields[i].set(target, field);
>  }
>   } else if (!isNull) {
>  // read and dump a pre-existing field value
>  fieldSerializers[i].deserialize(source);
>   }
>}
> } catch (IllegalAccessException e) {
>throw new RuntimeException("Error during POJO copy, this should not
> happen since we check the fields before.", e);
> }
>


-- 
Best,
  pp


Re: flink-kafka-sink

2021-01-12 文章 r pp
hi,没有效果 具体是啥?

cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道:

>  我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
> 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
> 我设置了事务id,隔离级别,client
> id,enable.idempotence,max.in.flight.requests.per.connection,retries
> 但是没有效果。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


Re: Flink TaskManager失败的日志关键词

2020-12-25 文章 r pp
  嗨~ 从flink 的启动 sh 文件里面可以看到,启动java 虚拟机的时候,就设置好 日志文件名了。改了名字,这次的JOB

https://github.com/apache/flink/pull/11839/files

FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-
${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=fil 
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"  

在启动一个jvm 时,日志配置信息已经写好了
eg:
java  -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456
 -Dlog.file=/

root/flink-1.12.0/log/flink-root-standalonesession-0-iZ0jli08ce7m36qzwgalk4Z.log
。。。

zhuxiaoshang  于2020年12月25日周五 下午4:53写道:

> Hi,
> 一般搜索Exception、Error、Fail之类的吧,如果是TM因为内存超用被kill的话 可以搜索container、kill之类的关键字
>
> > 2020年12月25日 下午1:43,赵一旦  写道:
> >
> > 如题,有人知道关键词吗,每次失败日志太多哦。
> > 显示各种task的cancel等。
> > 最后突然就失败了。。。
> >
> > 目前感觉经常是因为cancel(180s)。导致Task did not exit gracefully within 180 +
> seconds。
> >
> >
> > 此外,大家生产中会修改日志格式和日志文件吗。我调整了之后WEB-UI上那个日志从来没能看过。现在虽然有个日志list,但点了也没效果。
> >
> > 我调整了日志文件名。
>
>


Re: Flink catalog+hive问题

2020-12-23 文章 r pp
gmail  可能有些不兼容,看不到截图

19916726683 <19916726...@163.com> 于2020年12月24日周四 上午10:51写道:

> hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。
>
>  Original Message
> *Sender:* Rui Li
> *Recipient:* user-zh
> *Date:* Wednesday, Dec 23, 2020 19:41
> *Subject:* Re: Flink catalog+hive问题
>
> hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效
>
> [1]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
>
> On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote:
>
> > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式
> >
> >
> > Original Message
> > Sender:guaishushu1103@163.comguaishushu1...@163.com
> > Recipient:user-zhuser...@flink.apache.org
> > Date:Wednesday, Dec 23, 2020 15:53
> > Subject:Flink catalog+hive问题
> >
> >
> > 在用flink
> > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?
> > guaishushu1...@163.com
>
>
>
> --
> Best regards!
> Rui Li
>
>


Re: pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 文章 r pp
表a 在 sql 语句的哪里呢?
关心的真的是过滤问题么? 如果你对你的业务十分熟悉,且了解到 flink1.11 不过 过滤,那为什么 不自行过滤 优化下呢?
如果,不是过滤问题,是大数 join 小数 问题,或者 大数 join 大数问题,是不是可以考虑 广播传播 或者 并行度 的优化方向?

是不是应该 先分析好业务问题,在去看 flink1.12 能否解决问题。

肖越 <18242988...@163.com> 于2020年12月24日周四 上午11:16写道:

> connector 从数据库读取整张表格,执行:
> env.sql_query("select a , b, c from table1 left join table2 on a = d where
> b = '103' and c = '203' and e = 'AC' and a between 20160701 and 20170307
> order a")
> 其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟~
> 了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-22 文章 r pp
flink 提交到特定的node ,可以保证 其它的任务 不能提交到flink特定的node 上么?

xiao cai  于2020年12月22日周二 上午10:28写道:

> Hi
> 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上
>
>
>  Original Message
> Sender: r pp
> Recipient: user-zh
> Date: Monday, Dec 21, 2020 21:25
> Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
>
> 嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? <
> afweij...@163.com> 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > >
> -邮件原件- > 发件人: user-zh-return-10095-afweijian=
> 163@flink.apache.org >  163@flink.apache.org> 代表 yujianbo > 发送时间: 2020年12月21日 16:44 > 收件人:
> user-zh@flink.apache.org > 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
> > > 各位大佬好: > 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- >
> Sent from: http://apache-flink.147419.n8.nabble.com/ >


Re: 执行mvn构建错误 编译flink1.9遇到了相同的问题 请问解决了吗?我编译最新代码没这个问题

2020-12-21 文章 r pp
编译问题,大多包没下好,多来几次

mvn clean install -DskipTests -Drat.skip=true

亲测有效


shaoshuai <762290...@qq.com> 于2020年12月21日周一 下午4:53写道:

> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
> (default-testCompile) on project flink-parquet_2.11: Compilation failure:
> Compilation failure:
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[390,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[416,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[418,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[458,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[461,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[533,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[559,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[561,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[576,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[579,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[318,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[344,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[346,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[367,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[370,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[343,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[345,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[354,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[357,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[246,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[272,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> 

Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 文章 r pp
程序中,创建表后,执行命令。

kingdomad  于2020年12月21日周一 下午4:55写道:

> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> 需要执行msck repair table修复分区表后,hive才能读取到数据。
> 求助大佬,要如何解决。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 r pp
嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大?

 于2020年12月21日周一 下午5:48写道:

> 通过yarn label可以实现
>
> -邮件原件-
> 发件人: user-zh-return-10095-afweijian=163@flink.apache.org
>  代表 yujianbo
> 发送时间: 2020年12月21日 16:44
> 收件人: user-zh@flink.apache.org
> 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
> 各位大佬好:
>  请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SQL执行模式

2020-12-21 文章 r pp
sql 的本质其实是 让用户不用关心 是流处理 还是 批处理,比如 ,计算  当天某个视频的点击总数。是一个累加结果,可以实时查询出变化。
但flink 不是一个存储系统,就会存在一个问题,使用sql 状态值 怎么办?
官博 都有说明,也说了哪些算子背后 适用于 Streaming or Batch or both。以及存在的使用注意事项
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/


jiangjiguang719  于2020年12月21日周一 下午7:44写道:

> flink1.12版本中,streamAPI 通过 -Dexecution.runtime-mode 指定是批还是流
> 的执行模式,那么在SQL中如何指定呢


Re: 对于kafka partition 设置时间戳及watermark

2020-12-19 文章 r pp
是的

张锴  于2020年12月19日周六 下午5:45写道:

> 我按官网操作,重写了序列化方式
>
> val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> props)kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor[MyType] {
> def extractAscendingTimestamp(element: MyType): Long =
> element.eventTimestamp})
> val stream: DataStream[MyType] = env.addSource(kafkaSource)
>
> *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
>


Re: flink1.11.2检查点失败

2020-12-18 文章 r pp
我觉得补充完整的 故障信息,以及你的资源配置信息,实例代码 可以更好的让别人回答你的问题

zhy  于2020年12月18日周五 下午4:07写道:

>
> 补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗
>
> zhy  于2020年12月18日周五 下午3:57写道:
>
> > hi、
> >
> >
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
> >
>


Re: flink 1.11 interval join场景下rocksdb内存超用问题

2020-12-18 文章 r pp
你好,能否把  promethus上metrics,   rocksdb_block_cache_usage的大小不断上升的
截图发一下,其它rocksdb 的内存图 如果有的话,也发一下

开始时间  到 结束时间  3个 小时的。




867127831 <867127...@qq.com> 于2020年12月18日周五 下午3:15写道:

> Hi,
>
>
> 我在flink 1.11 on k8s上运行了一个双流join的sql,使用rocksdb作为backend,flink
> managed部分的内存由flink托管(state.backend.rocksdb.memory.managed=true),但是发现k8s的pod的内存消耗一直在增加。具体情况如下:
>
>
> flink sql:
>
>
> insert into console_sink
> select t1.*, t2.*
> from t1 left join t2
> on t1.unique_id = t2.unique_id
> and t1.event_time BETWEEN t2.event_time - INTERVAL '1' HOUR AND
> t2.event_time + INTERVAL '1' HOUR
>
>
>
> 属性配置:
> state.backend=rocksdb;
> state.backend.incremental=false;
> state.backend.rocksdb.memory.managed=true
> state.idle.retention.mintime='10 min';
> state.idle.retention.maxtime='20 min';
> checkpoint.time.interval='15 min';
> source.idle-timeout='6 ms';
>
> taskmanager.memory.flink.size =55 gb
> taskmanager.memory.managed.fraction=0.85
>
>
>
>
>
>
> 运行现象:
> 1. checkpoint的size稳定在200G左右,说明state是有过期释放的。
> 2. k8s pod的使用内存不断增加,没有下降下来的趋势,最终整个pod的内存使用量超过pod内存上限,导致pod被杀掉。
> 3. 通过采集promethus上metrics,
> 发现rocksdb_block_cache_usage的大小不断上升,最终达到rocksdb_block_cache_capacity的上限。并且rocksdb_block_cache_usage的大小远远超过了flink
> managed部分内存的大小。
>
>
> 想知道,为什么在flink全托管rocksdb的情况下,为什么会出现rocksdb_block_cache_usage这个指标一直增长而不降低呢?


Re: Flink 1.12 job on yarn 集成hive时如何配置 hiveConf

2020-12-18 文章 r pp
这个问题 ,一个很朴素的思路 ,你集群里面的在哪里 ,就填哪里咯

Jacob <17691150...@163.com> 于2020年12月18日周五 下午4:13写道:

> Dear all,
>
> 请问在flink在集成hive时候,需要配置hive的conf目录,我的job是on yarn提交的,那么如何配置这个hive conf路径呢?
>
> String name = "myhive";
> String defaultDatabase = "mydatabase";
> String hiveConfDir = "";  // hive-site.xml路径
> String version = "1.1.0-cdh5.8.3";
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: jdbc sink无法插入数据

2020-12-18 文章 r pp
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。
再对比下 加入kafka 后的效果。

一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了

guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:

> Hi,
>
> 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> .process(new ProcessFunction() {
> @Override
> public void processElement(RatioValuevalue, Context ctx,
> Collector out) throws Exception {
> out.collect(value);
> ctx.output(ratioOutputTag, value);
> }
> });
> sideStream.addSink(new FlinkKafkaProducer<>(
> "ratio_value",
> new RatioValueSerializationSchema(suffix),
> PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> tool.get(SCHEMA_REGISTRY_URL)),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> DataStream ratioSideStream =
> sideStream.getSideOutput(ratioOutputTag);
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> 想问下这种情况是否有什么排查手段?
>
>
> guoliubi...@foxmail.com
>


Re: Flink 连接Hive hiveConf路径配置

2020-12-18 文章 r pp
按照我朴素的思路,你的yarn环境可以读取hiveConf 的信息吧。。。
on Yarn 的提交模式,和本地是不同的
另一种是提交的时候 添加配置项
–files $HIVE_HOME/conf/hive-site.xml

Jacob <17691150...@163.com> 于2020年12月19日周六 上午9:26写道:

> Dears,
>
> flink在连接hive时,需配置hiveConf所在路径
>
> 我已经下载了集群中hive-site.xml文件,不知道应该放在哪个目录
>
> Job部署模式是 on Yarn
> ,请问代码中hiveConf应该放在哪个目录下,应该不是我启动job所在的机器吧?因为job提交后运行在hadoop集群,是无法找到相关目录的。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式。

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11.2 读写Hive以及对hive的版本支持

2020-12-18 文章 r pp
嗨。提供一个解决的思路
   1.缺包
   2在yarn 的环境下缺包,可以把缺的包 放在集群统一的位置,在提交命令时,指名所在包的位置。

Jacob <17691150...@163.com> 于2020年12月18日周五 下午2:01写道:

> Dear All,
>
> Flink.11.2操作hive时,对hive的版本支持是怎样的
>
>
> 看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
> 我的执行环境:
>
> *Flink : 1.11.2*
> *Haoop : 2.6.0-cdh5.8.3*
> *Hive : 1.1.0-cdh5.8.3*
> *Job运行方式 : on yarn*
>
> 同时对读写hive的demo,我不知道我写的是否正确:
>
> public static void main(String[] args) throws Exception {
>
> EnvironmentSettings settings = EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
> String name = "myhive";
> String defaultDatabase = "datafeed";
> String hiveConfDir = "/opt/app/bigdata/hive-1.1.0-cdh5.8.3/conf";
> // hive-site.xml路径
> String version = "1.1.0-cdh5.8.3";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>
> tableEnv.registerCatalog("myhive", hive);
> tableEnv.useCatalog("myhive");
> String createDbSql = "INSERT INTO TABLE flink2hive_test VALUES
> ('55', \"333\", \"CHN\")";
> tableEnv.sqlUpdate(createDbSql);
> }
>
> 这样的job提交到yarn会报错:
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.mapreduce.TaskAttemptContext
>
> 是缺少MapReduce的相关包吗?
>
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>