flink????
flink??1.14.5 ?? 2022-07-22 10:07:51 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ??
关于Flink对中标/银河麒麟和统信的适配
尊敬的Flink贡献者您好: 由于政策问题,很多国企和央企都将要采用银河麒麟或者中标麒麟或者统信,请问Flink/PyFlink在以上三个系统中有进行适配应用吗?有无问题?以及和在Centos部署的有何不同呢? 期望得到回复,万分感谢!
Re: Using RocksDBStateBackend and SSD to store states, application runs slower..
Hi, I’d say it seems you are trying to identify bottlenecks in your job, and are currently looking at RocksDB Disk I/O as one of the bottlenecks. However, there are also other bottlenecks (e.g. CPU/memory/network/sink throttling), and from what you described, it’s possible that the HDFS sink is the bottleneck. Are you using Flink >= 1.13? If so you can use Flamegraphs on the Flink dashboard to debug what the busy operator is doing. Regards, Hong From: Jing Ge Date: Thursday, 21 July 2022 at 21:14 To: Yaroslav Tkachenko Cc: vtygoss , "user@flink.apache.org" Subject: RE: [EXTERNAL]Using RocksDBStateBackend and SSD to store states, application runs slower.. CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi, using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This optimization can improve the source throughput and reduce the delayed wrate rate. If this optimization didn't fix the back pressure, could you share more information about your job? Could you check the metric of the back pressured operator, e.g. check if it is caused by write-heavy or read-heavy tasks? You could try tuning rocksdb.writebuffer for write-heavy tasks. On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko mailto:yaros...@goldsky.io>> wrote: Hi! I'd try re-running the SSD test with the following config options: state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED On Thu, Jul 21, 2022 at 4:11 AM vtygoss mailto:vtyg...@126.com>> wrote: Hi, community! I am doing some performance tests based on my scene. 1. Environment - Flink: 1.13.5 - StateBackend: RocksDB, incremental - user case: complex sql contains 7 joins and 2 aggregation, input data 30,000,000 records and output 60,000,000 records about 80GB. - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots per TM - only difference: different config 'state.backend.rocksdb.localdir', one SATA disk or one SSD disk. 2. rand write performance difference between SATA and SSD 4.8M/s is archived using SATA, while 48.2M/s using SSD. ``` fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 -bs=8k -numjobs=64 -filename=/mnt/disk11/xx ``` 3. In my use case, Flink SQL application finished in 41minutes using SATA, while 45minutes using SSD. Does this comparision suggest that the way to improve RocksDB performance by using SSD is not effective? The direct downstream of the BackPressure operator is HdfsSink, does that mean the best target to improve application performance is HDFS? Thanks for your any replies or suggestions. Best Regards!
Re: Decompressing RMQ streaming messages
Hi Venkat, there's nothing that I know of, but I've written a zlib decompressor for our payloads which was pretty straightforward. public class ZlibDeserializationSchema extends AbstractDeserializationSchema { @Override public byte[] deserialize(byte[] message) throws IOException { Inflater decompressor = new Inflater(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); decompressor.setInput(message); byte[] buffer = new byte[1024]; int len=0; do { try { len = decompressor.inflate(buffer); } catch (DataFormatException e) { e.printStackTrace(); } bos.write(buffer, 0, len); } while (len > 0); decompressor.end(); bos.close(); return bos.toByteArray(); } } hope that helps. On Thu, 21 Jul 2022 at 21:13, Ramana wrote: > Hi - We have a requirement to read the compressed messages emitting out of > RabbitMQ and to have them processed using PyFlink. However, I am not > finding any out of the box functionality in PyFlink which can help > decompress the messages. > > Could anybody help me with an example of how to go about this? > > Appreciate any help here. > > Thanks > > ~Venkat > > -- This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia
Re: Using RocksDBStateBackend and SSD to store states, application runs slower..
Hi, using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This optimization can improve the source throughput and reduce the delayed wrate rate. If this optimization didn't fix the back pressure, could you share more information about your job? Could you check the metric of the back pressured operator, e.g. check if it is caused by write-heavy or read-heavy tasks? You could try tuning rocksdb.writebuffer for write-heavy tasks. On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko wrote: > Hi! > > I'd try re-running the SSD test with the following config options: > > state.backend.rocksdb.thread.num: 4 > state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED > > > On Thu, Jul 21, 2022 at 4:11 AM vtygoss wrote: > >> Hi, community! >> >> >> I am doing some performance tests based on my scene. >> >> >> 1. Environment >> >> - Flink: 1.13.5 >> >> - StateBackend: RocksDB, incremental >> >> - user case: complex sql contains 7 joins and 2 aggregation, input data >> 30,000,000 records and output 60,000,000 records about 80GB. >> >> - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 >> slots per TM >> >> - only difference: different config 'state.backend.rocksdb.localdir', one >> SATA disk or one SSD disk. >> >> >> 2. rand write performance difference between SATA and SSD >> >>4.8M/s is archived using SATA, while 48.2M/s using SSD. >> >>``` >> >>fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync >> -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G >> --allow_mounted_write=1 -bs=8k -numjobs=64 -filename=/mnt/disk11/xx >> >>``` >> >> >> 3. In my use case, Flink SQL application finished in 41minutes using >> SATA, while 45minutes using SSD. >> >> >> Does this comparision suggest that the way to improve RocksDB performance >> by using SSD is not effective? >> >> The direct downstream of the BackPressure operator is HdfsSink, does that >> mean the best target to improve application performance is HDFS? >> >> >> Thanks for your any replies or suggestions. >> >> >> Best Regards! >> >> >> >> >> >> >> >> >>
Re: Flink nested complex json parsing with multiple schema file
Hi Yaroslav, Thanks for your reply. How is performance , Can I have a sample code for the same. On Thu, Jul 21, 2022, 9:31 PM Yaroslav Tkachenko wrote: > Hi Soumen, > > I'd try parsing the input using the DataStream API (with a fast JSON > library) and then converting it to a Table. > > On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury > wrote: > >> We have a requirement of parsing a very complex json (size around 25 kb >> per event) event with a predefined schema (nested schema, with multiple >> schema files ) and create a temporary table and from temp table we have to >> apply some case statement based some fields( eg. to find out success, >> failure count , status code ) and do a aggregation in 1 sec interval. >> >> We have tried with inbuilt *JSON_VALUE* function to retrieve some field >> value and then apply the case statement, but as I am using JSON_VALUE more >> than 5/6 times, the application is performing very slow. >> >> For some other filtering use case we are able to receive more that 1600 >> event/sec, but for this case we are only receiving around 300 event/sec for >> 1 core . >> >> Below is the query example: >> >> *Query 1:* >> >> "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) >> AS result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) >> AS errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS >> subCause, JSON_QUERY(message, >> '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol, >> JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS >> subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message, >> '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime, >> proctime() as proctime from kafkaJsonSource", >> >> *Query 2:* >> >> select count(case when result1=1 then 1 else null end) >> failed_result,count(case when result1=0 then 1 else null end) >> successful_result,count(case when errorCode like '4%' then 1 else null end) >> err_starts_4,count(case when errorCode like '5%' then 1 else null end) >> err_starts_5,count(case when errorCode like '6%' then 1 else null end) >> err_starts_6,count(case when subCause is not null then 1 else null end) >> has_sub_cause,count(case when subProtocol='DNS' then 1 else null end) >> protocol_dns, count(case when subProtocol='Diameter' then 1 else null end) >> protocol_diameter, count(case when (subProtocol='Diameter' and subError >> like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case >> when (subProtocol='Diameter' and subError like '4%') then 1 else null end) >> protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and >> subError like '5%') then 1 else null end) protocol_diameter_err_starts_5 >> FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL >> '1' SECOND)) GROUP BY window_start, window_end; >> >> Please someone let use know, if there is some better way to do this . >> >> -- >> Regards >> Soumen Choudhury >> Cell : +91865316168 >> mail to : sou@gmail.com >> >
Re: Flink nested complex json parsing with multiple schema file
Hi Soumen, I'd try parsing the input using the DataStream API (with a fast JSON library) and then converting it to a Table. On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury wrote: > We have a requirement of parsing a very complex json (size around 25 kb > per event) event with a predefined schema (nested schema, with multiple > schema files ) and create a temporary table and from temp table we have to > apply some case statement based some fields( eg. to find out success, > failure count , status code ) and do a aggregation in 1 sec interval. > > We have tried with inbuilt *JSON_VALUE* function to retrieve some field > value and then apply the case statement, but as I am using JSON_VALUE more > than 5/6 times, the application is performing very slow. > > For some other filtering use case we are able to receive more that 1600 > event/sec, but for this case we are only receiving around 300 event/sec for > 1 core . > > Below is the query example: > > *Query 1:* > > "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS > result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS > errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS > subCause, JSON_QUERY(message, > '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol, > JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS > subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message, > '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime, > proctime() as proctime from kafkaJsonSource", > > *Query 2:* > > select count(case when result1=1 then 1 else null end) > failed_result,count(case when result1=0 then 1 else null end) > successful_result,count(case when errorCode like '4%' then 1 else null end) > err_starts_4,count(case when errorCode like '5%' then 1 else null end) > err_starts_5,count(case when errorCode like '6%' then 1 else null end) > err_starts_6,count(case when subCause is not null then 1 else null end) > has_sub_cause,count(case when subProtocol='DNS' then 1 else null end) > protocol_dns, count(case when subProtocol='Diameter' then 1 else null end) > protocol_diameter, count(case when (subProtocol='Diameter' and subError > like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case > when (subProtocol='Diameter' and subError like '4%') then 1 else null end) > protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and > subError like '5%') then 1 else null end) protocol_diameter_err_starts_5 > FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL > '1' SECOND)) GROUP BY window_start, window_end; > > Please someone let use know, if there is some better way to do this . > > -- > Regards > Soumen Choudhury > Cell : +91865316168 > mail to : sou@gmail.com >
Re: Using RocksDBStateBackend and SSD to store states, application runs slower..
Hi! I'd try re-running the SSD test with the following config options: state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED On Thu, Jul 21, 2022 at 4:11 AM vtygoss wrote: > Hi, community! > > > I am doing some performance tests based on my scene. > > > 1. Environment > > - Flink: 1.13.5 > > - StateBackend: RocksDB, incremental > > - user case: complex sql contains 7 joins and 2 aggregation, input data > 30,000,000 records and output 60,000,000 records about 80GB. > > - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 > slots per TM > > - only difference: different config 'state.backend.rocksdb.localdir', one > SATA disk or one SSD disk. > > > 2. rand write performance difference between SATA and SSD > >4.8M/s is archived using SATA, while 48.2M/s using SSD. > >``` > >fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync > -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G > --allow_mounted_write=1 -bs=8k -numjobs=64 -filename=/mnt/disk11/xx > >``` > > > 3. In my use case, Flink SQL application finished in 41minutes using SATA, > while 45minutes using SSD. > > > Does this comparision suggest that the way to improve RocksDB performance > by using SSD is not effective? > > The direct downstream of the BackPressure operator is HdfsSink, does that > mean the best target to improve application performance is HDFS? > > > Thanks for your any replies or suggestions. > > > Best Regards! > > > > > > > > >
Flink nested complex json parsing with multiple schema file
We have a requirement of parsing a very complex json (size around 25 kb per event) event with a predefined schema (nested schema, with multiple schema files ) and create a temporary table and from temp table we have to apply some case statement based some fields( eg. to find out success, failure count , status code ) and do a aggregation in 1 sec interval. We have tried with inbuilt *JSON_VALUE* function to retrieve some field value and then apply the case statement, but as I am using JSON_VALUE more than 5/6 times, the application is performing very slow. For some other filtering use case we are able to receive more that 1600 event/sec, but for this case we are only receiving around 300 event/sec for 1 core . Below is the query example: *Query 1:* "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS subCause, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message, '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime, proctime() as proctime from kafkaJsonSource", *Query 2:* select count(case when result1=1 then 1 else null end) failed_result,count(case when result1=0 then 1 else null end) successful_result,count(case when errorCode like '4%' then 1 else null end) err_starts_4,count(case when errorCode like '5%' then 1 else null end) err_starts_5,count(case when errorCode like '6%' then 1 else null end) err_starts_6,count(case when subCause is not null then 1 else null end) has_sub_cause,count(case when subProtocol='DNS' then 1 else null end) protocol_dns, count(case when subProtocol='Diameter' then 1 else null end) protocol_diameter, count(case when (subProtocol='Diameter' and subError like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case when (subProtocol='Diameter' and subError like '4%') then 1 else null end) protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and subError like '5%') then 1 else null end) protocol_diameter_err_starts_5 FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL '1' SECOND)) GROUP BY window_start, window_end; Please someone let use know, if there is some better way to do this . -- Regards Soumen Choudhury Cell : +91865316168 mail to : sou@gmail.com
回复:为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。
其实没有这些命令很正常 因为这些辅助命令并不妨碍它运行 docker镜像自由度较高 你可以自己基于flink打镜像 自己安装你想要的命令 best, tanjialiang. 回复的原邮件 | 发件人 | yidan zhao | | 日期 | 2022年07月21日 17:59 | | 收件人 | user-zh | | 抄送至 | | | 主题 | 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。 | ~
Re: 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。
估计是想轻量化flink 镜象吧。 Best regards, Yuxia - 原始邮件 - 发件人: "yidan zhao" 收件人: "user-zh" 发送时间: 星期四, 2022年 7 月 21日 下午 5:59:30 主题: 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。 ~
Re: Making Kafka source respect offset changed externally
This is somewhat implied in https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing. /> Note that Kafka source does //*NOT*//rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring./ / / and https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version/ / /> Set |setStartFromGroupOffsets(true)| on the consumer so that we get read offsets from Kafka. This will only take effect when there is no read offset in Flink state, which is why the next step is very important./ / / / / Dynamic partition discovery shouldn't have an effect because you are not creating partitions/topics. // On 21/07/2022 12:14, Alexis Sarda-Espinosa wrote: I would suggest updating the documentation to include that statement. I imagine dynamic partition discovery has no effect on this? Regards, Alexis. Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler : Flink only reads the offsets from Kafka when the job is initially started from a clear slate. Once checkpoints are involved it only relies on offsets stored in the state. On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote: Hello again, I just performed a test using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I did a few tests in the following order, and I noticed a few weird things. Note that our job uses Processing Time windows, so watermarks are irrelevant. 1. After the job had been running for a while, we manually moved the consumer group's offset to 12 hours in the past [1] (without restarting the job). - After this, the consumer simply stopped reading messages - the consumer lag in Kafka stayed at around 150k (no new data arrived) 2. We restarted the job with a checkpoint. - The consumer lag in Kafka dropped down to 0, but no data was emitted from the windows. 3. We stopped the job, moved the offset again, and restarted Without any checkpoint/savepoint. - This time the consumer correctly processed the backlog and emitted events from the windows. This was done with Flink 1.15.0. Is this expected? In other words, if there's a mismatch between Flink's state's offset and Kafka's offset, will the job be unable to run? [1] The command to move the offset was: kafka-consumer-groups.sh \ --bootstrap-server ... \ --topic our-topic \ --group our-group \ --command-config kafka-preprod.properties \ --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \ --execute Regards, Alexis. Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa : Hi Yaroslav, The test I did was just using earliest, I'll test with committed offset again, thanks. Regards, Alexis. On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, wrote: Hi Alexis, Do you use OffsetsInitializer.committedOffsets() to specify your Kafka consumer offsets? In this case, it should get the offsets from Kafka and not the state. On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa wrote: Hello, Regarding the new Kafka source (configure with a consumer group), I found out that if I manually change the group's offset with Kafka's admin API independently of Flink (while the job is running), the Flink source will ignore that and reset it to whatever it stored internally. Is there any way to prevent this? Regards, Alexis.
Decompressing RMQ streaming messages
Hi - We have a requirement to read the compressed messages emitting out of RabbitMQ and to have them processed using PyFlink. However, I am not finding any out of the box functionality in PyFlink which can help decompress the messages. Could anybody help me with an example of how to go about this? Appreciate any help here. Thanks ~Venkat
Using RocksDBStateBackend and SSD to store states, application runs slower..
Hi, community! I am doing some performance tests based on my scene. 1. Environment - Flink: 1.13.5 - StateBackend: RocksDB, incremental - user case: complex sql contains 7 joins and 2 aggregation, input data 30,000,000 records and output 60,000,000 records about 80GB. - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots per TM - only difference: different config 'state.backend.rocksdb.localdir', one SATA disk or one SSD disk. 2. rand write performance difference between SATA and SSD 4.8M/s is archived using SATA, while 48.2M/s using SSD. ``` fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 -bs=8k -numjobs=64 -filename=/mnt/disk11/xx ``` 3. In my use case, Flink SQL application finished in 41minutes using SATA, while 45minutes using SSD. Does this comparision suggest that the way to improve RocksDB performance by using SSD is not effective? The direct downstream of the BackPressure operator is HdfsSink, does that mean the best target to improve application performance is HDFS? Thanks for your any replies or suggestions. Best Regards!
Re: Making Kafka source respect offset changed externally
I would suggest updating the documentation to include that statement. I imagine dynamic partition discovery has no effect on this? Regards, Alexis. Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler < ches...@apache.org>: > Flink only reads the offsets from Kafka when the job is initially started > from a clear slate. > Once checkpoints are involved it only relies on offsets stored in the > state. > > On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote: > > Hello again, > > I just performed a test > using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I > did a few tests in the following order, and I noticed a few weird things. > Note that our job uses Processing Time windows, so watermarks are > irrelevant. > > 1. After the job had been running for a while, we manually moved the > consumer group's offset to 12 hours in the past [1] (without restarting the > job). > - After this, the consumer simply stopped reading messages - the > consumer lag in Kafka stayed at around 150k (no new data arrived) > > 2. We restarted the job with a checkpoint. > - The consumer lag in Kafka dropped down to 0, but no data was > emitted from the windows. > > 3. We stopped the job, moved the offset again, and restarted Without any > checkpoint/savepoint. > - This time the consumer correctly processed the backlog and emitted > events from the windows. > > This was done with Flink 1.15.0. > > Is this expected? In other words, if there's a mismatch between Flink's > state's offset and Kafka's offset, will the job be unable to run? > > > > [1] The command to move the offset was: > > kafka-consumer-groups.sh \ > --bootstrap-server ... \ > --topic our-topic \ > --group our-group \ > --command-config kafka-preprod.properties \ > --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \ > --execute > > Regards, > Alexis. > > Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa < > sarda.espin...@gmail.com>: > >> Hi Yaroslav, >> >> The test I did was just using earliest, I'll test with committed offset >> again, thanks. >> >> Regards, >> Alexis. >> >> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, >> wrote: >> >>> Hi Alexis, >>> >>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka >>> consumer offsets? In this case, it should get the offsets from Kafka and >>> not the state. >>> >>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa < >>> sarda.espin...@gmail.com> wrote: >>> Hello, Regarding the new Kafka source (configure with a consumer group), I found out that if I manually change the group's offset with Kafka's admin API independently of Flink (while the job is running), the Flink source will ignore that and reset it to whatever it stored internally. Is there any way to prevent this? Regards, Alexis. >
为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。
~
Re: Job id in logs
No, that is not possible. There are too man shared components (many of which not being aware of jobs at all) for this to be feasible. On 21/07/2022 10:49, Lior Liviev wrote: Hello, is there a way to add job Id to logs to distinguish between different jobs?
Job id in logs
Hello, is there a way to add job Id to logs to distinguish between different jobs?
Re: Making Kafka source respect offset changed externally
Flink only reads the offsets from Kafka when the job is initially started from a clear slate. Once checkpoints are involved it only relies on offsets stored in the state. On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote: Hello again, I just performed a test using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I did a few tests in the following order, and I noticed a few weird things. Note that our job uses Processing Time windows, so watermarks are irrelevant. 1. After the job had been running for a while, we manually moved the consumer group's offset to 12 hours in the past [1] (without restarting the job). - After this, the consumer simply stopped reading messages - the consumer lag in Kafka stayed at around 150k (no new data arrived) 2. We restarted the job with a checkpoint. - The consumer lag in Kafka dropped down to 0, but no data was emitted from the windows. 3. We stopped the job, moved the offset again, and restarted Without any checkpoint/savepoint. - This time the consumer correctly processed the backlog and emitted events from the windows. This was done with Flink 1.15.0. Is this expected? In other words, if there's a mismatch between Flink's state's offset and Kafka's offset, will the job be unable to run? [1] The command to move the offset was: kafka-consumer-groups.sh \ --bootstrap-server ... \ --topic our-topic \ --group our-group \ --command-config kafka-preprod.properties \ --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \ --execute Regards, Alexis. Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa : Hi Yaroslav, The test I did was just using earliest, I'll test with committed offset again, thanks. Regards, Alexis. On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, wrote: Hi Alexis, Do you use OffsetsInitializer.committedOffsets() to specify your Kafka consumer offsets? In this case, it should get the offsets from Kafka and not the state. On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa wrote: Hello, Regarding the new Kafka source (configure with a consumer group), I found out that if I manually change the group's offset with Kafka's admin API independently of Flink (while the job is running), the Flink source will ignore that and reset it to whatever it stored internally. Is there any way to prevent this? Regards, Alexis.
Re: 请教下flink源码分支和tag的命名
Hi, 1.15.1 应该是对应 tag release-1.15.1 yidan zhao 于2022年7月21日周四 12:53写道: > 我目前看了下,有一定规律但也还是不完全懂。 > 比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢? > 哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。 >