Re: 关于 mongo db 的splitVector 权限问题

2024-05-23 Thread Jiabao Sun
Hi,

splitVector 是 MongoDB 计算分片的内部命令,在副本集部署模式下也可以使用此命令来计算 chunk 区间。
如果没有 splitVector 权限,会自动降级为 sample 切分策略。

Best,
Jiabao

evio12...@gmail.com  于2024年5月23日周四 16:57写道:

>
> hello~
>
>
> 我正在使用 flink-cdc mongodb connector 2.3.0
> 
>  (
> https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
> ,
> 文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections',
> 'collStats', 'find', and 'changeStream' ,
>
>
> 我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
> 如果DBA不授权 splitVector , 会有什么影响呢?
>
> --
> evio12...@gmail.com
>


Re: flink集群如何将日志直接写入elasticsearch中?

2024-03-13 Thread Jiabao Sun
比较简单的方式是启动一个filebeat进程,抓取 jobmanager.log 和t askmanager.log

Best,
Jiabao

kellygeorg...@163.com  于2024年3月13日周三 15:30写道:

> 有没有比较方便快捷的解决方案?
>
>
>


RE: 退订

2024-02-08 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/02/06 04:15:48 杨作青 wrote:
>   
> 
> 

RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
Sorry, I didn't notice the version information. 
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka. 
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] https://github.com/apache/flink-connector-kafka/pull/18


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
> 
> Thanks for reply.
> 
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
> 
> Also, any help on question 1 regarding Schema Registry?
> 
> Regards,
> Kirti Dhar
> 
> -Original Message-
> From: Jiabao Sun  
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
> 
> Hi Kirti,
> 
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
> 
> 
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic("topic-name")
> .setValueSerializationSchema(new SimpleStringSchema())
> .setHeaderProvider(new HeaderProvider() {
> @Override
> public Headers getHeaders(String input) {
> //TODO: implements it
> return null;
> }
> })
> .build()
> )
> .build();
> 
> Best,
> Jiabao
> 
> 
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> > 
> > I have below queries regarding Flink Kafka Sink.
> > 
> > 
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> > 
> > 
> > Regards,
> > Kirti Dhar
> > 
> > 
> 

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Jiabao Sun
Hi Kirti,

Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.


KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setHeaderProvider(new HeaderProvider() {
@Override
public Headers getHeaders(String input) {
//TODO: implements it
return null;
}
})
.build()
)
.build();

Best,
Jiabao


On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> Hi Mates,
> 
> I have below queries regarding Flink Kafka Sink.
> 
> 
>   1.  Does Kafka Sink support schema registry? If yes, is there any 
> documentations to configure the same?
>   2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?
> 
> 
> Regards,
> Kirti Dhar
> 
> 

RE: Re: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea,

When specifying an ID, the IndexedRequest[1] can perform a complete overwrite. 
If partial update is needed, the UpdateRequest[2] can be used.

@Override
public void process(
Tuple2 element, RuntimeContext ctx, RequestIndexer 
indexer) {
UpdateRequest updateRequest = new UpdateRequest("index-name", "id-123");
Map jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
updateRequest.doc(jsonMap);
indexer.add(updateRequest);
}


Best,
Jiabao

[1] 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html
[2] 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-update.html


On 2024/01/29 16:14:26 Fidea Lidea wrote:
> Hi  Jiabao & Jiadong,
> 
> Could you please share examples on how to "*update*" data using
> ElasticsearchSink?
> 
> Thanks
> 
> On Mon, Jan 29, 2024 at 9:07 PM Jiabao Sun  wrote:
> 
> > Hi Fidea,
> >
> > I found some examples in the Java documentation, and I hope they can be
> > helpful.
> >
> > private static class TestElasticSearchSinkFunction implements
> > ElasticsearchSinkFunction> {
> > public IndexRequest createIndexRequest(Tuple2
> > element) {
> > Map json = new HashMap<>();
> > json.put("data", element.f1);
> > return Requests
> > .indexRequest()
> > .index("my-index")
> > .type("my-type")
> > .id(element.f0.toString())
> > .source(json);
> > }
> >
> > public void process(
> > Tuple2 element,
> > RuntimeContext ctx,
> > RequestIndexer indexer) {
> > indexer.add(createIndexRequest(element));
> > }
> > }
> >
> > But as jiadong mentioned, ElasticsearchSinkFunction is no longer
> > recommended for use.
> >
> > Best,
> > Jiabao
> >
> >
> > On 2024/01/29 11:15:43 Fidea Lidea wrote:
> > > Hi Team,
> > >
> > > Could you please share with me a few example codes on  how to perform
> > > "updaterequest on elasticsearch using apache flink"
> > > I.want to use  ElasticsearchSinkFunction to perform updaterequest.
> > >
> > > Thanks
> > > Nida Shaikh
> > > lideafidea...@gmail.com
> > >
> 

RE: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea,

I found some examples in the Java documentation, and I hope they can be 
helpful. 

private static class TestElasticSearchSinkFunction implements 
ElasticsearchSinkFunction> {
public IndexRequest createIndexRequest(Tuple2 element) {
Map json = new HashMap<>();
json.put("data", element.f1);
return Requests
.indexRequest()
.index("my-index")
.type("my-type")
.id(element.f0.toString())
.source(json);
}

public void process(
Tuple2 element,
RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}

But as jiadong mentioned, ElasticsearchSinkFunction is no longer recommended 
for use. 

Best,
Jiabao


On 2024/01/29 11:15:43 Fidea Lidea wrote:
> Hi Team,
> 
> Could you please share with me a few example codes on  how to perform
> "updaterequest on elasticsearch using apache flink"
> I.want to use  ElasticsearchSinkFunction to perform updaterequest.
> 
> Thanks
> Nida Shaikh
> lideafidea...@gmail.com
> 

RE: Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Jiabao Sun
Hi Tauseef,

We cannot directly write POJO types into Elasticsearch. 
You can try serializing the TopologyDTO into a JSON string like Jackson before 
writing it.

public static void main(String[] args) throws IOException {
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(HttpHost.create("http://127.0.0.1:9200; {
TopologyDTO data = new TopologyDTO();

IndexRequest request = Requests.indexRequest()
.index("topology")
.id(data.getUuid()) //here uuid is String
.source(new ObjectMapper().writeValueAsString(data), 
XContentType.JSON);

client.index(request);
}
}

Best,
Jiabao


On 2024/01/25 13:00:58 Tauseef Janvekar wrote:
> Hi Team,
> 
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
> 
> The code written for the same is here
> 
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder().setBulkFlushMaxActions(1)
> 
> .setHosts(new HttpHost("elastic-host.com", 9200, "https"))
> 
> .setConnectionPassword("password").setConnectionUsername("elastic")
> 
> .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
> element))).build())
> 
> .name("topology_sink");
> 
> 
> private static IndexRequest createIndexRequest(TopologyDTO data) {
> 
> Map json = new HashMap<>();
> 
> json.put("data", data);
> 
> return Requests.indexRequest()
> 
> .index("topology")
> 
> .id(data.getUuid()) //here uuid is String
> 
> .source(json);
> 
> }
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> Tauseef
> 

