Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 文章 jindy_liu
恩,这里有个问题就是,假设我们以离线结果为基准去对比,但离线结果一般天级或小时级,但实时部分可能是秒级的,两个结果在连线环境做比较,也不好去看这个结果有差异的时候,到底实时计算部分有没有问题!

有很多种原因可能会导致这个结果不准确。。。比如flink sql的bug或都流式消息丢失了等等!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-07 文章 jindy_liu
有没有大佬有思路可以参考下?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql开了TwoStageOptimizedAggregateRule优化,容易jvm heap outofmemory?

2021-01-06 文章 jindy_liu
如上,同一个作业,数据也是相同的,配置差异就是有两阶段聚合,线上作业运行一断时间后
1、开两阶段聚合, 运行一段时间就会core,并且从checkpoint恢复时,必core,作业重启不了。每次都显示jvm heap不足。
2、关闭两阶段聚合,其它内存配置不变,作业运行没问题。

查看线上的core的时候的dump文件,发现一处疑似泄漏的地方。
请问下local agg操作是不是会用到java heap做聚合操作?聚合的数据量没控制好,容易引发内存问题?


 






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink cdc connector:数据量较大时,snapshot阶段报错

2021-01-06 文章 jindy_liu
snapshot阶段如果后端处理的慢,就容易反压,反压会造成debezium执行select * from xxx的时候会花较长时间。

这个报错一般是mysql本身的原因。出现通信错误的原因挺复杂的,需要单独看。我的原因比较坑,定位也花了些时间!!!公司DBA会进行sql语句执行时长监控,并kill掉相应的sql,从而造成上述通信异常问题,

还有一些原因比如空闲时间太长了,mysql server也会断开连接。常见的这些是改mysql的配置,社区的jark
wu有一些分享配置,mysql-cdc-connector的github上也有分享。比如wait_timeout之类的。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc流(1.11.2),双流join长时间运行后,试着取消job后,再从从checkpoint处恢复运行,报outofmemorry, tm的参数配置也没有变过,rocksdb后端。 这个可能原因是啥?运行的时候没报,为啥恢复的时间报了?

2020-12-23 文章 jindy_liu


java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:261)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:235)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:227)
at java.util.ArrayList.add(ArrayList.java:458)
at
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:212)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:199)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:120)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:142)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:105)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$345/1080469422.accept(Unknown
Source)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$199/62611056.runDefaultAction(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql数据处理时延的测试方法?

2020-12-15 文章 jindy_liu
请问下,在flink sql里,一般用啥方法去衡量一个任务里,一条mysql
cdc流从输入flink,走完flink内部算子,到sink外部系统的整体时延? 或者说整个任务的时延?
总说是实时,目前也不知道处理的实时的量级!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 jindy_liu
感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write
buffer是taskmanager所有的slots的。从web
ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。

昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。

taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#7G per slot writer + read
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 20
state.backend.rocksdb.writebuffer.size: 256M   #5G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 9

查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web
ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败)

@yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升?
(
但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!!
因为有些AGG算子,我开了代码调整了些,开了minibatch
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "10 s");
configuration.setString("table.exec.mini-batch.size", "1");
)

 
 
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 jindy_liu
补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话,

jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看

top类的方法基本都在rocksdb的io上了。并且很多线程都在等待
 

 





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 jindy_liu
flink
sql主要涉及到9张mysql表(snapshot+cdc),任务的解析后的算子较多,大概60~70个,但主要是join,和4~5个GroupAggregate算子,最后sink,sink不是瓶颈已经排除。

恩,已经调过几版参数了我的机型的配置是一样的,12核+24G内存 + ssd 50G,共6台(任务并行度设置为60,除去了flink mysql
cdc流的并行度为1,其它算子并行度都为60)
taskmgr的flink-conf主要参数为,其它为默认:
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#13G
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 48
state.backend.rocksdb.writebuffer.size: 256M   #12G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 9

这一板算是比较好的一版,但依然感觉比较慢。观察性能,6台的性能差异不大,都差不多,也没看到明显的数据倾斜。
起始阶段大部分数据都是写状态,启动阶段来看,是速率输入最快的时刻,后面越来越慢,慢慢的source算子就出现反压了。

