Re: [SQL] parse table name from sql statement
thx silence 于2020年9月22日周二 上午11:54写道: > 写过一个类似的可以参考一下 > > private static List lookupSelectTable(SqlNode sqlNode) { > List list = new ArrayList<>(); > if (sqlNode instanceof SqlSelect) { > SqlNode from = ((SqlSelect) sqlNode).getFrom(); > list.addAll(lookupSelectTable(from)); > } else if (sqlNode instanceof SqlJoin) { > SqlJoin sqlJoin = (SqlJoin) sqlNode; > list.addAll(lookupSelectTable(sqlJoin.getLeft())); > list.addAll(lookupSelectTable(sqlJoin.getRight())); > } else if (sqlNode instanceof SqlBasicCall) { > SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; > SqlOperator operator = sqlBasicCall.getOperator(); > if (SqlKind.AS.equals(operator.getKind())) { > > list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0])); > } else if (SqlKind.UNION.equals(operator.getKind())) { > for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) { > list.addAll(lookupSelectTable(operandSqlNode)); > } > } else { > throw new RuntimeException("operator " + operator.getKind() > + " not support"); > } > } else if (sqlNode instanceof SqlIdentifier) { > list.add(((SqlIdentifier) sqlNode).getSimple()); > } else { > throw new RuntimeException("operator " + sqlNode.getClass() + " > not support"); > } > return list; > } > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best Regards, Harold Miao
Re: Re: [SQL] parse table name from sql statement
大佬 能不能给点示例 Benchao Li 于2020年9月21日周一 下午4:38写道: > 我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取? > > Harold.Miao 于2020年9月21日周一 下午1:58写道: > > > 主要是我没有完整的所有单元case, 总是感觉写的不完整。 > > > > 郭士榕 于2020年9月21日周一 上午11:08写道: > > > > > > > > > > > > > > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-09-21 10:50:31,"Harold.Miao" 写道: > > > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。 > > > > > > > >郭士榕 于2020年9月21日周一 上午10:21写道: > > > > > > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。 > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> 在 2020-09-21 10:12:13,"Harold.Miao" 写道: > > > >> >hi all > > > >> > > > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。 > > > >> > > > > >> >谢谢 > > > >> > > > > >> >-- > > > >> > > > > >> >Best Regards, > > > >> >Harold Miao > > > >> > > > > > > > > > > > >-- > > > > > > > >Best Regards, > > > >Harold Miao > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > > > -- > > Best, > Benchao Li > -- Best Regards, Harold Miao
Re: Re: [SQL] parse table name from sql statement
主要是我没有完整的所有单元case, 总是感觉写的不完整。 郭士榕 于2020年9月21日周一 上午11:08写道: > > > > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。 > > > > > > > > > > > > > > > 在 2020-09-21 10:50:31,"Harold.Miao" 写道: > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。 > > > >郭士榕 于2020年9月21日周一 上午10:21写道: > > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。 > >> > >> > >> > >> > >> > >> 在 2020-09-21 10:12:13,"Harold.Miao" 写道: > >> >hi all > >> > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。 > >> > > >> >谢谢 > >> > > >> >-- > >> > > >> >Best Regards, > >> >Harold Miao > >> > > > > > >-- > > > >Best Regards, > >Harold Miao > -- Best Regards, Harold Miao
Re: [SQL] parse table name from sql statement
主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。 郭士榕 于2020年9月21日周一 上午10:21写道: > 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。 > > > > > > 在 2020-09-21 10:12:13,"Harold.Miao" 写道: > >hi all > > > >请教大家在复杂sql语句中parse所有的table name是怎么实现的。 > > > >谢谢 > > > >-- > > > >Best Regards, > >Harold Miao > -- Best Regards, Harold Miao
[SQL] parse table name from sql statement
hi all 请教大家在复杂sql语句中parse所有的table name是怎么实现的。 谢谢 -- Best Regards, Harold Miao
Re: 关于官方的k8s operator
谢谢 Yang Wang 于2020年9月17日周四 上午11:20写道: > Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个 > 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成 > > 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4] > > > [1]. https://github.com/lyft/flinkk8soperator > [2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > [3]. > > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html > [4]. https://github.com/wangyang0918/flink-native-k8s-operator > > > Best, > Yang > > Harold.Miao 于2020年9月17日周四 上午10:14写道: > > > hi flink > > > > 请教一下官方在支持k8s operator部署这块有什么计划吗? > > > > 谢谢 > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
关于官方的k8s operator
hi flink 请教一下官方在支持k8s operator部署这块有什么计划吗? 谢谢 -- Best Regards, Harold Miao
Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs
是我的代码问题,我set sp的时候streamGraph里面的算子还没有构建出来,正确的做法是在生成jobGraph的时候set 进去。 感谢 Jark Wu 于2020年9月15日周二 上午11:31写道: > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句? > > On Mon, 14 Sep 2020 at 20:15, Harold.Miao wrote: > > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > >final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > * LOG.info("restore cp exist: {}", > > environment.getExecution().getRestoreSp().isPresent()); if > > (environment.getExecution().getRestoreSp().isPresent()) { > > LOG.info("restore cp path: {}", > > environment.getExecution().getRestoreSp().get()); if > > (!environment.getExecution().getRestoreSp().get().contains("none")) { > >SavepointRestoreSettings savepointRestoreSettings = > > > > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > > true); > > > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > > } }* > >// for TimeCharacteristic validation in StreamTableEnvironmentImpl > > > > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > >if (env.getStreamTimeCharacteristic() == > TimeCharacteristic.EventTime) { > > > > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > >} > >return env; > > } > > > > > > 传入上面那个只有meta文件地址的时候报错如下: > > > > Exception in thread "main" > > org.apache.flink.table.client.SqlClientException: Unexpected > > exception. This is a bug. Please consider filing an issue. > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > > Could not create execution context. > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > > at > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > > Caused by: java.lang.IllegalStateException: No operators defined in > > streaming topology. Cannot execute. > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > > ... 3 more > > > > > > 错误很明显的显示没有算子的state > > > > > > > > > > > > > > > > > > Congxian Qiu 于2020年9月14日周一 下午7:53写道: > > > > > Hi > > >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > > Best, > > > Congxian > > > > > > > > > Harold.Miao 于2020年9月14日周一 下午6:44写
Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs
是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings 来恢复cp 请教 我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗 谢谢 Jark Wu 于2020年9月15日周二 上午11:31写道: > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句? > > On Mon, 14 Sep 2020 at 20:15, Harold.Miao wrote: > > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 > > > > private StreamExecutionEnvironment createStreamExecutionEnvironment() { > >final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > * LOG.info("restore cp exist: {}", > > environment.getExecution().getRestoreSp().isPresent()); if > > (environment.getExecution().getRestoreSp().isPresent()) { > > LOG.info("restore cp path: {}", > > environment.getExecution().getRestoreSp().get()); if > > (!environment.getExecution().getRestoreSp().get().contains("none")) { > >SavepointRestoreSettings savepointRestoreSettings = > > > > > SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), > > true); > > > env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); > > } }* > >// for TimeCharacteristic validation in StreamTableEnvironmentImpl > > > > > env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); > >if (env.getStreamTimeCharacteristic() == > TimeCharacteristic.EventTime) { > > > > > env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); > >} > >return env; > > } > > > > > > 传入上面那个只有meta文件地址的时候报错如下: > > > > Exception in thread "main" > > org.apache.flink.table.client.SqlClientException: Unexpected > > exception. This is a bug. Please consider filing an issue. > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > > Could not create execution context. > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) > > at > > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) > > at > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > > Caused by: java.lang.IllegalStateException: No operators defined in > > streaming topology. Cannot execute. > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137) > > at > > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) > > ... 3 more > > > > > > 错误很明显的显示没有算子的state > > > > > > > > > > > > > > > > > > Congxian Qiu 于2020年9月14日周一 下午7:53写道: > > > > > Hi > > >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > > > 这一个文件的。具体逻辑可以看一下这里[1] > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > > > Best, > > > Congxian > > > > > > > > &
Re: [sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs
还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下 private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); * LOG.info("restore cp exist: {}", environment.getExecution().getRestoreSp().isPresent()); if (environment.getExecution().getRestoreSp().isPresent()) { LOG.info("restore cp path: {}", environment.getExecution().getRestoreSp().get()); if (!environment.getExecution().getRestoreSp().get().contains("none")) { SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), true); env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); } }* // for TimeCharacteristic validation in StreamTableEnvironmentImpl env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); } return env; } 传入上面那个只有meta文件地址的时候报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) at org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:184) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:137) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) ... 3 more 错误很明显的显示没有算子的state Congxian Qiu 于2020年9月14日周一 下午7:53写道: > Hi >如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > 这一个文件的。具体逻辑可以看一下这里[1] > > [1] > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > Best, > Congxian > > > Harold.Miao 于2020年9月14日周一 下午6:44写道: > > > hi all > > > > flink 版本: 1.11.1 > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > state.backend: filesystem > > state.backend.fs.checkpointdir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > state.checkpoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > state.savepoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > execution.checkpointing.externalized-checkpoint-retention: > > RETAIN_ON_CANCELLATION > > execution.checkpointing.interval: 60s > > execution.checkpointing.mode: EXACTLY_ONCE > > jobmanager.execution.failover-strategy: full > > state.backend.incremental: true > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > 类似下面: > > > > hdfs:// > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > 除了这个文件,其他什么都没有。 > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > 请教大家这是什么原因导致的呢 > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
[sql-client][checkpoint] sql-client提交任务,checkpoint没有写入hdfs
hi all flink 版本: 1.11.1 我们利用sql-client提交任务, flink-conf.yaml配置如下 state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 state.checkpoints.dir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 state.savepoints.dir: hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE jobmanager.execution.failover-strategy: full state.backend.incremental: true 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 类似下面: hdfs://10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata 除了这个文件,其他什么都没有。 我们的源是kafka,kafka肯定会保存state的。 请教大家这是什么原因导致的呢 谢谢 -- Best Regards, Harold Miao
Re: sql-client checkpoint sql-client
从checkpoint恢复 官方还不支持 我司是修改了sql-client来支持这个需求的 引领 于2020年9月4日周五 下午6:13写道: > > > 想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点: > ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group > by或者是count等操作时该如何办? > ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费 > | | > 引领 > | > | > yrx73...@163.com > | > 签名由网易邮箱大师定制 > > -- Best Regards, Harold Miao
Re: 关于hive的一个疑问
hi hive catlog只存储元数据,元数据信息可以通过hive client获取Hive Table,然后通过table.getParameters()可以获取到。 至于具体数据,是跟你的元数据对应的存储系统相关的。要去对应的存储里面去查。 Bruce 于2020年8月20日周四 下午7:52写道: > hi,all. > > hive仅作为元数据管理,具体数据不存储在hdfs上,通过hivecatalog可以读取到具体的数据吗? > > > > > 比如hive里面存储了MySQL,Oracle的表元数据信息,可以用hivecatalog读取到具体的表数据吗? > > > > > 发自我的iPhone -- Best Regards, Harold Miao
Re: TableColumn为啥不包含comment
谢谢 我想提交这个patch Shengkai Fang 于2020年8月14日周五 下午4:33写道: > hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。 > > [1] https://issues.apache.org/jira/browse/FLINK-18958 > > Harold.Miao 于2020年8月13日周四 上午11:08写道: > > > hi all > > 我发现TableColumn class不包含column comment , 给开发带来了一点麻烦,请教大家一下,谢谢 > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
TableColumn为啥不包含comment
hi all 我发现TableColumn class不包含column comment , 给开发带来了一点麻烦,请教大家一下,谢谢 -- Best Regards, Harold Miao
Re: 关于 sql-client
这个呢 https://github.com/ververica/flink-sql-gateway 杨荣 于2020年7月24日周五 下午3:19写道: > 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者 > submit with sql file 的 feature 到现在都还没实现呢。 > > Harold.Miao 于2020年7月24日周五 上午11:42写道: > > > 1 应该是可以的 主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address > > 源码里面有加载主配置文件的逻辑 > > > > public LocalExecutor(URL defaultEnv, List jars, List > libraries) { > >// discover configuration > >final String flinkConfigDir; > >try { > > // find the configuration directory > > flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); > > > > // load the global configuration > > this.flinkConfig = > > GlobalConfiguration.loadConfiguration(flinkConfigDir); > > > > // initialize default file system > > FileSystem.initialize(flinkConfig, > > PluginUtils.createPluginManagerFromRootFolder(flinkConfig)); > > > > // load command lines for deployment > > this.commandLines = > > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir); > > this.commandLineOptions = collectCommandLineOptions(commandLines); > >} catch (Exception e) { > > throw new SqlClientException("Could not load Flink configuration.", > > e); > >} > > > > > > 2 因为等不及官方的 我们自己wrapper实现了一个 > > > > > > > > > > 杨荣 于2020年7月24日周五 上午10:53写道: > > > > > Hi all, > > > > > > 请问: > > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job > > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。 > > > > > > 2. GateWay mode 预计在那个版本 release? > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
Re: 关于 sql-client
1 应该是可以的 主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address 源码里面有加载主配置文件的逻辑 public LocalExecutor(URL defaultEnv, List jars, List libraries) { // discover configuration final String flinkConfigDir; try { // find the configuration directory flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); // load the global configuration this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir); // initialize default file system FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig)); // load command lines for deployment this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir); this.commandLineOptions = collectCommandLineOptions(commandLines); } catch (Exception e) { throw new SqlClientException("Could not load Flink configuration.", e); } 2 因为等不及官方的 我们自己wrapper实现了一个 杨荣 于2020年7月24日周五 上午10:53写道: > Hi all, > > 请问: > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。 > > 2. GateWay mode 预计在那个版本 release? > -- Best Regards, Harold Miao
Re: [sql-client] 如何绕过交互式模式去做ddl
谢谢 我暂时这样改了一下 public boolean submitUpdate(String statement) { terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi()); terminal.writer().println(new AttributedString(statement).toString()); terminal.flush(); final Optional parsedStatement = parseCommand(statement); // only support INSERT INTO/OVERWRITE return parsedStatement.map(cmdCall -> { switch (cmdCall.command) { case INSERT_INTO: case INSERT_OVERWRITE: return callInsert(cmdCall); * case CREATE_TABLE:callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);return true;* default: printError(CliStrings.MESSAGE_UNSUPPORTED_SQL); return false; } }).orElse(false); } Jark Wu 于2020年7月21日周二 下午6:41写道: > Hi, > > 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。 > 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。 > 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828. > > Best, > Jark > > On Thu, 16 Jul 2020 at 19:43, Harold.Miao wrote: > > > hi flink users > > > > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl, > > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢! > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
[sql-client] 如何绕过交互式模式去做ddl
hi flink users 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl, 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢! -- Best Regards, Harold Miao
Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 private static T findSingleInternal( Class factoryClass, Map properties, Optional classLoader) { List tableFactories = discoverFactories(classLoader); List filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List discoverFactories(Optional classLoader) { try { List result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } wangl...@geekplus.com.cn 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > wangl...@geekplus.com.cn 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > wangl...@geekplus.com.cn > > > > > -- Best Regards, Harold Miao
Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
是在flink-conf.yaml里面配置这个参数吗 execution.checkpointing.interval <https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-interval> godfrey he 于2020年7月16日周四 下午1:37写道: > 现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval, > 你可以配置在flink-conf.yaml里 > > Harold.Miao 于2020年7月16日周四 下午1:27写道: > > > hi flink users > > > > 通过sql-client提交sql怎么设置checkpointing.interval? > > 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。 > > 谢谢 > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao
[sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
hi flink users 通过sql-client提交sql怎么设置checkpointing.interval? 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。 谢谢 -- Best Regards, Harold Miao
[flink-sql] 如何在sql运行时动态修改kafka的scan.startup.mode
hi all 现在有个需求,就是一段用sql-client提交的sql任务需要动态修改kafka的scan.startup.mode,以支持不同的消费需求。请问有什么好的办法吗? 谢谢 -- Best Regards, Harold Miao