Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 Thread jinzhuguang
我试了下,当我显示的设置env.setRuntimeMode(RuntimeExecutionMode.BATCH); 
就不会进行checkpoint了,否则是可以。

> 2024年2月2日 17:20,ha.fen...@aisino.com 写道:
> 
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>env.setStateBackend(new FsStateBackend("file:\\d:\\abc"));
> 
> 
> 发件人: jinzhuguang
> 发送时间: 2024-02-02 17:16
> 收件人: user-zh
> 主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
> 你是在batch模式下手动开启了checkpoint吗
> 
>> 2024年2月2日 17:11,ha.fen...@aisino.com 写道:
>> 
>> 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。
>> 
>> 发件人: jinzhuguang
>> 发送时间: 2024-02-02 16:47
>> 收件人: user-zh
>> 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
>> Flink 1.16.0
>> 
>> 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。
>> 
>> interface StatefulSinkWriter extends 
>> SinkWriter {
>>   /**
>>* @return The writer's state.
>>* @throws IOException if fail to snapshot writer's state.
>>*/
>>   List snapshotState(long checkpointId) throws IOException;
>>   }
>> 
>> 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。
>> 
>> 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?
>> 
>> 恳请各位大佬赐教
> 



Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 Thread jinzhuguang
你是在batch模式下手动开启了checkpoint吗

> 2024年2月2日 17:11,ha.fen...@aisino.com 写道:
> 
> 今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。
> 
> 发件人: jinzhuguang
> 发送时间: 2024-02-02 16:47
> 收件人: user-zh
> 主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
> Flink 1.16.0
> 
> 我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。
> 
> interface StatefulSinkWriter extends SinkWriter 
> {
>/**
> * @return The writer's state.
> * @throws IOException if fail to snapshot writer's state.
> */
>List snapshotState(long checkpointId) throws IOException;
>}
> 
> 然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。
> 
> 如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?
> 
> 恳请各位大佬赐教



Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据

2024-02-02 Thread jinzhuguang
Flink 1.16.0

我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。

interface StatefulSinkWriter extends SinkWriter {
/**
 * @return The writer's state.
 * @throws IOException if fail to snapshot writer's state.
 */
List snapshotState(long checkpointId) throws IOException;
}

然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。

如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?

恳请各位大佬赐教


FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 Thread jinzhuguang
Flink版本:1.16.0

看官网上的案例:
CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',   -- 必选:指定连接器类型
  'path' = 'file:///path/to/whatever',  -- 必选:指定路径
  'format' = '...', -- 必选:文件系统连接器指定 format
-- 有关更多详情,请参考 Table Formats
  'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串

  -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 
的文件数,但是可能会导致数据倾斜,默认值是 false
  'sink.shuffle-by-partition.enable' = '...',
  ...
)
目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。

Re: 关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 Thread jinzhuguang
感谢大佬,我找到了。
所以说SQL类的内建函数实际上使用的是calcite的能力,而flink自己的内建函数是在table api中使用

> 2023年11月24日 17:07,Xuyang  写道:
> 
> Hi, 
> 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
>  NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2023-11-24 15:15:04,"jinzhuguang"  写道:
>> flink 1.18.0
>> 
>> 
>> 例如我写下一条SQL:
>> select * from KafkaTable where id is not null;
>> 
>> IS NOT NULL应该属于系统内建函数,于是我找到相关代码:
>> 
>> public static final BuiltInFunctionDefinition IS_NOT_NULL =
>>   BuiltInFunctionDefinition.newBuilder()
>>   .name("isNotNull")
>>   .kind(SCALAR)
>>   
>> .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
>>   
>> .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
>>   .build();
>> 
>> 发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:
>> 
>> DEBUG org.apache.flink.table.module.ModuleManager  [] - 
>> Cannot find FunctionDefinition 'is not null' from any loaded modules.
>> 
>> 
>> 所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?
>> 
>> 以下是调用栈:
>> @org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
>>   at 
>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
>>   at 
>> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
>>   at 
>> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
>>   at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
>>   at 
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
>>   at 
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
>>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>>   at 
>> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
>>   at 
>> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>>   at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:750)



关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-23 Thread jinzhuguang
flink 1.18.0


例如我写下一条SQL:
 select * from KafkaTable where id is not null;

IS NOT NULL应该属于系统内建函数,于是我找到相关代码:

public static final BuiltInFunctionDefinition IS_NOT_NULL =
BuiltInFunctionDefinition.newBuilder()
.name("isNotNull")
.kind(SCALAR)

.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();

发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:

DEBUG org.apache.flink.table.module.ModuleManager  [] - Cannot 
find FunctionDefinition 'is not null' from any loaded modules.


所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?

