Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 RS
Hi,


今天又重新测试了一次,日志如下所示:


2022-07-12 13:55:38,674 INFO  org.apache.flink.client.cli.CliFrontend   
   [] - Loading FallbackYarnSessionCli
2022-07-12 13:55:38,707 INFO  org.apache.flink.core.fs.FileSystem   
   [] - Hadoop is not in the classpath/dependencies. The extended set 
of supported File Systems via Hadoop is not available.
2022-07-12 13:55:38,752 INFO  
org.apache.flink.table.client.gateway.context.DefaultContext [] - Executor 
config: {execution.savepoint.ignore-unclaimed-state=false, 
execution.attached=true, execution.savepoint-restore-mode=NO_CLAIM, 
execution.shutdown-on-attached-exit=false, 
pipeline.jars=[file:/home/flink/flink-1.15.1/opt/flink-sql-client-1.15.1.jar, 
file:/home/flink/flink-1.15.1/opt/flink-python_2.12-1.15.1.jar], 
pipeline.classpaths=[], execution.target=remote}
2022-07-12 13:55:40,311 INFO  org.apache.flink.table.client.cli.CliClient   
   [] - Command history file path: /home/flink/.flink-sql-history
2022-07-12 13:55:48,839 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
statement: select * from t1;
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
 ~[flink-sql-client-1.15.1.jar:1.15.1]
at org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2731) 
~[flink-sql-client-1.15.1.jar:1.15.1]
at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:585) 
~[flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:296)
 [flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:281)
 [flink-sql-client-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:229)
 [flink-sql-client-1.15.1.jar:1.15.1]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
[flink-sql-client-1.15.1.jar:1.15.1]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client-1.15.1.jar:1.15.1]
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client-1.15.1.jar:1.15.1]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client-1.15.1.jar:1.15.1]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
source for reading table 'default_catalog.default_database.t1'.


Table options are:


'connector'='filesystem'
'csv.allow-comments'='true'
'csv.ignore-parse-errors'='true'
'format'='csv'
'path'='/tmp/qwe'
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159)
 ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
 ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
 ~[?:?]
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[?:?]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[?:?]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:197)
 ~[?:?]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
 ~[?:?]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
 ~[?:?]
at 

Re:Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 RS
Hi,


只有 flink-table-planner-loader-1.15.1.jar,没有flink-table-planner_2.12-1.15.1.jar 
也是不行的,并不是去掉一个就可以了


只能使用flink-table-planner_2.12-1.15.1.jar,所以感到奇怪


Thanks


在 2022-07-11 20:19:01,"jiangjiguang719"  写道:
>hi,
>你这个问题是,
>
>flink-table-planner-loader-1.15.1.jar  和 flink-table-planner_2.12-1.15.1.jar  
>冲突了 去掉一个就可以了
>
>
>
>
>
>
>
>在 2022-07-11 19:45:04,"Weihua Hu"  写道:
>>Hi,
>>
>>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>>
>>Best,
>>Weihua
>>
>>
>>On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:
>>
>>> Hi,
>>>
>>>
>>> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.table.planner.delegation.ParserFactory
>>>
>>>
>>> Flink SQL> CREATE TABLE t1 (
>>> > a STRING,
>>> > b INT
>>> > )WITH(
>>> > 'connector'='filesystem',
>>> > 'path'='/tmp/qwe',
>>> > 'format'='csv',
>>> > 'csv.ignore-parse-errors' = 'true',
>>> > 'csv.allow-comments' = 'true'
>>> > );
>>> [INFO] Execute statement succeed.
>>> Flink SQL> select * from t1;
>>> [ERROR] Could not execute SQL statement. Reason:
>>> java.lang.ClassNotFoundException:
>>> org.apache.flink.table.planner.delegation.ParserFactory
>>>
>>>
>>> 我测试了下,是因为我的lib目录下,有
>>> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
>>> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>>>
>>>
>>> 请教下,这个问题如何解决呢?
>>>
>>>
>>> Thanks


请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-11 文章 Bruce Zu
 Flink team好,
 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。

 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。

我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
一旦不再使用它就需要调用它的`close`方法来释放资源。

所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常

我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
在 main 方法结束时释放资源。