观察一小时的数据如下,1小时后,每张表平均能跑个500w左右的数据,从观察来看,cpu在等待时间较大,但load又比较重,10核也就能跑个5核左右。硬盘的io也没到ssd的瓶颈,状态大小跑1个小时后,每台机器ssd硬盘上(state.backend.rocksdb.localdir)的状态大小也是2GB左右。
 
 
 
 

这一版本参数里,主要是把state.backend.rocksdb.writebuffer.count调大了,在任务启动的时候,数据基本是都是写,比如把state.backend.rocksdb.writebuffer.count设为10的话,速度就更慢了,io
util会到80%左右。
这个参数感觉不是很好调。

尝试用内存跑,5几分钟左右就能把每张表能跑个500w左右的数据跑完,基本没io,cpu都是满状态跑,并且算子是没有反压的,当然任务基本马上也就被oom-kill了,内存不够。

不知道这里有啥优化方法么?@yuntang






--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-08 文章 jindy_liu
场景上:
   
目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
   
目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。

 所以产生以下想法,不知道可不可行?
   
1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2020-11-30 文章 jindy_liu
flink 版本: 1.11.2

*
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]*


完整信息:

2020-11-13 11:07:04
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309)
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:407)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:138)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1581)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1663)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:590)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:333)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:327)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-27 文章 jindy_liu
谢谢jark!这几天一直在做性能调优!
1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join
key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink
sql cdc方面有啥建议吗?

2、关于性能这块,确实flink的rocksdb默认参数,性能很差!
按你给的文章,调了些参数,同时换了ssd硬盘后,write_buffer,buffter_size,能有很好的提升。我说之前怎么并行度提高了,cpu感觉总是跑不满,在等io了。感觉这里提升空间还有很大,还没摸到窍门,哪个参数会比较好。

3、另外,性能监控方面,flink的web
ui上的metric有点难用,有没有一些prometheus+grafana的最佳实践的?指标有点多,dashboard搞起来很麻烦,
主要是要有dashboard的配置!




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 jindy_liu
很感谢jark!
1、昨天将status表设置成时态表(Temporal
Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。

2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。

3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?

这个数据反压上,jark你有啥建议吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink cdc 多表关联处理延迟很大

2020-11-18 文章 jindy_liu
借宝地,我们的场景很像,cdc流与cdc的join,打成宽表。

我也遇到过这种问题,cdc流与cdc的join,
当数据量大的时候,容易出问题(我的checkpoint设置的时间是2小时,超时时间只设置成了10分钟,失败次数设置得超大,超时时长太长,设置成2小时,基本上数据都流不动)

1、snapshot 的时候,老是会有i/o问题。flink侧的日志就是这样的。
./flink--taskexecutor-0-flink-taskmanager-v1-11-2-fcf8f675f-gn8q8.log.5:146619:2020-11-14
00:19:53,578 ERROR io.debezium.connector.mysql.SnapshotReader  
[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `low_db`.`t_low_media`': Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@3d208504 is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.

MySQL侧的show processlist 就看不到'SELECT * FROM `low_db`.`t_low_media`'这个命令了。

猜测是因为反压严重,然后链路上没有数据传输,空闲太久,挂了

2、join后的view做sink的时候,由于有join在,在某些情况下,结果输出存在乱序情况(Retract流输出可能放到不同的subtask上),导致sink结果不对。除非并行度设置成1,不然大概率乱序!






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-11-16 文章 jindy_liu
我也遇到这种乱序问题,楼主,你那边在sql层面解决了么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-16 文章 jindy_liu
1、试了下

在test表中增加一个proctime 

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`proctime` AS PROCTIME(),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'no_lock',
  'password' = 'no_lock',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test',
  'debezium.snapshot.locking.mode' = 'none'
);

写去重语句,

INSERT into test_status_print 
SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as 
rowNum
FROM (
SELECT t.* , s.name as status_name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id
)
)r WHERE rowNum = 1;

但提示报错,不支持:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Deduplicate doesn't support
consuming update and delete changes which is produced by node
Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink sql 无法用!=

2020-11-15 文章 jindy_liu
用<>



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
哦,这样啊
1、加上一个 deduplicate by sink key 节点在sql中是怎么写的?
2、另外sql 中有关键字能单独指定一条sink sql的并发度吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片png格式,怕看不了,我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!

