Flink 的 大Hive 维度表

2022-09-21 文章 macia kk
Hi
  Flink 的 Hive 维度表是放在内从中,可以把这个放到State中吗,这样用 RocksDB 就能减小一下内存的使用量


Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-21 文章 yanfei lei
Hi,
Flink社区有一篇关于Credit-based Flow Control的blog post

,里面介绍了反压机制的原理和优劣势,希望有帮助。

Shammon FY  于2022年9月21日周三 11:43写道:

> Hi
> 我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
> 1. push模型
> 上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
> 2. pull模型
> 上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算
>
> 在flink里,上下游交互流程主要分为几个步骤
> 1. 上游计算任务所在的TM创建一个Netty Server
> 2. 下游计算任务启动时通过Netty Client跟上游创建连接
> 3. 下游计算任务向上游发送一个partition
> request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
> 4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
> a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
> b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
> c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据
>
> 通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型
>
> On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:
>
> > 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> > 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> > 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task
> 线程的性能瓶颈将导致整条链路的所有
> > task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
> >
> > Xuyang  于2022年9月9日周五 20:35写道:
> >
> > > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best!
> > > Xuyang
> > >
> > >
> > >
> > >
> > >
> > > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > > >各位大佬好
> > > >请教下,
> > > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> > >
> >
>


Re: native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 Yang Wang
你在Flink client端提交任务前设置一下HADOOP_CONF_DIR环境变量

然后再运行flink run-application命令

Best,
Yang

yanfei lei  于2022年9月22日周四 11:04写道:

