Re: Re: flink on yarn启动失败

2020-12-23 文章 magichuang
感谢感谢感谢!!!

原来是这样,以为solt 缩写就是-s了,,,感谢这位朋友的解答,已经可以提交了~


> -- 原始邮件 --
> 发 件 人:"Yang Wang" 
> 发送时间:2020-12-24 11:01:46
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink on yarn启动失败
>
> 你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 应该是-ys,而不是-s
> -s是从savepoints恢复,所以报错里面会有找不到savepoints目录
>
>
> Best,
> Yang
>
> magichuang 于2020年12月23日周三 下午8:29写道:
>
> > 机器参数:三台 32C64G centos 7.8,cdh集群在这上面先部署
> > flink版本:1.11.2,在三台集群上搭建的集群
> >
> > hadoop集群是用cdh搭建的
> >
> >
> > 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> > traffic.py
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
> >
> >
> >
> >
> > 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
> >
> > 测试官方例子 flink run -m yarn-cluster examples/batch/WordCount.jar
> > 是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
> >
> >
> >
> >
> > 下面是主要报错信息
> >
> > Caused by: java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.client.JobExecutionException: Could not
> > instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> > not instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> > savepoint file/directory '2' on file system 'file'.
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at 

Re: Flink catalog+hive问题

2020-12-23 文章 19916726683
可以参考下这个
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html
贴的代码是org.apache.hadoop.hive.io.HdfsUtils 的setFullFileStatus 方法
Original Message
Sender:Rui lilirui.fu...@gmail.com
Recipient:user-zhuser...@flink.apache.org
Date:Thursday, Dec 24, 2020 11:33
Subject:Re: Flink catalog+hive问题


Hello, 
你贴的图看不到了。可以贴一下参考的官网链接。hive至少支持三种不同的authorization模式,flink目前对接hive时只有用storage 
based authorization会生效。 On Thu, Dec 24, 2020 at 10:51 AM 19916726683 
19916726...@163.com wrote:  hive的官网有介绍ACL,如何继承权限关系。源码在Hive- HDFSUtils类中 
核心代码应该是上面的这点。   Original Message  *Sender:* Rui lilirui.fu...@gmail.com  
*Recipient:* user-zhuser...@flink.apache.org  *Date:* Wednesday, Dec 23, 2020 
19:41  *Subject:* Re: Flink catalog+hive问题   
hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效   
[1]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
   On Wed, Dec 23, 2020 at 4:34 PM 19916726683 19916726...@163.com wrote:
spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式   Original Message   
Sender:guaishushu1103@163.comguaishushu1...@163.com   
Recipient:user-zhuser...@flink.apache.org   Date:Wednesday, Dec 23, 2020 15:53  
 Subject:Flink catalog+hive问题   在用flink   
catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?   
guaishushu1...@163.com --  Best regards!  Rui Li   -- Best regards! Rui Li

pyflink1.12 使用connector read.query参数报错

2020-12-23 文章 肖越
使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
source_ddl = """
CREATE TABLE source_table(
yldrate DECIMAL,
pf_id VARCHAR,
symbol_id VARCHAR) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip/db',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'TS_PF_SEC_YLDRATE'
'read.query' = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM 
TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE = 
'AC' AND PF_ID = '1030100122' AND SYMBOL_ID = '2030004042' AND BIZ_DATE between 
'20160701' AND '20170307'"
)
"""
报错信息:
File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 766, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o6.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"=" at line 12, column 30.
Was expecting one of:
"UESCAPE" ...
 ...
")" ...
"," ...

提示期待的语法信息,没有看懂,为什么不能出现“=” ?希望路过的大佬,能够指导一下~~谢谢!

flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler

2020-12-23 文章 bigdata
flink1.10.1?? 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Unable to instantiate java compiler
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at 
org.apache.flink.client.cli.CliFrontend$$Lambda$38/1165303897.call(Unknown 
Source)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/887750041.run(Unknown
 Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider$$Lambda$225/1113414982.apply(Unknown
 Source)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$FilterReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:175)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at 

proctime in yaml ,sql-cli start throws exception

2020-12-23 文章 su_...@cjspd.com
 Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.Exception 
in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected 
exception. This is a bug. Please consider filing an issue. at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)Caused by: 
org.apache.flink.table.client.gateway.SqlExecutionException: Could not create 
execution context. at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)Caused by: 
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' inthe classpath.
Reason: No factory supports all properties.
The matching 
candidates:org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactoryUnsupported
 property keys:schema.#.proctime




