Re: flink sql如何处理脏数据问题?

2023-10-29 文章 ying lin
还有一种做法就是使用datastream,datastream支持sideoutput,但 flink
sql不支持,不过有一种迂回的做法就是flinksql -> datastream -> flink
sql,可以查一下官网资料,flinksql和datastream可以互相转换。

Xuyang  于2023年10月30日周一 10:17写道:

> Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2023-10-29 10:23:38,"casel.chen"  写道:
> >场景:使用flink
> sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
> topic或者写入一个文件便于事后审查。这个目前有办法做到吗?
>


Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-29 文章 Benchao Li
Hi casel,

Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway
的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。

casel.chen  于2023年10月29日周日 17:24写道:
>
> 1. 启动flink集群
> bin/start-cluster.sh
>
>
> 2. 启动sql gateway
> bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2
>
>
> 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下
>
>
> 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
> $ bin/beeline
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
> Beeline version 3.1.2 by Apache Hive
> beeline> !connect jdbc:flink://localhost:8083
> Connecting to jdbc:flink://localhost:8083
> Enter username for jdbc:flink://localhost:8083:
> Enter password for jdbc:flink://localhost:8083:
> Failed to create the executor.
> 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
> . . . . . . . . . . . . . . . . . . . .>   a INT,
> . . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
> . . . . . . . . . . . . . . . . . . . .> ) WITH (
> . . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
> . . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
> . . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
> . . . . . . . . . . . . . . . . . . . .> );
> Failed to create the executor.
> Connection is already closed.
>


-- 

Best,
Benchao Li


Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 Xuyang
Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1].
我的理解是倒不是说“引入CatalogStore后才可以提供show create 
catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations




--

Best!
Xuyang





在 2023-10-29 20:34:52,"casel.chen"  写道:
>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
>catalog语法支持?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-10-20 17:03:46,"李宇彬"  写道:
>>Hi Feng
>>
>>
>>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>>| |
>> 回复的原邮件 
>>| 发件人 | Feng Jin |
>>| 发送日期 | 2023年10月20日 13:18 |
>>| 收件人 |  |
>>| 主题 | Re: flink sql不支持show create catalog 吗? |
>>hi casel
>>
>>
>>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>>
>>
>>Best,
>>Feng
>>
>>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>>
>>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>>sql不支持show create catalog 。
>>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re:flink sql如何处理脏数据问题?

2023-10-29 文章 Xuyang
Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。







--

Best!
Xuyang





在 2023-10-29 10:23:38,"casel.chen"  写道:
>场景:使用flink 
>sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
> topic或者写入一个文件便于事后审查。这个目前有办法做到吗?


Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 casel.chen
请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
catalog语法支持?

















在 2023-10-20 17:03:46,"李宇彬"  写道:
>Hi Feng
>
>
>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>| |
> 回复的原邮件 
>| 发件人 | Feng Jin |
>| 发送日期 | 2023年10月20日 13:18 |
>| 收件人 |  |
>| 主题 | Re: flink sql不支持show create catalog 吗? |
>hi casel
>
>
>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>
>
>Best,
>Feng
>
>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>
>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>sql不支持show create catalog 。
>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-29 文章 casel.chen
1. 启动flink集群
bin/start-cluster.sh


2. 启动sql gateway
bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2


3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下


4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
$ bin/beeline 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
Beeline version 3.1.2 by Apache Hive
beeline> !connect jdbc:flink://localhost:8083
Connecting to jdbc:flink://localhost:8083
Enter username for jdbc:flink://localhost:8083: 
Enter password for jdbc:flink://localhost:8083: 
Failed to create the executor.
0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
. . . . . . . . . . . . . . . . . . . .>   a INT,
. . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
. . . . . . . . . . . . . . . . . . . .> ) WITH (
. . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
. . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
. . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
. . . . . . . . . . . . . . . . . . . .> );
Failed to create the executor.
Connection is already closed.