Re: 关于非keyedstream使用定时器问题

2021-02-08 文章 yidan zhao
当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。

yidan zhao  于2021年2月9日周二 下午3:22写道:

> 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
> 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
> 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
>
> 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都
> yidan zhao  于2021年2月9日周二 下午3:04写道:
>
>> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>>
>> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
>> 同时不希望使用keyedStream,因为会导致数据不均衡。
>>
>> 除了引入随机key外还有什么方法吗。
>>
>


Re: 关于非keyedstream使用定时器问题

2021-02-08 文章 yidan zhao
引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 于2021年2月9日周二 下午3:04写道:

> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>
> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
> 同时不希望使用keyedStream,因为会导致数据不均衡。
>
> 除了引入随机key外还有什么方法吗。
>


关于非keyedstream使用定时器问题

2021-02-08 文章 yidan zhao
如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?

目前我实现一个sink,带超时希望用到timerservice。但是不支持。
同时不希望使用keyedStream,因为会导致数据不均衡。

除了引入随机key外还有什么方法吗。


Re: fink on yarn per job container 被杀

2021-02-08 文章 key lou
谢谢。看了相关文章 和邮件列表类似的问题,中心思路都是调大堆外内存。 还是有几个疑问
1、在 flink 1.10 中 在state 不断增长的情况下 是否没办法控制 rocksdb 内存的增长? 导致 有container 被
kill 的风险。rocksdb 没有当内存不足时就clear 内存刷磁盘的动作?
 2、当使用 rocksdbStateBackend 时 如果配置的是 hdfs 路径。rocksdb 是否还会有本地文件生成。在 tm
节点上一直没有找到相关文件。


zhiyezou <1530130...@qq.com> 于2021年2月7日周日 上午9:41写道:

> Hi
> 实在不好意思,没事先检查链接。
> https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&idx=1&sn=b0893a9bf12fbcae76852a156302de95
> 可以先看下是否正确配置了state ttl,如果配置了还出现此类问题那应该还是managed memory这块的问题,相关配置参考链接文档
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> louke...@gmail.com>;
> 发送时间: 2021年2月5日(星期五) 下午2:03
> 收件人: "user-zh"
> 主题: Re: fink on yarn per job container 被杀
>
>
>
> 谢谢 回答.
>     是指的这个参数 taskmanager.memory.jvm-overhead 调整吗(微信连接有点问题)?
> 我看邮件列表很多大佬的回答基本上都是要调大堆外内存。
> 难道 rocksdb 就申请内存的时候就控制不住上限吗?我的state 是会一直增长,但是我希望rocksDB 内存能在我设置的范围内,不至于被
> yarn  kill . 这块要能实现吗?
>
> zhiyezou <1530130...@qq.com> 于2021年2月5日周五 下午1:25写道:
>
> > Hi
> >    可以先jvm-overhead相关配置,具体原理及参数请参考这篇文章,
> >
> https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&;idx=1&sn=b0893a9bf12fbcae76852a156302de95
> >
> >
> ;
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >  
> "user-zh"
> >
> <
> > louke...@gmail.com>;
> > 发送时间: 2021年2月4日(星期四) 下午5:46
> > 收件人: "user-zh" >
> > 主题: fink on yarn per job container 被杀
> >
> >
> >
> > 各位大佬好
> >  rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中
> > taskmanager.memory.managed.size  限制不住 rocksDB 内存申请?改如何控制上线?
> > java.lang.Exception: Container
> > [pid=137231,containerID=container_e118_1611713951789_92045_01_03]
> > is running beyond physical memory limits. Current usage: 4.1 GB of 4
> GB
> > physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing
> > container.
> > Dump of the process-tree for
> container_e118_1611713951789_92045_01_03 :
> >     |- PID PPID PGRPID SESSID CMD_NAME
> > USER_MODE_TIME(MILLIS) SYSTEM_TIME(
> > MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >     |- 137259 137231 137231 137231 (java)
> 3935 488
> > 8764928000 1086082
> > /app/jdk/bin/java -Xmx2029372037 -Xms2029372037
> -XX:MaxDirectMemorySize=
> > 493921243 -XX:MaxMetaspaceSize=268435456
> -XX:+HeapDumpOnOutOfMemoryError -
> >
> >
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
> > -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
> > YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
> > 134217728b -D taskmanager.memory.network.max=359703515b -D
> > taskmanager.memory.network.min=359703515b -D
> > taskmanager.memory.framework.heap.size=134217728b -D
> > taskmanager.memory.managed.size=1073741824b -D
> taskmanager.cpu.cores=1.0 -D
> > taskmanager.memory.task.heap.size=1895154309b -D
> > taskmanager.memory.task.off-heap.size=0b --configDir .
> > -Djobmanager.rpc.address=cnsz22pl377
> > -Dweb.port=0
> > -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc
> > -Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377
> >     |- 137231 137229 137231 137231 (bash)
> 0 0 115855360 356
> > /bin/bash -c
> > /app/jdk/bin/java -Xmx2029372037 -Xms2029372037
> -XX:MaxDirectMemorySize=
> > 493921243 -XX:MaxMetaspaceSize=268435456
> -XX:+HeapDumpOnOutOfMemoryError -
> >
> >
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
> > -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
> > YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
> > 134217728b -D taskmanager.memory.network.max=359703515b -D
> > taskmanager.memory.network.min=359703515b -D
> > taskmanager.memory.framework.heap.size=134217728b -D
> > taskmanager.memory.managed.size=1073741824b -D
> taskmanager.cpu.cores=1.0 -D
> > taskmanager.memory.task.heap.size=1895154309b -D
> > taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager
> > .rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir=
> > '/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc'
> > -Djobmanager.rpc.port=
> > '18918' -Drest.address='CNSZ22PL377' 1>
> >
> >
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out
> > 2>
> >
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err
> >
> >
> > *state.backend.rocksdb.memory.fixed-per-slot* 1024M
> > *state.backend.rocksdb.memory.managed* true
> > *taskmanager.memory

Re: Flink standalone on k8s HA异常

2021-02-08 文章 Yang Wang
启用HA以后,你需要创建一个有create/watch ConfigMap的权限的service account
然后挂载给JobManager和TaskManager
从你的报错看应该是没有配置service account

Best,
Yang


casel.chen  于2021年2月9日周二 上午12:10写道:

> 我试着答k8s上部署flink
> standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?
>
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery
>
>
> 2021-02-09 00:03:04,421 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
> start cluster entrypoint StandaloneSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> services from the instantiated HighAvailabilityServicesFactory
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> ... 2 more
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:84)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory.fromConfiguration(DefaultKubeClientFactory.java:88)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:38)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 macia kk
SELECT *FROM
(
SELECT  tt.*
FROM
input_tabe_01 tt
FULL OUTER JOIN input_tabe_02 mt
ON (mt.transaction_sn = tt.reference_id)
and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
) lt
LEFT JOIN exchange_rate ex
/*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'all') */
FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
(lt.event_time, '-MM-dd') = cast(ex.date_id as String)


Rui Li  于2021年2月9日周二 上午10:20写道:

> Hi,
>
> 那join的语句是怎么写的呢?
>
> On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:
>
> > 图就是哪个报错
> >
> > 建表语句如下,表示公共表,我也没有改的权限.
> >
> > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> > 'country', `currency` string COMMENT 'currency', `exchange_rate`
> > decimal(25,10) COMMENT 'exchange rate')
> > PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'
> > WITH SERDEPROPERTIES (
> >   'serialization.format' = '1'
> > )
> >
> >
> > Rui Li  于2021年2月8日周一 下午2:17写道:
> >
> > > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> > >
> > > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> > >
> > > > Currently the join key in Temporal Table Join can not be empty.
> > > >
> > > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > > >
> > > > [image: image.png]
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 Rui Li
Hi,

那join的语句是怎么写的呢?

On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:

> 图就是哪个报错
>
> 建表语句如下,表示公共表,我也没有改的权限.
>
> CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> 'country', `currency` string COMMENT 'currency', `exchange_rate`
> decimal(25,10) COMMENT 'exchange rate')
> PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
>
>
> Rui Li  于2021年2月8日周一 下午2:17写道:
>
> > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> >
> > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> >
> > > Currently the join key in Temporal Table Join can not be empty.
> > >
> > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > >
> > > [image: image.png]
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 文章 Dian Fu
这个问题应该有人问过,你搜搜看。

另外,如果GC频繁的话,把内存调大看看~


> 在 2021年2月8日,下午5:14,陈康 <844256...@qq.com> 写道:
> 
> 感谢回复...切换了版本...运行报错如下图 。。
> [hadoop@hadoop01 bin]$ pip list | grep flink
> apache-flink   1.11.1
> 
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
> with id 397c590a9c19b173a83a4476f8eeaca0 timed out.
>   ... 26 more
> 
>   观察到TM
> 年轻代gc频繁..
> ==
> [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
> localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f
> Traceback (most recent call last):
>  File "udf.py", line 63, in 
>t_env.execute("job")
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1057, in execute
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: a27f139f01ef951d832cfa8382523e4f)
>   at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: a27f139f01ef951d832cfa8382523e4f)
>   at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
>   at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
>   at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
>   at
> java.util.co

Flink standalone on k8s HA异常

2021-02-08 文章 casel.chen
我试着答k8s上部署flink 
standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?


high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery


2021-02-09 00:03:04,421 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
start cluster entrypoint StandaloneSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
 [flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
 [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 2 more
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:84)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory.fromConfiguration(DefaultKubeClientFactory.java:88)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:38)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 2 more

??????flink????join??????????????????

2021-02-08 文章 Mailbox service
??--  --
??: "lxk7...@163.com"

flink双流join如何确保数据不丢失

2021-02-08 文章 lxk7...@163.com

目前在使用flink进行双流join,多是使用interval join,根据经验值给定时间间隔,那么该如何保证数据不丢失呢?
如果数据晚于这个时间间隔,那么数据就被丢弃了,而我做的是关于订单的数据,这是不被允许的。


lxk7...@163.com


回复: flink升级hadoop3

2021-02-08 文章 lxk7...@163.com

不知道你的问题具体是指什么意思。
如果是升级hadoop的话,直接将flink下的配置文件中关于hadoop的jar包改成hadoop3的就行了


lxk7...@163.com
 
发件人: kandy.wang
发送时间: 2021-02-07 10:27
收件人: user-zh
主题: flink升级hadoop3
flink 如何升级hadoop3 ? 


Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 文章 陈康
感谢回复...切换了版本...运行报错如下图 。。
[hadoop@hadoop01 bin]$ pip list | grep flink
apache-flink   1.11.1

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id 397c590a9c19b173a83a4476f8eeaca0 timed out.
... 26 more

  观察到TM
年轻代gc频繁..
==
[hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f
Traceback (most recent call last):
  File "udf.py", line 63, in 
t_env.execute("job")
  File
"/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1057, in execute
  File
"/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: a27f139f01ef951d832cfa8382523e4f)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: a27f139f01ef951d832cfa8382523e4f)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.post

???? Yarn application state ?? Flink Job status ????????????

2021-02-08 文章 ??????

     Yarn App  
NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED ??
     Flink Job  
CREATED,RUNNING,FAILING,FAILED,CANCELLING,CANCELED,FINISHED,RESTARTING,SUSPENDED,RECONCILING
 


    ??  Flink Job?? 
Yarn app ?? RUNNING


    ??1??Flink??Yarn Rest API Yarn 
app state ?? Flink Rest API Job ?? status





Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 文章 Dian Fu
看起来似乎是因为Flink集群的版本和PyFlink的版本不一致导致的:集群装的Flink是1.11.1,PyFlink是1.12.0?

先把版本都统一一下,再试试。

> 在 2021年2月8日,上午10:28,陈康 <844256...@qq.com> 写道:
> 
> 请教大佬们: 一个最简单pyflink UDF跑起来,报 Failed to create stage bundle factory!
> INFO:root:Initializing python harness: 在IdeaIJ上可以运行、大家有遇到过吗?谢谢~
> 
> /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec
> /opt/python36/bin/python3  -py udf.py
> 
> [hadoop@hadoop01 pyflink]$ /opt/python36/bin/python3  -V
> Python 3.6.5
> [hadoop@hadoop01 pyflink]$ pip list | grep flink
> apache-flink   1.12.0
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka,
> Json
> from pyflink.table.udf import udf, TableFunction, ScalarFunction
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
> 
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
> DataTypes.BIGINT())
> t_env.register_function("add", add)
> 
> t_env.sql_update("""
>   CREATE TABLE mySource (   
>   a bigint,
>   b bigint
>   ) WITH ( 
>   'connector' = 'kafka',
>   'topic' = 'udf',
>   'properties.bootstrap.servers' = 'hadoop01:9092',
>   'properties.group.id' = 'pyflinkUDF',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
>   ) 
> """)
> t_env.sql_update("""
>   CREATE TABLE mySink (   
>   a bigint,
>   b bigint
>   ) WITH ( 
>   'connector' = 'print'   
>   ) 
> """)
> t_env.sql_update("insert into mySink select a, add(a,b) from mySource")
> t_env.execute("job")
> 
> 
> 
> 
> [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
> localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> Job has been submitted with JobID 67fc6d3f4f3f97339345202f4de53366
> Traceback (most recent call last):
>  File "udf.py", line 63, in 
>t_env.execute("job")
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1057, in execute
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 67fc6d3f4f3f97339345202f4de53366)
>   at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(Metho