in sql-cli use proctime() is ok ,but is the yaml config file ,start the sql 
client throws exception 
schema:
name: id
data-type: VARCHAR
name: relation_id
data-type: VARCHAR
name: send_to
data-type: VARCHAR
name: status
data-type: VARCHAR
name: create_by
data-type: VARCHAR
name: send_to_name
data-type: VARCHAR
name: proc_time
data-type: TIMESTAMP(3)
proctime: true 


Re: Flink catalog+hive问题

2020-12-23 文章 Rui Li
Hello,

你贴的图看不到了。可以贴一下参考的官网链接。hive至少支持三种不同的authorization模式,flink目前对接hive时只有用storage
based authorization会生效。

On Thu, Dec 24, 2020 at 10:51 AM 19916726683 <19916726...@163.com> wrote:

> hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。
>
>  Original Message
> *Sender:* Rui Li
> *Recipient:* user-zh
> *Date:* Wednesday, Dec 23, 2020 19:41
> *Subject:* Re: Flink catalog+hive问题
>
> hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效
>
> [1]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
>
> On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote:
>
> > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式
> >
> >
> > Original Message
> > Sender:guaishushu1103@163.comguaishushu1...@163.com
> > Recipient:user-zhuser...@flink.apache.org
> > Date:Wednesday, Dec 23, 2020 15:53
> > Subject:Flink catalog+hive问题
> >
> >
> > 在用flink
> > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?
> > guaishushu1...@163.com
>
>
>
> --
> Best regards!
> Rui Li
>
>

-- 
Best regards!
Rui Li


Re: Flink catalog+hive问题

2020-12-23 文章 r pp
gmail  可能有些不兼容,看不到截图

19916726683 <19916726...@163.com> 于2020年12月24日周四 上午10:51写道:

> hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。
>
>  Original Message
> *Sender:* Rui Li
> *Recipient:* user-zh
> *Date:* Wednesday, Dec 23, 2020 19:41
> *Subject:* Re: Flink catalog+hive问题
>
> hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效
>
> [1]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
>
> On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote:
>
> > spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式
> >
> >
> > Original Message
> > Sender:guaishushu1103@163.comguaishushu1...@163.com
> > Recipient:user-zhuser...@flink.apache.org
> > Date:Wednesday, Dec 23, 2020 15:53
> > Subject:Flink catalog+hive问题
> >
> >
> > 在用flink
> > catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?
> > guaishushu1...@163.com
>
>
>
> --
> Best regards!
> Rui Li
>
>


Re: pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 文章 r pp
表a 在 sql 语句的哪里呢?
关心的真的是过滤问题么? 如果你对你的业务十分熟悉,且了解到 flink1.11 不过 过滤,那为什么 不自行过滤 优化下呢?
如果,不是过滤问题,是大数 join 小数 问题,或者 大数 join 大数问题,是不是可以考虑 广播传播 或者 并行度 的优化方向?

是不是应该 先分析好业务问题,在去看 flink1.12 能否解决问题。

肖越 <18242988...@163.com> 于2020年12月24日周四 上午11:16写道:

> connector 从数据库读取整张表格,执行:
> env.sql_query("select a , b, c from table1 left join table2 on a = d where
> b = '103' and c = '203' and e = 'AC' and a between 20160701 and 20170307
> order a")
> 其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟~
> 了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?


pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 文章 肖越
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b = 
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order a")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟~
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?

Re: flink on yarn启动失败

2020-12-23 文章 Yang Wang
你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
traffic.py

应该是-ys,而不是-s
-s是从savepoints恢复,所以报错里面会有找不到savepoints目录


Best,
Yang

magichuang  于2020年12月23日周三 下午8:29写道:

> 机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
> flink版本:1.11.2,在三台集群上搭建的集群
>
> hadoop集群是用cdh搭建的
>
>
> 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
>
>
>
>
> 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
>
> 测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar
>  是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
>
>
>
>
> 下面是主要报错信息
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> savepoint file/directory '2' on file system 'file'.
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> 2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] -
> Stopped BLOB server at 0.0.0.0:16109
>
>
>
>
>
>
>
> 全部日志可以打开下面的链接:
> https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note
> 

Re: Flink catalog+hive问题

2020-12-23 文章 19916726683
hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。


 Original Message 