类似这样的伪代码:
```java
公共类 EsClientHolder {
  private static final ThreadLocal local = new
InheritableThreadLocal<>();

  public static final void createAndSetEsClient(EsClient esClient){
local.set(esClient);
  }

  private static final createAndSetEsClientBy(EsClientConfig
esClientConfig){
EsClient instance = new EsClient(esClientConfig);
createAndSetEsClient(instance)  ;
  }

   private static final   EsClient get() {
EsClient c = local.get();
if(c == null){
  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
}
return c;
  }

private static final  close()抛出 IOException {
EsClient o = local.get();
if(o!= null){
  o.close();
}
  }

// 在 Fink 应用程序代码中的用法
   public class main class {
public static void main(String[] args) throws IOException {
  try {
property prop = null;
EsClientConfig configuration = getEsClientConfig(prop);
EsClientHolder.createAndSetEsClientBy(config);
   // …
   SomeClass.method1();
   other classes.method2();
   // ...
  } at last {
EsClientHolder.close();
  }
}
  }

class SomeClass{
   public void. method 1(){
// 1. Use EsClient in any calling method of any other class:
EsClient esClient = EsClientHolder.get();
   // …
   }
}
class other class {
  public void method 2() {
  // 2. Use EsClient in any calling method of any forked child thread
new thread (
() -> {
  EsClient client = EsClientHolder.get();
  // …
})
. start();
 // …
  }
}

```

我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。

但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。

比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
的线程不一样的线程,
那么运行method1和mehod2的线程就没有办法拿到EsClient了。
这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
没有办法释放资源。

谢谢!


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 yidan zhao
我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
那么使用 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
cluster 的 svc 的 clusterIp 去提交呢。

yidan zhao  于2022年7月12日周二 12:50写道:
>
> 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
>
>
> 其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
>
> yidan zhao  于2022年7月12日周二 12:48写道:
> >
> > 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
> >
> > Yang Wang  于2022年7月12日周二 12:07写道:
> > >
> > > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > > 否则你就需要NodePort或者LoadBalancer的方式了
> > >
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > yidan zhao  于2022年7月12日周二 10:40写道:
> > >
> > > > 如下步骤参考的文档
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 版本:1.15
> > > >
> > > > (1)创建集群:
> > > >
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > > (2)提交任务:
> > > > ./bin/flink run \
> > > > --target kubernetes-session \
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > > ./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > svc是ClusterIp类型
> > > >
> > > > 第二步提交任务环节,显示如下:
> > > > Executing example with default input data.
> > > > Use --input to specify file input.
> > > > Printing result to stdout. Use --output to specify output path.
> > > > 2022-07-12 10:23:23,021 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > > 2022-07-12 10:23:23,027 INFO
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > > 2022-07-12 10:23:23,044 WARN
> > > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > >
> > > > 
> > > >  The program finished with the following exception:
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method caused an error: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > > 'CarTopSpeedWindowingExample'.
> > > > ...
> > > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > > Failed to submit JobGraph.
> > > > ...
> > > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > > Could not complete the operation. Number of retries has been
> > > > exhausted.
> > > > ...
> > > > Caused by: java.util.concurrent.CompletionException:
> > > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > > or service not known
> > > > ...
> > > > Caused by: java.net.UnknownHostException:
> > > > my-first-flink-cluster-rest.test: Name or service not known
> > > >
> > > >
> > > > 如上,根据 --target kubernetes-session
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > > >
> > > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 yidan zhao
如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?


其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。

yidan zhao  于2022年7月12日周二 12:48写道:
>
> 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
>
> Yang Wang  于2022年7月12日周二 12:07写道:
> >
> > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> > client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> > 否则你就需要NodePort或者LoadBalancer的方式了
> >
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> >
> > Best,
> > Yang
> >
> > yidan zhao  于2022年7月12日周二 10:40写道:
> >
> > > 如下步骤参考的文档
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > >
> > > 版本:1.15
> > >
> > > (1)创建集群:
> > >
> > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > (2)提交任务:
> > > ./bin/flink run \
> > > --target kubernetes-session \
> > > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > > ./examples/streaming/TopSpeedWindowing.jar
> > >
> > > svc是ClusterIp类型
> > >
> > > 第二步提交任务环节,显示如下:
> > > Executing example with default input data.
> > > Use --input to specify file input.
> > > Printing result to stdout. Use --output to specify output path.
> > > 2022-07-12 10:23:23,021 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > > 2022-07-12 10:23:23,027 INFO
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > > 2022-07-12 10:23:23,044 WARN
> > > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > > Please note that Flink client operations(e.g. cancel, list, stop,
> > > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > >
> > > 
> > >  The program finished with the following exception:
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method caused an error: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > > 'CarTopSpeedWindowingExample'.
> > > ...
> > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > > Failed to submit JobGraph.
> > > ...
> > > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > > Could not complete the operation. Number of retries has been
> > > exhausted.
> > > ...
> > > Caused by: java.util.concurrent.CompletionException:
> > > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > > or service not known
> > > ...
> > > Caused by: java.net.UnknownHostException:
> > > my-first-flink-cluster-rest.test: Name or service not known
> > >
> > >
> > > 如上,根据 --target kubernetes-session
> > > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> > >
> > > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> > >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 yidan zhao
我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。

Yang Wang  于2022年7月12日周二 12:07写道:
>
> 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
> client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
> 否则你就需要NodePort或者LoadBalancer的方式了
>
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
>
> Best,
> Yang
>
> yidan zhao  于2022年7月12日周二 10:40写道:
>
> > 如下步骤参考的文档
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> >
> > 版本:1.15
> >
> > (1)创建集群:
> >
> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > (2)提交任务:
> > ./bin/flink run \
> > --target kubernetes-session \
> > -Dkubernetes.cluster-id=my-first-flink-cluster \
> > ./examples/streaming/TopSpeedWindowing.jar
> >
> > svc是ClusterIp类型
> >
> > 第二步提交任务环节,显示如下:
> > Executing example with default input data.
> > Use --input to specify file input.
> > Printing result to stdout. Use --output to specify output path.
> > 2022-07-12 10:23:23,021 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> > 2022-07-12 10:23:23,027 INFO
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> > Web Interface: http://my-first-flink-cluster-rest.test:8081
> > 2022-07-12 10:23:23,044 WARN
> > org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> > Please note that Flink client operations(e.g. cancel, list, stop,
> > savepoint, etc.) won't work from outside the Kubernetes cluster since
> > 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> >
> > 
> >  The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> > 'CarTopSpeedWindowingExample'.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> > Failed to submit JobGraph.
> > ...
> > Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> > Could not complete the operation. Number of retries has been
> > exhausted.
> > ...
> > Caused by: java.util.concurrent.CompletionException:
> > java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> > or service not known
> > ...
> > Caused by: java.net.UnknownHostException:
> > my-first-flink-cluster-rest.test: Name or service not known
> >
> >
> > 如上,根据 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> > my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
> >
> > 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
> >


Re: flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 Yang Wang
日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
client必须在k8s集群内才能正常提交。例如:起一个Pod,然后再pod里面执行flink run
否则你就需要NodePort或者LoadBalancer的方式了

2022-07-12 10:23:23,021 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.


Best,
Yang

yidan zhao  于2022年7月12日周二 10:40写道:

> 如下步骤参考的文档
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
>
> 版本:1.15
>
> (1)创建集群:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> (2)提交任务:
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=my-first-flink-cluster \
> ./examples/streaming/TopSpeedWindowing.jar
>
> svc是ClusterIp类型
>
> 第二步提交任务环节,显示如下:
> Executing example with default input data.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> 2022-07-12 10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
> 2022-07-12 10:23:23,027 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Retrieve flink cluster my-first-flink-cluster successfully, JobManager
> Web Interface: http://my-first-flink-cluster-rest.test:8081
> 2022-07-12 10:23:23,044 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'CarTopSpeedWindowingExample'.
> ...
> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed to submit JobGraph.
> ...
> Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been
> exhausted.
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
> or service not known
> ...
> Caused by: java.net.UnknownHostException:
> my-first-flink-cluster-rest.test: Name or service not known
>
>
> 如上,根据 --target kubernetes-session
> -Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
> my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。
>
> 我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。
>


Re: on k8s 部署taskmanager一直不能启动

2022-07-11 文章 Lijie Wang
看一下 TM pods 是否启动了?TM log 中是否有异常?看起来是 TM 一直没有注册上来

Best,
Lijie

陈卓宇 <2572805...@qq.com.invalid> 于2022年7月12日周二 10:53写道:

> flink:1.14.5
> on k8s 部署taskmanager一直不能启动,也没有日志
> jobmanager日志:
> 2022-07-12 02:08:22,271 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] -
> Creating new TaskManager pod with name iii5-taskmanager-1-1 and resource
> <1728,1.0.
> 2022-07-12 02:08:22,286 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'key.deserializer' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,286 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'value.deserializer' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,286 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'enable.auto.commit' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'group.id' was supplied but isn't a known config.
> 2022-07-12 02:08:22,287 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'client.id.prefix' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'partition.discovery.interval.ms' was supplied but
> isn't a known config.
> 2022-07-12 02:08:22,287 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig
> [] - The configuration 'auto.offset.reset' was supplied but isn't a known
> config.
> 2022-07-12 02:08:22,287 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka version: unknown
> 2022-07-12 02:08:22,287 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka commitId: unknown
> 2022-07-12 02:08:22,287 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> [] - Kafka startTimeMs: 1657591702287
> 2022-07-12 02:08:22,354 INFO
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator []
> - Starting the KafkaSourceEnumerator for consumer group
> hire_sign_contract_prod without periodic partition discovery.
> 2022-07-12 02:08:23,464 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Pod
> iii5-taskmanager-1-1 is created.
> 2022-07-12 02:08:23,467 INFO
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator []
> - Discovered new partitions: [canal_hire_sign_v2-11, canal_hire_sign_v2-9,
> canal_hire_sign_v2-10, canal_hire_sign_v2-0, canal_hire_sign_v2-3,
> canal_hire_sign_v2-4, canal_hire_sign_v2-1, canal_hire_sign_v2-2,
> canal_hire_sign_v2-7, canal_hire_sign_v2-8, canal_hire_sign_v2-5,
> canal_hire_sign_v2-6]
> 2022-07-12 02:08:23,576 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] -
> Received new TaskManager pod: iii5-taskmanager-1-1
> 2022-07-12 02:08:23,578 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker iii5-taskmanager-1-1 with resource spec WorkerResourceSpec
> {cpuCores=1.0, taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0
> bytes, networkMemSize=128.000mb (134217730 bytes), managedMemSize=512.000mb
> (536870920 bytes), numSlots=1}.
>
> 到这里就卡主了
> 然后过一段时间,会报slot分配的异常,但是机器的资源是够的,之前也是能启动的


