flink????

2022-07-21 Thread ??????

flink??1.14.5
??
2022-07-22 10:07:51
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

??

关于Flink对中标/银河麒麟和统信的适配

2022-07-21 Thread 张 兴博
尊敬的Flink贡献者您好:
   
由于政策问题,很多国企和央企都将要采用银河麒麟或者中标麒麟或者统信,请问Flink/PyFlink在以上三个系统中有进行适配应用吗?有无问题?以及和在Centos部署的有何不同呢?

   期望得到回复,万分感谢!


Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Teoh, Hong
Hi,

I’d say it seems you are trying to identify bottlenecks in your job, and are 
currently looking at RocksDB Disk I/O as one of the bottlenecks. However, there 
are also other bottlenecks (e.g. CPU/memory/network/sink throttling), and from 
what you described, it’s possible that the HDFS sink is the bottleneck. Are you 
using Flink >= 1.13? If so you can use Flamegraphs on the Flink dashboard to 
debug what the busy operator is doing.

Regards,
Hong



From: Jing Ge 
Date: Thursday, 21 July 2022 at 21:14
To: Yaroslav Tkachenko 
Cc: vtygoss , "user@flink.apache.org" 
Subject: RE: [EXTERNAL]Using RocksDBStateBackend and SSD to store states, 
application runs slower..


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This 
optimization can improve the source throughput and reduce the delayed wrate 
rate.

If this optimization didn't fix the back pressure, could you share more 
information about your job? Could you check the metric of the back pressured 
operator, e.g. check if it is caused by write-heavy or read-heavy tasks? You 
could try tuning rocksdb.writebuffer for write-heavy tasks.

On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko 
mailto:yaros...@goldsky.io>> wrote:
Hi!

I'd try re-running the SSD test with the following config options:

state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED


On Thu, Jul 21, 2022 at 4:11 AM vtygoss 
mailto:vtyg...@126.com>> wrote:

Hi, community!



I am doing some performance tests based on my scene.



1. Environment

- Flink: 1.13.5

- StateBackend: RocksDB, incremental

- user case: complex sql contains 7 joins and 2 aggregation, input data 
30,000,000 records and output 60,000,000 records about 80GB.

- resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots 
per TM

- only difference: different config 'state.backend.rocksdb.localdir', one SATA 
disk or one SSD disk.



2. rand write performance difference between SATA and SSD

   4.8M/s is archived using SATA, while 48.2M/s using SSD.

   ```

   fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync  -fsync=1 
-runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 
-bs=8k  -numjobs=64 -filename=/mnt/disk11/xx

   ```



3. In my use case, Flink SQL application finished in 41minutes using SATA, 
while 45minutes using SSD.



Does this comparision suggest that the way to improve RocksDB performance by 
using SSD is not effective?

The direct downstream of the BackPressure operator is HdfsSink, does that mean 
the best target to improve application performance is HDFS?



Thanks for your any replies or suggestions.



Best Regards!














Re: Decompressing RMQ streaming messages

2022-07-21 Thread Francis Conroy
Hi Venkat,

there's nothing that I know of, but I've written a zlib decompressor for
our payloads which was pretty straightforward.

public class ZlibDeserializationSchema extends
AbstractDeserializationSchema {
@Override
public byte[] deserialize(byte[] message) throws IOException {
Inflater decompressor = new Inflater();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
decompressor.setInput(message);
byte[] buffer = new byte[1024];
int len=0;
do {
try {
len = decompressor.inflate(buffer);
} catch (DataFormatException e) {
e.printStackTrace();
}
bos.write(buffer, 0, len);
} while (len > 0);
decompressor.end();
bos.close();
return bos.toByteArray();
}
}

hope that helps.

On Thu, 21 Jul 2022 at 21:13, Ramana  wrote:

> Hi - We have a requirement to read the compressed messages emitting out of
> RabbitMQ and to have them processed using PyFlink. However, I am not
> finding any out of the box functionality in PyFlink which can help
> decompress the messages.
>
> Could anybody help me with an example of how to go about this?
>
> Appreciate any help here.
>
> Thanks
>
> ~Venkat
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Jing Ge
Hi,

using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This
optimization can improve the source throughput and reduce the delayed wrate
rate.

If this optimization didn't fix the back pressure, could you share more
information about your job? Could you check the metric of the back
pressured operator, e.g. check if it is caused by write-heavy or read-heavy
tasks? You could try tuning rocksdb.writebuffer for write-heavy tasks.

On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko 
wrote:

