请问如何贡献Flink Hologres连接器?

2024-05-13 文章 casel.chen
我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache 
Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo 
client[2]开发了hologres 
connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。


请问:
1. 贡献Flink Hologres连接器是否合规?
2. 如果合规的话,PR应该提到哪个项目代码仓库?
3. 还是说要像 https://flink-packages.org/categories/connectors 
这样链接到自己的github仓库?如果是的话要怎么在flink-packages.org上面注册呢?


[1] 
https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-hologres/1.17-vvr-8.0.4-1/
[2] 
https://github.com/aliyun/alibabacloud-hologres-connectors/tree/master/holo-client

Re:Flink流批一体应用在实时数仓数据核对场景下有哪些注意事项?

2024-04-17 文章 casel.chen
有人尝试这么实践过么?可以给一些建议么?谢谢!

















在 2024-04-15 11:15:34,"casel.chen"  写道:
>我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache 
>DolphinScheduler的数据质量模块。
>但这种方式的最大缺点是需要使用spark sql重写flink 
>sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink 
>sql,只需要将数据源从cdc或kafka换成hologres或starrocks表,再新建跑批结果表,最后只需要比较相同时间段内实时结果表和跑批结果表的数据即可。不过有几点疑问:
>1. 原实时flink sql表定义中包含的watermark, process_time和event_time这些字段可以复用在batch mode下么?
>2. 实时双流关联例如interval join和temporal join能够用于batch mode下么?
>3. 实时流作业中的窗口函数能够复用于batch mode下么?
>4. 其他需要关注的事项有哪些?


Flink流批一体应用在实时数仓数据核对场景下有哪些注意事项?

2024-04-14 文章 casel.chen
我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache 
DolphinScheduler的数据质量模块。
但这种方式的最大缺点是需要使用spark sql重写flink sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink 
sql,只需要将数据源从cdc或kafka换成hologres或starrocks表,再新建跑批结果表,最后只需要比较相同时间段内实时结果表和跑批结果表的数据即可。不过有几点疑问:
1. 原实时flink sql表定义中包含的watermark, process_time和event_time这些字段可以复用在batch mode下么?
2. 实时双流关联例如interval join和temporal join能够用于batch mode下么?
3. 实时流作业中的窗口函数能够复用于batch mode下么?
4. 其他需要关注的事项有哪些?

flink cdc metrics 问题

2024-04-07 文章 casel.chen
请问flink cdc对外有暴露一些监控metrics么?
我希望能够监控到使用flink cdc的实时作业当前未消费的binlog数据条数,类似于kafka topic消费积压监控。
想通过这个监控防止flink cdc实时作业消费慢而被套圈(最大binlog条数如何获取?)

急 [FLINK-34170] 何时能够修复?

2024-03-14 文章 casel.chen
我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major 
issue什么时候在哪个版本后能够修复呢?谢谢!


select xxx from kafka_table as kt 
left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt
on kt.trans_id=pt.trans_id and pt.trans_date = 
DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd');


phoenix表主键是 trans_id + trans_date 
复合主键,实际作业运行发现flink只会带trans_id字段对phoenix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤


https://issues.apache.org/jira/browse/FLINK-34170

使用avro schema注册confluent schema registry失败

2024-03-07 文章 casel.chen
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?