on k8s ????taskmanager????????????

2022-07-11 文章 ??????
flink??1.14.5
on k8s taskmanager
jobmanager??
2022-07-12 02:08:22,271 INFO 
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Creating 
new TaskManager pod with name iii5-taskmanager-1-1 and resource <1728,1.0. 
2022-07-12 02:08:22,286 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'key.deserializer' was supplied but isn't a known 
config. 
2022-07-12 02:08:22,286 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'value.deserializer' was supplied but isn't a known 
config. 
2022-07-12 02:08:22,286 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'enable.auto.commit' was supplied but isn't a known 
config. 
2022-07-12 02:08:22,287 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'group.id' was supplied but isn't a known config. 
2022-07-12 02:08:22,287 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'client.id.prefix' was supplied but isn't a known 
config. 
2022-07-12 02:08:22,287 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'partition.discovery.interval.ms' was supplied but isn't 
a known config. 
2022-07-12 02:08:22,287 WARN 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig 
[] - The configuration 'auto.offset.reset' was supplied but isn't a known 
config. 
2022-07-12 02:08:22,287 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - 
Kafka version: unknown 
2022-07-12 02:08:22,287 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - 
Kafka commitId: unknown 
2022-07-12 02:08:22,287 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - 
Kafka startTimeMs: 1657591702287 
2022-07-12 02:08:22,354 INFO 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Starting the KafkaSourceEnumerator for consumer group hire_sign_contract_prod 
without periodic partition discovery. 
2022-07-12 02:08:23,464 INFO 
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Pod 
iii5-taskmanager-1-1 is created. 
2022-07-12 02:08:23,467 INFO 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Discovered new partitions: [canal_hire_sign_v2-11, canal_hire_sign_v2-9, 
canal_hire_sign_v2-10, canal_hire_sign_v2-0, canal_hire_sign_v2-3, 
canal_hire_sign_v2-4, canal_hire_sign_v2-1, canal_hire_sign_v2-2, 
canal_hire_sign_v2-7, canal_hire_sign_v2-8, canal_hire_sign_v2-5, 
canal_hire_sign_v2-6] 
2022-07-12 02:08:23,576 INFO 
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Received 
new TaskManager pod: iii5-taskmanager-1-1 
2022-07-12 02:08:23,578 INFO 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker iii5-taskmanager-1-1 with resource spec WorkerResourceSpec 
{cpuCores=1.0, taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 
bytes, networkMemSize=128.000mb (134217730 bytes), managedMemSize=512.000mb 
(536870920 bytes), numSlots=1}. 

??
slot??

flink native k8s 按照文档提交任务找不到对应的集群

2022-07-11 文章 yidan zhao
如下步骤参考的文档 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes

版本:1.15

(1)创建集群:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
(2)提交任务:
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar

svc是ClusterIp类型

第二步提交任务环节,显示如下:
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2022-07-12 10:23:23,021 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-07-12 10:23:23,027 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Retrieve flink cluster my-first-flink-cluster successfully, JobManager
Web Interface: http://my-first-flink-cluster-rest.test:8081
2022-07-12 10:23:23,044 WARN
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Please note that Flink client operations(e.g. cancel, list, stop,
savepoint, etc.) won't work from outside the Kubernetes cluster since
'kubernetes.rest-service.exposed.type' has been set to ClusterIP.


 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Failed to execute job
'CarTopSpeedWindowingExample'.
...
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'CarTopSpeedWindowingExample'.
...
Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph.
...
Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been
exhausted.
...
Caused by: java.util.concurrent.CompletionException:
java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name
or service not known
...
Caused by: java.net.UnknownHostException:
my-first-flink-cluster-rest.test: Name or service not known


如上,根据 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 找到的提交入口为
my-first-flink-cluster-rest.test。这个应该是根据k8s生成的dns,test是flink的namespace。

我本地也的确并无法解析 my-first-flink-cluster-rest.test 这个。


Re: Re: flink-hudi-hive

