flink hudi写oss文件失败报No FileSystem for scheme "oss"

2021-06-18 文章 casel.chen
hadoop 2.9.2, flink 1.12.2, hudi 0.9.0-SNAPSHOT
core-site.xml里面配置了oss相关信息,本地启flink cluster,执行flink sql client创建表,写数据和查询都没问题。


改成在项目中flink sql作业,打包成fat jar以local方式运行,项目中引用了 flink-oss-fs-hadoop,但程序报了如下错误
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem 
for scheme "oss"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:97)
... 41 more
查看了dependency tree是有flink-oss-fs-hadoop jar包的,并且解析fat jar也能看到 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
 这个shaded类
但是使用arthas查找运行时的类却没有找到这个类。


以fat jar运行flink sql作业的启动命令如下:
/Users/admin/.sdkman/candidates/java/current/bin/java -cp 
/Users/admin/gitrepo/rtdp/streamsql/dist/lib/streamsql-submit-jar-with-dependencies.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-shaded-zookeeper-3.4.14.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-oss-fs-hadoop-1.12.2.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-csv-1.12.2.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-table_2.11-1.12.2.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-json-1.12.2.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:/Users/admin/dev/hudi-demo/flink-1.12.2/lib/flink-table-blink_2.11-1.12.2.jar
 com.huifu.streamsql.launcher.SubmitJobMain -command start -jobPath 
/Users/admin/gitrepo/rtdp/streamsql/dist/jobs/test/hudi/1-hudi-insert


使用arthas查询类


[INFO] arthas home: /Users/admin/.arthas/lib/3.5.1/arthas
[INFO] Try to attach process 25267
[INFO] Attach process 25267 success.
[INFO] arthas-client connect 127.0.0.1 3658
  ,---.  ,--. ,.,--.  ,--.  ,---.   ,---.   
 /  O  \ |  .--. ''--.  .--'|  '--'  | /  O  \ '   .-'  
|  .-.  ||  '--'.'   |  |   |  .--.  ||  .-.  |`.  `-.  
|  | |  ||  |\  \|  |   |  |  |  ||  | |  |.-'| 
`--' `--'`--' '--'   `--'   `--'  `--'`--' `--'`-'  



wiki   https://arthas.aliyun.com/doc
tutorials  https://arthas.aliyun.com/doc/arthas-tutorials.html  
version3.5.1
main_class com.huifu.streamsql.launcher.SubmitJobMain   
pid25267
time   2021-06-19 11:57:22  


[arthas@25267]$ sc org.apache.flink.fs.*
Affect(row-cnt:0) cost in 19 ms.
[arthas@25267]$ sc org.apache.hadoop.fs.*
Affect(row-cnt:0) cost in 5 ms.


另有一个疑问:flink hudi写oss一定要依赖hadoop home吗?需要安装hadoop目录?可不可以只将core-site.xml打包进fat 
jar运行呢?

Flink1.12的RocksDB占用的内存大于Flink1.9

2021-06-18 文章 Haihang Jing
请教个问题,同样的业务逻辑,同样的资源配置,作业在Flink1.9和Flink1.12的内存使用相差很大,使用jemalloc分析
发现主要是rocksdb的UncompressBlockContentsForCompressionType方法占用的内存较多
,运行相同的时间,该方法在Flink1.9 占用内存200MB,在Flink1.12占用内存约4G,大家有遇到过这个现象么?
 
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 提交job后 task 一直是 schedule 状态

2021-06-18 文章 Lei Wang
flink-conf.yaml 中加入下面的配置就可以了,但我不知道为什么。

taskmanager.host: localhost



On Fri, Jun 18, 2021 at 1:43 PM Lei Wang  wrote:

> flink-1.11.2
> ./bin/start-cluster.sh 启动然后
> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
> localhost --port 
>
> 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
>
> 2021-06-18 13:34:26,683 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
> 28577f2) switched from SCHEDULED to FAILED on not deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure tha
> t the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.11-1.11.2.jar:1
> .11.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2
>
> 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。
>
> 有大神帮助解释一下吗?
>
> 谢谢,
> 王磊
>


Re: Flink 提交到yarn失败

2021-06-18 文章 yangpengyi
设置了,从堆栈来看根源是调用client的initialze方法有问题,这里应该是获取到了hadoop classpath中的正确版本的客户端
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:167)~[hadoop-hdfs-client-3.0.0-cdh6.1.1.jar:?]



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 提交到yarn失败

2021-06-18 文章 Zhiwen Sun
HADOOP_CLASSPATH 设置了吗?

Zhiwen Sun



On Fri, Jun 18, 2021 at 9:47 AM yangpengyi <963087...@qq.com.invalid> wrote:

> 环境: FLINK 1.12 & CDH6.1.1
> 问题:
>
> 利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用到了正确的客户端jar包。
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath.
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:117)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:309)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:272)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:212)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:173)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_181]
> at javax.security.auth.Subject.doAs(Subject.java:422)
> ~[?:1.8.0_181]
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> ~[cloud-flinkAppCrashAnalysis-1.0.0-encodetest-RELEASE.jar:?]
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:172)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> ... 2 more
> Caused by: java.lang.VerifyError: Bad return type
> Exception Details:
>   Location:
>
>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> @157: areturn
>   Reason:
> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is
> not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature)
>   Current Frame:
> bci: @157
> flags: { }
> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>   Bytecode:
> 0x000: 2ab6 00b5 2a13 01f4 2bb6 00b7 4d01 4e2a
> 0x010: b400 422b b901 f502 003a 042c c600 1d2d
> 0x020: c600 152c b600 b9a7 0012 3a05 2d19 05b6
> 0x030: 00bb a700 072c b600 b919 04b0 3a04 1904
> 0x040: 4e19 04bf 3a06 2cc6 001d 2dc6 0015 2cb6
> 0x050: 00b9 a700 123a 072d 1907 b600 bba7 0007
> 0x060: 2cb6 00b9 1906 bf4d 2c07 bd00 d459 0312
> 0x070: d653 5904 12e0 5359 0512 e153 5906 1301
> 0x080: f653 b600 d74e 2dc1 01f6 9900 14b2 0023
> 0x090: 1301 f7b9 002b 0200 2a2b b601 f8b0 2dbf
> 0x0a0:
>   Exception Handler Table:
> bci [35, 39] => handler: 42
> bci [15, 27] => handler: 60
> bci [15, 27] => handler: 68
> bci [78, 82] => handler: 85
> bci [60, 70] => handler: 68
> bci [4, 57] => handler: 103
> bci [60, 103] => handler: 103
>   Stackmap Table:
>
>
> full_frame(@42,{Object[#751],Object[#774],Object[#829],Object[#799],Object[#1221]},{Object[#799]})
> same_frame(@53)
> same_frame(@57)
>
>
> full_frame(@60,{Object[#751],Object[#774],Object[#829],Object[#799]},{Object[#799]})
> same_locals_1_stack_item_frame(@68,Object[#799])
>
>
> full_frame(@85,{Object[#751],Object[#774],Object[#829],Object[#799],Top,Top,Object[#799]},{Object[#799]})
> same_frame(@96)
> same_frame(@100)
> full_frame(@103,{Object[#751],Object[#774]},{Object[#854]})
> append_frame(@158,Object[#854],Object[#814])
>
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:167)
> ~[hadoop-hdfs-client-3.0.0-cdh6.1.1.jar:?]
> at
>
> 

Re: flink写es和hbase反压

2021-06-18 文章 yidan zhao
注意需要结合flink的异步算子哈,不是随意直接改造成异步请求。

田磊  于2021年6月18日周五 下午2:33写道:
>
> 好的,我试试,谢谢解答!
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年06月18日 14:16,yidan zhao 写道:
> 推荐使用批量+异步方式写。
>
> 田磊  于2021年6月18日周五 下午1:12写道:
>
> >
> > 现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
> >
> >
> >
> > *totorobabyfans*邮箱:totorobabyf...@163.com
> >
> > 签名由
> > 网易邮箱大师  >


回复:flink写es和hbase反压

2021-06-18 文章 田磊
好的,我试试,谢谢解答!


| |
totorobabyfans
|
|
邮箱:totorobabyf...@163.com
|

签名由 网易邮箱大师 定制

在2021年06月18日 14:16,yidan zhao 写道:
推荐使用批量+异步方式写。

田磊  于2021年6月18日周五 下午1:12写道:

>
> 现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
>
>
>
> *totorobabyfans*邮箱:totorobabyf...@163.com
>
> 签名由
> 网易邮箱大师 


standalone K8S 如何查看 TaskMananger 的 gc.log ?

2021-06-18 文章 WeiXubin
请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置
TaskMananger 的 *gc.log* 日志? 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭

2021-06-18 文章 yidan zhao
感觉应该是水印时间没到的原因,和session window本身关系不大。

raofang <295070...@qq.com.invalid> 于2021年6月18日周五 下午12:54写道:
>
> hi,请教大家一个问题:
> flink1.12.2 sql BlinkPlanner
> 使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
>  谢谢~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink写es和hbase反压

2021-06-18 文章 yidan zhao
推荐使用批量+异步方式写。

田磊  于2021年6月18日周五 下午1:12写道:

>
> 现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
>
>
>
> *totorobabyfans*邮箱:totorobabyf...@163.com
>
> 签名由
> 网易邮箱大师  定制
>


Re: flink 提交job后 task 一直是 schedule 状态

2021-06-18 文章 yidan zhao
mark。遇到过,但不清楚啥问题也。

Lei Wang  于2021年6月18日周五 下午1:43写道:
>
> flink-1.11.2
> ./bin/start-cluster.sh 启动然后
> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
> localhost --port 
>
> 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
>
> 2021-06-18 13:34:26,683 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
> 28577f2) switched from SCHEDULED to FAILED on not deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure tha
> t the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.11-1.11.2.jar:1
> .11.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2
>
> 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。
>
> 有大神帮助解释一下吗?
>
> 谢谢,
> 王磊


Re: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭

2021-06-18 文章 lpengdr...@163.com
你用EventTimeSession窗口的超时也是按照你的事件时间来判断的,要有超过超时时间边界的数据输入了才能触发



lpengdr...@163.com
 
发件人: raofang
发送时间: 2021-06-18 12:20
收件人: user-zh
主题: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭
hi,请教大家一个问题:
flink1.12.2 sql BlinkPlanner
使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
 谢谢~
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/