> Hi Tino,
> 从org.apache.flink.core.fs.FileSystem.java
> <
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L361-L371
> >
>
> 来看,Flink直接将fs.default-scheme当作URI来解析,并没有解析相关xml配置的操作,看起来Flink目前是不支持HA架构的HDFS集群的。
>
> Best,
> Yanfei
>
> Xuyang  于2022年9月21日周三 23:28写道:
>
> > Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
> > Caused by: java.lang.IllegalArgumentException:
> > java.net.UnknownHostException: datacluster
> > 我猜测是不是可以修改为以下两种之一:
> > 1. hdfs://datacluster: port (类似hdfs://datacluster:8080)
> >
> > 2.  hdfs:///datacluster (三个斜杠)
> >
> >
> >
> >
> > 希望可以帮到你
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > 在 2022-09-21 18:24:46,"Tino Hean"  写道:
> > >*大家好, *
> > >*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
> > >./bin/flink run-application \
> > >--detached \
> > >--target kubernetes-application \
> > >-Dkubernetes.cluster-id=test \
> > >-Dkubernetes.container.image=flink-java11 \
> > >-Dfs.default-scheme=hdfs://datacluster \
> > >-Dkubernetes.rest-service.exposed.type=LoadBalancer \
> > >
> >
> >
> >-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> > >\
> > >-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
> > >-Dkubernetes.namespace=flink \
> > >-Dkubernetes.service-account=flink-sa \
> > >-Denv.hadoop.conf.dir=/opt/flink/conf \
> > >-Dkubernetes.container.image.pull-policy=Always \
> > >local:///opt/flink/usrlib/test.jar
> > >
> > >*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
> > >flink@e3187a41a139:~$ ls conf
> > >core-site.xml hdfs-site.xml log4j-console.properties
> > >log4j-session.properties logback-session.xml masters zoo.cfg
> > >flink-conf.yaml log4j-cli.properties log4j.properties
> logback-console.xml
> > >logback.xml workers
> > >
> > >*但是遇到了下面的报错:*
> > >
> > >2022-09-21 10:17:40,156 ERROR
> > >org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could
> > not
> > >start cluster entrypoint KubernetesApplicationClusterEntrypoint.
> > >org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to
> > >initialize the cluster entrypoint
> KubernetesApplicationClusterEntrypoint.
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
> > >[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
> > >[flink-dist-1.15.2.jar:1.15.2]
> > >Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> > >services from the instantiated HighAvailabilityServicesFactory
> >
> >org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> > >at
> >
> >
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at java.security.AccessController.doPrivileged(Native Method)
> > ~[?:?]
> > >at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
> > >at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> >
> >
> >~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
> > >at
> >
> >
> >org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >at
> >
> >
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
> > >~[flink-dist-1.15.2.jar:1.15.2]
> > >... 2 more
> > >Caused by: java.io.IOException: Could not create FileSystem for highly
> > >available 

Re: native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 yanfei lei
Hi Tino,
从org.apache.flink.core.fs.FileSystem.java

来看,Flink直接将fs.default-scheme当作URI来解析,并没有解析相关xml配置的操作,看起来Flink目前是不支持HA架构的HDFS集群的。

Best,
Yanfei

Xuyang  于2022年9月21日周三 23:28写道:

> Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: datacluster
> 我猜测是不是可以修改为以下两种之一:
> 1. hdfs://datacluster: port (类似hdfs://datacluster:8080)
>
> 2.  hdfs:///datacluster (三个斜杠)
>
>
>
>
> 希望可以帮到你
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-21 18:24:46,"Tino Hean"  写道:
> >*大家好, *
> >*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
> >./bin/flink run-application \
> >--detached \
> >--target kubernetes-application \
> >-Dkubernetes.cluster-id=test \
> >-Dkubernetes.container.image=flink-java11 \
> >-Dfs.default-scheme=hdfs://datacluster \
> >-Dkubernetes.rest-service.exposed.type=LoadBalancer \
> >
>
> >-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >\
> >-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
> >-Dkubernetes.namespace=flink \
> >-Dkubernetes.service-account=flink-sa \
> >-Denv.hadoop.conf.dir=/opt/flink/conf \
> >-Dkubernetes.container.image.pull-policy=Always \
> >local:///opt/flink/usrlib/test.jar
> >
> >*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
> >flink@e3187a41a139:~$ ls conf
> >core-site.xml hdfs-site.xml log4j-console.properties
> >log4j-session.properties logback-session.xml masters zoo.cfg
> >flink-conf.yaml log4j-cli.properties log4j.properties logback-console.xml
> >logback.xml workers
> >
> >*但是遇到了下面的报错:*
> >
> >2022-09-21 10:17:40,156 ERROR
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could
> not
> >start cluster entrypoint KubernetesApplicationClusterEntrypoint.
> >org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> >initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
> >[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
> >[flink-dist-1.15.2.jar:1.15.2]
> >Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> >services from the instantiated HighAvailabilityServicesFactory
> >org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> >at
>
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at java.security.AccessController.doPrivileged(Native Method)
> ~[?:?]
> >at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
> >at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>
> >~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
> >at
>
> >org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >... 2 more
> >Caused by: java.io.IOException: Could not create FileSystem for highly
> >available storage path (hdfs://datacluster/flink/recovery/cruiser)
> >at
>
> >org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> 

Re: flink sql中如何对于hive进行hive的参数set

2022-09-21 文章 yuxia
目前还不支持 set HiveConf 里面的参数。
只能在创建 HiveCatalog 用的那个 hive-site.xml 里面改。

快发布的 Flink 1.16 就可以支持  set HiveConf 里面的参数。

Best regards,
Yuxia

- 原始邮件 -
发件人: "junjie miao" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 9 月 22日 上午 10:19:06
主题: flink sql中如何对于hive进行hive的参数set

当使用flink sql insert hive parquet表时,运行报如下错误:
Caused by: org.apache.flink.util.SerializedThrowable: New Memory allocation 
1033596 bytes is smaller than the minimum allocation size of 1048576 bytes.
at 
org.apache.hive.shaded.parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:133)
at 
org.apache.hive.shaded.parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:87)
at 
org.apache.hive.shaded.parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:153)
at 
org.apache.hive.shaded.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:408)
at 
org.apache.hive.shaded.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:351)
at 
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:70)
at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:135)
at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:124)
at 
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
... 44 common frames omitted不知道flink sql中如何对于hive进行hive的参数set