2022-07-11 文章 ynz...@163.com
这是job managers所有日志:
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.shutdown-on-attached-exit, false
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: pipeline.jars, 
file:/home/dataxc/opt/flink-1.14.4/opt/flink-python_2.11-1.14.4.jar
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.checkpointing.min-pause, 8min
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: restart-strategy, failure-rate
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.memory.jvm-metaspace.size, 128m
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.checkpoints.dir, hdfs:///flink/checkpoints
2022-07-12 09:33:02,382 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,383 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]
2022-07-12 09:33:02,399 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_1 .
2022-07-12 09:33:02,405 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2022-07-12 09:33:02,479 INFO  
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing 
over to rm2
2022-07-12 09:33:02,509 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Recovered 0 containers from previous attempts ([]).
2022-07-12 09:33:02,509 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.
2022-07-12 09:33:02,514 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,515 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]
2022-07-12 09:33:02,528 INFO  org.apache.hadoop.conf.Configuration  
   [] - resource-types.xml not found
2022-07-12 09:33:02,528 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
find 'resource-types.xml'.
2022-07-12 09:33:02,538 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2022-07-12 09:33:02,541 INFO  
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound 
of the thread pool size is 500
2022-07-12 09:33:02,584 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,585 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]



best,
ynz...@163.com
 
From: Weihua Hu
Date: 2022-07-11 19:46
To: user-zh
Subject: Re: flink-hudi-hive
Hi,
任务反复初始化是指一直在 Failover 吗?在 JobManager.log 里可以看到作业 Failover 原因,搜索关键字; "to
FAILED"
 
Best,
Weihua
 
 
On Mon, Jul 11, 2022 at 2:46 PM ynz...@163.com  wrote:
 
> Hi,
> 我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web
> ui看到:相关任务反复初始化,task managers无任何信息。日志中也无明确错误提示 ;
> 当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
> 我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;
>
>
>
> best,
> ynz...@163.com
>


Re: sql-client java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat

2022-07-11 文章 Geng Biao
看起来你是在集群里运行的Flink;是通过Flink on YARN启动的session cluster吗?在启动session 
cluster时使用的flink/lib是不是没有添加flink-sql-parquet jar包?如果是的话,重启一下你的session 
cluster再试试。之所以这样怀疑,是因为你的DDL里用了parquet但没报错,这一般表示你client端的依赖是没问题的。但select语句报错了,而select是会被翻译成flink作业在集群上执行的,如果是用session
 cluster的话,session cluster里的依赖有可能不全,进而导致报错。

Best,
Biao Geng

On 7/11/22, 8:17 PM, "jiangjiguang719"  wrote:

hi,
flink sql client 是root启动的,我已经把 flink-sql-parquet-1.15.0.jar 改成了 
777,并且反编译了该jar包,确认org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
 类是存在的


但是还是一样的问题

















在 2022-07-11 19:50:17,"Weihua Hu"  写道:
>Hi,
>
>看起来 lib 目录下的文件权限不一样,flink-sql-parquet-1.15.0.jar 是 root 用户的,flink
>进程是通过什么用户启动的呢?
>
>Best,
>Weihua
>
>
>On Mon, Jul 11, 2022 at 7:36 PM jiangjiguang719 
>wrote:
>
>> hi,
>>  我使用 sql-client 读取parquet文件,报错:
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
>>
>>
>> Flink版本:1.15.0/1.15.1
>>
>>
>> SQL语句:
>> Flink SQL> CREATE TABLE orders_parquet (
>> >   int32_fieldINT,
>> >   int32_field1   INT,
>> >   int32_field2   INT
>> > ) WITH (
>> >  'connector' = 'filesystem',
>> >  'path' = '/data/testdata/PARQUET-1_4',
>> >  'format' = 'parquet'
>> > );
>> [INFO] Execute statement succeed.
>>
>>
>> Flink SQL> select * from orders_parquet where int32_field > 100;
>>
>>
>> Lib包如下:
>>
>>
>> [root@icx20 flink-1.15.1]# ll lib/
>> total 212528
>> -rw-r--r--. 1 root root 62050 Jul 11 19:27 commons-logging-1.1.3.jar
>> -rw-r--r--. 1 sae  sae 194416 Jun 22 02:51 flink-cep-1.15.1.jar
>> -rw-r--r--. 1 sae  sae 484728 Jun 22 02:54
>> flink-connector-files-1.15.1.jar
>> -rw-r--r--. 1 sae  sae  95184 Jun 22 03:03 flink-csv-1.15.1.jar
>> -rw-r--r--. 1 sae  sae  115818049 Jun 22 03:13 flink-dist-1.15.1.jar
>> -rw-r--r--. 1 sae  sae 175487 Jun 22 03:05 flink-json-1.15.1.jar
>> -rw-r--r--. 1 sae  sae   21041716 Jun 22 03:10 
flink-scala_2.12-1.15.1.jar
>> -rw-rw-r--. 1 sae  sae   10737871 May 12 22:45
>> flink-shaded-zookeeper-3.5.9.jar
>> -rw-r--r--. 1 root root   5381644 Jul 11 19:25 
flink-sql-parquet-1.15.0.jar
>> -rw-r--r--. 1 sae  sae   15262738 Jun 22 03:10
>> flink-table-api-java-uber-1.15.1.jar
>> -rw-r--r--. 1 sae  sae   36236261 Jun 22 03:10
>> flink-table-planner-loader-1.15.1.jar
>> -rw-r--r--. 1 sae  sae2996565 Jun 22 02:51
>> flink-table-runtime-1.15.1.jar
>> -rw-r--r--. 1 root root   2792264 Jul 11 19:28 guava-29.0-jre.jar
>> -rw-r--r--. 1 root root   3990042 Jul 11 19:26 hadoop-common-2.8.5.jar
>> -rw-rw-r--. 1 sae  sae 208006 May 12 22:15 log4j-1.2-api-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae 301872 May 12 22:15 log4j-api-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae1790452 May 12 22:15 log4j-core-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae  24279 May 12 22:15 
log4j-slf4j-impl-2.17.1.jar