RE: 回复:RE: how to get flink accumulated sink record count

2024-01-25 Thread Jiabao Sun
Hi Enric,

Could you kindly provide more specific details where you would like to capture 
the metric? 
Additionally, if it's convenient for you, could you please share some code 
examples?

Best,
Jiabao


On 2024/01/25 10:43:30 Enric Ott wrote:
> Thanks,Jiabao,but what I mean is capturing the metric in Flink tasks.
> 
> 
> 
> 
> --原始邮件------
> 发件人: "Jiabao Sun" 发送时间: 2024年1月25日(星期四) 下午3:11
> 收件人: "user" 主题: RE: how to get flink accumulated sink record count
> 
> 
> 
> 
> 
> I guess getting the metrics[1] might be helpful for you. 
> You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3].
> 
> Best,
> Jiabao
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics
> 
> On 2024/01/25 06:54:36 Enric Ott wrote:
>  Hi,Team:
>  I was wondering how to get flink accumulated sink record count(just like 
> the flink UI displays),any help would be appreciated.

RE: how to get flink accumulated sink record count

2024-01-24 Thread Jiabao Sun
Hi Enric,

I guess getting the metrics[1] might be helpful for you. 
You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3].

Best,
Jiabao

[1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics

On 2024/01/25 06:54:36 Enric Ott wrote:
> Hi,Team:
> I was wondering how to get flink accumulated sink record count(just like the 
> flink UI displays),any help would be appreciated.

RE: Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 Thread Jiabao Sun
Hi,

ResumeToken[1] can be considered globally unique[2].

Best,
Jiabao

[1] https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
[2] 
https://img.alicdn.com/imgextra/i4/O1CN010e81SP1vkgoyL0nhd_!!66211-0-tps-2468-1360.jpg

On 2024/01/22 09:36:42 "casel.chen" wrote:
> 
> 
> 
> V1版本依赖于DebeziumSourceFunction,后者依赖于DebeziumEngine产生changelog
> V2版本虽然依赖了 flink-connector-debezium 但没有用到debezium内部类
> 
> 
> 另外有一个问题:mongodb change stream断点续传用的resumeToken是像mysql binlog offset一样全局唯一么?
> 如果数据源是像kafka一样每个分片有binlog offset的话,
> 是不是要在对应xxxOffset类中要定义一个Map类型的offsetField 
> (类似mongodb对应ChangeStreamOffset中的resumeTokenField)? 
> 当前mongodb中定义的是Json String类型
> 
> 在 2024-01-22 11:03:55,"Jiabao Sun"  写道:
> >Hi,
> >
> >Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
> >Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。
> >
> >Best,
> >Jiabao
> >
> >[1] https://github.com/mongodb/mongo-kafka
> >
> >
> >On 2024/01/22 02:57:38 "casel.chen" wrote:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> Flink CDC MongoDB connector 还是基于debezium实现的
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >> >Hi,
> >> >
> >> >可以参考 Flink CDC MongoDB connector 的实现。
> >> >
> >> >Best,
> >> >Jiabao
> >> >
> >> >
> >> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> >> 
> 

RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 Thread Jiabao Sun
Hi,

Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。

Best,
Jiabao

[1] https://github.com/mongodb/mongo-kafka


On 2024/01/22 02:57:38 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Flink CDC MongoDB connector 还是基于debezium实现的
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >Hi,
> >
> >可以参考 Flink CDC MongoDB connector 的实现。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> 

RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 Thread Jiabao Sun
Hi,

可以参考 Flink CDC MongoDB connector 的实现。

Best,
Jiabao


On 2024/01/22 02:06:37 "casel.chen" wrote:
> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> 3.x自行开发,查了一下现有大部分flink cdc source 
> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!

RE: Re:RE: binlog文件丢失问题

2024-01-19 Thread Jiabao Sun
Hi,

日志中有包含 GTID 的内容吗?
用 SHOW VARIABLES LIKE 'gtid_mode’; 确认下是否开启了GTID呢?

Best,
Jiabao


On 2024/01/19 09:36:38 wyk wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 抱歉,具体报错和代码如下:
> 
> 
> 报错部分:
> Caused by: java.lang.IllegalStateException: The connector is trying to read 
> binlog starting at 
> Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1705645599953,db=,server_id=0,file=mysql_bin.007132,pos=729790304,row=0},
>  but this is no longer available on the server. Reconfigure the connector to 
> use a snapshot when needed.
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:112)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:170)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:75)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 
> 代码部分: 
> if (!isBinlogAvailable(mySqlOffsetContext)) {
> throw new IllegalStateException(
> "The connector is trying to read binlog starting at "
> + mySqlOffsetContext.getSourceInfo()
> + ", but this is no longer "
> + "available on the server. Reconfigure the connector to 
> use a snapshot when needed.");
> }
> 
> 在 2024-01-19 17:33:03,"Jiabao Sun"  写道:
> >Hi,
> >
> >你的图挂了,可以贴一下图床链接或者直接贴一下代码。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/19 09:16:55 wyk wrote:
> >> 
> >> 
> >> 各位大佬好:
> >> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> >> 
> >> 
> >> 问题描述:
> >> 场景: 公司mysql有两个备库: 备库1和备库2。
> >> 1. 现在备库1需要下线,需要将任务迁移至备库2
> >> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> >> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> >> 
> >> 
> >> 
> >> 
> >> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> >> 
> >> 
> >> 注意: 备库一个备库二的gtid是保持一致的
> >> 
> >> 
> >> 
> >> 
> >> 图一:
> >> 
> >> 
> >> 图二:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> 

RE: Re: Python flink statefun

2024-01-19 Thread Jiabao Sun
Hi Alex,

I think that logic is in IngressWebServer[1] and EgressWebServer[2].

Best,
Jiabao


[1] 
https://github.com/apache/flink-statefun-playground/blob/5b52061784626c8685ab33e172e4471840ce5ee1/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java#L18
[2] 
https://github.com/apache/flink-statefun-playground/blob/5b52061784626c8685ab33e172e4471840ce5ee1/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java#L30

On 2024/01/19 09:50:21 Alexandre LANGUILLAT wrote:
> Thanks Sun I use now the 3.2 version and it works as described in the
> README tutorial! I don't see in the code where the port redirection is
> handled tho, eg 8090 for PUT and 8091 for GET (they are in the module.yaml
> but dont see where in Python it's handled).
> 
> Bests,
> 
> Alex
> 
> Le ven. 19 janv. 2024 à 02:44, Jiabao Sun  a
> écrit :
> 
> > Hi Alexandre,
> >
> > I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in
> > Docker Hub.
> > You can temporarily use the release-3.2 version.
> >
> > Hi Martijn, did we ignore pushing it to the docker registry?
> >
> > Best,
> > Jiabao
> >
> > [1] https://hub.docker.com/r/apache/flink-statefun-playground/tags
> >
> > On 2024/01/18 17:09:20 Alexandre LANGUILLAT wrote:
> > > Hi,
> > >
> > > I am trying to run the example provided here:
> > >
> > https://github.com/apache/flink-statefun-playground/tree/release-3.3/python/greeter
> > >
> > > 1 - Following the read.me, with docker (that I installed):
> > >
> > > "docker-compose build" works well. But "docker-compose up" returns an
> > error:
> > >
> > > [image: image.png]
> > >
> > > 2 - Without docker, having a virtual env with apache-flink-statefun and
> > > aiohttp installed, I ran "python functions.py" but I the server runs on
> > > port 8000 according to the script and I dont know how the request in curl
> > > (or postman) would work since it calls port 8090... :
> > >
> > > curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d
> > > '{"name": "Bob"}' localhost:8090/example/person/Bob
> > >
> > >
> > > I wonder what I have to configure additionaly? I owuld be keen to run it
> > > without docker actually, to understand how it works under the hood.
> > >
> > > Bests
> > >
> > > --
> > > Alexandre
> > >
> 
> 
> 
> -- 
> Alexandre Languillat
> 

RE: binlog文件丢失问题

2024-01-19 Thread Jiabao Sun
Hi,

你的图挂了,可以贴一下图床链接或者直接贴一下代码。

Best,
Jiabao


On 2024/01/19 09:16:55 wyk wrote:
> 
> 
> 各位大佬好:
> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> 
> 
> 问题描述:
> 场景: 公司mysql有两个备库: 备库1和备库2。
> 1. 现在备库1需要下线,需要将任务迁移至备库2
> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> 
> 
> 
> 
> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> 
> 
> 注意: 备库一个备库二的gtid是保持一致的
> 
> 
> 
> 
> 图一:
> 
> 
> 图二:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

RE: Re:RE: RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi,

oracle cdc connector 已经接入增量快照读框架,动态加表也是可以统一去实现的。
可以去社区创建issue,也欢迎直接贡献。

Best,
Jiabao


On 2024/01/19 04:46:21 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-19 11:53:49,"Jiabao Sun"  写道:
> >Hi,
> >
> >Oracle CDC connector[1] 目前是不支持动态加表的。
> >
> >Best,
> >Jiabao
> >
> >[1] 
> >https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
> >
> >
> >On 2024/01/19 03:37:41 Jiabao Sun wrote:
> >> Hi,
> >> 
> >> 请提供一下 flink cdc 的版本,使用的什么连接器。
> >> 如果方便的话,也请提供一下日志。
> >> 另外,table 的正则表达式可以匹配到新增的表吗?
> >> 
> >> Best,
> >> Jiabao
> >> 
> >> [1] 
> >> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> >> 
> >> On 2024/01/19 03:27:22 王凯 wrote:
> >> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> >> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> >> > 
> >> > 
> >> > 王凯
> >> > 2813732...@qq.com
> >> > 
> >> > 
> >> > 
> >> > 
> 

RE: 退订

2024-01-18 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from u...@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
u...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/19 03:39:52 李乐 wrote:
> 退订

RE: RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi,

Oracle CDC connector[1] 目前是不支持动态加表的。

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html


On 2024/01/19 03:37:41 Jiabao Sun wrote:
> Hi,
> 
> 请提供一下 flink cdc 的版本,使用的什么连接器。
> 如果方便的话,也请提供一下日志。
> 另外,table 的正则表达式可以匹配到新增的表吗?
> 
> Best,
> Jiabao
> 
> [1] 
> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> 
> On 2024/01/19 03:27:22 王凯 wrote:
> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> > 
> > 
> > 王凯
> > 2813732...@qq.com
> > 
> > 
> > 
> > 

RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi,

请提供一下 flink cdc 的版本,使用的什么连接器。
如果方便的话,也请提供一下日志。
另外,table 的正则表达式可以匹配到新增的表吗?

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15

On 2024/01/19 03:27:22 王凯 wrote:
> 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> 参数,当从savepoint重启时,新添加的表的数据不能同步
> 
> 
> 王凯
> 2813732...@qq.com
> 
> 
> 
> 

RE: Python flink statefun

2024-01-18 Thread Jiabao Sun
Hi Alexandre,

I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in Docker 
Hub. 
You can temporarily use the release-3.2 version.

Hi Martijn, did we ignore pushing it to the docker registry?

Best,
Jiabao

[1] https://hub.docker.com/r/apache/flink-statefun-playground/tags

On 2024/01/18 17:09:20 Alexandre LANGUILLAT wrote:
> Hi,
> 
> I am trying to run the example provided here:
> https://github.com/apache/flink-statefun-playground/tree/release-3.3/python/greeter
> 
> 1 - Following the read.me, with docker (that I installed):
> 
> "docker-compose build" works well. But "docker-compose up" returns an error:
> 
> [image: image.png]
> 
> 2 - Without docker, having a virtual env with apache-flink-statefun and
> aiohttp installed, I ran "python functions.py" but I the server runs on
> port 8000 according to the script and I dont know how the request in curl
> (or postman) would work since it calls port 8090... :
> 
> curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d
> '{"name": "Bob"}' localhost:8090/example/person/Bob
> 
> 
> I wonder what I have to configure additionaly? I owuld be keen to run it
> without docker actually, to understand how it works under the hood.
> 
> Bests
> 
> -- 
> Alexandre
> 

RE: Flink Slow Execution

2024-01-17 Thread Jiabao Sun
Hi Dulce,

MiniCluster is generally used for local testing and is limited by the resources 
of a single machine. 
When more tasks are executed, it may not be able to immediately acquire the 
resources needed to start the MiniCluster, resulting in slower startup times. 

If running Flink tasks in a production environment, it is recommended to use 
cluster deployment mode[1]. 
You can also use resource providers like Yarn[2] or Kubernetes[3].

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/
[3] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

On 2024/01/17 21:11:56 Dulce Morim wrote:
> Hello,
> 
> In a single JVM, I'm running multiple flink batch jobs locally using the 
> MiniCluster (Java 17 and Flink 1.18).
> 
> At the beginning of the process, the Mini Cluster starts pretty much 
> instantly. However, I'm running into an issue where the more jobs I execute 
> the longer the MiniCluster takes to start.
> 
> Here's an example:
> 
> 2024-01-17 17:07:26.989 [INFO ] MiniCluster - Starting Flink Mini Cluster
> 2024-01-17 17:07:27.165 [INFO ] MiniCluster - Starting Metrics Registry
> 2024-01-17 17:07:33.801 [INFO ] MetricRegistryImpl - No metrics reporter 
> configured, no metrics will be exposed/reported.
> 2024-01-17 17:07:33.801 [INFO ] MiniCluster - Starting RPC Service(s)
> 2024-01-17 17:07:34.646 [INFO ] MiniCluster - Flink Mini Cluster started 
> successfully
> 
> Has anyone faced this issue?
> 
> Thanks.
> 

RE: 实时数仓场景落地问题

2024-01-14 Thread Jiabao Sun
Hi,

可以尝试使用 Flink CDC + Apache Paimon 去构建实时数仓。
目前 Paimon 已经支持使用 Flink CDC 将数据整库入湖,可以使用较小的成本进行实时入湖。
另外利用 Paimon partial update的特性,可以以较小的计算成本去构建 ADS 层宽表。
Paimon 也可以同时支持批式计算和流式计算,对于时效性和计算成本可以使用灵活的计算方式做平衡。

Best,
Jiabao


On 2024/01/14 12:57:29 海风 wrote:
> hello,公司里业务会拿一张t+1的离线数仓表名,经常是ads应用层的,问你可不可以做成实时表,大家有碰到这类需求嘛?我的理解现在虽然有实时数仓,或者流批一体这样探索,但是远没有到层级很深的ads层t+1离线表可能以较小的成本去实现实时化。
> 引申的问题是当前实时数仓已有较大规模的场景落地么?有哪些场景落地呢?落地的效果成本与效果大概是怎么样的呢?
> 
> 
> 

RE: 退订

2024-01-14 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org 
 if you want to
unsubscribe the mail from u...@flink.apache.org , 
and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 
 地址来取消订阅来自
u...@flink.apache.org  邮件组的邮件,你可以参考[1][2] 
管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/14 03:17:44 王春顺 wrote:
> 
> 退订

Re: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang,
 
1. Maybe filters not being correctly pushed down or the performance impact of 
single-concurrency writing to Iceberg. 
Can you please check the actual number of records written to Iceberg? 
Additionally, could you provide the version of the Iceberg connector and the 
SQL statement used for writing? 
This will help us investigate any potential planner issues.

2. It is also a good way to use the maximum id from yesterday as the lower 
bound.

By the way, for scenarios that require continuous writing, you can also try 
using Flink CDC.

Best,
Jiabao


> 2024年1月11日 10:52,haifang luo  写道:
> 
> Hello JiaBao
> Thank you for your reply~
> This doesn't seem to solve my problem.
> My steps are:
> Read the oracle table (super large wide table) according to the timestamp or 
> auto-incremented primary key ID every day, and write it to the iceberg table. 
> Only timestamp or ID are filter conditions, there are no other filter 
> conditions, and they are all index fields of the Oracle table.
> 1. If I do not configure partition scanning, the job will always have only 
> one degree of parallelism operating. When I execute a select query, the job 
> is completed quickly.
> But when I write the results of the select query to the iceberg table, the 
> jdbc connector will scan the oracle table from scratch, and it is very slow. 
> Whether it is to enter the entire table or filter part of the data, it takes 
> more than 7 hours to execute. I have checked and found that the read and 
> write performance of the Oracle library is no problem.
> 2. If I add a partition scan and filter the same amount of data from Oracle 
> and write it to the Iceberg table, it will complete the scan very quickly and 
> end the execution.
> I can't figure out whether this is a problem with the flink connector or 
> iceberg.
> 
> Jiabao Sun mailto:jiabao@xtransfer.cn>> 
> 于2024年1月10日周三 18:15写道:
>> Hi haifang,
>> 
>> lower-bound and upper-bound are defined as long types, and it seems 
>> difficult to fill in the value of timestamp. 
>> However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC 
>> supports filter pushdown.
>> 
>> Best,
>> Jiabao
>> 
>> On 2024/01/10 08:31:23 haifang luo wrote:
>> > Hello~~
>> > My Flink version: 1.15.4
>> > [image: image.png]
>> > 'scan.partition.column' type is timestamp, how should I fill in
>> > 'scan.partition.lower-bound' and  'scan.partition.upper-bound'?
>> > Thank you for your reply~~
>> >



RE: 退订这个邮箱

2024-01-10 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

RE: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang,

lower-bound and upper-bound are defined as long types, and it seems difficult 
to fill in the value of timestamp. 
However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC 
supports filter pushdown.

Best,
Jiabao

On 2024/01/10 08:31:23 haifang luo wrote:
> Hello~~
> My Flink version: 1.15.4
> [image: image.png]
> 'scan.partition.column' type is timestamp, how should I fill in
> 'scan.partition.lower-bound' and  'scan.partition.upper-bound'?
> Thank you for your reply~~
> 

RE: Rabbitmq connector for Flink v1.18

2024-01-09 Thread Jiabao Sun
Hi Charlotta,

The latest news about connector releases is here[1]. 
You can subscribe to the mailing list or follow the jira issue to get the 
latest updates.

Best,
Jiabao

[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2
[2] https://flink.apache.org/what-is-flink/community/


On 2024/01/08 08:55:46 Charlotta Westberg via user wrote:
> Hi,
> 
> We are using rabbitmq sources and sinks, and wanted to upgrade to flink 1.18, 
> but noticed the documentation on RabbitMQ Connector mentioned
> 
> There is no connector (yet) available for Flink version 1.18
> 
> I tried to find a JIRA issue for the connector to support flink 1.18 but was 
> unable to. Is there a plan for the rabbitmq connector and flink 1.18?
> 
> Best regards
> Charlotta
> 

RE: 如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 Thread Jiabao Sun
Hi,

可以参考下这篇文档[1],进行简单的测试。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i


On 2024/01/02 08:02:10 "casel.chen" wrote:
> 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
> flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

Re: Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 Thread Jiabao Sun
Hi,

GuavaFlinkConnectorRateLimiter 目前只在 flink-connector-gcp-pubsub[1] 有使用。
Flink CDC 还未支持限流[2],目前可以尝试降低 snapshot 并发数来缓解数据库压力。 

Best,
Jiabao

[1] 
https://github.com/apache/flink-connector-gcp-pubsub/blob/f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L22
[2] https://github.com/ververica/flink-cdc-connectors/issues/510


> 2024年1月2日 11:39,casel.chen  写道:
> 
> 业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
> 请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?
> 
> 
> 我看到社区之前有人提议过,但issue一直是open状态
> https://issues.apache.org/jira/browse/FLINK-18740
> 
> 
> 另外,我在flink最新master分支源码中有找到 
> GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?



RE: FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 Thread Jiabao Sun
Hi,

使用 SQL 的话不太好实现写入多个路径,
使用 DataStream 的话可以考虑自己实现一个 RichSinkFunction。

Best,
Jiabao

On 2023/12/29 08:37:34 jinzhuguang wrote:
> Flink版本:1.16.0
> 
> 看官网上的案例:
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,
>   ...
>   part_name1 INT,
>   part_name2 STRING
> ) PARTITIONED BY (part_name1, part_name2) WITH (
>   'connector' = 'filesystem',   -- 必选:指定连接器类型
>   'path' = 'file:///path/to/whatever',  -- 必选:指定路径
>   'format' = '...', -- 必选:文件系统连接器指定 format
> -- 有关更多详情,请参考 Table Formats
>   'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串
> 
>   -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 
> 的文件数,但是可能会导致数据倾斜,默认值是 false
>   'sink.shuffle-by-partition.enable' = '...',
>   ...
> )
> 目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。

RE: Flink SQL Windowing TVFs

2023-12-28 Thread Jiabao Sun
Hi,

在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。
部署到生产的 SQL 是否包含了 GROUP BY 表达式?
本地测试的Flink版本是不是1.14.0?

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate



On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote:
> Hi,
>  我在使用1.14.0版本Flink,本地测试了CUMULATE(TABLE kafka, DESCRIPTOR(rowtime), 
> INTERVAL '60' SECOND, INTERVAL '1' DAYS)方法可以正常运行,但是当我将其部署到线上环境报了如下错误:
>  org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: Currently Flink doesn't support individual window 
> table-valued function CUMULATE(time_col=[rowtime], max_size=[8640 ms], 
> step=[1 min]).
>  Please use window table-valued function with the following computations:
>  1. aggregate using window_start and window_end as group keys.
>  2. topN using window_start and window_end as partition key.
>  3. join with join condition contains window starts equality of input 
> tables and window ends equality of input tables.
>  请问这是因为线上包版本导致的吗,如果是版本问题,具体是哪一个包呢
>  非常感谢
> 

Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 Thread Jiabao Sun
Hi,

是的,目前来说会 block 住。
flush + apply schema change 一般来说不会持续太长时间,
且 schema 变更一般来说是低频事件,即使 block 也不会有太大性能影响。

Best,
Jiabao


> 2023年12月28日 12:57,casel.chen  写道:
> 
> 
> 
> 
> 感谢解惑!
> 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>> Hi,
>> 
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>> 
>> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>> writer,参考 DorisEventSerializer
>> 
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>> 被 block 的原因是 responseFuture没有 
>> complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>> 在没有完成时会 block 住。 
>> 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>> waitFlushSuccess的responseFuture 标记为 complete。
>> 参考 
>> SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>> 
>> 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>> 
>>> 2023年12月27日 22:14,casel.chen  写道:
>>> 
>>> 看了infoq介绍flink cdc 3.0文章 
>>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>>> d6 其中d代表数据变更,s代表schema变更
>>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>>> 而Task2处理 d3, d4, d5, s3, d6
>>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>>> 
>>> 
>>> SchemaOperator代码中
>>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>>> schemaChangeEvent) {
>>>   // The request will need to send a FlushEvent or block until flushing 
>>> finished
>>>   SchemaChangeResponse response = requestSchemaChange(tableId, 
>>> schemaChangeEvent);
>>>   if (response.isShouldSendFlushEvent()) {
>>>   LOG.info(
>>>   "Sending the FlushEvent for table {} in subtask {}.",
>>>   tableId,
>>>   getRuntimeContext().getIndexOfThisSubtask());
>>>   output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>>   output.collect(new StreamRecord<>(schemaChangeEvent));
>>>   // The request will block until flushing finished in each sink 
>>> writer
>>>   requestReleaseUpstream();
>>>   }
>>>   }
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>>> 求指教,谢谢!
>>> 



Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 Thread Jiabao Sun
Hi,

> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?

Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
writer,参考 DorisEventSerializer

> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 
使用 responseFuture.get() 在没有完成时会 block 住。 
只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
waitFlushSuccess的responseFuture 标记为 complete。
参考 
SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.

保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit

> 2023年12月27日 22:14,casel.chen  写道:
> 
> 看了infoq介绍flink cdc 3.0文章 
> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
> d6 其中d代表数据变更,s代表schema变更
> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
> 而Task2处理 d3, d4, d5, s3, d6
> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
> 
> 
> SchemaOperator代码中
> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
> schemaChangeEvent) {
>// The request will need to send a FlushEvent or block until flushing 
> finished
>SchemaChangeResponse response = requestSchemaChange(tableId, 
> schemaChangeEvent);
>if (response.isShouldSendFlushEvent()) {
>LOG.info(
>"Sending the FlushEvent for table {} in subtask {}.",
>tableId,
>getRuntimeContext().getIndexOfThisSubtask());
>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>output.collect(new StreamRecord<>(schemaChangeEvent));
>// The request will block until flushing finished in each sink 
> writer
>requestReleaseUpstream();
>}
>}
> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?
> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
> 求指教,谢谢!
> 