同时修下笔误:===>
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu1
2, jindy2, 2020-07-03T18:04:22 , 2, statu2




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
怕图片看不清,
我文字补充下:
1、print的最后几行。

32> +I(191,jindy191,2020-07-03T18:04:22,0,statu0)
32> +I(192,jindy192,2020-07-03T18:04:22,0,statu0)
32> +I(193,jindy193,2020-07-03T18:04:22,0,statu0)
32> +I(194,jindy194,2020-07-03T18:04:22,0,statu0)
32> +I(195,jindy195,2020-07-03T18:04:22,0,statu0)
32> +I(196,jindy196,2020-07-03T18:04:22,0,statu0)
32> +I(197,jindy197,2020-07-03T18:04:22,0,statu0)
32> +I(198,jindy198,2020-07-03T18:04:22,0,statu0)
32> +I(199,jindy199,2020-07-03T18:04:22,0,statu0)
36> +I(0,jindy0,2020-07-06T20:01:15,3,statu3)
32> -D(0,jindy0,2020-07-06T20:01:15,0,statu0)
30> -D(1,jindy1,2020-11-12T00:00:02,1,statu1)
36> +I(1,jindy1,2020-11-12T00:00:02,3,statu3)
36> +I(2,jindy2,2020-07-03T18:04:22,3,statu3)
30> -D(2,jindy2,2020-07-03T18:04:22,2,statu2)

2、同时可以看出,大部分数据都在join的32这个subtask上做了处理。200w行处理都在一个subtask做了!





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
图片是屏幕截图,png格式的。忘记加后缀了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-15 文章 jindy_liu
我又重试了次,不用重启job也会有问题,就是把并行度大于1会有问题!。

1、直接在sql-client里,启动/data/home/jindyliu/flink-demo/flink-1.11.2/bin/sql-client.sh
embedded -d 
/data/home/jindyliu/flink-demo/flink-1.11.2//conf/sql-client-defaults.yaml
sql-client-defaults.yaml的并行度设置为40.

数据一样,其中test表规模是200w条,status表11条。

源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

//输出
CREATE TABLE test_status_print (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print'
);

//联接
INSERT into test_status_print 
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

复现操作,在mysql-cdc snapshot结束后,改test 表中的status字段,会出现顺序问题。我用print打印了。
snapshot后 test_status中的数据正常:
0, jindy0, 2020-07-06T20:01:15 , 0, statu0
1, jindy2, 2020-11-12T00:00:02 , 1, statu2
2, jindy2, 2020-07-03T18:04:22 , 2, statu3

snapshot后,将mysql表中记录id=0,1,2的行中的status值改为3,预期结果
0, jindy0, 2020-07-06T20:01:15 , 3, statu3
1, jindy2, 2020-11-12T00:00:02 , 3, statu3
2, jindy2, 2020-07-03T18:04:22 , 3, statu3
但输出顺序上有问题,会导致test_status表中的id=0,2两条记录丢失。

1、print输出:
 


ps:
另外观察到另外一个问题是:source数据送到join算子里,好像没啥hash能力,基本都挤在了一个结点上处理了?为啥会这样?感觉这样join算子会是瓶颈!!!很容易反压?!
 

@jark,帮忙看看,我的版本是Version: 1.11.2 Commit: fe36135 @
2020-09-09T16:19:03+02:00,官网下载的 ?







--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-12 文章 jindy_liu
源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

输出表
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '10',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);


输出语句:
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据
test: 
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.

status
0, status0
1, status1
2, status2
.

操作顺序与复现:
1、启动任务,设置并行度为40,
表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
savepoint保存,然后web ui上取消任务。
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1
2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
job  下,
  ==> test_status中的数据正常:
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1_modify
2, name2, 2020-07-06 00:00:00 , 1, status1_modify
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
job  下
  ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
0, name0, 2020-07-06 00:00:00 , 0, status0


怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
如果是,能不能在sink的时候,只把sink这里的并行度设置为1??







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-05 文章 jindy_liu
好的,谢谢jark!
数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换!
看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 文章 jindy_liu
目前有两个DataStream的流,通过mapfunction,
转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
*(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*

*目前我的做法会报错:*

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
fsSettings);