Re:Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 jiangjiguang719
hi,
你这个问题是,

flink-table-planner-loader-1.15.1.jar  和 flink-table-planner_2.12-1.15.1.jar  
冲突了 去掉一个就可以了














在 2022-07-11 19:45:04,"Weihua Hu"  写道:
>Hi,
>
>有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了
>
>Best,
>Weihua
>
>
>On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:
>
>> Hi,
>>
>>
>> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
>> java.lang.ClassNotFoundException:
>> org.apache.flink.table.planner.delegation.ParserFactory
>>
>>
>> Flink SQL> CREATE TABLE t1 (
>> > a STRING,
>> > b INT
>> > )WITH(
>> > 'connector'='filesystem',
>> > 'path'='/tmp/qwe',
>> > 'format'='csv',
>> > 'csv.ignore-parse-errors' = 'true',
>> > 'csv.allow-comments' = 'true'
>> > );
>> [INFO] Execute statement succeed.
>> Flink SQL> select * from t1;
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.table.planner.delegation.ParserFactory
>>
>>
>> 我测试了下,是因为我的lib目录下,有
>> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
>> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>>
>>
>> 请教下,这个问题如何解决呢?
>>
>>
>> Thanks


Re:Re: sql-client java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat

2022-07-11 文章 jiangjiguang719
hi,
flink sql client 是root启动的,我已经把 flink-sql-parquet-1.15.0.jar 改成了 
777,并且反编译了该jar包,确认org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
 类是存在的


但是还是一样的问题

















在 2022-07-11 19:50:17,"Weihua Hu"  写道:
>Hi,
>
>看起来 lib 目录下的文件权限不一样,flink-sql-parquet-1.15.0.jar 是 root 用户的,flink
>进程是通过什么用户启动的呢?
>
>Best,
>Weihua
>
>
>On Mon, Jul 11, 2022 at 7:36 PM jiangjiguang719 
>wrote:
>
>> hi,
>>  我使用 sql-client 读取parquet文件,报错:
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException:
>> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
>>
>>
>> Flink版本:1.15.0/1.15.1
>>
>>
>> SQL语句:
>> Flink SQL> CREATE TABLE orders_parquet (
>> >   int32_fieldINT,
>> >   int32_field1   INT,
>> >   int32_field2   INT
>> > ) WITH (
>> >  'connector' = 'filesystem',
>> >  'path' = '/data/testdata/PARQUET-1_4',
>> >  'format' = 'parquet'
>> > );
>> [INFO] Execute statement succeed.
>>
>>
>> Flink SQL> select * from orders_parquet where int32_field > 100;
>>
>>
>> Lib包如下:
>>
>>
>> [root@icx20 flink-1.15.1]# ll lib/
>> total 212528
>> -rw-r--r--. 1 root root 62050 Jul 11 19:27 commons-logging-1.1.3.jar
>> -rw-r--r--. 1 sae  sae 194416 Jun 22 02:51 flink-cep-1.15.1.jar
>> -rw-r--r--. 1 sae  sae 484728 Jun 22 02:54
>> flink-connector-files-1.15.1.jar
>> -rw-r--r--. 1 sae  sae  95184 Jun 22 03:03 flink-csv-1.15.1.jar
>> -rw-r--r--. 1 sae  sae  115818049 Jun 22 03:13 flink-dist-1.15.1.jar
>> -rw-r--r--. 1 sae  sae 175487 Jun 22 03:05 flink-json-1.15.1.jar
>> -rw-r--r--. 1 sae  sae   21041716 Jun 22 03:10 flink-scala_2.12-1.15.1.jar
>> -rw-rw-r--. 1 sae  sae   10737871 May 12 22:45
>> flink-shaded-zookeeper-3.5.9.jar
>> -rw-r--r--. 1 root root   5381644 Jul 11 19:25 flink-sql-parquet-1.15.0.jar
>> -rw-r--r--. 1 sae  sae   15262738 Jun 22 03:10
>> flink-table-api-java-uber-1.15.1.jar
>> -rw-r--r--. 1 sae  sae   36236261 Jun 22 03:10
>> flink-table-planner-loader-1.15.1.jar
>> -rw-r--r--. 1 sae  sae2996565 Jun 22 02:51
>> flink-table-runtime-1.15.1.jar
>> -rw-r--r--. 1 root root   2792264 Jul 11 19:28 guava-29.0-jre.jar
>> -rw-r--r--. 1 root root   3990042 Jul 11 19:26 hadoop-common-2.8.5.jar
>> -rw-rw-r--. 1 sae  sae 208006 May 12 22:15 log4j-1.2-api-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae 301872 May 12 22:15 log4j-api-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae1790452 May 12 22:15 log4j-core-2.17.1.jar
>> -rw-rw-r--. 1 sae  sae  24279 May 12 22:15 log4j-slf4j-impl-2.17.1.jar