以下是调用栈:
@org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
at 
org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
at 
org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)


Re: flink sql如何实现json字符数据解析?

2023-11-22 Thread jinzhuguang
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:

SourceT: (
uuid String,
   body_data ARRAY>
)

SinkT (
result ARRAY>
)

Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result

希望对你有帮助

> 2023年11月22日 20:54,casel.chen  写道:
> 
> 输入:
> 
> {
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
> }
> 
> 
> 
> 
> 输出:
> 
> [
> 
>  {
> 
> "uuid": "",
> 
> "body_data: null,
> 
> "body_data.fild1": "123”,
> 
> "body_data.fild2": "234"
> 
>  },
> 
>  {
> 
> "uuid": "",
> 
> "body_data": null,
> 
> "body_data.fild1": "abc",
> 
> "body_data.fild2": "cdf"
> 
>  }
> 
> ]
> 
> 
> 
> 
> 当格式错误时
> 
> 
> 
> 
> 输入:
> 
> {
> 
> "uuid": "”,
> 
> "body_data": "abc"
> 
> }
> 
> 输出:
> 
> {
> 
> "uuid": "",
> 
> "body_data": "abc",
> 
> "body_data.fild1": null,
> 
> "body_data.fild2": null
> 
> }



如何在Flink Connector Source退出时清理资源

2023-10-23 Thread jinzhuguang
版本:Flink 1.16.0

需求:在某个source结束退出时清理相关的资源。

问题:目前没有找到Source退出时相关的hook函数,不知道在哪里编写清理资源的代码。

恳请大佬们指教。

Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 Thread jinzhuguang
感谢大佬!!!

