Re: 关于 mongo db 的splitVector 权限问题

2024-05-23 Thread Jiabao Sun
Hi, splitVector 是 MongoDB 计算分片的内部命令,在副本集部署模式下也可以使用此命令来计算 chunk 区间。 如果没有 splitVector 权限,会自动降级为 sample 切分策略。 Best, Jiabao evio12...@gmail.com 于2024年5月23日周四 16:57写道: > > hello~ > > > 我正在使用 flink-cdc mongodb connector 2.3.0 >

Re: flink集群如何将日志直接写入elasticsearch中?

2024-03-13 Thread Jiabao Sun
比较简单的方式是启动一个filebeat进程,抓取 jobmanager.log 和t askmanager.log Best, Jiabao kellygeorg...@163.com 于2024年3月13日周三 15:30写道: > 有没有比较方便快捷的解决方案? > > >

RE: 退订

2024-02-08 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user@flink.apache.org, and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自 user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Jiabao [1]

RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
ink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java > But this doesn't seem released yet. Can you please point me towards correct > Flink version? > > Also, any help on question 1 regarding Schema Registry? > > Regards, > Kirti Dhar > > -Original Message

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Jiabao Sun
Hi Kirti, Kafka Sink supports sending messages with headers. You should implement a HeaderProvider to extract headers from input element. KafkaSink sink = KafkaSink.builder() .setBootstrapServers(brokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder()

RE: Re: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
> Could you please share examples on how to "*update*" data using > ElasticsearchSink? > > Thanks > > On Mon, Jan 29, 2024 at 9:07 PM Jiabao Sun wrote: > > > Hi Fidea, > > > > I found some examples in the Java documentation, and I hope they can

RE: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea, I found some examples in the Java documentation, and I hope they can be helpful. private static class TestElasticSearchSinkFunction implements ElasticsearchSinkFunction> { public IndexRequest createIndexRequest(Tuple2 element) { Map json = new HashMap<>();

RE: Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Jiabao Sun
Hi Tauseef, We cannot directly write POJO types into Elasticsearch. You can try serializing the TopologyDTO into a JSON string like Jackson before writing it. public static void main(String[] args) throws IOException { try (RestHighLevelClient client = new RestHighLevelClient(

RE: 回复:RE: how to get flink accumulated sink record count

2024-01-25 Thread Jiabao Sun
ing the metric in Flink tasks. > > > > > --原始邮件------ > 发件人: "Jiabao Sun" 发送时间: 2024年1月25日(星期四) 下午3:11 > 收件人: "user" 主题: RE: how to get flink accumulated sink record count > > > > > > I guess get

RE: how to get flink accumulated sink record count

2024-01-24 Thread Jiabao Sun
Hi Enric, I guess getting the metrics[1] might be helpful for you. You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3]. Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/ [2]

RE: Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 Thread Jiabao Sun
类中要定义一个Map类型的offsetField > (类似mongodb对应ChangeStreamOffset中的resumeTokenField)? > 当前mongodb中定义的是Json String类型 > > 在 2024-01-22 11:03:55,"Jiabao Sun" 写道: > >Hi, > > > >Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。 > >Flink CDC MongoDB c

RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 Thread Jiabao Sun
> > > > Flink CDC MongoDB connector 还是基于debezium实现的 > > > > > > > > > 在 2024-01-22 10:14:32,"Jiabao Sun" 写道: > >Hi, > > > >可以参考 Flink CDC MongoDB connector 的实现。 > > > >Best, > >Jiabao > > > > &g

RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 Thread Jiabao Sun
Hi, 可以参考 Flink CDC MongoDB connector 的实现。 Best, Jiabao On 2024/01/22 02:06:37 "casel.chen" wrote: > 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc > 3.x自行开发,查了一下现有大部分flink cdc source > connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental >

RE: Re:RE: binlog文件丢失问题

2024-01-19 Thread Jiabao Sun
read binlog starting at " > + mySqlOffsetContext.getSourceInfo() > + ", but this is no longer " > + "available on the server. Reconfigure the connector to > use a snapshot when needed."); > } > > 在 2024-01-19 17:33:03,"J

RE: Re: Python flink statefun

2024-01-19 Thread Jiabao Sun
re in Python it's handled). > > Bests, > > Alex > > Le ven. 19 janv. 2024 à 02:44, Jiabao Sun a > écrit : > > > Hi Alexandre, > > > > I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in > > Docker Hub. > > You can temporar

RE: binlog文件丢失问题

2024-01-19 Thread Jiabao Sun
Hi, 你的图挂了,可以贴一下图床链接或者直接贴一下代码。 Best, Jiabao On 2024/01/19 09:16:55 wyk wrote: > > > 各位大佬好: > 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下: > > > 问题描述: > 场景: 公司mysql有两个备库: 备库1和备库2。 > 1. 现在备库1需要下线,需要将任务迁移至备库2 > 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一 >

RE: Re:RE: RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi, oracle cdc connector 已经接入增量快照读框架,动态加表也是可以统一去实现的。 可以去社区创建issue,也欢迎直接贡献。 Best, Jiabao On 2024/01/19 04:46:21 "casel.chen" wrote: > > > > > > > 想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢? > > > > > > > > > > >

RE: 退订

2024-01-18 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from u...@flink.apache.org, and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自 u...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Jiabao [1]

RE: RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi, Oracle CDC connector[1] 目前是不支持动态加表的。 Best, Jiabao [1] https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html On 2024/01/19 03:37:41 Jiabao Sun wrote: > Hi, > > 请提供一下 flink cdc 的版本,使用的什么连接器。 > 如果方便的话,也请提供一下日志。 > 另外,table 的

RE: flink cdc动态加表不生效

2024-01-18 Thread Jiabao Sun
Hi, 请提供一下 flink cdc 的版本,使用的什么连接器。 如果方便的话,也请提供一下日志。 另外,table 的正则表达式可以匹配到新增的表吗? Best, Jiabao [1] https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15 On 2024/01/19 03:27:22 王凯 wrote: > 在使用flink

RE: Python flink statefun

2024-01-18 Thread Jiabao Sun
Hi Alexandre, I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in Docker Hub. You can temporarily use the release-3.2 version. Hi Martijn, did we ignore pushing it to the docker registry? Best, Jiabao [1] https://hub.docker.com/r/apache/flink-statefun-playground/tags On

RE: Flink Slow Execution

2024-01-17 Thread Jiabao Sun
Hi Dulce, MiniCluster is generally used for local testing and is limited by the resources of a single machine. When more tasks are executed, it may not be able to immediately acquire the resources needed to start the MiniCluster, resulting in slower startup times. If running Flink tasks in a

RE: 实时数仓场景落地问题

2024-01-14 Thread Jiabao Sun
Hi, 可以尝试使用 Flink CDC + Apache Paimon 去构建实时数仓。 目前 Paimon 已经支持使用 Flink CDC 将数据整库入湖,可以使用较小的成本进行实时入湖。 另外利用 Paimon partial update的特性,可以以较小的计算成本去构建 ADS 层宽表。 Paimon 也可以同时支持批式计算和流式计算,对于时效性和计算成本可以使用灵活的计算方式做平衡。 Best, Jiabao On 2024/01/14 12:57:29 海风 wrote: >

RE: 退订

2024-01-14 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from u...@flink.apache.org , and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org

Re: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
d the execution. > I can't figure out whether this is a problem with the flink connector or > iceberg. > > Jiabao Sun mailto:jiabao@xtransfer.cn>> > 于2024年1月10日周三 18:15写道: >> Hi haifang, >> >> lower-bound and upper-bound are defined as long types, and it seems &g

RE: 退订这个邮箱

2024-01-10 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user@flink.apache.org, and you can refer [1][2] for more details. 请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自 user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Best, Jiabao [1]

RE: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang, lower-bound and upper-bound are defined as long types, and it seems difficult to fill in the value of timestamp. However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC supports filter pushdown. Best, Jiabao On 2024/01/10 08:31:23 haifang luo wrote: > Hello~~

RE: Rabbitmq connector for Flink v1.18

2024-01-09 Thread Jiabao Sun
Hi Charlotta, The latest news about connector releases is here[1]. You can subscribe to the mailing list or follow the jira issue to get the latest updates. Best, Jiabao [1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2 [2] https://flink.apache.org/what-is-flink/community/

RE: 如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 Thread Jiabao Sun
Hi, 可以参考下这篇文档[1],进行简单的测试。 Best, Jiabao [1] https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i On 2024/01/02 08:02:10 "casel.chen" wrote: > 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 >

Re: Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 Thread Jiabao Sun
Hi, GuavaFlinkConnectorRateLimiter 目前只在 flink-connector-gcp-pubsub[1] 有使用。 Flink CDC 还未支持限流[2],目前可以尝试降低 snapshot 并发数来缓解数据库压力。 Best, Jiabao [1]

RE: FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 Thread Jiabao Sun
Hi, 使用 SQL 的话不太好实现写入多个路径, 使用 DataStream 的话可以考虑自己实现一个 RichSinkFunction。 Best, Jiabao On 2023/12/29 08:37:34 jinzhuguang wrote: > Flink版本:1.16.0 > > 看官网上的案例: > CREATE TABLE MyUserTable ( > column_name1 INT, > column_name2 STRING, > ... > part_name1 INT, > part_name2 STRING > )

RE: Flink SQL Windowing TVFs

2023-12-28 Thread Jiabao Sun
Hi, 在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。 部署到生产的 SQL 是否包含了 GROUP BY 表达式? 本地测试的Flink版本是不是1.14.0? Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote: > Hi,

Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 Thread Jiabao Sun
> > > > > 在 2023-12-28 01:16:40,"Jiabao Sun" 写道: >> Hi, >> >>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); >>> 还要发送一次SchemaChangeEvent呢? >> >> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整

Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 Thread Jiabao Sun
Hi, > 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); > 还要发送一次SchemaChangeEvent呢? Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 writer,参考 DorisEventSerializer > 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release > upstream的呢? 被 block

RE: lock up表过滤条件下推导致的bug

2023-12-25 Thread Jiabao Sun
Hi, 邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。 Best, Jiabao On 2023/12/25 12:22:41 杨光跃 wrote: > 我的sql如下: > 、 > > > t_purch_apply_sent_route 是通过flink cdc创建的 > t_purch_apply_sent_route_goods 是普通的jdbc > 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据 > 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推 >

RE: Re:Flink脏数据处理

2023-12-21 Thread Jiabao Sun
Hi, 需要精准控制异常数据的话,就不太推荐flink sql了。 考虑使用DataStream将异常数据用侧流输出[1],再做补偿。 Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/ On 2023/12/06 08:45:20 Xuyang wrote: > Hi, > 目前flink sql主动收集脏数据的行为。有下面两种可行的办法: > 1.

RE: Re:Re:flink sql支持批量lookup join

2023-12-21 Thread Jiabao Sun
Hi, casel. 使用三次lookup join是可以实现的,加上缓存,性能应该不差。 WITH users AS ( SELECT * FROM (VALUES(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu')) T (id, name) ) SELECT orders.id, u1.name as creator_name, u2.name as approver_name, u3.name as deployer_name FROM ( SELECT *

RE: Pending records

2023-12-21 Thread Jiabao Sun
Hi rania, Does "pending records" specifically refer to the records that have been read from the source but have not been processed yet? If this is the case, FLIP-33[1] introduces some standard metrics for Source, including "pendingRecords," which can be helpful. However, not all Sources

RE: Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-20 Thread Jiabao Sun
Hi Chosen, Whether kafka appender is supported or not has no relation to the flink-kubernetes-operator. It only depends on whether log4j2 supports kafka appender. From the error message, it appears that the error is caused by the absence of the log4j-layout-template-json[1] plugin. For the

RE: Feature flag functionality on flink

2023-12-18 Thread Jiabao Sun
Hi, If it is for simplicity, you can also try writing the flag into an external system, such as Redis、Zookeeper or MySQL, and query the flag from the external system when perform data processing. However, Broadcast State is still the mode that I recommend. Perhaps we only need to encapsulate

RE: flink1.15-flink1.18官方提供写入Elasticsearch的接口报序列化异常

2023-12-18 Thread Jiabao Sun
Hi, createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。 Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。 Best, Jiabao On 2023/12/12 07:53:53 李世钰 wrote: > val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String] > // This instructs the sink to emit after every element,

RE: Control who can manage Flink jobs

2023-12-17 Thread Jiabao Sun
Hi, I don't have much experience with Beam. If you only need to submit Flink tasks, I would recommend StreamPark[1]. Best, Jiabao [1] https://streampark.apache.org/docs/user-guide/Team On 2023/11/30 09:21:50 Поротиков Станислав Вячеславович via user wrote: > Hello! > Is there any way to

RE: Socket timeout when report metrics to pushgateway

2023-12-17 Thread Jiabao Sun
Hi, The pushgateway uses push mode to report metrics. When deployed on a single machine under high load, there may be some performance issues. A simple solution is to set up multiple pushgateways and push the metrics to different pushgateways based on different task groups. There are other

Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread Jiabao Sun
Hi, 生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。 另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。 Best, Jiabao -- From:jinzhuguang Send Time:2023年9月20日(星期三) 20:56 To:user-zh Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决

RE: 咨询求助: Least函数输入正常 但是返回值异常

2023-08-20 Thread Jiabao Sun
Hi, 方便提供一下复现的用例吗? Best, Jiabao On 2023/08/21 02:19:53 guifeng huang wrote: > (Flink1.15版本) > 咨询求助: Least函数输入参数(Double类型)正常, 在Flink shell里测试函数无问题, 结果符合预期. > 但是实际生产流里进行使用的时候发现返回结果有异, 以下是3种case > - 返回结果正确, 符合预期 > - 返回0, 不符合预期, 未知原因 > - 返回结果和理论正确值有微小的gap, 找了几个case都是1位数值里的差距. > 看看有没有其他的老师遇到过同样的问题

Re: Flink消费MySQL

2023-08-07 Thread Jiabao Sun
Hi, 可以尝试使用 flink-cdc-connectors 去实时关联。 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。 被关联的表变化不大的话可以考虑 lookup join。 Best, Jiabao > 2023年8月8日 上午11:10,小昌同学 写道: > > 谢谢老师指导呀; > 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 > 老师这一块有更好的建议嘛 > > > | | > 小昌同学

RE: Questions about java enum when convert DataStream to Table

2023-08-02 Thread Jiabao Sun
Hi haishui, The enum type cannot be mapped as flink table type directly. I think the easiest way is to convert enum to string type first: DataStreamSource> source = env.fromElements( new Tuple2<>("1", TestEnum.A.name()), new Tuple2<>("2", TestEnum.B.name()) ); Or add a map

Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-08-02 Thread Jiabao Sun
找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办 > >> 2023年7月27日 11:05,Jiabao Sun 写道: >> >> 你好, >> >> 通常在 pom 中引入 maven-deploy-plugin,并且通过 声明私服地址,使用 mvn >> clean deploy 命令部署到nexus私服。 >> 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNA

Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-07-26 Thread Jiabao Sun
你好, 通常在 pom 中引入 maven-deploy-plugin,并且通过 声明私服地址,使用 mvn clean deploy 命令部署到nexus私服。 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin 统一设置。 org.apache.maven.plugins maven-deploy-plugin 2.8.2

RE: Suggestions for Open Source FLINK SQL editor

2023-07-26 Thread Jiabao Sun
Hi Rajat, I think Apache StreamPark(Incubating) or Apache Zeppelin is a good choice. https://streampark.apache.org/ https://zeppelin.apache.org/ Best, Jiabao On 2023/07/19 16:47:43 Rajat Ahuja wrote: > Hi team, > > I have set

RE: flink如何正确使用mybatis

2023-07-26 Thread Jiabao Sun
SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。 On 2023/07/18 02:13:16 lxk wrote: > 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下 > > public class MybatisUtil { > > private static final Logger LOGGER = > LogFactory.createNewLogger("MybatisUtil"); > private static ThreadLocal tl = new

RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian, I think we can use podTemplate to mount kubernetes secrets as file or environment variables. Then we can access the secrets in our flink program. Please refers to https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml