Re: taskmanager.out配置滚动

2020-12-21 Thread 李杰
Hi,
这个功能我们之前做过,可以看下这里。
https://issues.apache.org/jira/browse/FLINK-20713

zilong xiao  于2020年12月3日周四 下午7:50写道:

> 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
>


Re: Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi nick,

   Sorry I initially think that the data is also write into Kafka with flink . 
So it could be ensured that there is no delay in the write side, right ? Does 
the delay in the read side keeps existing ?

Best,
 Yun




 --Original Mail --
Sender:nick toker 
Send Date:Tue Dec 22 01:43:50 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: checkpoint delay consume message

hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com‬‏>:‬

 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



flink 1.11.2 创建hive表的问题

2020-12-21 Thread 曹武
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 创建hive表的问题

2020-12-21 Thread 曹武
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 Thread liujian
Thanks,flink-confhistory 
server,??hdfs??,??web ui??,




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

 Best,
 Yang

 liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35??

 gt; 
??history-server,,,??,
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??21??(??) 10:15
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
 gt
 


flink1.10 广播流更新卡住

2020-12-21 Thread 洪雪芬
Hi!
在使用flink广播流实现配置定时更新的过程中,出现下游算子并行度大于1时,下游算子获取更新到的广播流卡住的情况,即广播流算子持续发送数据,但下游算子只接收到前一小部分数据,然后就没有接收到新数据的情况,但无报错日志。
但该问题在本地IDEA运行时无法复现,提交到集群上以yarn-cluster模式运行时则会出现。
大家有没有遇到过类似的情况?是什么原因导致这样的问题,有什么解决方案吗?


java.lang.IllegalStateException: Trying to access closed classloader.

2020-12-21 Thread jy l
Hi:
我在Idea里面运行我的flink程序,报了如下异常:
Exception in thread "Thread-22" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly
or indirectly in static fields. If the stacktrace suggests that the leak
occurs in a third party library and cannot be fixed immediately, you can
disable this check with the configuration
'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:161)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:179)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
at
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

是什么原因导致这样的异常,出现这样的异常,我该怎么处理?

望知道的人告知一下,感谢。


pyflink1.12 进行多表关联后的结果类型是TableResult,如何转为Table类型

2020-12-21 Thread 肖越
通过sql进行左连接查询,sql语句为:
sql = ''' Insert into print_sink select a.id, a.pf_id, b.symbol_id from  a \
 left join b on b.day_id = a.biz_date where a.ccy_type = 'AC' and \
 a.pf_id = '1030100122' and b.symbol_id = '2030004042' and a.biz_date 
between '20160701' and '20170307' '''


table_result = env.execute_sql(sql)
通过env.execute_sql()执行后的结果是 TableResult , 如何转成Table类型?
或者有哪些其他的方式,可以直接执行表的连接操作,返回结果是Table类型?



Re: yarn application模式提交任务失败

2020-12-21 Thread Yang Wang
silence的回答是对的
如果用-t参数,搭配的都是-D来引导的,不需要prefix,文档里面也是[1]
这个和之前-m yarn-cluster是不一样的,以前的方式需要-yD来引导

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

silence  于2020年12月21日周一 上午10:53写道:

> 应该是-D不是-yD
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-21 Thread Yang Wang
我之前在另一个邮件里面回复过,我再拷贝过来。

目前我已经建了一个JIRA来跟进too old resource version的问题[1]

在Flink里面采用了Watcher来监控Pod的状态变化,当Watcher被异常close的时候就会触发fatal
error进而导致JobManager的重启

我这边做过一些具体的测试,在minikube、自建的K8s集群、阿里云ACK集群,稳定运行一周以上都是正常的。这个问题复现是通过重启
K8s的APIServer来做到的。所以我怀疑你那边Pod和APIServer之间的网络是不是不稳定,从而导致这个问题经常出现。


[1]. https://issues.apache.org/jira/browse/FLINK-20417

Best,
Yang

lichunguang  于2020年12月21日周一 下午3:51写道:

> Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal
> error
> occurred in
> ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
> old resource version】
>
> pod重启:
> 
>
> 重启原因:
> 2020-12-10 07:21:19,290 ERROR
> org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
> error occurred in ResourceManager.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> 2020-12-10 07:21:19,291 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
>
>
> 网上查的原因是因为:
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行
>
> @Override
> public KubernetesWatch watchPodsAndDoCallback(Map labels,
> PodCallbackHandler podCallbackHandler) {
> return new KubernetesWatch(
> this.internalClient.pods()
> .withLabels(labels)
> .watch(new
> KubernetesPodsWatcher(podCallbackHandler)));
> }
>
> 而ETCD中只会保留一段时间的version信息
> 【 I think it's standard behavior of Kubernetes to give 410 after some time
> during watch. It's usually client's responsibility to handle it. In the
> context of a watch, it will return HTTP_GONE when you ask to see changes
> for
> a resourceVersion that is too old - i.e. when it can no longer tell you
> what
> 