Re: sql-client java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat

2022-07-11 文章 Weihua Hu
Hi,

看起来 lib 目录下的文件权限不一样,flink-sql-parquet-1.15.0.jar 是 root 用户的,flink
进程是通过什么用户启动的呢?

Best,
Weihua


On Mon, Jul 11, 2022 at 7:36 PM jiangjiguang719 
wrote:

> hi,
>  我使用 sql-client 读取parquet文件,报错:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
>
>
> Flink版本:1.15.0/1.15.1
>
>
> SQL语句:
> Flink SQL> CREATE TABLE orders_parquet (
> >   int32_fieldINT,
> >   int32_field1   INT,
> >   int32_field2   INT
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = '/data/testdata/PARQUET-1_4',
> >  'format' = 'parquet'
> > );
> [INFO] Execute statement succeed.
>
>
> Flink SQL> select * from orders_parquet where int32_field > 100;
>
>
> Lib包如下:
>
>
> [root@icx20 flink-1.15.1]# ll lib/
> total 212528
> -rw-r--r--. 1 root root 62050 Jul 11 19:27 commons-logging-1.1.3.jar
> -rw-r--r--. 1 sae  sae 194416 Jun 22 02:51 flink-cep-1.15.1.jar
> -rw-r--r--. 1 sae  sae 484728 Jun 22 02:54
> flink-connector-files-1.15.1.jar
> -rw-r--r--. 1 sae  sae  95184 Jun 22 03:03 flink-csv-1.15.1.jar
> -rw-r--r--. 1 sae  sae  115818049 Jun 22 03:13 flink-dist-1.15.1.jar
> -rw-r--r--. 1 sae  sae 175487 Jun 22 03:05 flink-json-1.15.1.jar
> -rw-r--r--. 1 sae  sae   21041716 Jun 22 03:10 flink-scala_2.12-1.15.1.jar
> -rw-rw-r--. 1 sae  sae   10737871 May 12 22:45
> flink-shaded-zookeeper-3.5.9.jar
> -rw-r--r--. 1 root root   5381644 Jul 11 19:25 flink-sql-parquet-1.15.0.jar
> -rw-r--r--. 1 sae  sae   15262738 Jun 22 03:10
> flink-table-api-java-uber-1.15.1.jar
> -rw-r--r--. 1 sae  sae   36236261 Jun 22 03:10
> flink-table-planner-loader-1.15.1.jar
> -rw-r--r--. 1 sae  sae2996565 Jun 22 02:51
> flink-table-runtime-1.15.1.jar
> -rw-r--r--. 1 root root   2792264 Jul 11 19:28 guava-29.0-jre.jar
> -rw-r--r--. 1 root root   3990042 Jul 11 19:26 hadoop-common-2.8.5.jar
> -rw-rw-r--. 1 sae  sae 208006 May 12 22:15 log4j-1.2-api-2.17.1.jar
> -rw-rw-r--. 1 sae  sae 301872 May 12 22:15 log4j-api-2.17.1.jar
> -rw-rw-r--. 1 sae  sae1790452 May 12 22:15 log4j-core-2.17.1.jar
> -rw-rw-r--. 1 sae  sae  24279 May 12 22:15 log4j-slf4j-impl-2.17.1.jar


Re: flink-hudi-hive