> Hi!
>
> I'd try re-running the SSD test with the following config options:
>
> state.backend.rocksdb.thread.num: 4
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
>
>
> On Thu, Jul 21, 2022 at 4:11 AM vtygoss  wrote:
>
>> Hi, community!
>>
>>
>> I am doing some performance tests based on my scene.
>>
>>
>> 1. Environment
>>
>> - Flink: 1.13.5
>>
>> - StateBackend: RocksDB, incremental
>>
>> - user case: complex sql contains 7 joins and 2 aggregation, input data
>> 30,000,000 records and output 60,000,000 records about 80GB.
>>
>> - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3
>> slots per TM
>>
>> - only difference: different config 'state.backend.rocksdb.localdir', one
>> SATA disk or one SSD disk.
>>
>>
>> 2. rand write performance difference between SATA and SSD
>>
>>4.8M/s is archived using SATA, while 48.2M/s using SSD.
>>
>>```
>>
>>fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync
>>  -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G
>> --allow_mounted_write=1 -bs=8k  -numjobs=64 -filename=/mnt/disk11/xx
>>
>>```
>>
>>
>> 3. In my use case, Flink SQL application finished in 41minutes using
>> SATA, while 45minutes using SSD.
>>
>>
>> Does this comparision suggest that the way to improve RocksDB performance
>> by using SSD is not effective?
>>
>> The direct downstream of the BackPressure operator is HdfsSink, does that
>> mean the best target to improve application performance is HDFS?
>>
>>
>> Thanks for your any replies or suggestions.
>>
>>
>> Best Regards!
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Soumen Choudhury
Hi Yaroslav,

Thanks for your reply.
How is performance ,

Can I have a sample code for the same.

On Thu, Jul 21, 2022, 9:31 PM Yaroslav Tkachenko 
wrote:

> Hi Soumen,
>
> I'd try parsing the input using the DataStream API (with a fast JSON
> library) and then converting it to a Table.
>
> On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury 
> wrote:
>
>> We have a requirement of parsing a very complex json (size around 25 kb
>> per event) event with a predefined schema (nested schema, with multiple
>> schema files ) and create a temporary table and from temp table we have to
>> apply some case statement based some fields( eg. to find out success,
>> failure count , status code ) and do a aggregation in 1 sec interval.
>>
>> We have tried with inbuilt *JSON_VALUE* function to retrieve some field
>> value and then apply the case statement, but as I am using JSON_VALUE more
>> than 5/6 times, the application is performing very slow.
>>
>> For some other filtering use case we are able to receive more that 1600
>> event/sec, but for this case we are only receiving around 300 event/sec for
>> 1 core .
>>
>> Below is the query example:
>>
>> *Query 1:*
>>
>> "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint)
>> AS result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' )
>> AS errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
>> subCause, JSON_QUERY(message,
>> '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
>> JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
>> subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
>> '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
>> proctime() as proctime from kafkaJsonSource",
>>
>> *Query 2:*
>>
>> select count(case when result1=1 then 1 else null end)
>> failed_result,count(case when result1=0 then 1 else null end)
>> successful_result,count(case when errorCode like '4%' then 1 else null end)
>> err_starts_4,count(case when errorCode like '5%' then 1 else null end)
>> err_starts_5,count(case when errorCode like '6%' then 1 else null end)
>> err_starts_6,count(case when subCause is not null then 1 else null end)
>> has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
>> protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
>> protocol_diameter, count(case when (subProtocol='Diameter' and subError
>> like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
>> when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
>> protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
>> subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
>> FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
>> '1' SECOND)) GROUP BY window_start, window_end;
>>
>> Please someone let use know, if there is some better way to do this .
>>
>> --
>> Regards
>> Soumen Choudhury
>> Cell : +91865316168
>> mail to : sou@gmail.com
>>
>


Re: Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Yaroslav Tkachenko
Hi Soumen,

I'd try parsing the input using the DataStream API (with a fast JSON
library) and then converting it to a Table.

On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury  wrote:

> We have a requirement of parsing a very complex json (size around 25 kb
> per event) event with a predefined schema (nested schema, with multiple
> schema files ) and create a temporary table and from temp table we have to
> apply some case statement based some fields( eg. to find out success,
> failure count , status code ) and do a aggregation in 1 sec interval.
>
> We have tried with inbuilt *JSON_VALUE* function to retrieve some field
> value and then apply the case statement, but as I am using JSON_VALUE more
> than 5/6 times, the application is performing very slow.
>
> For some other filtering use case we are able to receive more that 1600
> event/sec, but for this case we are only receiving around 300 event/sec for
> 1 core .
>
> Below is the query example:
>
> *Query 1:*
>
> "select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS
> result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS
> errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
> subCause, JSON_QUERY(message,
> '$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
> JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
> subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
> '$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
> proctime() as proctime from kafkaJsonSource",
>
> *Query 2:*
>
> select count(case when result1=1 then 1 else null end)
> failed_result,count(case when result1=0 then 1 else null end)
> successful_result,count(case when errorCode like '4%' then 1 else null end)
> err_starts_4,count(case when errorCode like '5%' then 1 else null end)
> err_starts_5,count(case when errorCode like '6%' then 1 else null end)
> err_starts_6,count(case when subCause is not null then 1 else null end)
> has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
> protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
> protocol_diameter, count(case when (subProtocol='Diameter' and subError
> like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
> when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
> protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
> subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
> FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
> '1' SECOND)) GROUP BY window_start, window_end;
>
> Please someone let use know, if there is some better way to do this .
>
> --
> Regards
> Soumen Choudhury
> Cell : +91865316168
> mail to : sou@gmail.com
>


Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Yaroslav Tkachenko
Hi!

I'd try re-running the SSD test with the following config options:

state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED


On Thu, Jul 21, 2022 at 4:11 AM vtygoss  wrote:

> Hi, community!
>
>
> I am doing some performance tests based on my scene.
>
>
> 1. Environment
>
> - Flink: 1.13.5
>
> - StateBackend: RocksDB, incremental
>
> - user case: complex sql contains 7 joins and 2 aggregation, input data
> 30,000,000 records and output 60,000,000 records about 80GB.
>
> - resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3
> slots per TM
>
> - only difference: different config 'state.backend.rocksdb.localdir', one
> SATA disk or one SSD disk.
>
>
> 2. rand write performance difference between SATA and SSD
>
>4.8M/s is archived using SATA, while 48.2M/s using SSD.
>
>```
>
>fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync
>  -fsync=1 -runtime=300 -group_reporting -name=xxx -size=100G
> --allow_mounted_write=1 -bs=8k  -numjobs=64 -filename=/mnt/disk11/xx
>
>```
>
>
> 3. In my use case, Flink SQL application finished in 41minutes using SATA,
> while 45minutes using SSD.
>
>
> Does this comparision suggest that the way to improve RocksDB performance
> by using SSD is not effective?
>
> The direct downstream of the BackPressure operator is HdfsSink, does that
> mean the best target to improve application performance is HDFS?
>
>
> Thanks for your any replies or suggestions.
>
>
> Best Regards!
>
>
>
>
>
>
>
>
>


Flink nested complex json parsing with multiple schema file

2022-07-21 Thread Soumen Choudhury
We have a requirement of parsing a very complex json (size around 25 kb per
event) event with a predefined schema (nested schema, with multiple schema
files ) and create a temporary table and from temp table we have to apply
some case statement based some fields( eg. to find out success, failure
count , status code ) and do a aggregation in 1 sec interval.

We have tried with inbuilt *JSON_VALUE* function to retrieve some field
value and then apply the case statement, but as I am using JSON_VALUE more
than 5/6 times, the application is performing very slow.

For some other filtering use case we are able to receive more that 1600
event/sec, but for this case we are only receiving around 300 event/sec for
1 core .

Below is the query example:

*Query 1:*

"select cast(JSON_QUERY(message, '$.eventRecordHeader.Result'), bigint) AS
result1, JSON_QUERY(message, '$.eventRecordHeader.Cause.ErrorCode' ) AS
errorCode, JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause' ) AS
subCause, JSON_QUERY(message,
'$.eventRecordHeader.Cause.SubCause.SubProtocol' ) AS subProtocol,
JSON_QUERY(message, '$.eventRecordHeader.Cause.SubCause.SubError' ) AS
subError, TO_TIMESTAMP_LTZ(cast(JSON_QUERY(message,
'$.eventRecordHeader.StartTime') as bigint)/1000, 3) AS eventTime,
proctime() as proctime from kafkaJsonSource",

*Query 2:*

select count(case when result1=1 then 1 else null end)
failed_result,count(case when result1=0 then 1 else null end)
successful_result,count(case when errorCode like '4%' then 1 else null end)
err_starts_4,count(case when errorCode like '5%' then 1 else null end)
err_starts_5,count(case when errorCode like '6%' then 1 else null end)
err_starts_6,count(case when subCause is not null then 1 else null end)
has_sub_cause,count(case when subProtocol='DNS' then 1 else null end)
protocol_dns, count(case when subProtocol='Diameter' then 1 else null end)
protocol_diameter, count(case when (subProtocol='Diameter' and subError
like '3%') then 1 else null end) protocol_diameter_err_starts_3,count(case
when (subProtocol='Diameter' and subError like '4%') then 1 else null end)
protocol_diameter_err_starts_4,count(case when (subProtocol='Diameter' and
subError like '5%') then 1 else null end) protocol_diameter_err_starts_5
FROM TABLE(TUMBLE(TABLE filter_transformed, DESCRIPTOR(proctime), INTERVAL
'1' SECOND)) GROUP BY window_start, window_end;

Please someone let use know, if there is some better way to do this .

-- 
Regards
Soumen Choudhury
Cell : +91865316168
mail to : sou@gmail.com


回复:为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。

2022-07-21 Thread 谭家良
其实没有这些命令很正常 因为这些辅助命令并不妨碍它运行
docker镜像自由度较高 你可以自己基于flink打镜像 自己安装你想要的命令

best,
tanjialiang.




 回复的原邮件 
| 发件人 | yidan zhao |
| 日期 | 2022年07月21日 17:59 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。 |
~


Re: 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。

2022-07-21 Thread yuxia
估计是想轻量化flink 镜象吧。

Best regards,
Yuxia

- 原始邮件 -
发件人: "yidan zhao" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 7 月 21日 下午 5:59:30
主题: 为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。

~


Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
This is somewhat implied in 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing.


/> Note that Kafka source does //*NOT*//rely on committed offsets for 
fault tolerance. Committing offset is only for exposing the progress of 
consumer and consuming group for monitoring./

/
/
and 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version/

/

/> Set |setStartFromGroupOffsets(true)| on the consumer so that we get 
read offsets from Kafka. This will only take effect when there is no 
read offset in Flink state, which is why the next step is very important./

/
/
/
/
Dynamic partition discovery shouldn't have an effect because you are not 
creating partitions/topics.

//

On 21/07/2022 12:14, Alexis Sarda-Espinosa wrote:

I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler 
:


Flink only reads the offsets from Kafka when the job is initially
started from a clear slate.
Once checkpoints are involved it only relies on offsets stored in
the state.

On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:

Hello again,

I just performed a test
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST).
I did a few tests in the following order, and I noticed a few
weird things. Note that our job uses Processing Time windows, so
watermarks are irrelevant.