Re: a question about KubernetesConfigOptions

2020-12-21 Thread Yang Wang
Hi Debasish Ghosh,

Thanks for the attention on native K8s integration of Flink.

1. For volumes and volumes mount, it is not supported now. And we are
trying to get it done via pod template. Refer here[1] for more information.

2. Currently, on different deployments, Flink has different cpu config
options. But for the memory, all the deployments share the same config
options. You could find more information here[2].
* yarn.appmaster.vcores
* yarn.containers.vcores
* kubernetes.jobmanager.cpu
* kubernetes.taskmanager.cpu

3. You are right. This class is PublicEvolving and we may introduce more
config options in the future(e.g. pod template related).


[1].
https://lists.apache.org/thread.html/rf2e7b9be96f2bd5106d08ffb573d55f70a8acfb0b814a21d8b50d747%40%3Cdev.flink.apache.org%3E
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup.html

Debasish Ghosh  于2020年12月21日周一 下午3:21写道:

> Hello -
>
> In
> https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
> the various supported options are declared as constants.
>
> I see that there is no support for options like Volumes and VolumeMounts.
> Also I see entries for JOB_MANANGER_CPU and TASK_MANAGER_CPU but not for
> JOB_MANAGER_MEMORY and TASK_MANAGER_MEMORY. How do we accommodate these if
> we want to pass them as well ? I see that the class is annotated
> with @PublicEvolving - just wanted to clarify if these are planned to be
> added in future.
>
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-21 Thread Yang Wang
history-server和native
k8s没有关系的,如果你想使用,就需要用一个deployment单独部署history-server在K8s集群内

native k8s覆盖的场景是Flink任务如何原生地提交到K8s集群内

Best,
yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午8:16写道:

> Thanks, 使用你下面的docker方式我测试确实可以,但是不知道Native K8s如何来操作,可以详细说一下
> 我现在是Dockerfile如下两种情况都试过
>
>
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh"]
> EXPOSE 6123 8081 8082
> CMD ["help","history-server"]
>
> 
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
> EXPOSE 6123 8081 8082
> CMD ["help"]
>
>
>
> 这两种都尝试过...,请帮忙指教一下
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 下午3:08
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 是的,理解没有错,history-server会启动后listen一个端口
>
> 我这边尝试是没有问题的,你可以通过如下命令启动
> docker run -p 8082:8082 --env
> FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
> flink:latest history-server
>
> 更多配置你参考如下文档
>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月21日周一 下午1:35写道:
>
>  我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月21日(星期一) 上午10:15
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
> 
>  [1].
> 
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> 
> 
> ;
>  Best,
>  Yang
> 
> 
>  liujian <13597820...@qq.comgt; 于2020年12月20日周日 下午12:45写道:
> 
>  gt; Thanks,amp;nbsp;
>  gt; amp;nbsp; amp;nbsp;
> amp;nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  gt; 1.12.0的Dockerfile 修改成CMD ["history-server"]amp;nbsp;
>  并暴露8082端口,但是好像达不到这个效果
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月19日(星期六) 晚上9:35
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt; 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  gt; Flink
>  gt;
>  gt;
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
>  gt;
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>  gt;
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; liujian <13597820...@qq.comamp;gt; 于2020年12月19日周六
> 下午8:29写道:
>  gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:
>  gt;
> 
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>  gt; "user-zh"
>  gt;
> 
> 

Re: Native Kubernetes 需要访问HDFS

2020-12-21 Thread Yang Wang
对,是的,自动ship hadoop配置是从1.11开始支持的

在1.10的版本你需要把配置打到镜像里面

Best,
Yang

Akisaya  于2020年12月21日周一 下午5:02写道:

> 1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置
>
> Yang Wang  于2020年12月19日周六 上午12:18写道:
>
> > 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> > hadoop的配置并且挂载给JobManager和TaskManager的
> >
> > Best,
> > Yang
> >
> > liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
> >
> > > Hi:
> > >  在使用Native Kubernetes
> > > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> > >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
> >
>


Re: Application Mode job on K8S集群,无法缩容问题

2020-12-21 Thread Yang Wang
是的,如果CA不能直接释放Pod的话,那是会导致它没有办法被驱逐

Flink TaskManager Pod的生命周期都是交给JobManager管理的,并且不会重启,挂了之后就会申请新的
和CA结合起来,是会有你所说的限制。不过如果能够带上annotation标识pod可以被驱逐的话,不清楚CA是否可以work

Best,
Yang

lichunguang  于2020年12月21日周一 下午4:16写道:

> Yang Wang你好:
> 我想表达的意思是:
>   Native Flink on K8s采用单Pod的申请资源的方式,和K8s自动伸缩机制有些冲突。
>
> 原因:
> 比如job比较多时,各node负载都比较高;而剩余job比较少时,每个node只有少量pod,但因为【Pods that are not backed
> by a controller object】,没法驱逐资源利用率最低的node,导致整体利用率较低。
>
> What types of pods can prevent CA from removing a node?
>
> https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/FAQ.md#what-types-of-pods-can-prevent-ca-from-removing-a-node
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 Thread Rui Li
具体是怎么写hive的呢?

On Mon, Dec 21, 2020 at 11:28 PM 赵一旦  wrote:

> 即使不是flink写入,其他方式写入也需要这样做的哈。
>
> r pp  于2020年12月21日周一 下午9:28写道:
>
> > 程序中,创建表后,执行命令。
> >
> > kingdomad  于2020年12月21日周一 下午4:55写道:
> >
> > >
> >
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> > > 需要执行msck repair table修复分区表后,hive才能读取到数据。
> > > 求助大佬,要如何解决。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > kingdomad
> > >
> > >
> >
>


-- 
Best regards!
Rui Li


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread xiao cai
Hi
可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上


 Original Message 
Sender: r pp
Recipient: user-zh
Date: Monday, Dec 21, 2020 21:25
Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点


嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? 
 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > > 
-邮件原件- > 发件人: user-zh-return-10095-afweijian=163@flink.apache.org > 
 代表 yujianbo > 发送时间: 
2020年12月21日 16:44 > 收件人: user-zh@flink.apache.org > 主题: Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点 > > 各位大佬好: > 请问Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

Re: NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Danny Chan
Hi Yuval Itzchakov ~

The thread you paste has a different stake trace with your case.

In the pasted thread, the JaninoRelMetadataProvider was missed because we
only set it once in a thread local variable, when the RelMetadataQuery was
used in a different working thread, the JaninoRelMetadataProvider caused an
NPE.

For your case, based on the stack trace, this line throws ~

RelMetadataQuery line 114:

super(null);

But actually this line allows an empty argument and it should not throw.

Can you give a re-producecable case here so that we can debug and find more
evidence ?

Yuval Itzchakov  于2020年12月22日周二 上午1:52写道:

> Hi,
>
> While trying to execute a query via TableEnvironment.sqlQuery in Flink
> 1.12, I receive the following exception:
>
> java.lang.NullPointerException
> :114, RelMetadataQuery (org.apache.calcite.rel.metadata)
> :76, RelMetadataQuery (org.apache.calcite.rel.metadata)
> get:39, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> get:38, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
> create:108, LogicalFilter (org.apache.calcite.rel.logical)
> createFilter:344, RelFactories$FilterFactoryImpl
> (org.apache.calcite.rel.core)
> convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
> org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
> FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> toQueryOperation:823, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convertSqlQuery:795, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convert:250, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
> sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
> $anonfun$translateTemplate$2:476, Foo$ (Foo)
> apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
> evaluateNow:361, FiberContext (zio.internal)
> $anonfun$evaluateLater$1:778, FiberContext (zio.internal)
> run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
> runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
> run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
> run:748, Thread (java.lang)
>
> This seems to be coming from the FlinkRelMetadataQuery class attempting to
> initialize all handlers:
>
> [image: image.png]
>
> This seems to be coming from the calcite shaded JAR
> inside "flink-table-planner-blink-1.12"
>
> Has anyone ran into this issue? I saw a thread in the chinese user group
> but I don't understand what's been said there (
> https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
> --
> Best Regards,
> Yuval Itzchakov.
>


numRecordsOutPerSecond metric and side outputs

2020-12-21 Thread Alexey Trenikhun
Hello,
Does numRecordsOutPerSecond metric takes into account number of records send to 
side output or it provides rate only for main output?

Thanks,
Alexey


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
I don't know how to reproduce it but what I've observed are three kinds of
termination when connectivity with zookeeper is somehow disrupted. I don't
think its an issue with zookeeper as it supports a much bigger kafka cluster
since a few years. 

1. The first kind is exactly this -
https://github.com/apache/flink/pull/11338. Basically temporary loss of
connectivity or rolling upgrade of zookeeper will cause job to terminate. It
will restart eventually from where it left off.
2. The second kind is when job terminates and restarts for the same reason
but is unable to recover from checkpoint. I think its similar to this -
https://issues.apache.org/jira/browse/FLINK-19154. If upgrading to 1.12.0
(from 1.11.2) will fix the second issue then I'll upgrade. 
3. The third kind is where it repeatedly restarts as its unable to establish
a session with Zookeeper. I don't know if reducing session timeout will help
here but in this case, I'm forced to disable zookeeper HA entirely as the
job cannot even restart here. 

I could create a JIRA ticket for discussion zookeeper itself if you suggest
but the issue of zookeeper and savepoints are related as I'm not sure what
will happen in each of the above.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Yuval Itzchakov
Hi,

While trying to execute a query via TableEnvironment.sqlQuery in Flink
1.12, I receive the following exception:

java.lang.NullPointerException
:114, RelMetadataQuery (org.apache.calcite.rel.metadata)
:76, RelMetadataQuery (org.apache.calcite.rel.metadata)
get:39, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
get:38, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
create:108, LogicalFilter (org.apache.calcite.rel.logical)
createFilter:344, RelFactories$FilterFactoryImpl
(org.apache.calcite.rel.core)
convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:823, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convertSqlQuery:795, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convert:250, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
$anonfun$translateTemplate$2:476, Foo$ (Foo)
apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
evaluateNow:361, FiberContext (zio.internal)
$anonfun$evaluateLater$1:778, FiberContext (zio.internal)
run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

This seems to be coming from the FlinkRelMetadataQuery class attempting to
initialize all handlers:

[image: image.png]

This seems to be coming from the calcite shaded JAR
inside "flink-table-planner-blink-1.12"

Has anyone ran into this issue? I saw a thread in the chinese user group
but I don't understand what's been said there (
https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
-- 
Best Regards,
Yuval Itzchakov.


Re: checkpoint delay consume message

2020-12-21 Thread nick toker
hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

>  Hi Nick,
>
> Are you using EXACTLY_ONCE semantics ? If so the sink would use
> transactions, and only commit the transaction on checkpoint complete to
> ensure end-to-end exactly-once. A detailed description could be find in [1]
>
>
> Best,
>  Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> --
> Sender:nick toker
> Date:2020/12/21 23:52:34
> Recipient:user
> Theme:checkpoint delay consume message
>
> Hello,
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters.
>
> Could you please advise how we can remove/control this delay?
>
> we use flink 1.11.2
>
> BR
> nick
>
>


RE: RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao 
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org
Subject: Re: checkpointing 

Re: RE: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said 

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao , user@flink.apache.org 

Subject:RE: checkpointing seems to be throttled.

Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any. 
From: Yun Gao  
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and 
attachments.
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 

Re: 请教关于KeyedState的恢复机制

2020-12-21 Thread 赵一旦
目前来说,按照我讲的方式去实现应该不难。我怕的是flink在恢复keyedState的时候,无法适应我的这种partition机制。

现有的机制,restore的时候实际是 keyGroup 到window并行实例之间的一个重分配。

换成我的partition机制后,能否还正常restore呢?

赵一旦  于2020年12月22日周二 上午12:03写道:

> 如题,目前对于OperatorState来说,API层面有2个接口,即CheckpointedFunction和ListCheckpointed
> 。即UDF中有入口对restore做自定义。
>
> 问题(1)KeyedState的恢复则相对黑盒。想知道相关实现在哪。
>
> 引申问题(2),我的原始目的为。我期望实现
> keyBy(...).timwWindow(x).xxx()这种统计。在保留keyBy的keySelector机制前提下(即window算子部分仍然会按照key分窗口统计),通过重写部分flink的api层代码方式,强制去除keyBy中加入的
> KeyGroupStreamPartitioner
> ,换成使用可传入的自定义Partitioner。目的呢是希望解决“数据倾斜”,但我不想通过双层keyBy解决,因为本身key数量很少(假设100),即使是双层,那么第一层需要将key起码扩大1000倍我感觉才能足够均衡。如果能仅仅扩大比如30倍(这个倍数可以考虑和下游window算子并发一致),然后在partition中实现类似rebalance的分发机制。
> 当然,更高级的可能还可以做智能的,比如部分key扩大,部分key不扩大。
>
>
> 描述比较乱,换言之,我就直接非KeyedStream情况下,使用dataStream.flatMap,然后flatMap中使用MapState统计。类似这种效果。当然我还是希望通过改造window实现,因为window部分还有watermark以及分窗机制,flatMap需要自己实现分窗。
>
>


Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao 
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout:
 60 min

execution.checkpointing.tolerable-failed-checkpoints:12

execution.checkpointing.unaligned
  true
and also explicitly set
state.checkpoints.dir



checkpoint delay consume message

2020-12-21 Thread nick toker
Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between
the time we write a message to the KAFKA topic and the time the flink kafka
connector consumes this message.
The delay is closely related to checkpointInterval and/or
minPauseBetweenCheckpoints meening that the MAX delay when consuming a
message from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick


[Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Puneet Kinra
oHi All

Unable to submit job from REST API (Flink-Monitoring API),

*Steps followed:*

1) Load the jar using load api.
2) can see the jar in the /tmp/flink-web folder.
3) Try to run the jar using the following.

