Re: flink sql如何处理脏数据问题?
还有一种做法就是使用datastream,datastream支持sideoutput,但 flink sql不支持,不过有一种迂回的做法就是flinksql -> datastream -> flink sql,可以查一下官网资料,flinksql和datastream可以互相转换。 Xuyang 于2023年10月30日周一 10:17写道: > Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。 > > > > > > > > -- > > Best! > Xuyang > > > > > > 在 2023-10-29 10:23:38,"casel.chen" 写道: > >场景:使用flink > sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka > topic或者写入一个文件便于事后审查。这个目前有办法做到吗? >
Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败
Hi casel, Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway 的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。 casel.chen 于2023年10月29日周日 17:24写道: > > 1. 启动flink集群 > bin/start-cluster.sh > > > 2. 启动sql gateway > bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 > > > 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下 > > > 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车 > $ bin/beeline > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] > Beeline version 3.1.2 by Apache Hive > beeline> !connect jdbc:flink://localhost:8083 > Connecting to jdbc:flink://localhost:8083 > Enter username for jdbc:flink://localhost:8083: > Enter password for jdbc:flink://localhost:8083: > Failed to create the executor. > 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T( > . . . . . . . . . . . . . . . . . . . .> a INT, > . . . . . . . . . . . . . . . . . . . .> b VARCHAR(10) > . . . . . . . . . . . . . . . . . . . .> ) WITH ( > . . . . . . . . . . . . . . . . . . . .> 'connector' = 'filesystem', > . . . . . . . . . . . . . . . . . . . .> 'path' = 'file:///tmp/T.csv', > . . . . . . . . . . . . . . . . . . . .> 'format' = 'csv' > . . . . . . . . . . . . . . . . . . . .> ); > Failed to create the executor. > Connection is already closed. > -- Best, Benchao Li
Re:Re:回复: flink sql不支持show create catalog 吗?
Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1]. 我的理解是倒不是说“引入CatalogStore后才可以提供show create catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。 [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations -- Best! Xuyang 在 2023-10-29 20:34:52,"casel.chen" 写道: >请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create >catalog语法支持? > > > > > > > > > > > > > > > > > >在 2023-10-20 17:03:46,"李宇彬" 写道: >>Hi Feng >> >> >>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。 >>| | >> 回复的原邮件 >>| 发件人 | Feng Jin | >>| 发送日期 | 2023年10月20日 13:18 | >>| 收件人 | | >>| 主题 | Re: flink sql不支持show create catalog 吗? | >>hi casel >> >> >>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。 >> >> >>Best, >>Feng >> >>On Fri, Oct 20, 2023 at 11:55 AM casel.chen wrote: >> >>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink >>sql不支持show create catalog 。 >>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?
Re:flink sql如何处理脏数据问题?
Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。 -- Best! Xuyang 在 2023-10-29 10:23:38,"casel.chen" 写道: >场景:使用flink >sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka > topic或者写入一个文件便于事后审查。这个目前有办法做到吗?
Re:回复: flink sql不支持show create catalog 吗?
请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create catalog语法支持? 在 2023-10-20 17:03:46,"李宇彬" 写道: >Hi Feng > > >我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。 >| | > 回复的原邮件 >| 发件人 | Feng Jin | >| 发送日期 | 2023年10月20日 13:18 | >| 收件人 | | >| 主题 | Re: flink sql不支持show create catalog 吗? | >hi casel > > >从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。 > > >Best, >Feng > >On Fri, Oct 20, 2023 at 11:55 AM casel.chen wrote: > >之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink >sql不支持show create catalog 。 >而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?
flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败
1. 启动flink集群 bin/start-cluster.sh 2. 启动sql gateway bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车 $ bin/beeline SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] Beeline version 3.1.2 by Apache Hive beeline> !connect jdbc:flink://localhost:8083 Connecting to jdbc:flink://localhost:8083 Enter username for jdbc:flink://localhost:8083: Enter password for jdbc:flink://localhost:8083: Failed to create the executor. 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T( . . . . . . . . . . . . . . . . . . . .> a INT, . . . . . . . . . . . . . . . . . . . .> b VARCHAR(10) . . . . . . . . . . . . . . . . . . . .> ) WITH ( . . . . . . . . . . . . . . . . . . . .> 'connector' = 'filesystem', . . . . . . . . . . . . . . . . . . . .> 'path' = 'file:///tmp/T.csv', . . . . . . . . . . . . . . . . . . . .> 'format' = 'csv' . . . . . . . . . . . . . . . . . . . .> ); Failed to create the executor. Connection is already closed.