DataStreamSource json1 // canal json的格式
DataStreamSource json2  // canal json的格式
ConnectedStreams connect=
caliber_cdc_json.connect(caliber_snapshot_json); //connect
DataStream snapshot_cdc_stream = connect.flatMap(
new SnapshotCdcCoRichFlatMapFunction()
); //做连接

//3, 注册表,将表数据,直接输出
Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" +
"`id` INT,\n" +
"`name` VARCHAR(255),\n" +
"`time` TIMESTAMP(3),\n" +
"PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
"  'connector' = 'print'\n" +
")";

 //4, app logic
String op = "INSERT into test_mirror SELECT * from test";
fsTableEnv.executeSql(output);
fsTableEnv.executeSql(op);


*但提交任务失败,错误信息:*
serializationSchema:root
 |-- id: INT NOT NULL
 |-- name: VARCHAR(255)
 |-- time: TIMESTAMP(3)
 |-- status: INT
 |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0
++
| table name |
++
| UnnamedTable$0 |
|   test |
|test_mirror |
++
3 rows in set


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: A raw type backed by type information has no serializable
string representation. It needs to be resolved into a proper raw type.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
Source)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
*Caused by: org.apache.flink.table.api.TableException: A raw type backed by
type information has no serializable string representation. It needs to be
resolved into a proper raw type.*
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 

flink sql client:cdc 至elasticsearch 有问题(版本1.11.0),提交任务成功,但web dashbord上却看不到任务!求看是不是bug?还是我配置有问题?

2020-08-11 文章 jindy_liu
建表如下:

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
 'connector'='kafka',
 'topic'='test',
 'properties.group.id'='c_mysql_binlog_es',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true'
);


# 输出表至es
CREATE TABLE test_mirror_es (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'test_mirror'
);

INSERT into test_mirror_es SELECT * from test where test.id >=0;

日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
create a source for reading table
'default_catalog.default_database.test_mirror_es'.

完整日志如下:


2020-08-12 13:07:20,815 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 10
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: parallelism.default, 5
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: state.savepoints.dir,
hdfs://localhost:9000/flink-1.11.0/flink-savepoints
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-08-12 13:07:21,198 INFO 
org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2020-08-12 13:07:22,099 INFO 
org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
config: {taskmanager.memory.process.size=1728m,
jobmanager.execution.failover-strategy=region,
jobmanager.rpc.address=localhost, execution.target=remote,
jobmanager.memory.process.size=1600m,
state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
parallelism.default=5, taskmanager.numberOfTaskSlots=10,
pipeline.classpaths=[]}
2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient   
  
[] - Command history file path: /root/.flink-sql-history
2020-08-12 13:07:46,637 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:17,512 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration  
  
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient   
  
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
statement.
at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
at 

flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-20 文章 jindy_liu
例如:

mysql表:
CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `time` datetime NOT NULL,
  `status` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `status` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