Sender: Rui Li
Recipient: user-zh
Date: Wednesday, Dec 23, 2020 19:41
Subject: Re: Flink catalog+hive问题


hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效 [1] 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer
 On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote: > 
spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式 > > > Original Message > 
Sender:guaishushu1103@163.comguaishushu1...@163.com > 
Recipient:user-zhuser...@flink.apache.org > Date:Wednesday, Dec 23, 2020 15:53 
> Subject:Flink catalog+hive问题 > > > 在用flink > 
catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗? > 
guaishushu1...@163.com -- Best regards! Rui Li

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-23 文章 Yun Tang
Hi @Storm

checkpoint的增量模式目前仅对RocksDB生效,这里的增量是指上传新产生的DB 
sst文件。而RocksDB的全量模式是将DB的有效kv进行序列化写出,除非有大量的数据没有compaction清理掉,否则不可能出现增量checkpoint 
size无限膨胀,而全量checkpoint正常的问题,你这里的无限膨胀的size范围是多大呢?

祝好
唐云

From: Storm☀️ 
Sent: Tuesday, December 22, 2020 19:52
To: user-zh@flink.apache.org 
Subject: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

唐云大佬好,
我关闭了chk的增量模式之后,chkstate确实不会再无线膨胀了。这个是我配置的不准确,还是一个已知问题呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-23 文章 admin
Hi,
Hive 自动添加分区依赖于分区提交策略 metastore,所以得添加policy配置才能生效

> 2020年12月23日 上午9:27,kingdomad  写道:
> 
> 是的。开启了checkpoint。
> 消费kafka,用tableEnv把stream注册成TemporaryView。
> 然后执行sql写入到hive的表中。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> kingdomad
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-23 09:22:48,"范瑞" <836961...@qq.com> 写道:
>> Hello
>> 
>> 
>> 请问是使用 Sql吧?开启cp了吗?
>> 
>> 
>> 
>> ---原始邮件---
>> 发件人: "kingdomad"> 发送时间: 2020年12月23日(周三) 上午9:17
>> 收件人: "user-zh"> 主题: Re:Re: flink1.11.2写hive分区表,hive识别不到分区
>> 
>> 
>> 分区用的是记录中的字段,没有用到processing time或者event time去生成分区。
>> 发现只要给hive的表加上以下这三个属性就可以马上提交分区到metastore了。
>> 'sink.partition-commit.trigger'='process-time'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> kingdomad
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-21 23:27:49,"赵一旦" > 即使不是flink写入,其他方式写入也需要这样做的哈。
>> 
>> r pp > 
>>  程序中,创建表后,执行命令。
>> 
>>  kingdomad > 
>>  
>>  
>> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
>>   需要执行msck repair table修复分区表后,hive才能读取到数据。
>>   求助大佬,要如何解决。
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>   --
>>  
>>   kingdomad
>>  
>>  
>> 



flink on yarn启动失败

2020-12-23 文章 magichuang
机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
flink版本:1.11.2,在三台集群上搭建的集群

hadoop集群是用cdh搭建的


启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py traffic.py

程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka




这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败

测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar   
是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?




下面是主要报错信息

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate 
JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory '2' on file system 'file'.

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] - 
Stopped BLOB server at 0.0.0.0:16109







全部日志可以打开下面的链接:https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note
加载可能慢一些,请稍等一会就出来了~













Best,

MagicHuang







Re: Flink 操作hive 一些疑问

2020-12-23 文章 Rui Li
Hi,

是说写数据的是一个流作业,读数据的是一个批作业么?

On Tue, Dec 22, 2020 at 5:51 PM Jacob <17691150...@163.com> wrote:

> Dear all,
>
> 我目前有个Flink job,执行完所以业务逻辑后生成了一些业务数据,然后将这些数据以ORC格式写到hdfs上,并调用hive api
> 将orc文件load到Hive表,至此flink job的工作结束。
>
> 后面,其他Java定时程序做Mapreduce,对上一步写进hive的数据进行后续操作。
>
> 现在升级了Flink版本,Flink可以直接操作hive,不再依赖于Mapreduce。
>
> 但这样一来,是不是需要两个flink job ,一个用来生成业务数据,一个用来操作hive 来处理这些业务数据
>
> 因为两个job的执行环境不一样,如果不操作hive,是这样的操作环境
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> env.execute("my job");
> 如果操作hive,就需要构造这样的操作的环境
>
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> ..
> tableEnv.executeSql(hql);
>
> 有没有什么通用的方案,让这两个job合二为一呢?我想要的效果时,当生成完业务数据后,直接操作hive,取代mapreduce的工作。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-23 文章 Rui Li
流数据写hive分区表是需要额外的参数配置的。Flink 1.11里具体的参数可以参考这两个文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink

On Wed, Dec 23, 2020 at 9:17 AM kingdomad  wrote:

> 分区用的是记录中的字段,没有用到processing time或者event time去生成分区。
> 发现只要给hive的表加上以下这三个属性就可以马上提交分区到metastore了。
> 'sink.partition-commit.trigger'='process-time'
> 'sink.partition-commit.delay'='0s'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>
>
>
>
>
>
> 在 2020-12-21 23:27:49,"赵一旦"  写道:
> >即使不是flink写入,其他方式写入也需要这样做的哈。
> >
> >r pp  于2020年12月21日周一 下午9:28写道:
> >
> >> 程序中,创建表后,执行命令。
> >>
> >> kingdomad  于2020年12月21日周一 下午4:55写道:
> >>
> >> >
> >>
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> >> > 需要执行msck repair table修复分区表后,hive才能读取到数据。
> >> > 求助大佬,要如何解决。
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> >
> >> > kingdomad
> >> >
> >> >
> >>
>


-- 
Best regards!
Rui Li


flink1.10.1??idea????dml????????Error during disposal of stream operator. java.lang.NullPointerException

2020-12-23 文章 bigdata

2020-12-23 19:43:01,588 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask
 - Error during disposal of stream operator.
java.lang.NullPointerException
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.close(JDBCUpsertOutputFormat.java:202)
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.close(JDBCUpsertSinkFunction.java:61)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)

Re: flink 1.11.2 创建hive表的问题

2020-12-23 文章 Rui Li
只是日志里有异常信息还是说DDL会执行失败呢?另外可以贴一下日志里的异常堆栈,看看是哪里打出来的。

On Tue, Dec 22, 2020 at 2:41 PM 曹武 <14701319...@163.com> wrote:

> 大佬好,我在使用create table if not
> exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
> bm_tsk_001 already exists异常,查看源码发现if not
> exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: Flink catalog+hive问题

2020-12-23 文章 Rui Li
hive的ACL用的是哪种呢?目前flink没有专门做ACL的对接,只有HMS端storage based authorization [1] 会生效

[1]
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-1StorageBasedAuthorizationintheMetastoreServer

On Wed, Dec 23, 2020 at 4:34 PM 19916726683 <19916726...@163.com> wrote:

> spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式
>
>
> Original Message
> Sender:guaishushu1103@163.comguaishushu1...@163.com
> Recipient:user-zhuser...@flink.apache.org
> Date:Wednesday, Dec 23, 2020 15:53
> Subject:Flink catalog+hive问题
>
>
> 在用flink
> catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗?
> guaishushu1...@163.com



-- 
Best regards!
Rui Li


dml????????Error during disposal of stream operator. java.lang.NullPointerException

2020-12-23 文章 bigdata
??ddl??dml??

pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 文章 肖越
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b = 
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order by 
biz_date")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟!
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?

dml????????Error during disposal of stream operator. java.lang.NullPointerException

2020-12-23 文章 bigdata
??ddl??dml??

flink sql cdc流(1.11.2),双流join长时间运行后,试着取消job后,再从从checkpoint处恢复运行,报outofmemorry, tm的参数配置也没有变过,rocksdb后端。 这个可能原因是啥?运行的时候没报,为啥恢复的时间报了?

2020-12-23 文章 jindy_liu


java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:261)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:235)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:227)
at java.util.ArrayList.add(ArrayList.java:458)
at
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:212)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:199)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$345/1080469422.accept(Unknown
Source)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$199/62611056.runDefaultAction(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink catalog+hive问题

2020-12-23 文章 19916726683
spark是可以通过配置来确定是用hive的acl还是用自己的acl,不清楚flink是不是也是这种模式


Original Message
Sender:guaishushu1103@163.comguaishushu1...@163.com
Recipient:user-zhuser...@flink.apache.org
Date:Wednesday, Dec 23, 2020 15:53
Subject:Flink catalog+hive问题


在用flink 
catalog+hive做元数据持久化的时候,发现hive的ACL权限没有起作用,麻烦问下知道的大佬,flink是会直接跳过hive的ACL权限吗? 
guaishushu1...@163.com