2022-07-11 文章 Weihua Hu
Hi,
任务反复初始化是指一直在 Failover 吗?在 JobManager.log 里可以看到作业 Failover 原因,搜索关键字; "to
FAILED"

Best,
Weihua


On Mon, Jul 11, 2022 at 2:46 PM ynz...@163.com  wrote:

> Hi,
> 我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web
> ui看到:相关任务反复初始化,task managers无任何信息。日志中也无明确错误提示 ;
> 当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
> 我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;
>
>
>
> best,
> ynz...@163.com
>


Re: filesystem java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory

2022-07-11 文章 Weihua Hu
Hi,

有更详细的日志吗?看起来是类加载冲突的,需要明确下是哪个类冲突了

Best,
Weihua


On Wed, Jul 6, 2022 at 1:53 PM RS  wrote:

> Hi,
>
>
> 通过sql-client执行flink sql,connector选择filesystem,会出现如下报错
> java.lang.ClassNotFoundException:
> org.apache.flink.table.planner.delegation.ParserFactory
>
>
> Flink SQL> CREATE TABLE t1 (
> > a STRING,
> > b INT
> > )WITH(
> > 'connector'='filesystem',
> > 'path'='/tmp/qwe',
> > 'format'='csv',
> > 'csv.ignore-parse-errors' = 'true',
> > 'csv.allow-comments' = 'true'
> > );
> [INFO] Execute statement succeed.
> Flink SQL> select * from t1;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.table.planner.delegation.ParserFactory
>
>
> 我测试了下,是因为我的lib目录下,有
> flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,放lib下是因为还要其他任务需要读写hive
> 如果lib下没有flink-sql-connector-hive-3.1.2_2.12-1.15.0.jar,则没有这个报错
>
>
> 请教下,这个问题如何解决呢?
>
>
> Thanks


sql-client java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat

2022-07-11 文章 jiangjiguang719
hi,
 我使用 sql-client 读取parquet文件,报错:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: 
org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat


Flink版本:1.15.0/1.15.1


SQL语句:
Flink SQL> CREATE TABLE orders_parquet (
>   int32_fieldINT,
>   int32_field1   INT,
>   int32_field2   INT
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = '/data/testdata/PARQUET-1_4',
>  'format' = 'parquet'
> );
[INFO] Execute statement succeed.


Flink SQL> select * from orders_parquet where int32_field > 100;


Lib包如下:


[root@icx20 flink-1.15.1]# ll lib/
total 212528
-rw-r--r--. 1 root root 62050 Jul 11 19:27 commons-logging-1.1.3.jar
-rw-r--r--. 1 sae  sae 194416 Jun 22 02:51 flink-cep-1.15.1.jar
-rw-r--r--. 1 sae  sae 484728 Jun 22 02:54 flink-connector-files-1.15.1.jar
-rw-r--r--. 1 sae  sae  95184 Jun 22 03:03 flink-csv-1.15.1.jar
-rw-r--r--. 1 sae  sae  115818049 Jun 22 03:13 flink-dist-1.15.1.jar
-rw-r--r--. 1 sae  sae 175487 Jun 22 03:05 flink-json-1.15.1.jar
-rw-r--r--. 1 sae  sae   21041716 Jun 22 03:10 flink-scala_2.12-1.15.1.jar
-rw-rw-r--. 1 sae  sae   10737871 May 12 22:45 flink-shaded-zookeeper-3.5.9.jar
-rw-r--r--. 1 root root   5381644 Jul 11 19:25 flink-sql-parquet-1.15.0.jar
-rw-r--r--. 1 sae  sae   15262738 Jun 22 03:10 
flink-table-api-java-uber-1.15.1.jar
-rw-r--r--. 1 sae  sae   36236261 Jun 22 03:10 
flink-table-planner-loader-1.15.1.jar
-rw-r--r--. 1 sae  sae2996565 Jun 22 02:51 flink-table-runtime-1.15.1.jar
-rw-r--r--. 1 root root   2792264 Jul 11 19:28 guava-29.0-jre.jar
-rw-r--r--. 1 root root   3990042 Jul 11 19:26 hadoop-common-2.8.5.jar
-rw-rw-r--. 1 sae  sae 208006 May 12 22:15 log4j-1.2-api-2.17.1.jar
-rw-rw-r--. 1 sae  sae 301872 May 12 22:15 log4j-api-2.17.1.jar
-rw-rw-r--. 1 sae  sae1790452 May 12 22:15 log4j-core-2.17.1.jar
-rw-rw-r--. 1 sae  sae  24279 May 12 22:15 log4j-slf4j-impl-2.17.1.jar

flink-hudi-hive

2022-07-11 文章 ynz...@163.com
Hi,
我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web ui看到:相关任务反复初始化,task 
managers无任何信息。日志中也无明确错误提示 ;
当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;



best,
ynz...@163.com