RE: lock up表过滤条件下推导致的bug

2023-12-25 Thread Jiabao Sun
Hi,

邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。

Best,
Jiabao


On 2023/12/25 12:22:41 杨光跃 wrote:
> 我的sql如下:
> 、
> 
> 
> t_purch_apply_sent_route 是通过flink cdc创建的
> t_purch_apply_sent_route_goods 是普通的jdbc
> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据
> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推
> 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
> 
> 
> 
> 

RE: Re:Flink脏数据处理

2023-12-21 Thread Jiabao Sun
Hi,

需要精准控制异常数据的话,就不太推荐flink sql了。
考虑使用DataStream将异常数据用侧流输出[1],再做补偿。

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/


On 2023/12/06 08:45:20 Xuyang wrote:
> Hi, 
> 目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
> 1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
> 2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。
> 
> 
> 但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。
> 
> 
> 要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
> connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]
> 
> 
> [1] 
> https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program
> 
> 
> 
> --
> 
> Best!
> Xuyang
> 
> 
> 
> 
> 
> 在 2023-12-06 15:26:56,"刘建"  写道:
> >Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
> >我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等
> 

RE: Re:Re:flink sql支持批量lookup join

2023-12-21 Thread Jiabao Sun
Hi, casel.

使用三次lookup join是可以实现的,加上缓存,性能应该不差。