Re: Flink+Hudi:java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2022-09-21 文章 Summer

我在hudi-flink-bundle模块的pom.xml中增加:

  com.google.common.
  ${flink.bundle.shade.prefix}com.google.common.

打包之后,并没有解决问题。



















 Replied Message 
From XuyangDate 9/20/2022 00:01To 
Subject Re:Flink+Hudi:java.lang.NoSuchMethodError: 
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
Hi, 
看起来像是这几个项目中的版本并不适配,导致com.google.common.base.Preconditions这个类版本冲突导致的,可以尝试下将这个包在flink和hudi中shade一下试试













--

 Best!
 Xuyang





At 2022-09-14 09:27:45, "Summer"  wrote:

版本:Flink1.13.3、Hudi0.10.1、Hive3.1.2、Hadoop3.2.1


编译:Hudi:mvn clean package -DskipITs  -Dmaven.test.skip=true 
-Dhadoop.version=3.2.1  -Pflink-bundle-shade-hive3

Flink-SQL-HIVE:mvn clean install -Dfast -Dhadoop.version=3.2.1   -Dscala-2.11 
-DskipTests  -Dfast -T 4 -Dmaven.compile.fork=true -Dmaven.javadoc.skip=true 
-Dcheckstyle.skip=true



启动:./sql-client.sh embedded -j ../lib/hudi-flink-bundle_2.11-0.10.1-rc1.jar

Lib目录:


[root@rhy-t-bd-java lib]# ll
total 271504
-rw-r--r-- 1 root  root  92313 Oct 12  2021 flink-csv-1.13.3.jar
-rw-r--r-- 1 root  root  106535831 Oct 12  2021 flink-dist_2.12-1.13.3.jar
-rw-r--r-- 1 root  root 148127 Oct 12  2021 flink-json-1.13.3.jar
-rwxrwxrwx 1 root  root7709740 Jun  8  2021 
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 stack wheel  48845196 Sep 13 18:43 
flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar
-rw-r--r-- 1 root  root   35051553 Oct 12  2021 flink-table_2.12-1.13.3.jar
-rw-r--r-- 1 root  root   38613339 Oct 12  2021 
flink-table-blink_2.12-1.13.3.jar
-rw-r--r-- 1 root  root   38955252 Sep 13 17:20 
hudi-flink-bundle_2.11-0.10.1-rc1.jar
-rwxrwxrwx 1 root  root  67114 Mar 31  2021 log4j-1.2-api-2.12.1.jar
-rwxrwxrwx 1 root  root 276771 Mar 31  2021 log4j-api-2.12.1.jar
-rwxrwxrwx 1 root  root1674433 Mar 31  2021 log4j-core-2.12.1.jar
-rwxrwxrwx 1 root  root  23518 Mar 31  2021 log4j-slf4j-impl-2.12.1.jar


Flink-SQL:CREATE TABLE paat_hudi_flink_test(

id bigint ,

name string,

birthday TIMESTAMP(3),

tsTIMESTAMP(3),

partition VARCHAR(20),

primary key(id) not enforced -- the uuid primary key must be specified

)

PARTITIONED BY (partition)

with(

'connector'='hudi',

'path' = 'hdfs://emr-cluster/user/hive/hudi/warehouse/ods_hudi.hudi_flink_test/'

, 'hoodie.datasource.write.recordkey.field' = 'id'

, 'write.precombine.field' = 'ts'

, 'write.tasks' = '1'

, 'compaction.tasks' = '1'

, 'write.rate.limit' = '2000'

, 'table.type' = 'MERGE_ON_READ'

, 'compaction.async.enable' = 'true'

, 'compaction.trigger.strategy' = 'num_commits'

, 'compaction.max_memory' = '1024'

, 'changelog.enable' = 'true'

, 'read.streaming.enable' = 'true'

, 'read.streaming.check-interval' = '4'

, 'hive_sync.enable' = 'true'

, 'hive_sync.mode'= 'hms'

, 'hive_sync.metastore.uris' = 'thrift://:9083'
, 'hive_sync.jdbc_url' = 'jdbc:hive2://

, 'hive_sync.jdbc_url' = 'jdbc:hive2://:1'
, 'hive_sync.table' = 'hudi_flink_test'
, 'hive_sync.db' = 'ods_hudi'
, 'hive_sync.username' = '

, 'hive_sync.table' = 'hudi_flink_test'

, 'hive_sync.db' = 'ods_hudi'

, 'hive_sync.username' = ''
, 'hive_sync.password' = '

, 'hive_sync.password' = '*^'

, 'hive_sync.support_timestamp' = 'true'

);