1. After the job had been running for a while, we manually moved
the consumer group's offset to 12 hours in the past [1] (without
restarting the job).
  - After this, the consumer simply stopped reading messages -
the consumer lag in Kafka stayed at around 150k (no new data arrived)

2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was
emitted from the windows.

3. We stopped the job, moved the offset again, and restarted
Without any checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and
emitted events from the windows.

This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between
Flink's state's offset and Kafka's offset, will the job be unable
to run?



[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa
:

Hi Yaroslav,

The test I did was just using earliest, I'll test with
committed offset again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,
 wrote:

Hi Alexis,

Do you use OffsetsInitializer.committedOffsets() to
specify your Kafka consumer offsets? In this case, it
should get the offsets from Kafka and not the state.

On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa
 wrote:

Hello,

Regarding the new Kafka source (configure with a
consumer group), I found out that if I manually
change the group's offset with Kafka's admin API
independently of Flink (while the job is running),
the Flink source will ignore that and reset it to
whatever it stored internally. Is there any way to
prevent this?

Regards,
Alexis.





Decompressing RMQ streaming messages

2022-07-21 Thread Ramana
Hi - We have a requirement to read the compressed messages emitting out of
RabbitMQ and to have them processed using PyFlink. However, I am not
finding any out of the box functionality in PyFlink which can help
decompress the messages.

Could anybody help me with an example of how to go about this?

Appreciate any help here.

Thanks

~Venkat


Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread vtygoss
Hi, community!


I am doing some performance tests based on my scene. 


1. Environment
- Flink: 1.13.5
- StateBackend: RocksDB, incremental
- user case: complex sql contains 7 joins and 2 aggregation, input data 
30,000,000 records and output 60,000,000 records about 80GB. 
- resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots 
per TM
- only difference: different config 'state.backend.rocksdb.localdir', one SATA 
disk or one SSD disk.


2. rand write performance difference between SATA and SSD
   4.8M/s is archived using SATA, while 48.2M/s using SSD.
   ```
   fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync  -fsync=1 
-runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 
-bs=8k  -numjobs=64 -filename=/mnt/disk11/xx
   ``` 


3. In my use case, Flink SQL application finished in 41minutes using SATA, 
while 45minutes using SSD. 


Does this comparision suggest that the way to improve RocksDB performance by 
using SSD is not effective? 
The direct downstream of the BackPressure operator is HdfsSink, does that mean 
the best target to improve application performance is HDFS?


Thanks for your any replies or suggestions. 


Best Regards!

Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Alexis Sarda-Espinosa
I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> Flink only reads the offsets from Kafka when the job is initially started
> from a clear slate.
> Once checkpoints are involved it only relies on offsets stored in the
> state.
>
> On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:
>
> Hello again,
>
> I just performed a test
> using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
> did a few tests in the following order, and I noticed a few weird things.
> Note that our job uses Processing Time windows, so watermarks are
> irrelevant.
>
> 1. After the job had been running for a while, we manually moved the
> consumer group's offset to 12 hours in the past [1] (without restarting the
> job).
>   - After this, the consumer simply stopped reading messages - the
> consumer lag in Kafka stayed at around 150k (no new data arrived)
>
> 2. We restarted the job with a checkpoint.
>   - The consumer lag in Kafka dropped down to 0, but no data was
> emitted from the windows.
>
> 3. We stopped the job, moved the offset again, and restarted Without any
> checkpoint/savepoint.
>   - This time the consumer correctly processed the backlog and emitted
> events from the windows.
>
> This was done with Flink 1.15.0.
>
> Is this expected? In other words, if there's a mismatch between Flink's
> state's offset and Kafka's offset, will the job be unable to run?
>
>
>
> [1] The command to move the offset was:
>
> kafka-consumer-groups.sh \
>   --bootstrap-server ... \
>   --topic our-topic \
>   --group our-group \
>   --command-config kafka-preprod.properties \
>   --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
>   --execute
>
> Regards,
> Alexis.
>
> Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hi Yaroslav,
>>
>> The test I did was just using earliest, I'll test with committed offset
>> again, thanks.
>>
>> Regards,
>> Alexis.
>>
>> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, 
>> wrote:
>>
>>> Hi Alexis,
>>>
>>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>>> consumer offsets? In this case, it should get the offsets from Kafka and
>>> not the state.
>>>
>>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
 Hello,

 Regarding the new Kafka source (configure with a consumer group), I
 found out that if I manually change the group's offset with Kafka's admin
 API independently of Flink (while the job is running), the Flink source
 will ignore that and reset it to whatever it stored internally. Is there
 any way to prevent this?

 Regards,
 Alexis.