WITH users AS (
SELECT *
  FROM (VALUES(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu')) T (id, name)
)
SELECT orders.id, 
   u1.name as creator_name,
   u2.name as approver_name,
   u3.name as deployer_name
FROM (
   SELECT *
  FROM (VALUES(1, 1, 2, 3)) T (id, creator_id, approver_id, deployer_id)
) AS orders
LEFT JOIN users AS u1 ON orders.creator_id = u1.id
LEFT JOIN users AS u2 ON orders.approver_id = u2.id
LEFT JOIN users AS u3 ON orders.deployer_id = u3.id;

Best,
Jiabao

On 2023/11/22 12:44:47 "casel.chen" wrote:
> 有一张维表 user,包含id和name字段
> id  | name
> -
> 1 | zhangsan
> 2 | lisi
> 3 | wangwu
> 
> 
> 现在实时来了一条交易数据 
> id  | creator_id  | approver_id  | deployer_id
> -
> 1   | 1| 2   | 3
> 
> 
> 希望lookup维表user返回各用户名称
> id   |  creator_name   |  approver_name  |  deployer_name
> 
> 1| zhangsan  |  lisi|. wangwu
> 
> 
> 
> 以上场景用flink sql要如何实现?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-11-22 12:37:10,"Xuyang"  写道:
> >Hi, casel.
> >可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
> >k3=v3的用法的。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2023-11-22 11:55:11,"casel.chen"  写道:
> >>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
> >>
> >>
> >>id key1 key2 key3
> >>想实现批量lookup查询返回一行数据 id value1 value2 value3
> >>
> >>
> >>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
> >>id key1 key2 key3
> >>先将多列转成多行
> >>id key1
> >>id key2
> >>id key3
> >>
> >>分别进行lookup join后得到
> >>id value1
> >>id value2
> >>id value3
> >>最后多行转多列返回一行数据
> >>
> >>id value1 value2 value3
> >>
> >>
> >>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?
> 

RE: Pending records

2023-12-21 Thread Jiabao Sun
Hi rania,

Does "pending records" specifically refer to the records that have been read 
from the source but have not been processed yet?

If this is the case, FLIP-33[1] introduces some standard metrics for Source, 
including "pendingRecords," which can be helpful. 
However, not all Sources support this metric. For specific information, please 
refer to the documentation of the specific Source.

Best,
Jiabao

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


On 2023/11/26 16:09:54 rania duni wrote:
> Hello! 
> I want to get the pending records of a task. What is the best approach to get 
> the unprocessed records of a task?

RE: Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-20 Thread Jiabao Sun
Hi Chosen,

Whether kafka appender is supported or not has no relation to the 
flink-kubernetes-operator. 
It only depends on whether log4j2 supports kafka appender.

From the error message, it appears that the error is caused by the absence of 
the log4j-layout-template-json[1] plugin.
For the customized JARs, we can consider customizing the base image or refer to 
the examples in the podTemplate[2]
and use initContainers to download the JAR files and place them in the 
flink/lib directory.

Hope it helps.

Best,
Jiabao

[1] https://logging.apache.org/log4j/2.x/manual/json-template-layout.html
[2] 
https://github.com/apache/flink-kubernetes-operator/blob/808edfd156dc12932b6dd03146ccd2bec49963fb/examples/pod-template.yaml

On 2023/12/05 14:42:44 秋成 王 wrote:
> Hi,
> 
> I am recently working on syncing my Flink log to Kafka via log4j2 Kafka 
> appender. I have a log4j2.properties file which works fine locally, say run 
> my flink fat jar form terminal via following command:
>   PS D:\repo>>java -cp .\reconciliation-1.0-SNAPSHOT.jar 
> The log can be synced to Kafka successfully when run locally.
> 
> The contents of log4j2.properties file are pasted below:
> rootLogger.level = INFO
> rootLogger.appenderRef.kafka.ref = KafkaLog
> appender.kafka.type = Kafka
> appender.kafka.name = KafkaLog
> 
> appender.kafka.topic = topicName
> appender.kafka.properties[0].type=Property
> appender.kafka.properties[0].name=bootstrap.servers
> appender.kafka.properties[0].value=
> appender.kafka.properties[1].type=Property
> appender.kafka.properties[1].name=sasl.mechanism
> appender.kafka.properties[1].value=PLAIN
> appender.kafka.properties[2].type=Property
> appender.kafka.properties[2].name=sasl.jaas.config
> appender.kafka.properties[2].value=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="$ConnectionString" 
> password="${env:log_event_hub_connection_string}";
> appender.kafka.properties[3].type=Property
> appender.kafka.properties[3].name=security.protocol
> appender.kafka.properties[3].value=SASL_SSL
> 
> appender.kafka.layout.type = JsonTemplateLayout
> appender.kafka.layout.eventTemplateUri = classpath:kusto-applogv2-layout.json
> appender.kafka.layout.eventTemplateAdditionalField[0].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[0].key = Application
> appender.kafka.layout.eventTemplateAdditionalField[0].value = reconciliation
> appender.kafka.layout.eventTemplateAdditionalField[0].format = String
> appender.kafka.layout.eventTemplateAdditionalField[1].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[1].key = Language
> appender.kafka.layout.eventTemplateAdditionalField[1].value = Java
> appender.kafka.layout.eventTemplateAdditionalField[1].format = String
> 
> 
> I am now deploying Flink via Flink Kubernetes operator. However, after I 
> copied the contents in log4j2.properties file to log4j-console.properties 
> under section of logConfiguration in FlinkDeployment yaml, the kafka Appender 
> failed to init with error message:
> 
>   2023-12-05 10:12:36,903 main ERROR Unable to locate plugin type for 
> JsonTemplateLayout
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:37,047 main ERROR Unable to locate plugin for 
> JsonTemplateLayout
> 
> 
> My question is that Does Flink Kubernetes operator support Kafka appender 
> configuration in log4j-console.properties? If so can anyone provide me with 
> an example?
> 
> 
> PS: similar error message once showed up when run locally, I fixed the issue 
> with sulotion posted here. via adding
> 
> com.github.edwgiz.mavenShadePlugin.log4j2CacheTransformer.PluginsCacheFileTransformer
>  to pom file.
> 
> java - Console contains an invalid element or attribute "JsonTemplateLayout" 
> even after adding dependency - Stack 
> Overflow
> 
> 
> Thanks,
> 
> Chosen
> 

RE: Feature flag functionality on flink

2023-12-18 Thread Jiabao Sun
Hi,

If it is for simplicity, you can also try writing the flag into an external 
system, such as Redis、Zookeeper or MySQL, 
and query the flag from the external system when perform data processing.

However, Broadcast State is still the mode that I recommend. 
Perhaps we only need to encapsulate the repetitive logic (reading data from a 
topic) by defining a ConfigSource, 
for example, to handle reading data from a topic and converting it into the 
interested configuration items,
so that independent operators can be reused that logic.

Additionally, I have attached some articles about the usage of Broadcast 
State[1][2].
Hope it helps.

Best,
Jiabao

[1] 
https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/
[2] 
https://flink.apache.org/2020/03/24/advanced-flink-application-patterns-vol.2-dynamic-updates-of-application-logic/


On 2023/12/07 16:18:42 Oscar Perez via user wrote:
> Hi,
> We would like to enable sort of a feature flag functionality for flink jobs.
> 
> The idea would be to use broadcast state reading from a configuration topic
> and then ALL operators with logic would listen to this state.
> 
> This documentation:
> 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/broadcast_state/
> 
> explains how a certain operator can use this broadcast state but the
> problem we are having is understanding how we can share the same state
> across many different operators. One way is to create multiple streams, one
> per operator reading from the same topic and then connect to the multiple
> operators in a keyedbroadcastprocessfunction but this seems overkill
> 
> Is there an elegant solution to this problem?
> regards,
> Oscar
> 

RE: flink1.15-flink1.18官方提供写入Elasticsearch的接口报序列化异常

2023-12-18 Thread Jiabao Sun
Hi,

createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。
Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。

Best,
Jiabao


On 2023/12/12 07:53:53 李世钰 wrote:
> val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String]
>   // This instructs the sink to emit after every element, otherwise they would
>   // be buffered
>   .setBulkFlushMaxActions(1)
>   .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
>   .setEmitter(
> (element: String, context: SinkWriter.Context, indexer: RequestIndexer) 
> =
>   indexer.add(createIndexRequest(element)))
>   .build()
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: The elasticsearch emitter must be serializable. 
> 
> Caused by: java.lang.IllegalStateException: The elasticsearch emitter must be 
> serializable.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at 
> org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase.setEmitter(ElasticsearchSinkBuilderBase.java:77)
> at 
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder.setEmitter(Elasticsearch7SinkBuilder.java:63)

RE: Control who can manage Flink jobs

2023-12-17 Thread Jiabao Sun
Hi,

I don't have much experience with Beam. 
If you only need to submit Flink tasks, I would recommend StreamPark[1].

Best,
Jiabao

[1] https://streampark.apache.org/docs/user-guide/Team

On 2023/11/30 09:21:50 Поротиков Станислав Вячеславович via user wrote:
> Hello!
> Is there any way to control who can manage (submit/cancel) jobs to Flink 
> cluster. We have multiple teams and I am looking for decision how can we use 
> Beam+Flink safely.
> 
> Best regards,
> Stanislav Porotikov
> 
> 
> С уважением,
> Поротиков Станислав
> Инженер эскплуатации веб-сервисов
> Команда SRS
> 
> 

RE: Socket timeout when report metrics to pushgateway

2023-12-17 Thread Jiabao Sun
Hi,

The pushgateway uses push mode to report metrics. When deployed on a single 
machine under high load, there may be some performance issues. 
A simple solution is to set up multiple pushgateways and push the metrics to 
different pushgateways based on different task groups.

There are other metrics reporters available based on the push model, such as 
InfluxDB[1]. In a clustered mode, InfluxDB may offer better performance than 
pushgateway. 
You can try using InfluxDB as an alternative and evaluate its performance.

I speculate that the reason for using pushgateway is because when running Flink 
with YARN application or per job mode, the task ports are randomized, 
making it difficult for prometheus to determine which task to scrape. 

By the way, if you deploy tasks using the flink kubernetes operator,  you can 
directly use the prometheus metrics reporter without the need for 
pushgateway[2].

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/metric_reporters/#influxdb
[2] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#how-to-enable-prometheus-example


On 2023/12/12 08:23:22 李琳 wrote:
> hello,
>   we build flink report metrics to prometheus pushgateway, the program has 
> been running for a period of time, with a amount of data reported to 
> pushgateway, pushgateway response socket timeout exception, and much of 
> metrics data reported failed. following is the exception:
> 
> 
>  2023-12-12 04:13:07,812 WARN 
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter [] - Failed 
> to push metrics to PushGateway with jobName
> 00034937_20231211200917_54ede15602bb8704c3a98ec481bea96, groupingKey{}.
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream. socketRead(Native Method) ~[?:1.8.0_281]
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
> ~[?:1.8.0 281]
> at java.net.SocketInputStream.read(SocketInputStream. java:171) ~[?:1.8.0 
> 281] at java.net.SocketInputStream.read(SocketInputStream. java:141) 
> ~[?:1.8.0 2811
> at java.io.BufferedInputStream.fill (BufferedInputStream. java:246) ~[?:1.8.0 
> 2811 at java.io. BufferedInputStream.read1(BufferedInputStream.java:286) 
> ~[?:1.8.0_281] at 
> java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0 281] 
> at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) 
> ~[?:1.8.0_281] at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) 
> ~[?:1.8.0_281] at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
>  ~[?:1.8.0_281] at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
>  ~[?:1.8.0 2811 at 
> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)~[?:1.8.0_281]
>  at 
> io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:315)~[flink-metrics-prometheus-1.13.5.jar:1.13.5]
> at io.prometheus. client.exporter .PushGateway .push (PushGatevay . java:138) 
> ~[flink-metrics-prometheus-1.13.5. jar:1.13.51
> at 
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:63)
> [flink-metrics-prometheus-1.13.5.jar:1.13.51
> at org.apache. flink.runtime.metrics.MetricRegistryImp1$ReporterTask.run 
> (MetricRegistryImpl. java:494) [flink-dist_2.11-1.13.5.jar:1.13.5]
> 
> after test, it was caused with amount of data reported to pushgateway, then 
> we restart pushgateway server and the exception disappeared, but after sever 
> hours the exception re-emergenced.
> 
> so i want to know how to config flink or pushgateway to avoid the exception?
> 
> best regards.
> leilinee 

Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread Jiabao Sun
Hi,
生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。
另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。
Best,
Jiabao
--
From:jinzhuguang 
Send Time:2023年9月20日(星期三) 20:56
To:user-zh 
Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?


RE: 咨询求助: Least函数输入正常 但是返回值异常

2023-08-20 Thread Jiabao Sun
Hi,

方便提供一下复现的用例吗?

Best,
Jiabao


On 2023/08/21 02:19:53 guifeng huang wrote:
> (Flink1.15版本)
> 咨询求助: Least函数输入参数(Double类型)正常, 在Flink shell里测试函数无问题, 结果符合预期. 
> 但是实际生产流里进行使用的时候发现返回结果有异, 以下是3种case
> - 返回结果正确, 符合预期
> - 返回0, 不符合预期, 未知原因
> - 返回结果和理论正确值有微小的gap, 找了几个case都是1位数值里的差距.
> 看看有没有其他的老师遇到过同样的问题 

Re: Flink消费MySQL

2023-08-07 Thread Jiabao Sun
Hi,

可以尝试使用 flink-cdc-connectors 去实时关联。
使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
被关联的表变化不大的话可以考虑 lookup join。

Best,
Jiabao


> 2023年8月8日 上午11:10,小昌同学  写道:
> 
> 谢谢老师指导呀;
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> 老师这一块有更好的建议嘛
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年8月8日 10:37 |
> | 收件人 |  |
> | 主题 | Re: Flink消费MySQL |
> Hi,
> 
> 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> 
> 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> 
> Best,
> Shammon FY
> 
> On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> 
> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
> 
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
> 
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
> 
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
> 
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
> 
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
> 
> @Override
> public void cancel() {
> }
> };
> 
> 
> |
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



