Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
"file:\\d:\\abc")); > > > 发件人: jinzhuguang > 发送时间: 2024-02-02 17:16 > 收件人: user-zh > 主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据 > 你是在batch模式下手动开启了checkpoint吗 > >> 2024年2月2日 17:11,ha.fen...@aisino.com 写道: >> >> 今天正好测试了这

Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
你是在batch模式下手动开启了checkpoint吗 > 2024年2月2日 17:11,ha.fen...@aisino.com 写道: > > 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。 > > 发件人: jinzhuguang > 发送时间: 2024-02-02 16:47 > 收件人: user-zh > 主题: Batch模式下,StatefulSinkWr

Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 文章 jinzhuguang
Flink 1.16.0 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。 interface StatefulSinkWriter extends SinkWriter { /** * @return The writer's state. * @throws IOException if fail to snapshot writer's state. */ List

FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 文章 jinzhuguang
Flink版本:1.16.0 看官网上的案例: CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, ... part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'file:///path/to/whatever', -- 必选:指定路径

Re: 关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 jinzhuguang
t; > > -- > >Best! >Xuyang > > > > > > 在 2023-11-24 15:15:04,"jinzhuguang" 写道: >> flink 1.18.0 >> >> >> 例如我写下一条SQL: >> select * from KafkaTable where id is not null; >>

关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-23 文章 jinzhuguang
flink 1.18.0 例如我写下一条SQL: select * from KafkaTable where id is not null; IS NOT NULL应该属于系统内建函数,于是我找到相关代码: public static final BuiltInFunctionDefinition IS_NOT_NULL = BuiltInFunctionDefinition.newBuilder() .name("isNotNull") .kind(SCALAR)

Re: flink sql如何实现json字符数据解析?

2023-11-22 文章 jinzhuguang
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。 比如: SourceT: ( uuid String, body_data ARRAY> ) SinkT ( result ARRAY> ) Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as body_data.fild1, body_data[1]. Field2

如何在Flink Connector Source退出时清理资源

2023-10-23 文章 jinzhuguang
版本:Flink 1.16.0 需求:在某个source结束退出时清理相关的资源。 问题:目前没有找到Source退出时相关的hook函数,不知道在哪里编写清理资源的代码。 恳请大佬们指教。

Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
感谢大佬!!! > 2023年10月13日 10:44,tanjialiang 写道: > > Hi, > 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 > > > best wishes, > tanjialiang. > > > 回复的原邮件 > | 发件人 | jinzhuguang | > | 发送日期 | 2023年10月13日 10:39 | > | 收件人 | u

关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
首先,我的Flink版本为1.16.0 为了方便理解,我以Kafka作为案例来描述: 我有以下两个表: CREATE TABLE orders( user_id BIGINT, name STRING, timestamp TIMESTAMP(3) METADATA VIRTUAL )WITH( 'connector'='kafka', 'topic'='orders', 'properties.group.id' = 'test_join_tempral', 'scan.startup.mode'='earliest-offset',

Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 文章 jinzhuguang
你好,除了这些运维手段外,flink cdc本身有什么解法吗,比如说增量阶段不用从头开始读binlog,因为其实很多都是重复读到的数据 > 2023年9月20日 21:00,Jiabao Sun 写道: > > Hi, > 生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。 > 另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。 > Best, > Jiabao >

Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 文章 jinzhuguang
以mysql cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?

Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 文章 jinzhuguang
以mysql cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?

Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-08-02 文章 jinzhuguang
> ${maven.deploy.skip} > > > > > > > >private-snapshots > > https://xxx.xxx.xxx/nexus/content/repositories/snapshots/ > > >private-releases >https://xxx.xxx.xxx

如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-07-26 文章 jinzhuguang
我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?

Re: ASF jira account

2023-07-20 文章 jinzhuguang
我也遇到类似的问题,我是链接失效了,最后没办法再注册了 > 2023年7月20日 14:54,李天龙 写道: > > 您好! > 我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理: > > > There is already a pending Jira account request associated with this email > address. Please wait for it to be processed > > > 请问怎么解决这个问题,并且成功申请一个账号 > > > >

Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-12 文章 jinzhuguang
嗨你好,用于sort的中间数据是存储在状态后端当中吗,数据量很大的情况下。 > 2023年7月12日 19:48,weijie guo 写道: > > 你好, > 首先,Batch Shuffle 的中间数据都是会落盘的。其次,对于 Sort 这个操作来说,上面给出的解法和Dataset一致,都不会落盘。 > > Best regards, > > Weijie > > > jinzhuguang 于2023年7月12日周三 17:28写道: > >> 如果我的数据量很大,内存装不下,fl

Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-12 文章 jinzhuguang
如果我的数据量很大,内存装不下,flink在batch mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。 > 2023年7月12日 17:05,weijie guo 写道: > > 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。 > 以mapPartition为例,可以通过以下三个步骤实现相同的功能: > 1.

如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-11 文章 jinzhuguang
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?

如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-11 文章 jinzhuguang
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?