*Request*

http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json=com.orchestrator.flowExecution.GraphExecutor


Response:

{
"errors": [
"Not found."
]
}

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: [Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Yun Gao

Hi Puneet,

   From the doc it seems submitting a job via rest api should send a post 
request to /jars/:jarid/run [1]. The response "Not Found" should means the REST 
API server does not know the request type.

Best,
 Yun





 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#jars-jarid-run--
Sender:Puneet Kinra
Date:2020/12/21 19:47:31
Recipient:user
Theme:[Help Required:]-Unable to submit job from REST API

oHi All

Unable to submit job from REST API (Flink-Monitoring API),

Steps followed:

1) Load the jar using load api.
2) can see the jar in the /tmp/flink-web folder.
3) Try to run the jar using the following.

Request

http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json=com.orchestrator.flowExecution.GraphExecutor
 

Response:

{
"errors": [
"Not found."
]
}

-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com




Re: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators. 

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html
 



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
Subject:checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.

  
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.
We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?
Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?
For now we plan on setting 
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned
  trueand also explicitly setstate.checkpoints.dir


Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 Thread 赵一旦
即使不是flink写入,其他方式写入也需要这样做的哈。

r pp  于2020年12月21日周一 下午9:28写道:

> 程序中,创建表后,执行命令。
>
> kingdomad  于2020年12月21日周一 下午4:55写道:
>
> >
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> > 需要执行msck repair table修复分区表后,hive才能读取到数据。
> > 求助大佬,要如何解决。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > kingdomad
> >
> >
>


Re: Re: 如何通过现实时间控制事件时间的窗口

2020-12-21 Thread 赵一旦
大概懂了,但还不够清晰。
因为tumble window中,如果是5s窗口,则是按照0-5,5-10,10-15这样的。不是基于第一条数据的。

如果你要按照第一条数据到达开始开窗,那就不要使用flink的window机制。
直接基于process function提供的底层功能,自己通过timeservice实现。
再或者如果需要使用window,则使用global
window,自己定义window的trigger触发机制即可(比如进来第一条数据,就设置定时器,定时器到了则触发窗口计算然后清理窗口状态)。

guoliubi...@foxmail.com  于2020年12月21日周一 上午9:15写道:

> 因为表格样式被吃掉了,所以看不清,用图片说明下。
> https://i.bmp.ovh/imgs/2020/12/78d7dee70d88ebc9.png
>
> 定义了3秒的滚动窗口
> 第一条消息的eventTime是9:00:01,是在系统实际时间9:00:01收到的。
> 第二条消息的eventTime是9:00:02,但是是在系统实际时间9:00:11分收到的。
>
> 想要达成的目标是在系统时间9:00:05时把这一窗口关闭掉进行运算,忽略迟到的第二条消息,更不必要等到第3条消息触发到下一个窗口的时间了再关闭这个窗口。
> 找了下用ProcessingTimeoutTrigger可以达到目的,不过不知道有没有更详细的文档说明trigger怎么用的。
>
>
> guoliubi...@foxmail.com
>
> 发件人: 赵一旦
> 发送时间: 2020-12-20 23:30
> 收件人: user-zh
> 主题: Re: 如何通过现实时间控制事件时间的窗口
> 描述比较乱,看不懂。
>
> guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:
>
> > Hi,
> > 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
> >  系统时间
> >  与上一条间隔
> >  事件时间
> >  与上一条间隔
> >  9:00:01
> >
> >  9:00:01
> >
> >  9:00:11
> >  10s
> >  9:00:02
> >  1s
> >  9:00:12
> >  1s
> >  9:00:12
> >  10s
> > 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> > 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> > 请问这种情况需要怎么生成watermark?
> > 使用过
> > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> > 或者
> >
> >
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> > 结果都把第一条和第二条数据归集到同一个窗口中了,
> > 都没有达到预想的结果。
> > 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint?

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint? 

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 请教一下flink1.12可以指定时间清除state吗?

2020-12-21 Thread 赵一旦
窗口不会重复?重叠?是否重叠取决于你使用什么窗口。tumble窗口是不重叠的。

三色堇 <25977...@qq.com> 于2020年12月21日周一 上午8:47写道:

> 大佬,按天开窗滑动窗口会重复吗?滚动好像不行。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hinobl...@gmail.com;
> 发送时间:2020年12月18日(星期五) 晚上9:50
> 收件人:"user-zh"
> 主题:Re: 请教一下flink1.12可以指定时间清除state吗?
>
>
>
> 你这个直接按照天分窗就可以呀。
>
> 三色堇 <25977...@qq.com 于2020年12月18日周五 下午3:20写道:
>
> 
> Hi,社区的各位大家好:nbsp;请教一下,我目前生产上使用的flink1.12,根据公司需求,统计每天的日报,每天出一组结果。已经做了group
>  by current_date,userIdnbsp; 过程中我发现隔天的flink
> state未清理,还是在前一天的结果上累加,自己也测试了
>  1、Stream的TTLnbsp;
> 
> 2、tabEnv.getConfig().setIdleStateRetention(Duration.ofDays(1))这两种方式并不能满足我的需求,请教一下有其他方法可以实现这种日报需求吗?


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread Till Rohrmann
What are exactly the problems when the checkpoint recovery does not work?
Even if the ZooKeeper connection is temporarily disconnected which leads to
the JobMaster losing leadership and the job being suspended, the next
leader should continue where the first job left stopped because of the lost
ZooKeeper connection.

What happens under the hood when restoring from a savepoint is that it is
inserted into the CompletedCheckpointStore where also the other checkpoints
are stored. If now a failure happens, Flink will first try to recover from
a checkpoint/savepoint from the CompletedCheckpointStore and only if this
store does not contain any checkpoints/savepoints, it will use the
savepoint with which the job is started. The CompletedCheckpointStore
persists the checkpoint/savepoint information by writing the pointers to
ZooKeeper.

Cheers,
Till

On Mon, Dec 21, 2020 at 11:38 AM vishalovercome  wrote:

> Thanks for your reply!
>
> What I have seen is that the job terminates when there's intermittent loss
> of connectivity with zookeeper. This is in-fact the most common reason why
> our jobs are terminating at this point. Worse, it's unable to restore from
> checkpoint during some (not all) of these terminations. Under these
> scenarios, won't the job try to recover from a savepoint?
>
> I've gone through various tickets reporting stability issues due to
> zookeeper that you've mentioned you intend to resolve soon. But until the
> zookeeper based HA is stable, should we assume that it will repeatedly
> restore from savepoints? I would rather rely on kafka offsets to resume
> where it left off rather than savepoints.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 执行mvn构建错误 编译flink1.9遇到了相同的问题 请问解决了吗?我编译最新代码没这个问题

2020-12-21 Thread r pp
编译问题,大多包没下好,多来几次

mvn clean install -DskipTests -Drat.skip=true

亲测有效


shaoshuai <762290...@qq.com> 于2020年12月21日周一 下午4:53写道:

> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
> (default-testCompile) on project flink-parquet_2.11: Compilation failure:
> Compilation failure:
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[390,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[416,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[418,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[458,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[461,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[533,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[559,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[561,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[576,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[579,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[318,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[344,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[346,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[367,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[370,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[343,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[345,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[354,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[357,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[246,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[272,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> 

Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 Thread r pp
程序中,创建表后,执行命令。

kingdomad  于2020年12月21日周一 下午4:55写道:

> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> 需要执行msck repair table修复分区表后,hive才能读取到数据。
> 求助大佬,要如何解决。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread r pp
嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大?

 于2020年12月21日周一 下午5:48写道:

> 通过yarn label可以实现
>
> -邮件原件-
> 发件人: user-zh-return-10095-afweijian=163@flink.apache.org
>  代表 yujianbo
> 发送时间: 2020年12月21日 16:44
> 收件人: user-zh@flink.apache.org
> 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
> 各位大佬好:
>  请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SQL执行模式

2020-12-21 Thread r pp
sql 的本质其实是 让用户不用关心 是流处理 还是 批处理,比如 ,计算  当天某个视频的点击总数。是一个累加结果,可以实时查询出变化。
但flink 不是一个存储系统,就会存在一个问题,使用sql 状态值 怎么办?
官博 都有说明,也说了哪些算子背后 适用于 Streaming or Batch or both。以及存在的使用注意事项
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/


jiangjiguang719  于2020年12月21日周一 下午7:44写道:

> flink1.12版本中,streamAPI 通过 -Dexecution.runtime-mode 指定是批还是流
> 的执行模式,那么在SQL中如何指定呢


Re: 请教个Flink sql问题

2020-12-21 Thread 占英华
学习了,感谢回复!

> 在 2020年12月21日,20:39,hailongwang <18868816...@163.com> 写道:
> 
> 
> 
> 
> 不是的。在提交运行之后,如果那两个 insert 是从同一张表 select 出来的话,是会分流发送到 table1 和 table2,并没有先后顺序。
>> 在 2020-12-21 10:45:25,"占英华"  写道:
>> 这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的
>> 
 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
>>> 
>>> 
>>> 
>>> 可以的,比如将结果写入table1,table2 ……
>>> Insert into table1 ……;
>>> Insert into table2 ……;
>>> 
>>> 
>>> 
>>> Best,
>>> Hailong
 在 2020-12-19 08:30:23,"占英华"  写道:
 Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:Re: 请教个Flink sql问题

2020-12-21 Thread hailongwang



不是的。在提交运行之后,如果那两个 insert 是从同一张表 select 出来的话,是会分流发送到 table1 和 table2,并没有先后顺序。
在 2020-12-21 10:45:25,"占英华"  写道:
>这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的
>
>> 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
>> 
>> 
>> 
>> 可以的,比如将结果写入table1,table2 ……
>> Insert into table1 ……;
>> Insert into table2 ……;
>> 
>> 
>> 
>> Best,
>> Hailong
>>> 在 2020-12-19 08:30:23,"占英华"  写道:
>>> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 Thread liujian
Thanks, docker??,??Native 
K8s??,??
Dockerfile??


COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081 8082
CMD ["help","history-server"]


COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
EXPOSE 6123 8081 8082
CMD ["help"]



??...,??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com ??2020??12??21?? 1:35??

 
??history-server,,,??,




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??21??(??) 10:15
 ??:nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

 Best,
 Yang


 liujian <13597820...@qq.comgt; ??2020??12??20?? 12:45??

 gt; Thanks,amp;nbsp;
 gt; amp;nbsp; amp;nbsp; 
amp;nbsp;??historyServer,flink
 gt; 1.12.0??Dockerfile ??CMD ["history-server"]amp;nbsp;
 ??8082,??
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??19??(??) 9:35
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt 

SQL执行模式

2020-12-21 Thread jiangjiguang719
flink1.12版本中,streamAPI 通过 -Dexecution.runtime-mode 指定是批还是流 的执行模式,那么在SQL中如何指定呢

Re: pyflink1.12 连接Mysql报错 : Missing required options

2020-12-21 Thread Wei Zhong
Hi,

正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了。

> 在 2020年12月21日,13:44,肖越 <18242988...@163.com> 写道:
> 
> 在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
> #DDL定义
> source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\
> 
>symbol_id VARCHAR,biz_date VARCHAR,\
> 
>ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\
> 
>is_valid DECIMAL,time_mark TIMESTAMP) WITH (
> 
>'connector' = 'jdbc',
> 
>'connector.url' = 'jdbc:mysql://ip:port/db_base',
> 
>'connector.table' = 'ts_pf_sec_yldrate',
> 
>'table-name' = 'ts_pf_sec_yldrate',
> 
>'connector.driver' = 'com.mysql.jdbc.Driver',
> 
>'connector.username' = 'xxx',
> 
>'connector.password' = 'xxx')
> 
> """
> 错误信息:
> Traceback (most recent call last):
>  File 
> "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
>  line 67, in 
>print(join.to_pandas().head(6))
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
>  line 807, in to_pandas
>.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
>return f(*a, **kw)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'connector.driver'='com.mysql.jdbc.Driver'
> 'connector.password'='xxx'
> 'connector.table'='ts_pf_sec_yldrate'
> 'connector.url'='jdbc:mysql://ip:port/db_base'
> 'connector.username'='xxx'
> 'table-name'='ts_pf_sec_yldrate'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
> at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
> at 
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
> at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
> at 
> 

回复:请教一个flink消费多kafka topic如何进行数据分配的问题

2020-12-21 Thread Shuai Xia
Hi,可以看下KafkaTopicPartitionAssigner类的assign方式
是根据Topic名称哈希之后对并行度取余,加上分区值再次对并行度取余
最终的结果分配是存在不均匀



--
发件人:bradyMk 
发送时间:2020年12月21日(星期一) 17:40
收件人:user-zh 
主 题:请教一个flink消费多kafka topic如何进行数据分配的问题

Hi~想请教一下大家:

现在我用flink消费5个不同的kafka topic,每个topic都有12个分区,所以我设置了60个并行度;

env.setParallelism(60)

我认为程序设置的并行度是和topic的总分区一一对应的;

但是,程序运行后,我发现只有14个task有从topic消费到数据,其余消费数据量都为0,且有几个是每秒几千条,有几个是每秒几百条。所以现在很疑惑,flink消费多kafka
topic到底是如何进行数据分配的呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Queryable state on task managers that are not running the job

2020-12-21 Thread Martin Boyanov
Hi,
I'm running a long-running flink job in cluster mode and I'm interested in
using the queryable state functionality.
I have the following problem: when I query the flink task managers (i.e.
the queryable state proxy), it is possible to hit a task manager which
doesn't have the requested state, because the job is not running on that
task manager.
For example, I might have a cluster with 5 task managers, but the job is
deployed only on 3 of those. If my query hits any of the two idle task
managers, I naturally get an error message that the job does not exist.
My current solution is to size the cluster appropriately so that there are
no idle task managers. I was wondering if there was a better solution or if
this could be handled better in the future?
Thanks in advance.
Kind regards,
Martin


checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout:
 60 min

execution.checkpointing.tolerable-failed-checkpoints:12

execution.checkpointing.unaligned
  true
and also explicitly set
state.checkpoints.dir



答复: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread afweijian
通过yarn label可以实现

-邮件原件-
发件人: user-zh-return-10095-afweijian=163@flink.apache.org 
 代表 yujianbo
发送时间: 2020年12月21日 16:44
收件人: user-zh@flink.apache.org
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教一个flink消费多kafka topic如何进行数据分配的问题

2020-12-21 Thread bradyMk
Hi~想请教一下大家:

现在我用flink消费5个不同的kafka topic,每个topic都有12个分区,所以我设置了60个并行度;

env.setParallelism(60)

我认为程序设置的并行度是和topic的总分区一一对应的;

但是,程序运行后,我发现只有14个task有从topic消费到数据,其余消费数据量都为0,且有几个是每秒几千条,有几个是每秒几百条。所以现在很疑惑,flink消费多kafka
topic到底是如何进行数据分配的呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Native Kubernetes 需要访问HDFS

2020-12-21 Thread Akisaya
1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置

Yang Wang  于2020年12月19日周六 上午12:18写道:

> 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> hadoop的配置并且挂载给JobManager和TaskManager的
>
> Best,
> Yang
>
> liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
>
> > Hi:
> >  在使用Native Kubernetes
> > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
>


Re: No execution.target specified in your configuration file

2020-12-21 Thread Kostas Kloudas
Glad I could help!

On Mon, Dec 21, 2020 at 3:42 AM Ben Beasley  wrote:
>
> That worked. Thankyou, Kostas.
>
>
>
> From: Kostas Kloudas 
> Date: Sunday, December 20, 2020 at 7:21 AM
> To: Ben Beasley 
> Cc: user@flink.apache.org 
> Subject: Re: No execution.target specified in your configuration file
>
> Hi Ben,
>
> You can try using StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> instead of directly creating a new one. This will allow to pick up the
> configuration parameters you pass through the command line.
>
> I hope this helps,
> Kostas
>
> On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
> >
> > I was wondering if I could get help with the issue described in this 
> > stackoverflow post.


flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 Thread kingdomad
flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
需要执行msck repair table修复分区表后,hive才能读取到数据。
求助大佬,要如何解决。
















--

kingdomad



Re: 执行mvn构建错误 编译flink1.9遇到了相同的问题 请问解决了吗?我编译最新代码没这个问题

2020-12-21 Thread shaoshuai
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
(default-testCompile) on project flink-parquet_2.11: Compilation failure:
Compilation failure: 
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[390,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[416,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[418,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[458,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[461,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[533,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[559,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[561,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[576,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[579,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[318,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[344,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[346,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[367,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[370,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[343,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[345,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[354,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[357,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[246,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[272,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[274,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[287,3]
方法不会覆盖或实现超类型的方法
[ERROR]

Job结束blob目录及文件没有删除

2020-12-21 Thread Luna Wong
https://issues.apache.org/jira/browse/FLINK-20696

有一定概率发生,提交很多Job,Job结束后,会有个别Blob目录没有清理。我还没Debug出原因。


Re:Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread felixzh
这个没办法做到吧。想做资源隔离的话,应该只需要分队列就行

















在 2020-12-21 16:43:35,"yujianbo" <15205029...@163.com> 写道:
>各位大佬好:
> 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread amen...@163.com
这个问题应该问yarn吧。。。



 
发件人: yujianbo
发送时间: 2020-12-21 16:43
收件人: user-zh
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 Thread yujianbo
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Application Mode job on K8S集群,无法缩容问题

2020-12-21 Thread lichunguang
Yang Wang你好:
我想表达的意思是:
  Native Flink on K8s采用单Pod的申请资源的方式,和K8s自动伸缩机制有些冲突。

原因:
比如job比较多时,各node负载都比较高;而剩余job比较少时,每个node只有少量pod,但因为【Pods that are not backed
by a controller object】,没法驱逐资源利用率最低的node,导致整体利用率较低。

What types of pods can prevent CA from removing a node?
https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/FAQ.md#what-types-of-pods-can-prevent-ca-from-removing-a-node



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sink never executes

2020-12-21 Thread Ben Beasley
First off I want to thank the folks in this email list for their help thus far.

I’m facing another strange issue where if I add a window to my stream, the sink 
no longer executes. However the sink executes without the windowing. I 
described my problem on 
stackoverflow
 so that the code is easy to read.

I wonder if anyone can help me once more, I believe the solution could be 
simple for someone familiar with the code. I believe I’ve followed the 
tutorials and articles on the flink website correctly.