RE: Questions about java enum when convert DataStream to Table

2023-08-02 Thread Jiabao Sun
Hi haishui,

The enum type cannot be mapped as flink table type directly.

I think the easiest way is to convert enum to string type first:

DataStreamSource> source = env.fromElements(
new Tuple2<>("1", TestEnum.A.name()),
new Tuple2<>("2", TestEnum.B.name())
);

Or add a map transformation:

DataStream> source1 =
env.fromElements(
new TestData("1", TestEnum.A),
new TestData("2", TestEnum.B))
   .map(t -> new Tuple2<>(t.s, t.en.name()))
   .returns(new TypeHint>() {});

Hope it helps.

Best,
Jiabao


On 2023/08/02 06:43:30 haishui wrote:
> I want to convert dataStream to Table. The type of dataSream is a POJO, which 
> contains a enum field.
> 
> 
> 1. The enum field is RAW('classname', '...') in table. When I execute `SELECT 
> * FROM t_test` and print the result, It throws EOFException.
> 2. If I assign the field is STRING in schema, It throws cannot cast 
> "TestEnum" to "java.lang.String"
> 
> 
> Is there any way to define the enum field as STRING in table?
> 
> 
> My code is as follows:
> Flink 1.17.1
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStreamSource source = env.fromElements(
> new TestData("1", TestEnum.A),
> new TestData("2", TestEnum.B)
> );
> Schema schema = Schema
>  .newBuilder()
>  .column("s", DataTypes.STRING())
>  .column("en", DataTypes.STRING())
>  .build();
> Table table = tableEnv.fromDataStream(source);
> tableEnv.createTemporaryView("t_test", table);
> tableEnv.executeSql("DESC t_test").print();
> tableEnv.executeSql("select * from t_test").print();
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public static class TestData {
> private String s;
> private TestEnum en;
> }
> 
> public enum TestEnum {
> A, B, C
> }
> ++++
> | op |  s | en |
> ++++
> | +I |  1 | SqlRawValue{?} |
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: 
> java.io.EOFException
>   at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66)
>   at GeneratedCastExecutor$1.cast(Unknown Source)
>   at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74)
>   at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87)
>   at 
> org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167)
>   at 
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> Caused by: java.io.EOFException
>   at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202)
>   at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
>   at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96)
>   at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505)
>   at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>   ... 7 more

Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-08-02 Thread Jiabao Sun
你好,

不需要将所有的依赖都改为snapshot,仅需要将我们项目内的版本加上 snapshot 后缀。
可以在项目中统一替换版本号 1.x.x -> 1.x.x-SNAPSHOT,或者使用 mvn versions:set 
-DnewVersion=1.x.x-SNAPSHOT 设置。


> 2023年8月2日 下午2:25,jinzhuguang  写道:
> 
> 非常感谢你的提醒,我现在用maven工具修改了所有的版本号为snapshot,但是flink-connectors(connectors的父模块)也变成snapshot,打包的时候仓库里找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办
> 
>> 2023年7月27日 11:05,Jiabao Sun  写道:
>> 
>> 你好,
>> 
>> 通常在 pom 中引入 maven-deploy-plugin,并且通过  声明私服地址,使用 mvn 
>> clean deploy 命令部署到nexus私服。
>> 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 
>> versions-maven-plugin 统一设置。
>> 
>> 
>>   
>>   
>>   
>>  org.apache.maven.plugins
>>  maven-deploy-plugin
>>  2.8.2
>>  
>>${maven.deploy.skip}
>>  
>>
>>   
>>   
>> 
>>   
>>   
>>   private-snapshots
>>   
>> https://xxx.xxx.xxx/nexus/content/repositories/snapshots/
>>   
>>   
>>   private-releases
>>   https://xxx.xxx.xxx/nexus/content/repositories/releases/
>>   
>>   
>> 
>> 
>> 
>>> 2023年7月27日 上午10:48,jinzhuguang  写道:
>>> 
>>> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?



Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-07-26 Thread Jiabao Sun
你好,