io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema for subject 
"rtdp_test-test_schema-value", details: 
[{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/10/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/11/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{oldSchemaVersion: 4}, {oldSchema: 
'{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'},
 {validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)




maven依赖:



io.confluent

kafka-schema-registry-client

7.3.1






java代码:

Schema decimalSchema = LogicalTypes.decimal(precision, 
scale).addToSchema(SchemaBuilder.builder().bytesType());

data = 
data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault();




salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default
 value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证:




{

"type": "record",

"name": "Envelope",

"namespace": "rtdp_test-test_schema",

"fields": [

{

"name": "database",

"type": "string"

},

{

"name": "es",

"type": "long"

},

{

"name": "id",

"type": "int"

},

{

"name": "isDdl",

"type": "boolean"

},

{

"name": "sql",

"type": "string"

},

{

"name": "table",

"type": "string"

},

{

"name": "ts",

"type": "long"

},

{

"name": "type",

"type": "string"

},

{

"name": "pkNames",

"type": {

"type": "array",

"items": "string"

}

},

{

"name": "data",

"type": [

{

"type": "array",

"items": {

"type": "record",

"name": "Value",

"fields": [

{

"name": "id",

"type": [

"null",

"long"

],

"default": null

},

{

"name": "create_time",

"type": [

"null",

{

"type": "long",

"logicalType": "timestamp-millis"

}

],

"default": null

},

{


根据flink job web url可以获取到JobGraph信息么?

2024-03-01 文章 casel.chen
正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?

flink cdc底层的debezium是如何注册schema到confluent schema registry的?

2024-02-29 文章 casel.chen
搜索了debezium源码但没有发现哪里有调用 
SchemaRegistryClient.register方法的地方,请问它是如何注册schema到confluent schema registry的?

Flink DataStream 作业如何获取到作业血缘?

2024-02-26 文章 casel.chen
一个Flink DataStream 作业从mysql cdc消费处理后写入apache 
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?

Flink Prometheus Connector问题

2024-02-23 文章 casel.chen
场景:使用Flink实时生成指标写入Prometheus进行监控告警
网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的 
另外找到FLIP-312 是关于flink prometheus 
connector的,https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
请问Flink官方有没有出flink prometheus connector?
如果现在要实时写入prometheus的话,推荐的方式是什么?谢谢!

flink sql作业如何统计端到端延迟

2024-02-20 文章 casel.chen
flink sql作业从kafka消费mysql过来的canal 
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - 
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang"  写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>@Override
>public void invoke(RowData row, Context context) throws Exception {
>context.currentProcessingTime(); 
>context.currentWatermark(); 
>...
>}
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>> @Override
>>> public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>> }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>> CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;


flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


public class XxxSinkFunction extends RichSinkFunction implements 
CheckpointedFunction, CheckpointListener {


@Override
public synchronized void invoke(RowData rowData, Context context) throws 
IOException {
   //  这里想从rowData中获取event time和watermark值,如何实现呢?
}
}


例如source table如下定义


CREATE TEMPORARY TABLE source_table(
  username varchar,
  click_url varchar,
  eventtime varchar,
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
) with (
  'connector'='kafka',
  ...

);


CREATE TEMPORARY TABLE sink_table(
  username varchar,
  click_url varchar,
  eventtime varchar
) with (
  'connector'='xxx',
  ...
);
insert into sink_table select username,click_url,eventtime from source_table;

flink作业实时数据质量监控告警要如何实现?

2024-02-08 文章 casel.chen
我们在使用flink搭建实时数仓,想知道flink作业是如何做数据质量监控告警的?包括数据及时性、完整性、一致性、准确性等
调研了spark streaming有amazon deequ和apache 
griffin框架来实现,想知道flink作业有没有类似的DQC框架?最好是对原有作业无侵入或者少侵入。
如果没有的话,实时数据质量这块一般是如何实现的呢?
如果每个生产作业都要单独配置一个DQC作业是不是代价太高了?有没有通过metrics暴露数据质量信息的呢?


下面是deequ使用的示例,检查每个微批数据是否满足规则要求。我们也有类似的数据质量检查需求


VerificationSuite().onData(df)
  .addCheck(Check(CheckLevel.Error, "this a unit test")
.hasSize(_ == 5) // 判断数据量是否是5条
.isComplete("id") // 判断该列是否全部不为空
.isUnique("id") // 判断该字段是否是唯一
.isComplete("productName") // 判断该字段全部不为空
.isContainedIn("priority", Array("high", "low")) // 该字段仅仅包含这两个字段
.isNonNegative("numViews") //该字段不包含负数
.containsURL("description", _ >= 0.5) // 包含url的记录是否超过0.5
.hasApproxQuantile("numViews", 0.5, _ <= 10)
  )
  .run()

flink cdc整库同步大小表造成数据倾斜问题

2024-02-06 文章 casel.chen
使用flink cdc 3.0 
yaml作业进行mysql到doris整库同步时发现有数据倾斜发生,大的TM要处理180G数据,小的TM只有30G数据,上游有的大表流量很大,而小表几乎没有流量,有什么办法可以避免发生数据倾斜问题么?

Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 文章 casel.chen



V1版本依赖于DebeziumSourceFunction,后者依赖于DebeziumEngine产生changelog
V2版本虽然依赖了 flink-connector-debezium 但没有用到debezium内部类


另外有一个问题:mongodb change stream断点续传用的resumeToken是像mysql binlog offset一样全局唯一么?
如果数据源是像kafka一样每个分片有binlog offset的话,
是不是要在对应xxxOffset类中要定义一个Map类型的offsetField 
(类似mongodb对应ChangeStreamOffset中的resumeTokenField)? 
当前mongodb中定义的是Json String类型

在 2024-01-22 11:03:55,"Jiabao Sun"  写道:
>Hi,
>
>Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
>Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。
>
>Best,
>Jiabao
>
>[1] https://github.com/mongodb/mongo-kafka
>
>
>On 2024/01/22 02:57:38 "casel.chen" wrote:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Flink CDC MongoDB connector 还是基于debezium实现的
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
>> >Hi,
>> >
>> >可以参考 Flink CDC MongoDB connector 的实现。
>> >
>> >Best,
>> >Jiabao
>> >
>> >
>> >On 2024/01/22 02:06:37 "casel.chen" wrote:
>> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
>> >> 3.x自行开发,查了一下现有大部分flink cdc source 
>> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
>> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
>> 


Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 casel.chen









Flink CDC MongoDB connector 还是基于debezium实现的








在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
>Hi,
>
>可以参考 Flink CDC MongoDB connector 的实现。
>
>Best,
>Jiabao
>
>
>On 2024/01/22 02:06:37 "casel.chen" wrote:
>> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
>> 3.x自行开发,查了一下现有大部分flink cdc source 
>> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
>> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!


如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 casel.chen
现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
3.x自行开发,查了一下现有大部分flink cdc source 
connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
 snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!

Re:RE: RE: flink cdc动态加表不生效

2024-01-18 文章 casel.chen






想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?











在 2024-01-19 11:53:49,"Jiabao Sun"  写道:
>Hi,
>
>Oracle CDC connector[1] 目前是不支持动态加表的。
>
>Best,
>Jiabao
>
>[1] 
>https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
>
>
>On 2024/01/19 03:37:41 Jiabao Sun wrote:
>> Hi,
>> 
>> 请提供一下 flink cdc 的版本,使用的什么连接器。
>> 如果方便的话,也请提供一下日志。
>> 另外,table 的正则表达式可以匹配到新增的表吗?
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
>> 
>> On 2024/01/19 03:27:22 王凯 wrote:
>> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
>> > 参数,当从savepoint重启时,新添加的表的数据不能同步
>> > 
>> > 
>> > 王凯
>> > 2813732...@qq.com
>> > 
>> > 
>> > 
>> > 


如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 casel.chen
我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 文章 casel.chen
业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?


我看到社区之前有人提议过,但issue一直是open状态
https://issues.apache.org/jira/browse/FLINK-18740


另外,我在flink最新master分支源码中有找到 
GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?

Re:Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen



感谢解惑!
还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?








在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>Hi,
>
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>
>Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>writer,参考 DorisEventSerializer
>
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>被 block 的原因是 responseFuture没有 
>complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>在没有完成时会 block 住。 
>只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>waitFlushSuccess的responseFuture 标记为 complete。
>参考 
>SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>
>保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>
>Best,
>Jiabao
>
>[1] 
>https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>
>> 2023年12月27日 22:14,casel.chen  写道:
>> 
>> 看了infoq介绍flink cdc 3.0文章 
>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>> d6 其中d代表数据变更,s代表schema变更
>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>> 而Task2处理 d3, d4, d5, s3, d6
>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>> 
>> 
>> SchemaOperator代码中
>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>> schemaChangeEvent) {
>>// The request will need to send a FlushEvent or block until flushing 
>> finished
>>SchemaChangeResponse response = requestSchemaChange(tableId, 
>> schemaChangeEvent);
>>if (response.isShouldSendFlushEvent()) {
>>LOG.info(
>>"Sending the FlushEvent for table {} in subtask {}.",
>>tableId,
>>getRuntimeContext().getIndexOfThisSubtask());
>>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>output.collect(new StreamRecord<>(schemaChangeEvent));
>>// The request will block until flushing finished in each sink 
>> writer
>>requestReleaseUpstream();
>>}
>>}
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>> 求指教,谢谢!
>> 


flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen
看了infoq介绍flink cdc 3.0文章 
https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6 
其中d代表数据变更,s代表schema变更
这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
而Task2处理 d3, d4, d5, s3, d6
这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?


SchemaOperator代码中
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing 
finished
SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
output.collect(new StreamRecord<>(schemaChangeEvent));
// The request will block until flushing finished in each sink 
writer
requestReleaseUpstream();
}
}
为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
还要发送一次SchemaChangeEvent呢?
当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
upstream的呢?
求指教,谢谢!



Re:Re: Flink CDC MySqlSplitReader问题

2023-12-21 文章 casel.chen
那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么?

















在 2023-12-20 21:40:05,"Hang Ruan"  写道:
>Hi,casel
>
>这段逻辑应该只有在处理到新增表的时候才会用到。
>CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。
>但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。
>
>Best,
>Hang
>
>
>key lou  于2023年12月20日周三 16:24写道:
>
>> 意思是当 有 binlog  就意味着 已经读完了 snapshot
>>
>> casel.chen  于2023年12月19日周二 16:45写道:
>>
>> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!
>> >
>> >
>> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read
>> > snapshot split”这一句话我不理解。
>> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot
>> > split再读增量的binlog split么?
>> >
>> >
>> > private MySqlRecords pollSplitRecords() throws InterruptedException {
>> > Iterator dataIt;
>> > if (currentReader == null) {
>> > // (1) Reads binlog split firstly and then read snapshot
>> split
>> > if (binlogSplits.size() > 0) {
>> > // the binlog split may come from:
>> > // (a) the initial binlog split
>> > // (b) added back binlog-split in newly added table
>> process
>> > MySqlSplit nextSplit = binlogSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getBinlogSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else if (snapshotSplits.size() > 0) {
>> > MySqlSplit nextSplit = snapshotSplits.poll();
>> > currentSplitId = nextSplit.splitId();
>> > currentReader = getSnapshotSplitReader();
>> > currentReader.submitSplit(nextSplit);
>> > } else {
>> > LOG.info("No available split to read.");
>> > }
>> > dataIt = currentReader.pollSplitRecords();
>> > return dataIt == null ? finishedSplit() : forRecords(dataIt);
>> > } else if (currentReader instanceof SnapshotSplitReader) {
>> >   
>> > }
>> > ...
>> > }
>>


Flink CDC MySqlSplitReader问题

2023-12-19 文章 casel.chen
我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!


MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read 
snapshot split”这一句话我不理解。
为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot split再读增量的binlog 
split么?


private MySqlRecords pollSplitRecords() throws InterruptedException {
Iterator dataIt;
if (currentReader == null) {
// (1) Reads binlog split firstly and then read snapshot split
if (binlogSplits.size() > 0) {
// the binlog split may come from:
// (a) the initial binlog split
// (b) added back binlog-split in newly added table process
MySqlSplit nextSplit = binlogSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getBinlogSplitReader();
currentReader.submitSplit(nextSplit);
} else if (snapshotSplits.size() > 0) {
MySqlSplit nextSplit = snapshotSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
} else {
LOG.info("No available split to read.");
}
dataIt = currentReader.pollSplitRecords();
return dataIt == null ? finishedSplit() : forRecords(dataIt);
} else if (currentReader instanceof SnapshotSplitReader) {
  
}
...
}

Re:Flink SQL作业配置'table.exec.sink.upsert-materialize'参数会影响TIMESTAMP类型精度?

2023-11-30 文章 casel.chen
补充一下,flink版本是 1.17.1

















在 2023-12-01 15:49:48,"casel.chen"  写道:
>线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3) 
>类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd 
>HH:mm:ss.SSS`格式,
>然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd 
>HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么?
>
>
>-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>+U[2023-11-30T12:43:04.676821, 2023-11-30, 000143554006, 20231130, 
>23113012430450887882, F, 000143718775, 10.00, 44, 07, 2, 
>23113012430450887895, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>+I[2023-11-29T17:37:01.556478, 2023-11-29, 000141180318, 20231129, 
>2f1edf1e3337642d, P, 000141538175, 246.00, 999, 01, 2, 
>23112917370147645164, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>-U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
>2023112516024553495256400285, P, , 1200.00, 81, 02, 1, 23112516024525664244, 
>2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>+U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>+U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
>2023112516024553495256400285, F, 000141586078, 1200.00, 81, 02, 1, 
>23112516024525664244, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>-U[2023-11-28T14:53:21.349043, 2023-11-28, 000137842973, 20231128, 
>HFPWALLET23112814532140921335, P, 000142774221, 62.98, 86, 06, 4, 
>538014532140921373, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]


Flink SQL作业配置'table.exec.sink.upsert-materialize'参数会影响TIMESTAMP类型精度?

2023-11-30 文章 casel.chen
线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3) 
类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd 
HH:mm:ss.SSS`格式,
然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd 
HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么?


-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-30T12:43:04.676821, 2023-11-30, 000143554006, 20231130, 
23113012430450887882, F, 000143718775, 10.00, 44, 07, 2, 
23113012430450887895, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+I[2023-11-29T17:37:01.556478, 2023-11-29, 000141180318, 20231129, 
2f1edf1e3337642d, P, 000141538175, 246.00, 999, 01, 2, 
23112917370147645164, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
2023112516024553495256400285, P, , 1200.00, 81, 02, 1, 23112516024525664244, 
2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
2023112516024553495256400285, F, 000141586078, 1200.00, 81, 02, 1, 
23112516024525664244, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-28T14:53:21.349043, 2023-11-28, 000137842973, 20231128, 
HFPWALLET23112814532140921335, P, 000142774221, 62.98, 86, 06, 4, 
538014532140921373, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]

Re:回复: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen
社区Flink自带的那些json函数都没有解析一串json string返回一行或多行ROW的

















在 2023-11-23 15:24:33,"junjie.m...@goupwith.com"  写道:
>可以看下JSON函数
>https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/functions/systemfunctions/#json-functions
>
>
>
>Junjie.M
> 
>发件人: casel.chen
>发送时间: 2023-11-22 20:54
>收件人: user-zh@flink.apache.org
>主题: flink sql如何实现json字符数据解析?
>输入:
> 
>{
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
>}
> 
> 
> 
> 
>输出:
> 
>[
> 
>  {
> 
>"uuid": "",
> 
>"body_data: null,
> 
>"body_data.fild1": "123”,
> 
>"body_data.fild2": "234"
> 
>  },
> 
>  {
> 
>"uuid": "",
> 
>"body_data": null,
> 
>"body_data.fild1": "abc",
> 
>"body_data.fild2": "cdf"
> 
>  }
> 
>]
> 
> 
> 
> 
>当格式错误时
> 
> 
> 
> 
>输入:
> 
>{
> 
>"uuid": "”,
> 
>"body_data": "abc"
> 
>}
> 
>输出:
> 
>{
> 
>"uuid": "",
> 
>"body_data": "abc",
> 
>"body_data.fild1": null,
> 
>"body_data.fild2": null
> 
>}


Re:Re: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen



filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以

Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result




这种写死body_data[X]的sql语句应该不work








在 2023-11-23 15:10:00,"jinzhuguang"  写道:
>Flink 
>SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
>比如:
>
>SourceT: (
>   uuid String,
>   body_data ARRAY>
>)
>
>SinkT (
>   result ARRAY String, body_data.fild2  String>>
>)
>
>Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 20:54,casel.chen  写道:
>> 
>> 输入:
>> 
>> {
>> 
>>  "uuid":"",
>> 
>>  "body_data": 
>> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>> 
>> }
>> 
>> 
>> 
>> 
>> 输出:
>> 
>> [
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data: null,
>> 
>> "body_data.fild1": "123”,
>> 
>> "body_data.fild2": "234"
>> 
>>  },
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data": null,
>> 
>> "body_data.fild1": "abc",
>> 
>> "body_data.fild2": "cdf"
>> 
>>  }
>> 
>> ]
>> 
>> 
>> 
>> 
>> 当格式错误时
>> 
>> 
>> 
>> 
>> 输入:
>> 
>> {
>> 
>> "uuid": "”,
>> 
>> "body_data": "abc"
>> 
>> }
>> 
>> 输出:
>> 
>> {
>> 
>> "uuid": "",
>> 
>> "body_data": "abc",
>> 
>> "body_data.fild1": null,
>> 
>> "body_data.fild2": null
>> 
>> }


flink sql如何实现json字符数据解析?

2023-11-22 文章 casel.chen
输入:

{

  "uuid":"",

  "body_data": 
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"

}




输出:

[

  {

"uuid": "",

"body_data: null,

"body_data.fild1": "123”,

"body_data.fild2": "234"

  },

  {

"uuid": "",

"body_data": null,

"body_data.fild1": "abc",

"body_data.fild2": "cdf"

  }

]




当格式错误时




输入:

{

"uuid": "”,

"body_data": "abc"

}

输出:

{

"uuid": "",

"body_data": "abc",

"body_data.fild1": null,

"body_data.fild2": null

}

Re:Re:flink sql支持批量lookup join

2023-11-22 文章 casel.chen
有一张维表 user,包含id和name字段
id  | name
-
1 | zhangsan
2 | lisi
3 | wangwu


现在实时来了一条交易数据 
id  | creator_id  | approver_id  | deployer_id
-
1   | 1| 2   | 3


希望lookup维表user返回各用户名称
id   |  creator_name   |  approver_name  |  deployer_name

1| zhangsan  |  lisi|. wangwu



以上场景用flink sql要如何实现?














在 2023-11-22 12:37:10,"Xuyang"  写道:
>Hi, casel.
>可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
>k3=v3的用法的。
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2023-11-22 11:55:11,"casel.chen"  写道:
>>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>>
>>
>>id key1 key2 key3
>>想实现批量lookup查询返回一行数据 id value1 value2 value3
>>
>>
>>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
>>id key1 key2 key3
>>先将多列转成多行
>>id key1
>>id key2
>>id key3
>>
>>分别进行lookup join后得到
>>id value1
>>id value2
>>id value3
>>最后多行转多列返回一行数据
>>
>>id value1 value2 value3
>>
>>
>>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?


flink sql支持批量lookup join

2023-11-21 文章 casel.chen
一行数据带了三个待lookup查询的key,分别是key1,key2和key3


id key1 key2 key3
想实现批量lookup查询返回一行数据 id value1 value2 value3


查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
id key1 key2 key3
先将多列转成多行
id key1
id key2
id key3

分别进行lookup join后得到
id value1
id value2
id value3
最后多行转多列返回一行数据

id value1 value2 value3


上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?

flink sql作业如何支持配置流?

2023-11-20 文章 casel.chen
我有一个flink 
sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。


create table customer_conf_tbl (
  customer_id STRING
) with (
  'connector' = 'apollo',
  '其他属性' 
);
select * from biz_table where customer_id in (select string_split(customer_id, 
',') from customer_conf_tbl)


如果要做成配置实时更新作用于sql作业的话又该如何实现呢?

FLINK-33365 - Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-07 文章 casel.chen
这个critical issue有人fix吗?我们线上使用flink 1.17.1版本有使用jdbc维表查询on带and过滤条件,发现and过滤条件不起作用


例如
select xxx from a left join b on a.id = b.id and b.type = 'xxx'
发现b.type='xxx'这个过滤条件不起作用

Re:Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-30 文章 casel.chen
谢谢解答,我查了一下目前有两种CatalogStore实现,一个是基于内存的,另一个是基于文件系统的。
请问要如何配置基于文件系统的CatalogStore?这个文件可以在对象存储上吗?flink sql client要如何使用这个CatalogStore? 
谢谢!

















在 2023-10-30 10:28:34,"Xuyang"  写道:
>Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1].
>我的理解是倒不是说“引入CatalogStore后才可以提供show create 
>catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。
>
>
>
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-10-29 20:34:52,"casel.chen"  写道:
>>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
>>catalog语法支持?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2023-10-20 17:03:46,"李宇彬"  写道:
>>>Hi Feng
>>>
>>>
>>>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>>>| |
>>> 回复的原邮件 
>>>| 发件人 | Feng Jin |
>>>| 发送日期 | 2023年10月20日 13:18 |
>>>| 收件人 |  |
>>>| 主题 | Re: flink sql不支持show create catalog 吗? |
>>>hi casel
>>>
>>>
>>>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>>>
>>>
>>>Best,
>>>Feng
>>>
>>>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>>>
>>>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>>>sql不支持show create catalog 。
>>>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink 1.18.0的sql gateway支持提交作业到k8s上运行吗?

2023-10-30 文章 casel.chen
想问一下目前flink 1.18.0的sql gateway只支持提交作业到本地运行吗?能否支持提交作业到yarn或k8s上运行呢?

Re:Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-30 文章 casel.chen
果然不指定endpoint为hiveserver2类型后使用hive beeline工具连接上了。感谢!
不过我仍然有个疑问,看官网文档上有写提供 hiveserver2 endpoint 
是为了兼容hive方言,按理也应该可以使用beeline连接上,因为原本beeline支持连接hiveserver2
以下是原文:
HiveServer2 Endpoint is compatible with HiveServer2 wire protocol and allows 
users to interact (e.g. submit Hive SQL) with Flink SQL Gateway with existing 
Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on.
这里有提到Beeline工具,难道不是 beeline> !connect jdbc:flink://localhost:8083 这样的连接方式了?



















在 2023-10-30 11:27:15,"Benchao Li"  写道:
>Hi casel,
>
>Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway
>的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。
>
>casel.chen  于2023年10月29日周日 17:24写道:
>>
>> 1. 启动flink集群
>> bin/start-cluster.sh
>>
>>
>> 2. 启动sql gateway
>> bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2
>>
>>
>> 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下
>>
>>
>> 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
>> $ bin/beeline
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in 
>> [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in 
>> [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
>> Beeline version 3.1.2 by Apache Hive
>> beeline> !connect jdbc:flink://localhost:8083
>> Connecting to jdbc:flink://localhost:8083
>> Enter username for jdbc:flink://localhost:8083:
>> Enter password for jdbc:flink://localhost:8083:
>> Failed to create the executor.
>> 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
>> . . . . . . . . . . . . . . . . . . . .>   a INT,
>> . . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
>> . . . . . . . . . . . . . . . . . . . .> ) WITH (
>> . . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
>> . . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
>> . . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
>> . . . . . . . . . . . . . . . . . . . .> );
>> Failed to create the executor.
>> Connection is already closed.
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 casel.chen
请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
catalog语法支持?

















在 2023-10-20 17:03:46,"李宇彬"  写道:
>Hi Feng
>
>
>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>| |
> 回复的原邮件 
>| 发件人 | Feng Jin |
>| 发送日期 | 2023年10月20日 13:18 |
>| 收件人 |  |
>| 主题 | Re: flink sql不支持show create catalog 吗? |
>hi casel
>
>
>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>
>
>Best,
>Feng
>
>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>
>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>sql不支持show create catalog 。
>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-29 文章 casel.chen
1. 启动flink集群
bin/start-cluster.sh


2. 启动sql gateway
bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2


3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下


4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
$ bin/beeline 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
Beeline version 3.1.2 by Apache Hive
beeline> !connect jdbc:flink://localhost:8083
Connecting to jdbc:flink://localhost:8083
Enter username for jdbc:flink://localhost:8083: 
Enter password for jdbc:flink://localhost:8083: 
Failed to create the executor.
0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
. . . . . . . . . . . . . . . . . . . .>   a INT,
. . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
. . . . . . . . . . . . . . . . . . . .> ) WITH (
. . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
. . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
. . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
. . . . . . . . . . . . . . . . . . . .> );
Failed to create the executor.
Connection is already closed.



flink sql如何处理脏数据问题?

2023-10-28 文章 casel.chen
场景:使用flink 
sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
 topic或者写入一个文件便于事后审查。这个目前有办法做到吗?

flink sql不支持show create catalog 吗?

2023-10-19 文章 casel.chen
之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink 
sql不支持show create catalog 。
而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?

Flink CDC消费Apache Paimon表

2023-09-29 文章 casel.chen
目前想要通过Flink全量+增量消费Apache Paimon表需要分别起离线和增量消费两个作业,比较麻烦,而且无法无缝衔接,能否通过类似Flink 
CDC消费mysql表的方式消费Apache Paimon表?

Re:Re: flink作业如何从yarn平滑迁移到k8s?

2023-08-07 文章 casel.chen






如果不是用的flink kubernetes operator或者hdfs和oss系统网络无法直接连通怎么办?
有没有办法读取hdfs的checkpoint/savepoint然后再另存为oss的checkpoint/savepoint呢?谢谢!











在 2023-08-07 10:33:25,"Ruibin Xing"  写道:
>你好,
>
>如果你们也使用的是官方的Flink Kubernetes
>Operator,可以参考我们迁移的经验:迁移的时候设置FlinkDeployment的initalSavepoint为HDFS上Savepoint的地址,同时配置savepoint/checkpoint目录为OSS。这样Flink启动的时候会从HDFS中的状态恢复,并将新的checkpoint保存在oss中。
>
>On Sun, Aug 6, 2023 at 10:03 PM casel.chen  wrote:
>
>> flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on
>> k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!


flink作业如何从yarn平滑迁移到k8s?

2023-08-06 文章 casel.chen
flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on 
k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!

flink sql作业状态跨存储系统迁移问题

2023-07-28 文章 casel.chen
我们要将当前在Hadoop Yarn上运行的flink 
sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。


查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink 
sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state 
processor api更适合stream api写的作业,sql作业几乎无法处理。是这样么?

Re: 建议Flink ROWKIND做成元数据metadata

2023-07-18 文章 casel.chen
社区无人响应吗?

















在 2023-07-15 12:19:46,"casel.chen"  写道:
>Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka 
>connector中的offset和partition等,用户可以使用这些ROWKIND 
>metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段


建议Flink ROWKIND做成元数据metadata

2023-07-14 文章 casel.chen
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka 
connector中的offset和partition等,用户可以使用这些ROWKIND 
metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段

Re:flink on k8s 任务状态监控问题

2023-07-14 文章 casel.chen
可以查看history server














在 2023-07-14 18:36:42,"阿华田"  写道:
>
>
>hello  各位大佬, flink on K8S  ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 
>无法判断flink任务是正常Finished  还是异常失败了,各位大佬有什么建议吗
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>


Re:Re: Flink connector 是否支持忽略delete message

2023-07-11 文章 casel.chen
Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka connector中的offset, 
partition,用户可以引用这些metadata进行过滤操作?














在 2023-07-10 23:39:00,"yh z"  写道:
>Hi,  shi franke. 你可以尝试自己实现一个 DynamicTableSink,在里面添加参数 “sink.ignore-delete”。
>你可以参考 github 上的一些实现,例如 clickhouse:
>https://github.com/liekkassmile/flink-connector-clickhouse-1.13
>
>shi franke  于2023年7月7日周五 19:24写道:
>
>>
>> 感谢您的回复,这样自定义是可以实现的,我们目前使用的是1.15的flink版本。想看一下社区是不是有在框架层面实现这个配置的支持,理解这应该也是一个相对common的配置
>> junjie.m...@goupwith.com  于2023年7月7日周五 17:57写道:
>>
>> > 可以自己用DataStream API通过RowKind进行过滤。
>> > 如下示例代码:import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> > import org.apache.flink.types.Row;
>> > import org.apache.flink.types.RowKind;
>> > import org.apache.flink.util.Collector;
>> >
>> > /**
>> >  * 增量数据过滤函数
>> >  */
>> > public class AppendOnlyFilterFunction extends RichFlatMapFunction> > Row> {
>> >
>> > private boolean includedUpdateAfter = false;
>> >
>> > public AppendOnlyFilterFunction() {
>> > }
>> >
>> > public AppendOnlyFilterFunction(boolean includedUpdateAfter) {
>> > this.includedUpdateAfter = includedUpdateAfter;
>> > }
>> >
>> > @Override
>> > public void flatMap(Row row, Collector collector) throws
>> > Exception {
>> > if (RowKind.INSERT == row.getKind()) {
>> > collector.collect(row);
>> > } else if (includedUpdateAfter && RowKind.UPDATE_AFTER ==
>> > row.getKind()) {
>> > row.setKind(RowKind.INSERT);
>> > collector.collect(row);
>> > }
>> > }
>> >
>> > }
>> >
>> > 发件人: shi franke
>> > 发送时间: 2023-07-07 17:33
>> > 收件人: user-zh
>> > 主题: Flink connector 是否支持忽略delete message
>> > 咨询下各位大佬,请问下connector现在有支持忽略delete消息的选项配置
>> >
>> > 吗?比如上游的数据是一个upsert/retract流,在connector这里是否有选项去忽略delete
>> > message,当作append流只去戳里insert消息。我看现在代码没有类似的功能,不确定是否有相关的jira或者实现
>> >
>>


Flink CDC消费MySQL Binlog出现中文乱码问题

2023-06-27 文章 casel.chen
mysql库中设置的是utf8mb4编码,单独sql查询mysql表没有出现中文乱码
使用flink datastream作业通过cdc消费mysql 
binlog并写到下游doris表时遇到字符串长度超长问题,我们是按mysql表schema创建的doris 
schema,就很奇怪为什么总是报字符串超长错误。于是将异常时的原始数据打印出来,才发现数据中只要包含了中文字符都会显示成乱码,要么都是???,要么都是其他莫名字符。
我按照网上搜索解答在flink-conf.yaml中添加了env.java.options=UTF-8,结果测下来还是会报同样的异常。请问这个问题要怎么彻底解决?谢谢!

Re:Flink CDC消费mysql `0000-00-00 00:00:00` 时间值问题

2023-06-23 文章 casel.chen
没有人遇到过这个问题吗?




在 2023-06-19 10:41:30,"casel.chen"  写道:
>Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink 
>CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00` 
>(mysql中是timestamp类型)。
>问题1:可否给个Flink CDC选项,遇到这种dummy时间转成NULL?存量query和增量消费binlog处理这种dummy时间结果一致么?
>问题2:如果是mysql -> mysql同步场景,使用Flink CDC在timestamp类型下不能够同步`-00-00 
>00:00:00`这个dummy date,原因是Flink CDC转成了`1970-01-01 08:00:00` 
>CST,对应到UTC时区是`1970-01-01 00:00:00`,而mysql官方文档[1]定义timestamp类型取值范围是'1970-01-01 
>00:00:01' UTC to '2038-01-19 03:14:07' UTC,因此会认为`1970-01-01 08:00:00` CST是非法数据。
>
>
>[1] https://dev.mysql.com/doc/refman/5.7/en/datetime.html


Flink CDC消费mysql `0000-00-00 00:00:00` 时间值问题

2023-06-18 文章 casel.chen
Flink作业消费数据源来自mysql业务表,后者使用了`-00-00 00:00:00`这个dummy date来存时间,直接用Flink 
CDC消费的话会被解析成`1970-01-01 00:00:00` (mysql中是datetime类型)或者`1970-01-01 08:00:00` 
(mysql中是timestamp类型)。
问题1:可否给个Flink CDC选项,遇到这种dummy时间转成NULL?存量query和增量消费binlog处理这种dummy时间结果一致么?
问题2:如果是mysql -> mysql同步场景,使用Flink CDC在timestamp类型下不能够同步`-00-00 
00:00:00`这个dummy date,原因是Flink CDC转成了`1970-01-01 08:00:00` 
CST,对应到UTC时区是`1970-01-01 00:00:00`,而mysql官方文档[1]定义timestamp类型取值范围是'1970-01-01 
00:00:01' UTC to '2038-01-19 03:14:07' UTC,因此会认为`1970-01-01 08:00:00` CST是非法数据。


[1] https://dev.mysql.com/doc/refman/5.7/en/datetime.html

Re:Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-13 文章 casel.chen
__TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org___AS_huifuThdOrg__CASE__huifu_for_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_for_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_for_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_for_org___AS_huifuForOrg__CASE__huifu_sales_sub_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub___AS_huifuSales__DATE_FORMAT_CAST_LOCALTIMESTAMP__UTF_16LE_MMddHHmmssSSS___AS_synModifyTime__CAST_CURRENT_TIMESTAMPAS_synTtlDate__NotNullEnforcer_fields__serviceId__Sink:_Sink_table__hive_default_mongodb_active_channel_sink___fields__transDate__serviceId__huifuFstOrg__huifuSecOrg__huifuThdOrg__huifuForOrg__huifuSales__synModifyTime__synTtlDate__",task_attempt_num="1",job_name="tb_top_top_trans_order_binlog2mongo",tm_id="tb_top_top_trans_order_binlog2mongo_taskmanager_1_112",subtask_index="35",}
 73144.0





在 2023-06-13 10:13:17,"Feng Jin"  写道:
>hi casel
>
>1. 可以考虑使用 Flink1.15, 使用精简的 operator name
>
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled
>
>2.  Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric
>
>https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics
>
>
>Best,
>Feng
>
>On Tue, Jun 13, 2023 at 8:51 AM casel.chen  wrote:
>
>> 线上跑了200多个flink
>> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
>> flink
>> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
>> 请问这个问题有什么好的办法解决吗?


flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 文章 casel.chen
线上跑了200多个flink 
sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
flink 
sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
请问这个问题有什么好的办法解决吗?

Re:Re: 求flink作业各个算子的延迟指标

2023-06-12 文章 casel.chen
谢谢解答,如果是flink sql作业要如何获取到作业中每个算子的latency指标呢?
而且通过prometheus获取作业指标的话,因为flink sql作业中每个算子的名称是按sql内容拼出来的,会出现名称很长,
这样的算子指标直接打到prometheus的话会直接将prometheus内存打爆,这个问题有什么好的办法解决吗?

















在 2023-06-12 18:01:11,"Hangxiang Yu"  写道:
>[.[.]]..latency
>这个应该可以满足需求?也可以设置不同的粒度。
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io
>
>On Mon, Jun 12, 2023 at 5:05 PM casel.chen  wrote:
>
>> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?
>
>
>
>-- 
>Best,
>Hangxiang.


求flink作业各个算子的延迟指标

2023-06-12 文章 casel.chen
想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?

Re:回复:flink作业延迟时效指标

2023-06-12 文章 casel.chen









我的是flink sql作业,要如何实现你说的方案呢?我看到阿里云实时计算平台VVR是支持展示作业时延指标的,想知道它是如何实现的








在 2023-06-08 16:46:34,"17610775726" <17610775...@163.com> 写道:
>Hi
>
>
>你提到的所有的监控都是可以通过 metric 来监控报警的,至于你提到的 LatencyMarker 因为它不参与算子内部的计算逻辑的时间,所以这个 
>metric 并不是准确的,但是如果任务有反压的情况下 LatencyMarker 
>也会被阻塞,所以大体上还是可以反应出任务的延迟情况,如果想要准确的计算出端到端的延迟,可以在 消费 kafka 的时候获取一个 start time 时间戳 
>在 sink 的时候获取一个 end time 时间戳,然后自定义一个 metric 把这个结果上报 基于这个 metric 做端到端的延迟监控。
>
>
>Best
>JasonLee
>
>
> 回复的原邮件 
>| 发件人 | casel.chen |
>| 发送日期 | 2023年06月8日 16:39 |
>| 收件人 | user-zh@flink.apache.org |
>| 主题 | flink作业延迟时效指标 |
>我想知道当前flink作业延迟了多久现在能通过什么指标可以获取到吗?想通过设置作业延迟告警来反馈作业健康状况,是否产生背压,是否需要增加资源等。
>以mysql表实时同步到doris表为例:mysql binlog -> kafka -> flink -> doris
>延迟指标包括:
>1. 业务延迟:业务延迟=当前系统时间 - 当前系统处理的最后一条数据的事件时间(Event time)
>例如:kafka消息写入doris的时间 - kafka消息数据本身产生时间(例如更新mysql记录的时间)
>2. 数据滞留延迟:数据滞留时间=数据进入实时计算的时间 - 数据事件时间(Event time)
>例如:flink消费到kafka消息时间 - 消费到的kafka消息数据本身产生时间(例如更新mysql记录的时间)
>
>
>当前我们是用kafka消费组积压告警来替代的,但这个数据不准,一是因为flink 
>checkpoint才会更新offset,二是因为生产流量在不同时段是不同的,在流量低的时候告警不及时。
>查了官网有一个LatencyMarker可以开启使用,请问这个开启后要怎么观察延迟呢?这个metric需要上报到prometheus才可以读到吗?
>
>
>我们遇到另一个问题是使用flink 
>sql提交作业生成的metric名称很长,因为operatorId是根据sql内容来生成的,所以动不动就把prometheus给打爆了,这个有什么办法解决么?


flink作业延迟时效指标

2023-06-08 文章 casel.chen
我想知道当前flink作业延迟了多久现在能通过什么指标可以获取到吗?想通过设置作业延迟告警来反馈作业健康状况,是否产生背压,是否需要增加资源等。
以mysql表实时同步到doris表为例:mysql binlog -> kafka -> flink -> doris
延迟指标包括:
1. 业务延迟:业务延迟=当前系统时间 - 当前系统处理的最后一条数据的事件时间(Event time)
   例如:kafka消息写入doris的时间 - kafka消息数据本身产生时间(例如更新mysql记录的时间)
2. 数据滞留延迟:数据滞留时间=数据进入实时计算的时间 - 数据事件时间(Event time)
例如:flink消费到kafka消息时间 - 消费到的kafka消息数据本身产生时间(例如更新mysql记录的时间)


当前我们是用kafka消费组积压告警来替代的,但这个数据不准,一是因为flink 
checkpoint才会更新offset,二是因为生产流量在不同时段是不同的,在流量低的时候告警不及时。
查了官网有一个LatencyMarker可以开启使用,请问这个开启后要怎么观察延迟呢?这个metric需要上报到prometheus才可以读到吗?


我们遇到另一个问题是使用flink 
sql提交作业生成的metric名称很长,因为operatorId是根据sql内容来生成的,所以动不动就把prometheus给打爆了,这个有什么办法解决么?

Flink RocketMQ Connector

2023-05-26 文章 casel.chen
有没有Flink RocketMQ官方连接器? 需要自己开发吗?Flink生态组件网址(用户上传自己开发的连接器格式什么的)是什么?

用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 casel.chen
用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

table.exec.source.cdc-events-duplicate参数问题

2023-05-18 文章 casel.chen
mysql binlog 操作记录发到 kafka topic 中,消息格式是canal json,现通过flink 
sql实时同步写入另一个mysql库。今天发现实时作业抛错说写入mysql时遇到duplicate key error,查了一下发现是kafka 
topic中存在两条相同的消息,即相同主键且都是INSERT操作的消息。请问这种情况有什么办法可以避免作业出错吗?


查了官方文档说要在作业中添加参数 table.exec.source.cdc-events-duplicate 
,相当于是在作业中添加了一个状态算子用于去重,如果这张表不同主键的记录非常多的话,岂不是让其状态很占内存?而作业本身如果配置了状态过期参数,会不会造成无法精准去重?谢谢!


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/#duplicate-change-events

flink sql case when 中文数据写入doris出现乱码

2023-05-17 文章 casel.chen
使用flink sql写mysql表数据到doris表,发现case 
when语句判断交易类型使用了中文,写入后在doris查出是乱码,而mysql其他中文字段写入是正确的,想问一下这个sql中出现的乱码问题要解决?

Re:Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
Application Mode没有这个问题,现在是Session Mode提交作业会遇到这个问题
./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar




后面这个作业TopSpeedWindowing.jar包可以使用hdfs/oss路径指定吗?如果是分布式文件路径的话是不是就不用上传作业jar包到jobManager了,而是由jobManager自行下载?





在 2023-05-15 19:27:21,"shimin huang"  写道:
>可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
> 
>
>> 2023年5月15日 19:21,casel.chen  写道:
>> 
>> 我们开发了一个实时计算平台提交flink 
>> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>>  
>> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>>  sql作业提交的速度。
>


Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 casel.chen
我们开发了一个实时计算平台提交flink 
sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
 
jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
 sql作业提交的速度。

Flink SQL CEP如何处理双(多)流输入?

2023-05-11 文章 casel.chen
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 casel.chen
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。

flink sql canal json格式侧输出parse error记录问题

2023-05-06 文章 casel.chen
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00 
00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true' 
参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse 
error数据输出到另外一个kafka topic吗?谢谢!

kafka实例重启对flink作业的影响

2023-04-20 文章 casel.chen
实际工作中会遇到kafka版本升级或者kafka扩容(横向或纵向),数据重平衡等情况,想问一下发生这些情况下对线上运行的flink作业会有什么影响?flink作业能感知topic分区发生变化吗?要如何应对以减少对flink作业消费端的影响?

ValueError: unsupported pickle protocol: 5

2023-04-15 文章 casel.chen
我在尝试提交pyflink作业到k8s,按照这篇文章[1]介绍操作的,pyflink镜像文件[2],flink版本是1.15.2,执行wordcount 
jar作业没遇到问题,而在提交pyflink作业时发现作业失败了,日志显示如下。我本地安装的python 3.7.9和pyflink镜像中的版本是一致的,
请问是不是pickle包版本有问题?
怎样查看当前pickle包版本号是多少?
期望用的pickle包版本号是多少?
如何将当前pickle包安装成期望的版本?


./bin/flink run \
-m localhost:8081 \
-py ./examples/python/table/word_count.py


2023-04-1516:52:27
org.apache.flink.runtime.taskmanager.AsynchronousException: 
Caughtexceptionwhile processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(UnknownSource)
Causedby: TimerException{java.lang.RuntimeException: Errorwhile waiting 
forBeamPythonFunctionRunner flush}
... 14 more
Causedby: java.lang.RuntimeException: Errorwhile waiting 
forBeamPythonFunctionRunner flush
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
... 13 more
Causedby: java.lang.RuntimeException: Failed to close remote bundle
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(UnknownSource)
at java.base/java.util.concurrent.FutureTask.run(UnknownSource)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(UnknownSource)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(UnknownSource)
... 1 more
Causedby: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received fromSDK harness for instruction 1: Traceback (most recent call 
last):
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 289, in _execute
response = task()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 362, in 
lambda: self.create_worker().do_instruction(request), request)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 638, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 467, inget
self.data_channel_factory)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 925, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 924, in 

ogg json格式不支持database include和table include参数

2023-04-13 文章 casel.chen
多张oracle表变更同步到同一个kafka topic,现在实时flinlk作业需要消费其中一张oracle表,查了一下没看到类似canal json格式中 

canal-json.database.include 和 canal-json.table.include 参数,只在available 
metadata中看到 table 字段,这意味着我需要在select语句中按table字段进行过滤吗?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/ogg/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/canal/

找到多个default类型的ExecutorFactory导致提交flink sql作业失败

2023-03-28 文章 casel.chen
我的实时作业项目想解析sql获取到TableIdentifier做sql血缘,使用的版本是flink 1.15.2,同时引入了 
flink-table-planner_2.12 和 flink-table-planner-loader 依赖,debug时发现

 TableEnvironmentImpl create(EnvironmentSettings settings) 方法会调用

 FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER)方法
去寻找带有default标识的ExecutorFactory,结果找到了两个,一个是DelegateExcutorFactory,另一个是DefaultExecutorFactory。
于是抛了异常 "Multiple factories for identifier 'default' that implement 
ExecutorFactory found in the classpath."

 进一步查看到这个DelegateExcutorFactory其实代理的是就是DefaultExecutorFactory




 请问:

 1. 这个DelegateExcutorFactory起什么作用?

 2. 这两个module依赖的有什么区别和联系?

 3. 项目中只能依赖这两个当中的其中一个jar吗?正确的应该依赖哪个module呢?

项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动报解析配置异常

2023-03-25 文章 casel.chen
项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 
后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFactory类,有什么办法解决么?


ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException 
setting feature http://xml.org/sax/features/external-general-entities to false 
on DocumentBuilderFactory oracle.xml.jaxp.JXDocumentBuilderFactory@68dc098b: 
javax.xml.parsers.ParserConfigurationException
 javax.xml.parsers.ParserConfigurationException
at 
oracle.xml.jaxp.JXDocumentBuilderFactory.setFeature(JXDocumentBuilderFactory.java:374)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.setFeature(XmlConfiguration.java:204)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.disableDtdProcessing(XmlConfiguration.java:197)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.newDocumentBuilder(XmlConfiguration.java:186)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.(XmlConfiguration.java:89)
at 
org.apache.logging.log4j.core.config.xml.XmlConfigurationFactory.getConfiguration(XmlConfigurationFactory.java:46)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:558)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:482)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory.getConfiguration(ConfigurationFactory.java:322)
at 
org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:695)



flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 casel.chen
使用prometheus监控flink 
sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink sql作业这种operator 
name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。
请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。
要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!

Re:监控flink的prometheus经常OOM

2023-03-21 文章 casel.chen
更正一下,监控flink的方式从pushgateway方式改成了直接prometheus定期来抓取,周期设置的是1分钟,之前用pushgateway方式也总是把pushgateway打挂,现在改成pull方式还是照样把prometheus打挂。flink作业侧有什么参数可以配置吗?prometheus侧能否配置只抓取那些grafana
 dashboard展示需要的metrics?

















在 2023-03-22 12:08:29,"casel.chen"  写道:
>我们通过pushgateway上报metrics到prometheus,设置的上报周期是30秒,整个实时平台有200多个作业,启了一台50GB的prometheus还是撑不住,指标保留1天,设置了指标在内存中驻留2小时后写入磁盘。最大的一个metric已经有37万条。请问有什么解决办法么?能选择哪些指标进行上报不?


监控flink的prometheus经常OOM

2023-03-21 文章 casel.chen
我们通过pushgateway上报metrics到prometheus,设置的上报周期是30秒,整个实时平台有200多个作业,启了一台50GB的prometheus还是撑不住,指标保留1天,设置了指标在内存中驻留2小时后写入磁盘。最大的一个metric已经有37万条。请问有什么解决办法么?能选择哪些指标进行上报不?

Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?

2023-03-21 文章 casel.chen
Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web
 url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?

Re:Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 文章 casel.chen
检查过了,当前`state.checkpoints.num-retained`参数值是3


在 2023-03-21 20:05:35,"Shammon FY"  写道:
>Hi
>
>你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了?
>
>Best,
>Shammon FY
>
>
>On Tue, Mar 21, 2023 at 11:55 AM casel.chen  wrote:
>
>> 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun
>> oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
>> state
>> backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?


flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-20 文章 casel.chen
有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun 
oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb
 state 
backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?

prometheus监控flink作业经常OOM

2023-03-20 文章 casel.chen
线上用prometheus监控几百个flink作业,使用的是pushgateway方式,设置采样作业metrics周期是30秒,prometheus服务本身给了将近50GB内存,还是会经常发生OOM,请问有什么调优办法吗?

Re:Re: 实时数据同步对比监控有什么好的工具和方案吗?

2023-03-18 文章 casel.chen
站在业务角度,监控指标包括数据的一致性(不多不少)和 数据的时效性(同步延迟时长在合理区间)。这2块有什么工具和方案吗?

















在 2023-03-17 15:23:30,"Shammon FY"  写道:
>Hi
>
>具体是要监控哪些信息?不同的信息会有不同的工具和方案,比如资源使用率、failover情况、同步数据延时等
>
>Best,
>Shammon FY
>
>
>On Fri, Mar 17, 2023 at 10:52 AM casel.chen  wrote:
>
>> 业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗?
>> 实时同步链路:mysql -> kafka canal -> flink -> doris
>>
>>
>> 欢迎大家提供思路


业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

2023-03-16 文章 casel.chen
使用flink cdc消费mysql binlog遇到业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

实时数据同步对比监控有什么好的工具和方案吗?

2023-03-16 文章 casel.chen
业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗?
实时同步链路:mysql -> kafka canal -> flink -> doris


欢迎大家提供思路

flink sql多条cdc数据流实时regular join如何减少作业状态?

2023-03-11 文章 casel.chen
当前flink实时作业接的kafka canal json格式的cdc数据,mysql表会有新增和更新数据,但不会有物理删除。
如果直接多条cdc数据流实时关联会导致作业状态很大,请教:
1. 有没有什么办法可以减少作业状态?
2. cdc格式的retract流可以加去重变成append流吗?
3. 使用append流多流关联是不是能减少作业状态?

flink cdc connector计划支持hudi change data capture吗?

2023-03-06 文章 casel.chen
flink cdc connector计划支持hudi change data capture吗?

flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 casel.chen
flink sql jdbc connector是否支持多流拼接?
业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
每条流更新大宽表的一部分字段。

flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-01 文章 casel.chen
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。
请问:
1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生?
2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/window-deduplication/

Re:Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 casel.chen
你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force? 


Because of the disorder of ChangeLog data caused by Shuffle in distributed 
system, the data received by Sink may not be the order of global upsert. So add 
upsert materialize operator before upsert sink. It receives the upstream 
changelog records and generate an upsert view for the downstream.
By default, the materialize operator will be added when a distributed disorder 
occurs on unique keys. You can also choose no materialization(NONE) or force 
materialization(FORCE).

Possible values:
"NONE"
"AUTO"
"FORCE"


public static final ConfigOption 
TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
key("table.exec.sink.upsert-materialize")
.enumType(UpsertMaterialize.class)
.defaultValue(UpsertMaterialize.AUTO)
.withDescription(
Description.builder()
.text(
"Because of the disorder of 
ChangeLog data caused by Shuffle in distributed system, "
+ "the data received by 
Sink may not be the order of global upsert. "
+ "So add upsert 
materialize operator before upsert sink. It receives the "
+ "upstream changelog 
records and generate an upsert view for the downstream.")
.linebreak()
.text(
"By default, the materialize 
operator will be added when a distributed disorder "
+ "occurs on unique keys. 
You can also choose no materialization(NONE) "
+ "or force 
materialization(FORCE).")
.build());





在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
>Hi,
>
>Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
>ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
>solution of disordering problems in KeyBy shuffling.
>
>Best,
>Shuo
>
>On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
>
>>
>> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>>
>>
>> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
>> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>> >
>> >Best,
>> >Shengkai
>> >
>> >[1]
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>> >
>> >Shammon FY  于2023年2月20日周一 08:41写道:
>> >
>> >> Hi
>> >>
>> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>> >>
>> >> Best,
>> >> Shammon
>> >>
>> >>
>> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>> >>
>> >> > Hi,
>> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
>> into
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> >
>> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >> > >
>> >> > >
>> >> > >请问:
>> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >> > >我理解flink
>> >> >
>> >>
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >> > >
>> >> >
>> >>
>>


Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 casel.chen
如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?


在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
>我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>
>Best,
>Shengkai
>
>[1]
>https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>
>Shammon FY  于2023年2月20日周一 08:41写道:
>
>> Hi
>>
>> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>>
>> Best,
>> Shammon
>>
>>
>> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>>
>> > Hi,
>> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> > >
>> > >
>> > >请问:
>> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> > >我理解flink
>> >
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> > >
>> >
>>


Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 文章 casel.chen
Flink SQL作业示意如下:


create table user_source_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_id BIGINT NOT NULL,
  proctime AS PROCTIME()
) with (
 'connector' = 'kafka', 
 'format' = 'canal-json',
 ...
);


create table department_dim_table (
   id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
   name STRING
) with (
 'connector' = 'jdbc',
 ...
);


create table user_rich_sink_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_name STRING
) with (
 'connector' = 'jdbc'
 ...
);


insert into user_rich_sink_table 
select id, name, d.name as dept_name 
from user_source_table u
  left join department_dim_table for system_time as of u.proctime as d 
  on u.dept_id = d.id;


用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert?
现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。





在 2023-02-20 08:41:20,"Shammon FY"  写道:
>Hi
>
>如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
>Best,
>Shammon
>
>
>On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>
>> Hi,
>> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>>
>>
>> Thanks
>>
>>
>>
>> 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >
>> >
>> >请问:
>> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >我理解flink
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >
>>


Re:Re: flink canal json格式忽略不识别的type

2023-02-19 文章 casel.chen
日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了

















在 2023-02-20 09:58:56,"Shengkai Fang"  写道:
>Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
>
>Best,
>Shengkai
>
>casel.chen  于2023年2月9日周四 12:03写道:
>
>> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
>> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
>> json格式解析时直接忽略不识别的type,例如
>> 例1:
>> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
>> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
>> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
>> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
>> NULL DEFAULT '-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
>> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
>> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
>> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
>> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
>> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
>> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
>> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
>> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>>
>>
>> 例2:
>> {
>> "action":"ALTER",
>> "before":[],
>> "bid":0,
>> "data":[],
>> "db":"db_test",
>> "dbValType":{
>> "col1":"varchar(22)",
>> "col2":"varchar(22)",
>> "col_pk":"varchar(22)"
>> },
>> "ddl":true,
>> "entryType":"ROWDATA",
>> "execTs":1669789188000,
>> "jdbcType":{
>> "col1":12,
>> "col2":12,
>> "col_pk":12
>> },
>> "pks":[],
>> "schema":"db_test",
>> "sendTs":1669789189533,
>> "sql":"alter table table_test add col2 varchar(22) null",
>> "table":"table_test",
>> "tableChanges":{
>> "table":{
>> "columns":[
>> {
>> "jdbcType":12, // jdbc 类型。
>> "name":"col1",// 字段名称。
>> "position":0,  // 字段的顺序。
>> "typeExpression":"varchar(22)", // 类型描述。
>> "typeName":"varchar" // 类型名称。
>> },
>> {
>> "jdbcType":12,
>> "name":"col2",
>> "position":1,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> },
>> {
>> "jdbcType":12,
>> "name":"col_pk",
>> "position":2,
>> "typeExpression":"varchar(22)",
>> "typeName":"varchar"
>> }
>> ],
>> "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>> },
>> "type":"ALTER"
>> }
>> }


Re:Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 文章 casel.chen






你说的这个在写入之前进行shuffle(先执行一个group by主键)这个操作我认为应该是Flink框架层面的事情,不应该在作业层面显式添加。
Flink框架应该在执行sink的时候判断目标表是否有主键,如果有主键的话应该插入一个group by算子将相同主键的记录发到同一个TaskManager处理。
我听说 Flink新版本1.15还是1.16不记得了已经改进了这个问题,有谁知道吗?有相关issue或PR链接没?











在 2023-02-19 13:43:29,"RS"  写道:
>Hi,
>connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
>Thanks
>
>
>
>在 2023-02-17 15:56:51,"casel.chen"  写道:
>>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
>>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
>>Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>>
>>
>>请问:
>>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>>我理解flink 
>>sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>>


[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-16 文章 casel.chen
作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
Function的invoke方法打的日志),该行为导致最终结果表数据不正确。


请问:
flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
我理解flink 
sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。



Re:Flink SQL 实现数组元素变换的UDF

2023-02-16 文章 casel.chen
目前应该是不支持,一个替代方案是利用concat函数将数组转成string作为输入,再在你的UDF中拆成数组进行处理。

















在 2023-02-15 16:29:19,"723849736" <723849...@qq.com.INVALID> 写道:
>大家好,
>
>我在用flink sql的时候有一个场景,就是需要对数组中的某一列做变换,类似于spark sql中的tranform函数
>
>
>https://spark.apache.org/docs/latest/api/sql/index.html#transform
>
>
>目前flink sql好像不支持类似的功能,这个功能用UDF能实现吗?
>
>
>因为这个函数需要传入一个函数作为输入,函数类型的参数不是flink的data type,validate阶段会抛异常, 这个有办法解决吗?
>
>
>class ArrayTransformFunction extends ScalarFunction {
>
>  def eval(a: Array[Long], function: Long = Long): Array[Long] = {
>a.map(e = function(e))
>  }}
>异常信息如下
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
>validation failed. An error occurred in the type inference logic of function 
>'transform'.
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>   at SQLTest$.main(SQLTest.scala:44)
>   at SQLTest.main(SQLTest.scala)
>Caused by: org.apache.flink.table.api.ValidationException: An error occurred 
>in the type inference logic of function 'transform'.
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>   at java.util.Optional.flatMap(Optional.java:241)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>   at 
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
>   ... 6 more
>Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
>valid type inference for function class 'udf.ArrayTransformFunction'. Please 
>check for implementation mistakes and/or provide a corresponding hint.
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
>   at 
> org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
>   at 
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>   ... 17 more
>Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
>a signature to output mapping.
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
>   at 
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>   ... 20 more
>Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
>type inference from method:
>public long[] udf.ArrayTransformFunction.eval(long[],scala.Function1)
>   at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>   at 
> 

flink canal json格式忽略不识别的type

2023-02-08 文章 casel.chen
不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
{"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
 TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby` varchar(255) 
DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT NULL DEFAULT '-00-00 
00:00:00',  `updatedby` varchar(255) DEFAULT NULL,  `account` varchar(255) 
DEFAULT NULL,  `batch` varchar(255) DEFAULT NULL,  `client` varchar(255) 
DEFAULT NULL,  `command` varchar(255) DEFAULT NULL,  `container` varchar(255) 
DEFAULT NULL,  `items` mediumtext,  `trackingnumber` varchar(255) NOT NULL,  
`transporter` varchar(255) DEFAULT NULL,  `weight` decimal(19,2) NOT NULL,  
`zipcode` varchar(255) DEFAULT NULL,  `ld3` varchar(255) DEFAULT NULL,  
`destination_code` varchar(255) DEFAULT NULL,  PRIMARY KEY 
(`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT 
CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}


例2:
{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"ddl":true,
"entryType":"ROWDATA",
"execTs":1669789188000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789189533,
"sql":"alter table table_test add col2 varchar(22) null",
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1",// 字段名称。
"position":0,  // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1, 
"typeExpression":"varchar(22)", 
"typeName":"varchar" 
},
{
"jdbcType":12, 
"name":"col_pk",   
"position":2,  
"typeExpression":"varchar(22)", 
"typeName":"varchar" 
}
],
"primaryKeyColumnNames":["col_pk"] // 主键名列表。
},
"type":"ALTER"
}
}

如何监控flink sql on native k8s作业是否过度申请资源?

2023-01-16 文章 casel.chen
我们flink sql作业跑在k8s上,但发现k8s集群整体资源使用率并不高,例如请求内存占总内存89.28%,但实际使用内存占总内存只有66.38%。
现在想排查出哪些作业过度申请资源,有什么办法或直接的metrics可以监控flink sql作业实现k8s资源使用率么?谢谢!

flink自带的web ui为什么不能提供一个触发生成保存点的按钮?

2023-01-09 文章 casel.chen
flink自带的web ui为什么不能提供一个触发生成保存点的按钮?

Re:Re:Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-28 文章 casel.chen



需求是要支持任何http header透传,不管是标准http header还是用户自定义http header














在 2022-12-28 09:11:25,"RS"  写道:
>Hi,
>这个看你的需求啊,用户想自定义哪些Header,怎么定义?
>
>
>比如用户想在Header中添加上报的时间戳,那么这种是随时间变化的,就无法在options里面定义了
>比如用户想在Header中添加上报数据的元信息,数据大小,数据字段个数等,那么这个也是和数据强相关的,无法在options里面定义
>
>
>所以要看用户想要什么,你们想给用户开放到哪个程度?
>至于是不是可以像flink sql kafka connector定义 `properties.*` 
>,这个是具体实现的方式,现在都不清楚你要做什么,先确定目标,再考虑实现。
>
>
>Thanks
>
>在 2022-12-27 13:24:38,"casel.chen"  写道:
>>
>>
>>遇到用户添加自定义请求头Headers问题
>>
>>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>>>> 是不是可以像flink sql kafka connector定义 `properties.*` 那样定义 `headers.*` 呢?
>>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>>>> 是说有一些公用headers吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 
>>>> properties.group.id 和 properties.bootstrap.servers
>>
>>在 2022-12-26 11:12:57,"RS"  写道:
>>>Hi,
>>>
>>>
>>>> 遇到用户添加自定义请求头Headers问题
>>>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>>>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>>>
>>>
>>>> 如何在connector options中支持Map数据类型呢?
>>>options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
>>>
>>>
>>>
>>>
>>>Thanks
>>>
>>>在 2022-12-17 10:20:29,"casel.chen"  写道:
>>>>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector 
>>>>options中支持Map数据类型呢?


Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-26 文章 casel.chen












遇到用户添加自定义请求头Headers问题

如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>> 是不是可以像flink sql kafka connector定义 `properties.*` 那样定义 `headers.*` 呢?
如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>> 是说有一些公用headers吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 
>> properties.group.id 和 properties.bootstrap.servers

在 2022-12-26 11:12:57,"RS"  写道:
>Hi,
>
>
>> 遇到用户添加自定义请求头Headers问题
>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述
>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了
>
>
>> 如何在connector options中支持Map数据类型呢?
>options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map
>
>
>
>
>Thanks
>
>在 2022-12-17 10:20:29,"casel.chen"  写道:
>>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector 
>>options中支持Map数据类型呢?


Re:Re: flink sql connector options如何支持Map数据类型?

2022-12-19 文章 casel.chen
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型

















在 2022-12-19 14:51:42,"Weihua Hu"  写道:
>Hi, 你可以尝试使用独立开源的 http connector
>
>https://github.com/getindata/flink-http-connector
>
>Best,
>Weihua
>
>
>On Sat, Dec 17, 2022 at 10:21 AM casel.chen  wrote:
>
>> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector
>> options中支持Map数据类型呢?


flink sql connector options如何支持Map数据类型?

2022-12-16 文章 casel.chen
我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector 
options中支持Map数据类型呢?

  1   2   3   4   >