Re: [SQL] parse table name from sql statement

2020-09-22 文章 Harold.Miao
thx

silence  于2020年9月22日周二 上午11:54写道:

> 写过一个类似的可以参考一下
>
> private static List lookupSelectTable(SqlNode sqlNode) {
> List list = new ArrayList<>();
> if (sqlNode instanceof SqlSelect) {
> SqlNode from = ((SqlSelect) sqlNode).getFrom();
> list.addAll(lookupSelectTable(from));
> } else if (sqlNode instanceof SqlJoin) {
> SqlJoin sqlJoin = (SqlJoin) sqlNode;
> list.addAll(lookupSelectTable(sqlJoin.getLeft()));
> list.addAll(lookupSelectTable(sqlJoin.getRight()));
> } else if (sqlNode instanceof SqlBasicCall) {
> SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
> SqlOperator operator = sqlBasicCall.getOperator();
> if (SqlKind.AS.equals(operator.getKind())) {
>
> list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
> } else if (SqlKind.UNION.equals(operator.getKind())) {
> for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
> list.addAll(lookupSelectTable(operandSqlNode));
> }
> } else {
> throw new RuntimeException("operator " + operator.getKind()
> + " not support");
> }
> } else if (sqlNode instanceof SqlIdentifier) {
> list.add(((SqlIdentifier) sqlNode).getSimple());
> } else {
> throw new RuntimeException("operator " + sqlNode.getClass() + "
> not support");
> }
> return list;
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best Regards,
Harold Miao


Re: Re: [SQL] parse table name from sql statement

2020-09-21 文章 Harold.Miao
大佬  能不能给点示例

Benchao Li  于2020年9月21日周一 下午4:38写道:

> 我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取?
>
> Harold.Miao  于2020年9月21日周一 下午1:58写道:
>
> > 主要是我没有完整的所有单元case, 总是感觉写的不完整。
> >
> > 郭士榕  于2020年9月21日周一 上午11:08写道:
> >
> > >
> > >
> > >
> > > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-09-21 10:50:31,"Harold.Miao"  写道:
> > > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。
> > > >
> > > >郭士榕  于2020年9月21日周一 上午10:21写道:
> > > >
> > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> > > >> >hi all
> > > >> >
> > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> > > >> >
> > > >> >谢谢
> > > >> >
> > > >> >--
> > > >> >
> > > >> >Best Regards,
> > > >> >Harold Miao
> > > >>
> > > >
> > > >
> > > >--
> > > >
> > > >Best Regards,
> > > >Harold Miao
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best Regards,
Harold Miao


Re: Re: [SQL] parse table name from sql statement

2020-09-20 文章 Harold.Miao
主要是我没有完整的所有单元case, 总是感觉写的不完整。

郭士榕  于2020年9月21日周一 上午11:08写道:

>
>
>
> 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-21 10:50:31,"Harold.Miao"  写道:
> >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。
> >
> >郭士榕  于2020年9月21日周一 上午10:21写道:
> >
> >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> >> >hi all
> >> >
> >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> >> >
> >> >谢谢
> >> >
> >> >--
> >> >
> >> >Best Regards,
> >> >Harold Miao
> >>
> >
> >
> >--
> >
> >Best Regards,
> >Harold Miao
>


-- 

Best Regards,
Harold Miao


Re: [SQL] parse table name from sql statement

2020-09-20 文章 Harold.Miao
主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。

郭士榕  于2020年9月21日周一 上午10:21写道:

> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
>
>
>
>
>
> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> >hi all
> >
> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> >
> >谢谢
> >
> >--
> >
> >Best Regards,
> >Harold Miao
>


-- 

Best Regards,
Harold Miao


[SQL] parse table name from sql statement

2020-09-20 文章 Harold.Miao
hi all

请教大家在复杂sql语句中parse所有的table name是怎么实现的。

谢谢

-- 

Best Regards,
Harold Miao


Re: 关于官方的k8s operator

2020-09-17 文章 Harold.Miao
谢谢

Yang Wang  于2020年9月17日周四 上午11:20写道:

> Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个
> 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成
>
> 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4]
>
>
> [1]. https://github.com/lyft/flinkk8soperator
> [2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [3].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
> [4]. https://github.com/wangyang0918/flink-native-k8s-operator
>
>
> Best,
> Yang
>
> Harold.Miao  于2020年9月17日周四 上午10:14写道:
>
> > hi flink
> >
> > 请教一下官方在支持k8s operator部署这块有什么计划吗?
> >
> > 谢谢
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


关于官方的k8s operator

2020-09-16 文章 Harold.Miao
hi flink

请教一下官方在支持k8s operator部署这块有什么计划吗?

谢谢


-- 

Best Regards,
Harold Miao


Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-15 文章 Harold.Miao
是我的代码问题,我set sp的时候streamGraph里面的算子还没有构建出来,正确的做法是在生成jobGraph的时候set 进去。  感谢

Jark Wu  于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao  wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());  if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >  }   }*
> >// for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >}
> >return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> > ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu  于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > > Harold.Miao  于2020年9月14日周一 下午6:44写

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp

请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗

谢谢

Jark Wu  于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao  wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());  if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >  }   }*
> >// for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >}
> >return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> > ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu  于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > &

Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下

private StreamExecutionEnvironment createStreamExecutionEnvironment() {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();







*   LOG.info("restore cp exist: {}",
environment.getExecution().getRestoreSp().isPresent());   if
(environment.getExecution().getRestoreSp().isPresent()) {
LOG.info("restore cp path: {}",
environment.getExecution().getRestoreSp().get());  if
(!environment.getExecution().getRestoreSp().get().contains("none")) {
   SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
true); 
env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
 }   }*
   // for TimeCharacteristic validation in StreamTableEnvironmentImpl
   
env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
   if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
  
env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
   }
   return env;
}


传入上面那个只有meta文件地址的时候报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected
exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
... 3 more


错误很明显的显示没有算子的state








Congxian Qiu  于2020年9月14日周一 下午7:53写道:

> Hi
>如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> 这一个文件的。具体逻辑可以看一下这里[1]
>
> [1]
>
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> Best,
> Congxian
>
>
> Harold.Miao  于2020年9月14日周一 下午6:44写道:
>
> > hi  all
> >
> > flink 版本: 1.11.1
> >
> > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> >
> > state.backend: filesystem
> > state.backend.fs.checkpointdir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > state.checkpoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > state.savepoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> > execution.checkpointing.interval: 60s
> > execution.checkpointing.mode: EXACTLY_ONCE
> > jobmanager.execution.failover-strategy: full
> > state.backend.incremental: true
> >
> >
> > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> >
> > 类似下面:
> >
> > hdfs://
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> >
> > 除了这个文件,其他什么都没有。
> >
> > 我们的源是kafka,kafka肯定会保存state的。
> >
> >
> > 请教大家这是什么原因导致的呢
> >
> >
> > 谢谢
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


[sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs

2020-09-14 文章 Harold.Miao
hi  all

flink 版本: 1.11.1

我们利用sql-client提交任务, flink-conf.yaml配置如下

state.backend: filesystem
state.backend.fs.checkpointdir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
state.checkpoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
state.savepoints.dir:
hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252

execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
jobmanager.execution.failover-strategy: full
state.backend.incremental: true


任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件

类似下面:

hdfs://10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata

除了这个文件,其他什么都没有。

我们的源是kafka,kafka肯定会保存state的。


请教大家这是什么原因导致的呢


谢谢










-- 

Best Regards,
Harold Miao


Re: sql-client checkpoint sql-client

2020-09-13 文章 Harold.Miao
从checkpoint恢复  官方还不支持   我司是修改了sql-client来支持这个需求的

引领  于2020年9月4日周五 下午6:13写道:

>
>
> 想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点:
> ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group
> by或者是count等操作时该如何办?
> ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费
> | |
> 引领
> |
> |
> yrx73...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 

Best Regards,
Harold Miao


Re: 关于hive的一个疑问

2020-08-20 文章 Harold.Miao
hi

hive catlog只存储元数据,元数据信息可以通过hive client获取Hive
Table,然后通过table.getParameters()可以获取到。
至于具体数据,是跟你的元数据对应的存储系统相关的。要去对应的存储里面去查。

Bruce  于2020年8月20日周四 下午7:52写道:

> hi,all.
>
> hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗?
>
>
>
>
> 比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗?
>
>
>
>
> 发自我的iPhone



-- 

Best Regards,
Harold Miao


Re: TableColumn为啥不包含comment

2020-08-16 文章 Harold.Miao
谢谢   我想提交这个patch

Shengkai Fang  于2020年8月14日周五 下午4:33写道:

> hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18958
>
> Harold.Miao  于2020年8月13日周四 上午11:08写道:
>
> > hi all
> > 我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


TableColumn为啥不包含comment

2020-08-12 文章 Harold.Miao
hi all
我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢


-- 

Best Regards,
Harold Miao


Re: 关于 sql-client

2020-07-24 文章 Harold.Miao
这个呢  https://github.com/ververica/flink-sql-gateway

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: 关于 sql-client

2020-07-23 文章 Harold.Miao
1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
源码里面有加载主配置文件的逻辑

public LocalExecutor(URL defaultEnv, List jars, List libraries) {
   // discover configuration
   final String flinkConfigDir;
   try {
  // find the configuration directory
  flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();

  // load the global configuration
  this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);

  // initialize default file system
  FileSystem.initialize(flinkConfig,
PluginUtils.createPluginManagerFromRootFolder(flinkConfig));

  // load command lines for deployment
  this.commandLines =
CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
  this.commandLineOptions = collectCommandLineOptions(commandLines);
   } catch (Exception e) {
  throw new SqlClientException("Could not load Flink configuration.", e);
   }


2  因为等不及官方的  我们自己wrapper实现了一个




杨荣  于2020年7月24日周五 上午10:53写道:

> Hi all,
>
> 请问:
> 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
>
> 2. GateWay mode 预计在那个版本 release?
>


-- 

Best Regards,
Harold Miao


Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 Harold.Miao
谢谢  我暂时这样改了一下

public boolean submitUpdate(String statement) {
   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
   terminal.writer().println(new AttributedString(statement).toString());
   terminal.flush();

   final Optional parsedStatement = parseCommand(statement);
   // only support INSERT INTO/OVERWRITE
   return parsedStatement.map(cmdCall -> {
  switch (cmdCall.command) {
 case INSERT_INTO:
 case INSERT_OVERWRITE:
return callInsert(cmdCall);




* case CREATE_TABLE:callDdl(cmdCall.operands[0],
CliStrings.MESSAGE_TABLE_CREATED);return true;*
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
  }
   }).orElse(false);
}


Jark Wu  于2020年7月21日周二 下午6:41写道:

> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>
> > hi flink users
> >
> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


[sql-client] 如何绕过交互式模式去做ddl

2020-07-16 文章 Harold.Miao
hi flink users

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!


-- 

Best Regards,
Harold Miao


Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 文章 Harold.Miao
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码

private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {

   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);

   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}

private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }

}


wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:

>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


-- 

Best Regards,
Harold Miao


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 Harold.Miao
是在flink-conf.yaml里面配置这个参数吗
execution.checkpointing.interval
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-interval>

godfrey he  于2020年7月16日周四 下午1:37写道:

> 现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
> 你可以配置在flink-conf.yaml里
>
> Harold.Miao  于2020年7月16日周四 下午1:27写道:
>
> > hi flink users
> >
> > 通过sql-client提交sql怎么设置checkpointing.interval?
> > 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> > 谢谢
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


[sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 Harold.Miao
hi flink users

通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢



-- 

Best Regards,
Harold Miao


[flink-sql] 如何在sql运行时动态修改kafka的scan.startup.mode

2020-07-14 文章 Harold.Miao
hi  all
现在有个需求,就是一段用sql-client提交的sql任务需要动态修改kafka的scan.startup.mode,以支持不同的消费需求。请问有什么好的办法吗?
谢谢


-- 

Best Regards,
Harold Miao