通常在 pom 中引入 maven-deploy-plugin,并且通过  声明私服地址,使用 mvn 
clean deploy 命令部署到nexus私服。
部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin 
统一设置。





org.apache.maven.plugins
maven-deploy-plugin
2.8.2

  ${maven.deploy.skip}

  





private-snapshots
https://xxx.xxx.xxx/nexus/content/repositories/snapshots/


private-releases
https://xxx.xxx.xxx/nexus/content/repositories/releases/





> 2023年7月27日 上午10:48,jinzhuguang  写道:
> 
> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?



RE: Suggestions for Open Source FLINK SQL editor

2023-07-26 Thread Jiabao Sun
Hi Rajat,

I think Apache StreamPark(Incubating) or Apache Zeppelin is a good choice.

https://streampark.apache.org/ 
https://zeppelin.apache.org/ 


Best,
Jiabao


On 2023/07/19 16:47:43 Rajat Ahuja wrote:
> Hi team,
> 
> I have set up a session cluster on k8s via sql gateway.  I am looking for
> an open source Flink sql editor that can submit sql queries on top of the
> k8s session cluster. Any suggestions for sql editor to submit queries ?
> 
> 
> Thanks
> 

RE: flink如何正确使用mybatis

2023-07-26 Thread Jiabao Sun
SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。