kafka中数据:
// 表test 中insert事件
{"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}

//表status 中的事件
{"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}

如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-15 文章 jindy_liu
确实是这行导致的,
如果都重构了,那应该怎么用较好的?
我需要知道每一行对应的是insert, update还是delete事件。
或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-14 文章 jindy_liu
代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?

麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?


at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)



代码=>

public class PostgresSinkMapFunction extends RichFlatMapFunction {
private static String driverClass = "org.postgresql.Driver";
private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
private static String userNmae = "postgres";
private static String passWord = "1";

// 表status
private static JdbcRowDataOutputFormat outputFormatStatus;
private static String[] fieldNames = new String[] {"id", "name"};
private static DataType[] fieldDataTypes = new DataType[]{
DataTypes.INT(),
DataTypes.STRING()};

private static RowType rowType = RowType.of(
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
fieldNames);
private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);

@Override
public void flatMap(String s, Collector collector) throws
Exception {
GenericRowData row = new GenericRowData(2);

 row.setRowKind(INSERT);
 row.setField(0, count);
 row.setField(1, "jindy" + Integer.toString(count));

outputFormatStatus.writeRecord(row);

}

public void open(Configuration parameters) throws Exception {
super.open(parameters);

JdbcOptions jdbcOptions = JdbcOptions.builder()
.setDriverName(driverClass)
.setDBUrl(dbUrl)
.setTableName("status_mirror")
.setUsername(userNmae)
.setPassword(passWord)
.build();

JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(fieldNames)
.build();

outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
.setJdbcOptions(jdbcOptions)
.setFieldDataTypes(fieldDataTypes)
.setJdbcDmlOptions(dmlOptions)
   
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
.build();

// set context,这里有问题!!
outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
outputFormatStatus.open(0, 1);
}

public void close() throws Exception {
super.close();
outputFormatStatus.close();
}
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch
size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。

我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。
等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素;
这样kafka的batch sinka节奏应该就不用管了,两者的batch条件相互独立。
我自己初步看了下,应该可以?
初学者,望大佬提点,还有其它的注意事项要注意不?






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu


如果可以chain在一起,这个可以保证顺序性,我去试试。

这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1;

这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka
sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink?

谢谢~




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Re: Flink 多Sink 数据一致性保证

2020-07-12 文章 jindy_liu
原理大概理解了,想自己实现一个。比如kafka与mysql的实现,并想最大程度的复用些代码。
看了下源码,感觉要把现在的connector(kafka,
jdbc)中的代码都看一下,然后扣出来,再去按twophasecommitsinkfunction的实现,重组一些代码,一个个方法实现。
另外问一下,好像现在源码里的jdbc只是at-least-once实现?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-10 文章 jindy_liu
请问下,有没有大佬做过类似的事情?

另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: Flink 多Sink 数据一致性保证

2020-07-08 文章 jindy_liu
请问下,你这个最后是怎么做到的,能share下源码吗?
是需要将两个sink合并到一个sink里,然后再实现下二阶段提交吗?
我也遇到个多sink的原子性场景。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 文章 jindy_liu
如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的?

这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 文章 jindy_liu
恩,主要是看flink 的发布里说flink
支持cdc了,感觉这个功能好像是我要的,感觉好像我要做的事情能用flink都搞定。就不用多个开源平台切换与维护多个组件了。

我原本还想先基于flink sql 将数据存量数据先全量导一次异构存储(如hbase, pgsql等)(批量),然后再flink cdc
把mysql的bin-log变化数据搬运到异构存储(如hbase,
pgsql等)后(增量),同时再镜像一份cdc后的kafka里的json数据到下游(变化通知)。

那么下游再基于镜像的kafka里的数据(变化)+异构的镜像数据,再基于flink去做一些实时计算的场景需求(比如最近一个月内的前多少名的数据等),不用都挤在mysql的从库在做一些分析了,并且有些分析也不适合在mysql上搞,一些olap类的。

但实际demo了吧,光一个数据的实时搬运里,要解决的问题还挺多的,光flink好像不太行(可能是我不太熟悉,我接触flink时间较短)
问题:
1、存量+实时数据怎么结合起来,目前语义上只能做到“至少一次”,先存量搬运,再binlog实时迁移,但难以定位存量搬运完后对应的kafka的起始消费位置。(但业务场景如果只需要“至少一次”,还是可以用的,业务大部分是只需“至少一次”)

2、db里多表有序:这里有kafka性能问题和有序保证问题;目前业务场景db表变化不太快,一天1百w行数据的变更,可以搞定,同时也可以按需的N张表有序,不用整个db实例里的全部表。但这个有序感觉用flink
sql cdc还不太好搞多表。如果直接写程序去消费

3、多sink怎么保证数据一致性:具体来说,在增量同步的时候,flink需要先sink 异构存储(先),后要sink
kafka(后),怎么保证两个sink的先后次序与原子性?

现请问下,flink 的sink能定义先后吗?
如上面的,将kafka里的canal-json数据取出后,能先写pgsql成功,再把json数据原封不动写kafka吗?如果目前不支持,可否自己改造下支持?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。

求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink
1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink
cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql binlog按db实例,多表有序到kafka
单topic,单分区,感觉不知道要怎么样定义这个ddl,
同时怎么保证按序同步。(比如表与表之前的数据存在逻辑上的外键约束等等,具体来说test表的status字端就是个外键,如果关联记录都有更新,那更新顺序就比较重要了,要严格按binlog顺序来)。今天看了下,源码里canal-json的解析,好像只解析到了json里的feild
和 operate 类型。感觉这个多表有序的场景应该也是比较多的需求的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;   
若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?  
例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL, 
`name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT NULL, 
PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
'connector'='kafka', 'topic'='test',
'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');



--
Sent from: http://apache-flink.147419.n8.nabble.com/