JDBC异步lookup join有FLIP或者迭代计划吗?
如果没有我用VertX和Druid连接池贡献下代码。这个要在dev邮件列表里讨论是吗
SQL DDL选项的值需要单引号应该如何解决?
create table source ( id int ) with ( type='jdbc', username='us', password='ab'c' ); 例如上面DDL密码的值需要 ' 单引号。这种应该怎么解决?
Flink Jdbc Connector close方法线程同步
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java 为什么上诉代码中的close方法要进行线程同步,这个是什么考虑。
GenericRowData与BinaryRowData的转换
我看Kafka Connector源码生成的是GenericRowData,到Jdbc Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
SinkFunction与OutputFormat选择哪个?
自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢?
blink planner里的Scala代码,未来会由Java改写吗?
目前blink planner中有大量Scala代码,Scala在这方面写起来确实简单不少。未来不需要用Java重写是吗?
Re: Flink 1.12.2 sql api 使用parquet格式报错
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html Parquet你要下这个Jar包放在你flink/lib目录的。 Luna Wong 于2021年4月1日周四 上午10:45写道: > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html > > 太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道: > > > > 使用 parquet 还需要手段添加其他相关的依赖吗? > > > > > > 环境和报错信息如下: > > > > > > Flink 版本: 1.12.2 > > 部署方式: standalone kubernetes session > > 添加的相关依赖 > > > > > > > > > > > > > > > > > 错误信息: > > Caused by: org.apache.flink.table.api.ValidationException: Could not find > > any format factory for identifier 'parquet' in the classpath. at > > org.apache.flink.table.filesystem.FileSystemTableSink. > at > > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)
Re: Flink 1.12.2 sql api 使用parquet格式报错
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html 太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道: > > 使用 parquet 还需要手段添加其他相关的依赖吗? > > > 环境和报错信息如下: > > > Flink 版本: 1.12.2 > 部署方式: standalone kubernetes session > 添加的相关依赖 > > > > > > 错误信息: > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > format factory for identifier 'parquet' in the classpath. at > org.apache.flink.table.filesystem.FileSystemTableSink. at > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)
Re: FLIP-146中TableSource并行度设置预计哪个版本做?
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。 问题1:ScanTable 并行度在FLIP-146中有提及, LookupTable的并行度设置有FLIP或者issue吗? 问题2:这两类Table的并行度设置,预计在Flink哪个版本推出。 Luna Wong 于2021年3月31日周三 下午9:46写道: > > DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。 > > Luna Wong 于2021年3月31日周三 下午9:45写道: > > > > DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
Re: FLIP-146中TableSource并行度设置预计哪个版本做?
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。 Luna Wong 于2021年3月31日周三 下午9:45写道: > > DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
FLIP-146中TableSource并行度设置预计哪个版本做?
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
Re: Flink SQL一个Job使用多个Catalog的例子
我已经解决了。USE其他Catalog再建表即可。 Luna Wong 于2021年1月12日周二 下午9:41写道: > > 大家好。 > 我没有在官网找到个Job使用多个Catalog的例子。 > 我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表 > 注册在另一个Iceberg + Hive 的Catalog 中。 > 注册代码如下。 > CREATE CATALOG hive_catalog WITH ( > 'type'='iceberg', > 'catalog-type'='hive', > 'uri'='thrift://kudu1:9083', > 'clients'='2', > 'property-version'='1', > 'warehouse'='hdfs://ns1//user/hive/warehouse' > ); > 之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive > Catalog中。我必须得换一个Catalog。 > 目前在IDEA下我还没跑起来。 > > 可爱的木兰。
Flink SQL一个Job使用多个Catalog的例子
大家好。 我没有在官网找到个Job使用多个Catalog的例子。 我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表 注册在另一个Iceberg + Hive 的Catalog 中。 注册代码如下。 CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://kudu1:9083', 'clients'='2', 'property-version'='1', 'warehouse'='hdfs://ns1//user/hive/warehouse' ); 之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive Catalog中。我必须得换一个Catalog。 目前在IDEA下我还没跑起来。 可爱的木兰。
Flink IDEA中使用Iceberg
代码如下,Flink metastore报错日志: AlreadyExistsException(message:Database default already exists) 但是我已经 USE了 luna db啊?很奇怪 tEnv.executeSql("CREATE CATALOG iceberg_hive WITH (\n" + " 'type'='iceberg',\n" + " 'catalog-type'='hive',\n" + " 'uri'='thrift://kudu1:9083',\n" + " 'clients'='2',\n" + " 'property-version'='1',\n" + " 'warehouse'='hdfs://ns1//user/hive/warehouse'\n" + ")"); tEnv.executeSql("USE CATALOG iceberg_hive"); //tEnv.executeSql("CREATE DATABASE luna"); tEnv.executeSql("USE luna"); tEnv.executeSql("CREATE TABLE iceberg_hive.luna.dwd (\n" + "id BIGINT COMMENT 'unique id',\n" + "name STRING\n" + ")"); tEnv.executeSql("CREATE TABLE iceberg_hive.luna.ads (\n" + "id BIGINT COMMENT 'unique id',\n" + "name STRING\n" + ")");
Flink Stream SQL 何时支持拉取 Iceberg表
我看Iceberg官方文档,Stream目前只支持Iceberg Sink但是没有Source。未来在哪个版本会支持Iceberg Source Stream。有issue吗?
Flink CVE补丁会打在1.10版本吗
Flink 那个两个CVE的Patch 会打在1.10.4这个版本吗。目前修复版本是 1.11.3 1.12.0 ,很多生成还都在1.10 不好直接升级的。
Job结束blob目录及文件没有删除
https://issues.apache.org/jira/browse/FLINK-20696 有一定概率发生,提交很多Job,Job结束后,会有个别Blob目录没有清理。我还没Debug出原因。
Re: FlinkSQL导致Prometheus内存暴涨
我看了源码了。operator name截断了。但是task name没截断。task name是那些operator name拼起来的 所以特别长。现在我只是魔改源码临时截断了一下,咱还是在issue里讨论吧 Jark Wu 于2020年11月26日周四 下午8:53写道: > > IIRC, runtime will truncate the operator name to max 80 characters, see > `TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH`. > You can search the log if there are "The operator name {} exceeded the {} > characters length limit and was truncated.". > > On Thu, 26 Nov 2020 at 18:18, hailongwang <18868816...@163.com> wrote: > > > > > > > > > Hi, > > 是的,个人觉得可以提供一个配置项来控制 task Name。 > > 完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric > > 的采集,可以极大较少发送的网络开销,存储空间等。 > > 已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375 > > > > > > Best, > > Hailong > > > > 在 2020-11-24 14:19:40,"Luna Wong" 写道: > > >FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。 > > >下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。 > > > > > > > >task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed" > >
FlinkSQL导致Prometheus内存暴涨
FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。 下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。 task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"
Re: Flink未来会弃用TableSourceFactory吗
1.11版本 FactoryUtil类中有如下方法。 public static DynamicTableSink createTableSink(); 我拿到DynamicTableSink之后调用什么API可以把表注册进Catalog啊。 老版本API有 tEnv.registerTableSink() 注册TableSink。新版我是没办法这么注册DynamicTableSink是吧。 Leonard Xu 于2020年11月16日周一 下午10:59写道: > > Hi, > 据我了解会弃用的,新的connector都会用DynamicTableSourceFactory,一般稳定一两个版本后社区会弃用, > 另外这个是比较底层的实现,sql用户应该感知不到,如果有自定义connector的开发都建议用DynamicTableSourceFactory。 > > 祝好 > Leonard Xu > > > 在 2020年11月16日,19:54,Luna Wong 写道: > > > > FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗? >
Flink未来会弃用TableSourceFactory吗
FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?
Flink JSON反序列化DECIMAL精度丢失
https://issues.apache.org/jira/browse/FLINK-20170 这是我今天提的issue。 Jackson这样反序列化会把BigDECIMAL转成Double。超过12位的小数精度丢失。这种情况我得怎么办。只能先当做STRING类型处理或修改下JSON这个包的源码重新变一下。 还有其他的最佳实践吗
Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?
Table API 转 DataStream为啥会出现性能损耗 hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道: > > 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了? > 而不是再实现一个 Connector。 > > > > > 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道: > >明白了,多谢。 > > > >是 Canal-Json 格式的 Kafka Connector. > > > >我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table > >api 对接 flink。 > > > >现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector. > > > > > > > > > >-- > >Sent from: http://apache-flink.147419.n8.nabble.com/
ElasticsearchApiCallBridge相关类构造函数问题
为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。 我还想继承Elasticsearch6ApiCallBridge类。在new RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?