On 2023/07/18 02:13:16 lxk wrote:
> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下
> 
> public class MybatisUtil {
> 
> private static final Logger LOGGER = 
> LogFactory.createNewLogger("MybatisUtil");
> private static ThreadLocal tl = new ThreadLocal();
> private static SqlSessionFactory factory = null;
> //private static  SqlSession sqlSession = null;
> static {
> // 1 读取配置文件 config.xml
> InputStream in = null;
> try {
> in = Resources.getResourceAsStream("batis.xml");
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> throw new RuntimeException(e);
> }
> // 2 创建SqlSessionFactory
> factory = new SqlSessionFactoryBuilder().build(in);
> }
> 
> 
> 
> public static SqlSession getSqlSession(){
> SqlSession sqlSession = tl.get();
> if(sqlSession == null){
> sqlSession = factory.openSession();
> tl.set(sqlSession);
> LOGGER.info("sqlSession创建成功,连接为:{},时间为:{}", sqlSession,LocalTimeUtil.now());
> }
> return sqlSession;
> }
> 
> 
> }
> 以上是工具类
> 我在open方法中获取sqlsession,然后在invoke方法中使用mapper
> public void open(Configuration parameters) throws Exception {
> sqlSession = MybatisUtil.getSqlSession();
> }
> 
> public List map(HeaderFullWithPreOrder headerFullWithPreOrder) 
> throws Exception {
> SelectAddCartMapper mapper = sqlSession.getMapper(SelectAddCartMapper.class);
> ...其他省略
> }
> 
> 想问下这种方式使用是否正确。以及sqlsession是否需要关闭,看见相关帖子有说如果sqlsession不关闭的话会把连接打满
> 

RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian,

I think we can use podTemplate to mount kubernetes secrets as file or 
environment variables.
Then we can access the secrets in our flink program. 

Please refers to

https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml
 

https://kubernetes.io/docs/concepts/configuration/secret/#using-a-secret 


On 2023/07/21 10:53:10 tian tian wrote:
> Like s3.secret-key, the plaintext password cannot be directly written in
> the configuration. Is there a template language like jinja that can be
> replaced after mounting to the pod?
> 
> >
>