Query: select * from paat_hudi_flink_test;




出现错误:2022-09-13 18:45:29,203 INFO  
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor [] - The 
RpcEndpoint jobmanager_2 failed.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start 
RpcEndpoint jobmanager_2.
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.actor.Actor.aroundReceive(Actor.scala:517) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
 at 

Re: Flink-1.15 HiveDynamicTableFactory 取Catalog使用

2022-09-21 文章 yuxia
目前还不能直接 把HiveDynamicTableFactory直接作为一个connector使用。
把 HiveDynamicTableFactory 改造一下,
factoryIdentifier 不能再抛出  UnsupportedOperationException("Hive factory is only 
work for catalog.") 异常了,
改造工作量不大。

Best regards,
Yuxia

- 原始邮件 -
发件人: "yanggang_it_job" 
收件人: "user-zh" 
发送时间: 星期一, 2022年 9 月 19日 下午 5:04:00
主题: Flink-1.15 HiveDynamicTableFactory 取Catalog使用

当前使用HiveDynamicTableFactory需要先声明HiveCatalog才可以使用。
请问能否把HiveDynamicTableFactory直接作为一个connector使用,或者说需要怎么调整才可以直接使用?


Re:Flink-1.15 HiveDynamicTableFactory 取Catalog使用

2022-09-21 文章 Xuyang
Hi,你可以看下HiveDynamicTableFactory的实现,作为connector需要实现factoryIdentifier来表明在with参数中使用的‘connector’=‘xxx’,但这个类由于仅支持在hive
 catalog中使用,所以没有实现(还有一些其他的方法,如options可以透传with中的其他参数)。
如果你需要HiveDynamicTableFactory的功能,我感觉可以通过copy出一个新的connector类,参照其他正常connector的方式裁剪和实现一些必要的方法,然后mvn打包一下




--

Best!
Xuyang





在 2022-09-19 17:04:00,"yanggang_it_job"  写道:
>当前使用HiveDynamicTableFactory需要先声明HiveCatalog才可以使用。
>请问能否把HiveDynamicTableFactory直接作为一个connector使用,或者说需要怎么调整才可以直接使用?


Re:native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 Xuyang
Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: datacluster
我猜测是不是可以修改为以下两种之一:
1. hdfs://datacluster: port (类似hdfs://datacluster:8080)

2.  hdfs:///datacluster (三个斜杠)




希望可以帮到你

--

Best!
Xuyang





在 2022-09-21 18:24:46,"Tino Hean"  写道:
>*大家好, *
>*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
>./bin/flink run-application \
>--detached \
>--target kubernetes-application \
>-Dkubernetes.cluster-id=test \
>-Dkubernetes.container.image=flink-java11 \
>-Dfs.default-scheme=hdfs://datacluster \
>-Dkubernetes.rest-service.exposed.type=LoadBalancer \
>
>-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>\
>-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
>-Dkubernetes.namespace=flink \
>-Dkubernetes.service-account=flink-sa \
>-Denv.hadoop.conf.dir=/opt/flink/conf \
>-Dkubernetes.container.image.pull-policy=Always \
>local:///opt/flink/usrlib/test.jar
>
>*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
>flink@e3187a41a139:~$ ls conf
>core-site.xml hdfs-site.xml log4j-console.properties
>log4j-session.properties logback-session.xml masters zoo.cfg
>flink-conf.yaml log4j-cli.properties log4j.properties logback-console.xml
>logback.xml workers
>
>*但是遇到了下面的报错:*
>
>2022-09-21 10:17:40,156 ERROR
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
>start cluster entrypoint KubernetesApplicationClusterEntrypoint.
>org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
>[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
>[flink-dist-1.15.2.jar:1.15.2]
>Caused by: org.apache.flink.util.FlinkException: Could not create the ha
>services from the instantiated HighAvailabilityServicesFactory
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
>~[flink-dist-1.15.2.jar:1.15.2]
>at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
>at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
>at
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
>at
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
>~[flink-dist-1.15.2.jar:1.15.2]
>... 2 more
>Caused by: java.io.IOException: Could not create FileSystem for highly
>available storage path (hdfs://datacluster/flink/recovery/cruiser)
>at
>org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:53)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:284)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
>~[flink-dist-1.15.2.jar:1.15.2]
>at

Re:Re: 某作业计算算子处于busy状态

2022-09-21 文章 Xuyang
Hi, 可以尝试下使用Arthas+jmap的方式定位可能出现内存泄露的原因







--

Best!
Xuyang





在 2022-09-21 13:40:32,"杨扬"  写道:
>flink内存泄漏有什么排查的指标或者工具吗?
>比如大致定位泄漏的位置之类的。
>
>
>
>
>
>> 在 2022年9月19日,下午5:41,yidan zhao  写道:
>> 
>> 那你代码检查下有没有内存泄露呢。
>> 
>> 杨扬  于2022年9月19日周一 11:21写道:
>>> 
>>> 还有一个现象,观察到 
>>> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>>> 
>>> 
>>> 
>>> 
 在 2022年9月15日,下午8:58,yidan zhao  写道:
 
 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
 
 yidan zhao  于2022年9月15日周四 20:57写道:
> 
> 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
> 
> 杨扬  于2022年9月15日周四 20:02写道:
>> 
>> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
>> 
>> 
>> 
>> 
>>> 在 2022年9月15日,下午7:27,yidan zhao  写道:
>>> 
>>> busy那就提升并发度看看效果?
>>> 
>>> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
>>> 14:51写道:
>>> 各位好!
>>> 目前有一flink作业,大致分为3个阶段:
>>> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
>>> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>>> 
>>> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>>> 
>>> 
>>> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
>>> 
>>> 
>>> 
>>> ===
>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>> 
 
 ===
 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>>> 
>> 
>> === 
>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>


native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 Tino Hean
*大家好, *
*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
./bin/flink run-application \
--detached \
--target kubernetes-application \
-Dkubernetes.cluster-id=test \
-Dkubernetes.container.image=flink-java11 \
-Dfs.default-scheme=hdfs://datacluster \
-Dkubernetes.rest-service.exposed.type=LoadBalancer \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
-Dkubernetes.namespace=flink \
-Dkubernetes.service-account=flink-sa \
-Denv.hadoop.conf.dir=/opt/flink/conf \
-Dkubernetes.container.image.pull-policy=Always \
local:///opt/flink/usrlib/test.jar

*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
flink@e3187a41a139:~$ ls conf
core-site.xml hdfs-site.xml log4j-console.properties
log4j-session.properties logback-session.xml masters zoo.cfg
flink-conf.yaml log4j-cli.properties log4j.properties logback-console.xml
logback.xml workers

*但是遇到了下面的报错:*

2022-09-21 10:17:40,156 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
start cluster entrypoint KubernetesApplicationClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
[flink-dist-1.15.2.jar:1.15.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha
services from the instantiated HighAvailabilityServicesFactory
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
~[flink-dist-1.15.2.jar:1.15.2]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
~[flink-dist-1.15.2.jar:1.15.2]
... 2 more
Caused by: java.io.IOException: Could not create FileSystem for highly
available storage path (hdfs://datacluster/flink/recovery/cruiser)
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:53)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:284)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
~[flink-dist-1.15.2.jar:1.15.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
~[flink-dist-1.15.2.jar:1.15.2]
at