>


为啥官方flink镜像连 ps、top、jps、ip、route 这些命令都没有。。。

2022-07-21 Thread yidan zhao
~


Re: Job id in logs

2022-07-21 Thread Chesnay Schepler
No, that is not possible. There are too man shared components (many of 
which not being aware of jobs at all) for this to be feasible.


On 21/07/2022 10:49, Lior Liviev wrote:
Hello, is there a way to add job Id to logs to distinguish between 
different jobs?




Job id in logs

2022-07-21 Thread Lior Liviev
Hello, is there a way to add job Id to logs to distinguish between different 
jobs?


Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Chesnay Schepler
Flink only reads the offsets from Kafka when the job is initially 
started from a clear slate.

Once checkpoints are involved it only relies on offsets stored in the state.

On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:

Hello again,

I just performed a test 
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). 
I did a few tests in the following order, and I noticed a few weird 
things. Note that our job uses Processing Time windows, so watermarks 
are irrelevant.


1. After the job had been running for a while, we manually moved the 
consumer group's offset to 12 hours in the past [1] (without 
restarting the job).
  - After this, the consumer simply stopped reading messages - the 
consumer lag in Kafka stayed at around 150k (no new data arrived)


2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was 
emitted from the windows.


3. We stopped the job, moved the offset again, and restarted Without 
any checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and emitted 
events from the windows.


This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between 
Flink's state's offset and Kafka's offset, will the job be unable to run?




[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa 
:


Hi Yaroslav,

The test I did was just using earliest, I'll test with committed
offset again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,
 wrote:

Hi Alexis,

Do you use OffsetsInitializer.committedOffsets() to specify
your Kafka consumer offsets? In this case, it should get the
offsets from Kafka and not the state.

On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa
 wrote:

Hello,

Regarding the new Kafka source (configure with a consumer
group), I found out that if I manually change the group's
offset with Kafka's admin API independently of Flink
(while the job is running), the Flink source will ignore
that and reset it to whatever it stored internally. Is
there any way to prevent this?

Regards,
Alexis.



Re: 请教下flink源码分支和tag的命名

2022-07-21 Thread Lijie Wang
Hi,
1.15.1 应该是对应 tag release-1.15.1

yidan zhao  于2022年7月21日周四 12:53写道:

> 我目前看了下,有一定规律但也还是不完全懂。
> 比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
> 哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。
>