> 2023年10月13日 10:44,tanjialiang  写道:
> 
> Hi, 
> 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922
> 
> 
> best wishes,
> tanjialiang.
> 
> 
>  回复的原邮件 
> | 发件人 | jinzhuguang |
> | 发送日期 | 2023年10月13日 10:39 |
> | 收件人 | user-zh |
> | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 |
> 首先,我的Flink版本为1.16.0
> 为了方便理解,我以Kafka作为案例来描述:
> 我有以下两个表:
> CREATE TABLE orders(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA VIRTUAL
> )WITH(
> 'connector'='kafka',
> 'topic'='orders',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> CREATE TABLE kafka_sink(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
> )WITH(
> 'connector'='kafka',
> 'topic'='kafka_sink',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> 
> 正常情况:
> Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] SQL update statement has been successfully submitted to the cluster:
> Job ID: e419ae9d2cad4c3c2a2c1150c1a86653
> 
> 
> 异常情况:
> Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
> user_id,name,`timestamp` from orders;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
> 'timestamp'
> 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
> Flink SQL> describe kafka_sink;
> +---+--+--+-+---+---+
> |  name | type | null | key |extras | 
> watermark |
> +---+--+--+-+---+---+
> |   user_id |   BIGINT | TRUE | |   | 
>   |
> |  name |   STRING | TRUE | |   | 
>   |
> | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | 
>   |
> +---+--+--+-+---+---+
> 
> 
> 
> 恳请解答!



关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 Thread jinzhuguang
首先,我的Flink版本为1.16.0
为了方便理解,我以Kafka作为案例来描述:
我有以下两个表:
CREATE TABLE orders(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA VIRTUAL
)WITH(
'connector'='kafka',
'topic'='orders',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_sink(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
)WITH(
'connector'='kafka',
'topic'='kafka_sink',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);

正常情况: 
Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e419ae9d2cad4c3c2a2c1150c1a86653


异常情况:
Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
user_id,name,`timestamp` from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
'timestamp'
很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
Flink SQL> describe kafka_sink;
+---+--+--+-+---+---+
|  name | type | null | key |extras | watermark 
|
+---+--+--+-+---+---+
|   user_id |   BIGINT | TRUE | |   |   
|
|  name |   STRING | TRUE | |   |   
|
| timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' |   
|
+---+--+--+-+---+---+



恳请解答!

Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
你好,除了这些运维手段外,flink cdc本身有什么解法吗,比如说增量阶段不用从头开始读binlog,因为其实很多都是重复读到的数据

> 2023年9月20日 21:00,Jiabao Sun  写道:
> 
> Hi,
> 生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。
> 另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。
> Best,
> Jiabao
> --
> From:jinzhuguang 
> Send Time:2023年9月20日(星期三) 20:56
> To:user-zh 
> Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决
> 以mysql 
> cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?



Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?

Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?

Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-08-02 Thread jinzhuguang
非常感谢你的提醒,我现在用maven工具修改了所有的版本号为snapshot,但是flink-connectors(connectors的父模块)也变成snapshot,打包的时候仓库里找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办

> 2023年7月27日 11:05,Jiabao Sun  写道:
> 
> 你好,
> 
> 通常在 pom 中引入 maven-deploy-plugin,并且通过  声明私服地址,使用 mvn 
> clean deploy 命令部署到nexus私服。
> 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin 
> 统一设置。
> 
> 
>
>
>
>   org.apache.maven.plugins
>   maven-deploy-plugin
>   2.8.2
>   
> ${maven.deploy.skip}
>   
> 
>
>
> 
>
>
>private-snapshots
>
> https://xxx.xxx.xxx/nexus/content/repositories/snapshots/
>
>
>private-releases
>https://xxx.xxx.xxx/nexus/content/repositories/releases/
>
>
> 
> 
> 
>> 2023年7月27日 上午10:48,jinzhuguang  写道:
>> 
>> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?



如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-07-26 Thread jinzhuguang
我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?

Re: ASF jira account

2023-07-20 Thread jinzhuguang
我也遇到类似的问题,我是链接失效了,最后没办法再注册了

> 2023年7月20日 14:54,李天龙  写道:
> 
> 您好!
> 我想注册一个flink jira的账号,但由于提出的里有不充分给拒掉了,想再次申请,却提示邮箱已申请过,还未处理:
> 
> 
> There is already a pending Jira account request associated with this email 
> address. Please wait for it to be processed
> 
> 
> 请问怎么解决这个问题,并且成功申请一个账号
> 
> 
> 
> 
> 
> 
> --
> 发自我的网易邮箱平板适配版



Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-12 Thread jinzhuguang
嗨你好,用于sort的中间数据是存储在状态后端当中吗,数据量很大的情况下。

> 2023年7月12日 19:48,weijie guo  写道:
> 
> 你好,
> 首先,Batch Shuffle 的中间数据都是会落盘的。其次,对于 Sort 这个操作来说,上面给出的解法和Dataset一致,都不会落盘。
> 
> Best regards,
> 
> Weijie
> 
> 
> jinzhuguang  于2023年7月12日周三 17:28写道:
> 
>> 如果我的数据量很大,内存装不下,flink在batch
>> mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。
>> 
>>> 2023年7月12日 17:05,weijie guo  写道:
>>> 
>>> 
>> 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。
>>> 以mapPartition为例,可以通过以下三个步骤实现相同的功能:
>>> 1. dataStream.map(record -> (subtaskIndex,
>>> record)),为每个Record增加处理该record时子任务编号。
>>> 2. dataStream.assignTimestampsAndWatermarks,为每个Record增加相同的时间戳。
>>> 3.
>>> 
>> dataStream.window(TumblingEventTimeWindows.of(Time.seconds(1))).apply(mapPartition
>>> udf),基于固定时间窗口开窗,收集全量数据进行并使用和mapPartition相同的逻辑进行处理。
>>> 
>>> 以下链接中是相关的示例代码,其中的第一步和第二步,在DataSet API被移除之前后续会考虑为DataStream API提供相关的工具方法:
>>> 
>>> https://netcut.cn/p/dc693599e9031cd7
>> 
>> 



Re: 如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-12 Thread jinzhuguang
如果我的数据量很大,内存装不下,flink在batch mode下的行为是否会像传统的批处理系统,例如hive那样,会进行shuffe、中间数据落盘等操作。

> 2023年7月12日 17:05,weijie guo  写道:
> 
> 你好,对于DataSet中不按照key进行全量聚合/排序的API(例如,sortPartition/mapPartition),DataStream上目前没有直接提供相同的API,但可以通过组合DataStream上现有的API实现相同的功能。
> 以mapPartition为例,可以通过以下三个步骤实现相同的功能:
> 1. dataStream.map(record -> (subtaskIndex,
> record)),为每个Record增加处理该record时子任务编号。
> 2. dataStream.assignTimestampsAndWatermarks,为每个Record增加相同的时间戳。
> 3.
> dataStream.window(TumblingEventTimeWindows.of(Time.seconds(1))).apply(mapPartition
> udf),基于固定时间窗口开窗,收集全量数据进行并使用和mapPartition相同的逻辑进行处理。
> 
> 以下链接中是相关的示例代码,其中的第一步和第二步,在DataSet API被移除之前后续会考虑为DataStream API提供相关的工具方法:
> 
> https://netcut.cn/p/dc693599e9031cd7



如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-11 Thread jinzhuguang
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?

如果DataSet API 被彻底废掉了,那我如何用DataStream实现分区、排序这个需求?

2023-07-11 Thread jinzhuguang
目前,我在flink 1.16还没有发现DataStream有实现分区排序的接口,现在是否有方法在有界数据集上实现分区、排序这个需求呢?