[jira] [Closed] (FLINK-9597) Flink application does not scale as expected

2018-06-16 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy closed FLINK-9597.
--
Resolution: Later

> Flink application does not scale as expected
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.mycompany.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 32 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3
> Please refer TM.png and JM.png for further details.
> The test without any checkpoint enable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9597) Flink application does not scale as expected

2018-06-16 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514727#comment-16514727
 ] 

swy edited comment on FLINK-9597 at 6/16/18 9:27 AM:
-

Hi [~fhueske] Do you mind to share your experience any other reason the 
performance is capped?
CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network 
should be fine as well as total TX+RX peak is around 800Mbps while we have 
1000Mbps. Do you mind to share your thought? Or mind to test the attach 
application in your lab? We are going to process 10billions of record per day 
and it will be increased so scalability is very important to us. I will close 
the ticket but hopefully can have your valuable input on this. thank you.

* There is an interesting finding, the reason of low parallelism work much 
better is because all task being run in same TM, once we scale more, the task 
is distributed to different TM and the performance worse than the low 
parallelism case. Is this something expected? The more I scale the less I get?


was (Author: yow):
Hi [~fhueske] Do you mind to share your experience any other reason the 
performance is capped?
CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network 
should be fine as well as total TX+RX peak is around 800Mbps while we have 
1000Mbps. Do you mind to share your thought? Or mind to test the attach 
application in your lab? We are going to process 10billions of record per day 
and it will be increased so scalability is very important to us. I will close 
the ticket but hopefully can have your valuable input on this. thank you.

> Flink application does not scale as expected
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.mycompany.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 32 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> 

[jira] [Commented] (FLINK-9597) Flink application does not scale as expected

2018-06-16 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514727#comment-16514727
 ] 

swy commented on FLINK-9597:


Hi [~fhueske] Do you mind to share your experience any other reason the 
performance is capped?
CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network 
should be fine as well as total TX+RX peak is around 800Mbps while we have 
1000Mbps. Do you mind to share your thought? Or mind to test the attach 
application in your lab? We are going to process 10billions of record per day 
and it will be increased so scalability is very important to us. I will close 
the ticket but hopefully can have your valuable input on this. thank you.

> Flink application does not scale as expected
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.mycompany.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 32 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3
> Please refer TM.png and JM.png for further details.
> The test without any checkpoint enable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink application does not scale as expected

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.mycompany.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) 

We are running in VMWare, 5 Task Managers and each has 32 slots.

lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):32
On-line CPU(s) list:   0-31
Thread(s) per core:1
Core(s) per socket:1
Socket(s): 32
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 63
Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping:  2
CPU MHz:   2593.993
BogoMIPS:  5187.98
Hypervisor vendor: VMware
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm 
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat 
pln pts

  totalusedfree  shared  buff/cache   available
Mem: 98  24  72   0   1  72
Swap: 3   0   3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: sample.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 32 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3
> Please refer TM.png and JM.png for further details.
> The test without any checkpoint enable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) 

We are running in VMWare, 5 Task Managers and each has 32 slots.

lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):32
On-line CPU(s) list:   0-31
Thread(s) per core:1
Core(s) per socket:1
Socket(s): 32
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 63
Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping:  2
CPU MHz:   2593.993
BogoMIPS:  5187.98
Hypervisor vendor: VMware
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm 
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat 
pln pts

  totalusedfree  shared  buff/cache   available
Mem: 98  24  72   0   1  72
Swap: 3   0   3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* 

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) 

We are running in VMWare, 5 Task Managers and each has 32 slots.

lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):32
On-line CPU(s) list:   0-31
Thread(s) per core:1
Core(s) per socket:1
Socket(s): 32
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 63
Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping:  2
CPU MHz:   2593.993
BogoMIPS:  5187.98
Hypervisor vendor: VMware
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm 
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat 
pln pts

  totalusedfree  shared  buff/cache   available
Mem: 98  24  72   0   1  72
Swap: 3   0   3


Please refer TM.png and JM.png for further details.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer 

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) 

We are running in VMWare, 5 Task Managers and each has 16 slots.

lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):32
On-line CPU(s) list:   0-31
Thread(s) per core:1
Core(s) per socket:1
Socket(s): 32
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 63
Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping:  2
CPU MHz:   2593.993
BogoMIPS:  5187.98
Hypervisor vendor: VMware
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm 
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat 
pln pts

  totalusedfree  shared  buff/cache   available
Mem: 98  24  72   0   1  72
Swap: 3   0   3


Please refer TM.png and JM.png for further details.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer 

[jira] [Commented] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513963#comment-16513963
 ] 

swy commented on FLINK-9597:


Hi [~fhueske], information updated.

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 16 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3
> Please refer TM.png and JM.png for further details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: TM.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 16 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: JM.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: JM.png, TM.png, flink_app_parser_git.zip, 
> scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) 
> We are running in VMWare, 5 Task Managers and each has 16 slots.
> lscpu
> Architecture:  x86_64
> CPU op-mode(s):32-bit, 64-bit
> Byte Order:Little Endian
> CPU(s):32
> On-line CPU(s) list:   0-31
> Thread(s) per core:1
> Core(s) per socket:1
> Socket(s): 32
> NUMA node(s):  1
> Vendor ID: GenuineIntel
> CPU family:6
> Model: 63
> Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
> Stepping:  2
> CPU MHz:   2593.993
> BogoMIPS:  5187.98
> Hypervisor vendor: VMware
> Virtualization type:   full
> L1d cache: 32K
> L1i cache: 32K
> L2 cache:  256K
> L3 cache:  20480K
> NUMA node0 CPU(s): 0-31
> Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge 
> mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp 
> lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
> aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe 
> popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm 
> ida arat pln pts
>   totalusedfree  shared  buff/cache   
> available
> Mem: 98  24  72   0   1  
> 72
> Swap: 3   0   3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) 

We are running in VMWare, 5 Task Managers and each has 16 slots.

lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):32
On-line CPU(s) list:   0-31
Thread(s) per core:1
Core(s) per socket:1
Socket(s): 32
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 63
Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping:  2
CPU MHz:   2593.993
BogoMIPS:  5187.98
Hypervisor vendor: VMware
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm 
constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc 
aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat 
pln pts

  totalusedfree  shared  buff/cache   available
Mem: 98  24  72   0   1  72
Swap: 3   0   3




  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png!  
!TM.png! 


> 

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: (was: TM.png)

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png!  
> !TM.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: TM.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png!  
!TM.png! 

  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration)


> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into 

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: (was: JM.png)

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png!  
> !TM.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: JM.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
> URL=do36.comptel.com:8127"
> * aggrinterval: time in ms for timer to trigger
> * loop: how many row of data to feed
> * statsd: to send result to statsd
> * psrc: source parallelism
> * pJ2R: parallelism of map operator(JsonRecTranslator)
> * pAggr: parallelism of process+timer operator(AggregationDuration)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.

To run the program, sample parameters,

"aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 
URL=do36.comptel.com:8127"

* aggrinterval: time in ms for timer to trigger
* loop: how many row of data to feed
* statsd: to send result to statsd
* psrc: source parallelism
* pJ2R: parallelism of map operator(JsonRecTranslator)
* pAggr: parallelism of process+timer operator(AggregationDuration)

  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.



> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.
> To run the program, sample parameters,
> "aggrinterval=600 loop=750 statsd=1 psrc=4 

[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result which is capped at ~250k TPS. No 
matter how we tune the parallelism of the operators it just not scale, same to 
increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse 
than parallelism 32.

Sample source code attached(flink_app_parser_git.zip). It is a simple program, 
parsing json record into object, and pass it to a empty logic Flink's process 
function. Rocksdb is in used, and the source is generated by the program 
itself. This could be reproduce easily. 

We choose Flink because of it scalability, but this is not the case now, 
appreciated if anyone could help as this is impacting our projects! thank you.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance drop 
somemore!

Sample source code attached. It is very simple program, just parse json record 
into object, and pass to a almost empty logic flink process function. Rocksdb 
is in used, and the source is generated by the program itself. This could be 
reproduce easily! We choose Flink because of it scalability, but this is not 
the case now, appreciated if anyone could help as this is impacting our 
projects! thank you.



> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result which is capped at 
> ~250k TPS. No matter how we tune the parallelism of the operators it just not 
> scale, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> worse than parallelism 32.
> Sample source code attached(flink_app_parser_git.zip). It is a simple 
> program, parsing json record into object, and pass it to a empty logic 
> Flink's process function. Rocksdb is in used, and the source is generated by 
> the program itself. This could be reproduce easily. 
> We choose Flink because of it scalability, but this is not the case now, 
> appreciated if anyone could help as this is impacting our projects! thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance drop 
somemore!

Sample source code attached. It is very simple program, just parse json record 
into object, and pass to a almost empty logic flink process function. Rocksdb 
is in used, and the source is generated by the program itself. This could be 
reproduce easily! We choose Flink because of it scalability, but this is not 
the case now, appreciated if anyone could help as this is impacting our 
projects! thank you.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64

Sample source code attached. It is very simple program, just parse json record 
into object, and pass to a almost empty logic flink process function. Rocksdb 
is in used, and the source is generated by the program itself. This could be 
reproduce easily! We choose Flink because of it scalability, but this is not 
the case now, appreciated if anyone could help as this is impacting our 
projects! thank you.



> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result where is capped at 
> around 250k TPS. No matter how we tune the parallelism of the operators it 
> just not work, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64 performance 
> drop somemore!
> Sample source code attached. It is very simple program, just parse json 
> record into object, and pass to a almost empty logic flink process function. 
> Rocksdb is in used, and the source is generated by the program itself. This 
> could be reproduce easily! We choose Flink because of it scalability, but 
> this is not the case now, appreciated if anyone could help as this is 
> impacting our projects! thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64

Sample source code attached. It is very simple program, just parse json record 
into object, and pass to a almost empty logic flink process function. Rocksdb 
is in used, and the source is generated by the program itself. This could be 
reproduce easily! We choose Flink because of it scalability, but this is not 
the case now, appreciated if anyone could help as this is impacting our 
projects! thank you.


  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64




> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result where is capped at 
> around 250k TPS. No matter how we tune the parallelism of the operators it 
> just not work, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64
> Sample source code attached. It is very simple program, just parse json 
> record into object, and pass to a almost empty logic flink process function. 
> Rocksdb is in used, and the source is generated by the program itself. This 
> could be reproduce easily! We choose Flink because of it scalability, but 
> this is not the case now, appreciated if anyone could help as this is 
> impacting our projects! thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: flink_app_parser_git.zip

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: flink_app_parser_git.zip, scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result where is capped at 
> around 250k TPS. No matter how we tune the parallelism of the operators it 
> just not work, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Description: 
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64



  was:
Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png"
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8




> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result where is capped at 
> around 250k TPS. No matter how we tune the parallelism of the operators it 
> just not work, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9597:
---
Attachment: scaleNotWork.png

> Flink fail to scale!
> 
>
> Key: FLINK-9597
> URL: https://issues.apache.org/jira/browse/FLINK-9597
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: swy
>Priority: Major
> Attachments: scaleNotWork.png
>
>
> Hi, we found that our Flink application with simple logic, which using 
> process function is not scale-able when scale from 8 parallelism onward even 
> though with sufficient resources. Below it the result where is capped at 
> around 250k TPS. No matter how we tune the parallelism of the operators it 
> just not work, same to increase source parallelism.
> Please refer to "scaleNotWork.png",
> 1. fixed source parallelism 4, other operators parallelism 8
> 2. fixed source parallelism 4, other operators parallelism 16
> 3. fixed source parallelism 4, other operators parallelism 32
> 4. fixed source parallelism 6, other operators parallelism 8
> 5. fixed source parallelism 6, other operators parallelism 16
> 6. fixed source parallelism 6, other operators parallelism 32
> 7. fixed source parallelism 6, other operators parallelism 64



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9597) Flink fail to scale!

2018-06-15 Thread swy (JIRA)
swy created FLINK-9597:
--

 Summary: Flink fail to scale!
 Key: FLINK-9597
 URL: https://issues.apache.org/jira/browse/FLINK-9597
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.5.0
Reporter: swy


Hi, we found that our Flink application with simple logic, which using process 
function is not scale-able when scale from 8 parallelism onward even though 
with sufficient resources. Below it the result where is capped at around 250k 
TPS. No matter how we tune the parallelism of the operators it just not work, 
same to increase source parallelism.

Please refer to "scaleNotWork.png"
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8
1. fixed source parallelism 4, other operators parallelism 8





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510872#comment-16510872
 ] 

swy commented on FLINK-9506:


no problem an email also sent to ML.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510864#comment-16510864
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] I just sent an email to your personal mail box. Let's continue 
there and please let me know if no received. thank you!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510789#comment-16510789
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] an email is sent to "u...@flink.apache.org" with title "Flink 
performance fluctuation with ListState.add()". Actually it is still very 
related to this ticket, which is when State is in use, performance drop because 
of fluctuation and non-scale-able. Let's keep this ticket open for some more 
time, but we can discuss further over there. thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510752#comment-16510752
 ] 

swy commented on FLINK-9506:


Alright i am ok to close the ticket. Do you mind to share me the information on 
where to continue the discussion?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510746#comment-16510746
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] it surprise me as well. Please advice if something wrong in the 
code especially to rocksdb settings. Also if you don't mind you can run the 
test in your lab and share the performance. To run the application just use the 
following sample parameters,
aggrinterval=60 loop=500 auditinterval=3 statsd=1 
URL=do36.mycompany.com:8127
thank you!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510715#comment-16510715
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] please download source from 
https://github.com/swyow/flink_app_parser_git_01
Could you please advice if this is the right way? It should be quite straight 
forward already in this sample test app. Please let me know if you finish 
download as I need to remove it from git. thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510704#comment-16510704
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] the picture is captured without any line being commented-out. 
Yes the comment-out means no code in onTimer(). If we further comment out 
recordStore.add() then everything works well, no more fluctuation, just the 
input will be stopped if we output something in the processElement() instead(is 
this something expected?). 16 parallelism is used. And rocksdb backend is 
loaded successfully as mentioned in the log. Any optimization could be done on 
Rocksdb? Or any suggestion on hashCode? Is there a way to generate a unique 
hash? thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9506:
---
Attachment: input_stop_when_timer_run.png

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510632#comment-16510632
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] to your questions,
1. Checkpoint is disable at the moment. But in flink_config.yaml incremental 
checkpoint is enable for rocksdb.
2. No different even comment out onTimer content.
3. Please refer to below for sample code in 'ProcessAggregation'
public void processElement(Record r, Context ctx, Collector out)
throws Exception {
recordStore.add(r);

Record auditRec = new Record();
auditRec.setAuditOnly(true);
auditRec.setInput_id(r.getInput_id());
auditRec.setInput_type(r.getInput_type());
auditRec.setOutput_id(r.getOutput_id());
auditRec.setOutput_type(r.getOutput_type());
auditRec.setAddkey(r.getAddkey());
auditRec.setSource_id(r.getSource_id());
auditRec.setINPUT_LINK(r.getINPUT_LINK());
auditRec.setFilename(r.getFilename());
auditRec.setOUTPUT_LINK(r.getOUTPUT_LINK());

auditRec.setEL_COUNTER_IN(1);
auditRec.setEL_COUNTER_STORED(1);

out.collect(auditRec);


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + aggrWindowsIntervalMs) / 1000) * 1000);
if(countMarker != null) countMarker.count();
}

public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out)
throws Exception {
Iterable records = recordStore.get();
int primary_units = 0;
int secondary_units = 0;
int tertiary_units = 0;
int numReduce = -1;
Record lastRecord = null;
for (Record rec : records) {
primary_units += rec.getI_PRIMARY_UNITS();
secondary_units += rec.getI_SECONDARY_UNITS();
tertiary_units += rec.getI_TERTIARY_UNITS();
lastRecord = rec;
numReduce++;
}
if(lastRecord != null) {
lastRecord.setI_PRIMARY_UNITS(primary_units);
lastRecord.setI_SECONDARY_UNITS(secondary_units);
lastRecord.setI_TERTIARY_UNITS(tertiary_units);
lastRecord.setPARTIALS_COMBINED_(numReduce);

lastRecord.setEL_COUNTER_RETRIEVED(1);
lastRecord.setEL_COUNTER_REDUCED(numReduce);
out.collect(lastRecord);
}
recordStore.clear();
}


There is a new observation that when timer is running, which is used to flush 
record, the input from source will stop. Please refer to new attachment 
'input_stop_when_timer_run.png'. Is this something expected?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510575#comment-16510575
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] you are right. Fluctuation only happened when "keyby" with 
State. "keyby" without State is quite stable. Unfortunately hashcode() is not 
good enough for us as we need to aggregate billing data, which require high 
accuracy. We are now using ListState, and keyby with a 50 characters value, it 
seems a reasonable use case to me. Can I consider this a limitation in Flink? 
Otherwise any advice to get through? Rocksdb already in 
used(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) as suggested. Thank 
you for your support [~sihuazhou].

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509808#comment-16509808
 ] 

swy edited comment on FLINK-9506 at 6/12/18 4:10 PM:
-

Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem is caused 
by "KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation());

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation is not the root caused of fluctuation, no 
difference between empty logic or logic with ListState. Seems the fluctuation 
is caused by "KeyBy". Any idea why? Thank you.


was (Author: yow):
Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem caused by 
"KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker()))
.name("AggregationDuration: " + aggrDuration +"ms");

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation not the root caused of fluctuation, no 
difference between empty logic or logic with ListState in ProcessAggregation. 
Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, 

[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9506:
---
Attachment: keyby.png

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509808#comment-16509808
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem caused by 
"KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker()))
.name("AggregationDuration: " + aggrDuration +"ms");

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation not the root caused of fluctuation, no 
difference between empty logic or logic with ListState in ProcessAggregation. 
Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504820#comment-16504820
 ] 

swy commented on FLINK-9506:


thank for response [~sihuazhou], the key length is around 50 chars. We will 
change to hashCode as suggested and test again :)

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504675#comment-16504675
 ] 

swy commented on FLINK-9506:


[~sihuazhou] your idea is brilliant, but the first test result is not show too 
much of change surprisingly. Let's us do more test to confirm. But thank you! 
Do you mind to answer my questions above regarding to Rocksdb setup? I believe 
it is crucial in this performance test.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503426#comment-16503426
 ] 

swy commented on FLINK-9506:


[~kkrugler] please don't close the ticket yet. Because the performance 
degradation still happen, it is just a bit better in fluctuation after Rocksdb 
is deployed. We need to get Rocksdb setup correctly, hopefully performance drop 
will not happen in busty pattern if statebackend is working properly. Would 
appreciated to get advice here.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503284#comment-16503284
 ] 

swy commented on FLINK-9506:


[~srichter] Thanks for tips, after implement Rocksdb the performance seems much 
more scale-able now, and a little bit less fluctuation. I have few questions 
related to rocksdb. 
1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other 
option?
2. What is the recommendation for RocksDB's statebackend? We are using tmpfs 
with checkpoint now with savepoint persists to hdfs.
3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?
4. Below is the configuration we are using, could you please comment if 
something not right?
env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
RocksDBStateBackend rocksdb = new RocksDBStateBackend(new 
FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true);
env.setStateBackend(rocksdb);
//rocksdb.setOptions(new RocksdbOptions());

rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);


Or in flink_config.yaml:
state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  file:///tmp/rocksdb_simple_example/savepoints


Thank you in advance! 

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500211#comment-16500211
 ] 

swy commented on FLINK-9506:


[~srichter] Really appreciated your help. To your points,

*[First, when using the FSStateBackend, try to set asynchronous checkpoints to 
true]*
We are using 1.4.2 now, is "state.backend.async" available? Just found this 
parameter available in 1.5 according to Flink website. 

*[your comparison does not consider that in case that you are using the 
reducing state, out.collect(output); in onTimer produces an output and not just 
forwards null]*
This again is sample app issue, in real application we have a NULL check to 
prevent NULL record to next operator.

*[you can think about the object reuse setting 
env.getConfig().enableObjectReuse()]*
This is good idea indeed, will try it out.

*[And you can also make your AggregationKey much more efficient]*
Agree. Now only has one key from one field in Record structure,

.keyBy(new KeySelector() {
@Override
public String getKey(Record r) throws Exception
{ return r.getUNIQUE_KEY(); }

})




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499972#comment-16499972
 ] 

swy commented on FLINK-9506:


[~srichter] thanks for good explanation. We have no choice but need to store 
that much of record(10 mil to 1 bil, record size of 1kb) for 
aggregation(multiple fields aggr) because of business requirement. Due to the 
large state, RocksDB will be setup for production. We know passing/accessing 
object to/from RocksDB is not easy to performance but hopefully at least the 
result is scale-able. In our case now, not only the result is fluctuated, the 
scaling performance also capped, this is the real pain. Do you mind to share 
your thought on how to handle such case? Or any similar use case to share?

Next Step, we are going to tune memory related configuration, and also setup 
RocksDB hopefully the performance is scale-able. Thanks again.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499869#comment-16499869
 ] 

swy commented on FLINK-9506:


Any workaround would be appreciated, do you think using rocksdb help in such 
case? Or any other magic to make it work? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499859#comment-16499859
 ] 

swy edited comment on FLINK-9506 at 6/4/18 8:03 AM:


[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. I guess it might due 
to wrong key reducing. Any advice? Thanks.


was (Author: yow):
[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. Any advice? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499859#comment-16499859
 ] 

swy commented on FLINK-9506:


[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. Any advice? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9506:
---
Attachment: KeyNoHash_VS_KeyHash.png

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499667#comment-16499667
 ] 

swy commented on FLINK-9506:


Thanks for pointing out, this is a mistake in the sample. In real application 
we just return the record object directly without any 'new'. I have removed the 
'new' from sample and test it again but the performance still same.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445
 ] 

swy edited comment on FLINK-9506 at 6/3/18 3:04 PM:


what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not expect so much and also in a fluctuation pattern.


was (Author: yow):
what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not so much and also in a fluctuation pattern.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445
 ] 

swy edited comment on FLINK-9506 at 6/3/18 3:03 PM:


what we want to know is: Is this something expected(which is drop >100%) or 
something wrong in our code? Even we understand that the 'state' will impact 
performance, but not so much and also in a fluctuation pattern.


was (Author: yow):
what we want to know is: Is this something expected or something wrong in our 
code?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445
 ] 

swy commented on FLINK-9506:


what we want to know is: Is this something expected or something wrong in our 
code?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499412#comment-16499412
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] the keyby is comes from one of the private member in POJO.
.keyBy(new KeySelector() {
@Override
public String getKey(Record r) throws Exception 
{
return r.getUNIQUE_KEY();
}
})

There is sample code https://github.com/swyow/flink_tester. Just JsonObj is 
replaced with simple POJO 'Record' now, which contain 50 String private member. 
thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-9442) Flink Scaling not working

2018-06-03 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9442:
---
Comment: was deleted

(was: ok will post to mailing list as well. thanks.
by the way once we change to RichParallelSourceFunction it works much better 
now.

But our real application which using FlinkKafkaConsumer011 already using 
RichParallelSourceFunction still not able to scale. Any tips to share?)

> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  
>  
>  
>  public class passthru extends RichMapFunction {
> public void open(Configuration configuration) throws Exception {
> ... ... 
> stats = new NonBlockingStatsDClient();
> }
> public String map(String value) throws Exception { 
> ... ...
> stats.increment(); 
> return value;
> }
> }
> public class datagen extends RichSourceFunction {
> ... ...
> public void run(SourceContext ctx) throws Exception {
> int i = 0;
> while (run){
> String idx = String.format("%09d", i);
> ctx.collect("{\" field>\"}");
> i++;
> if(i == loop) 
> run = false;
> }
> }
> ... ...
> }
> public class Job {
> public static void main(String[] args) throws Exception {
> ... ...
> DataStream stream = env.addSource(new 
> datagen(loop)).rebalance();
> DataStream convert = stream.map(new passthru(statsdUrl));
> env.execute("Flink");
> } 
> }
> The reason of this sample test is because of Kafka source 
> FlinkKafkaConsumer011 facing the same issue which is not scale-able. And 
> FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always 
> set kafka partition = total TM #slot. But the result is still capped and not 
> improve linearly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499393#comment-16499393
 ] 

swy commented on FLINK-9506:


Hi Fabian,
1. Yes.
2. Yes. The state is suppose to store in Task Manager's memory, and we ensure 
they have sufficient memory. Also the checkpoint is disable for this testing.

For more information, please refer to
https://stackoverflow.com/questions/50587771/flink-reducingstate-impact-performance

Any alternative or advice would be appreciated as this is impacting our 
project. Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9506:
---
Attachment: flink.png

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)
swy created FLINK-9506:
--

 Summary: Flink ReducingState.add causing more than 100% 
performance drop
 Key: FLINK-9506
 URL: https://issues.apache.org/jira/browse/FLINK-9506
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.4.2
Reporter: swy


Hi, we found out application performance drop more than 100% when 
ReducingState.add is used in the source code. In the test checkpoint is 
disable. And filesystem(hdfs) as statebackend.

It could be easyly reproduce with a simple app, without checkpoint, just simply 
keep storing record, also with simple reduction function(in fact with empty 
function would see the same result). Any idea would be appreciated. What an 
unbelievable obvious issue.

Basically the app just keep storing record into the state, and we measure how 
many record per second in "JsonTranslator", which is shown in the graph. The 
difference between is just 1 line, comment/un-comment "recStore.add(r)".

DataStream stream = env.addSource(new GeneratorSource(loop);
DataStream convert = stream.map(new JsonTranslator())
   .keyBy()
   .process(new ProcessAggregation())
   .map(new PassthruFunction());  


public class ProcessAggregation extends ProcessFunction {
private ReducingState recStore;

public void processElement(Recordr, Context ctx, Collector out) {
recStore.add(r); //this line make the difference
}

Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9442) Flink Scaling not working

2018-05-26 Thread swy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491720#comment-16491720
 ] 

swy commented on FLINK-9442:


ok will post to mailing list as well. thanks.
by the way once we change to RichParallelSourceFunction it works much better 
now.

But our real application which using FlinkKafkaConsumer011 already using 
RichParallelSourceFunction still not able to scale. Any tips to share?

> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  
>  
>  
>  public class passthru extends RichMapFunction {
> public void open(Configuration configuration) throws Exception {
> ... ... 
> stats = new NonBlockingStatsDClient();
> }
> public String map(String value) throws Exception { 
> ... ...
> stats.increment(); 
> return value;
> }
> }
> public class datagen extends RichSourceFunction {
> ... ...
> public void run(SourceContext ctx) throws Exception {
> int i = 0;
> while (run){
> String idx = String.format("%09d", i);
> ctx.collect("{\" field>\"}");
> i++;
> if(i == loop) 
> run = false;
> }
> }
> ... ...
> }
> public class Job {
> public static void main(String[] args) throws Exception {
> ... ...
> DataStream stream = env.addSource(new 
> datagen(loop)).rebalance();
> DataStream convert = stream.map(new passthru(statsdUrl));
> env.execute("Flink");
> } 
> }
> The reason of this sample test is because of Kafka source 
> FlinkKafkaConsumer011 facing the same issue which is not scale-able. And 
> FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always 
> set kafka partition = total TM #slot. But the result is still capped and not 
> improve linearly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9442) Flink Scaling not working

2018-05-26 Thread swy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9442:
---
Affects Version/s: 1.4.2

> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  
>  
>  
>  public class passthru extends RichMapFunction {
> public void open(Configuration configuration) throws Exception {
> ... ... 
> stats = new NonBlockingStatsDClient();
> }
> public String map(String value) throws Exception { 
> ... ...
> stats.increment(); 
> return value;
> }
> }
> public class datagen extends RichSourceFunction {
> ... ...
> public void run(SourceContext ctx) throws Exception {
> int i = 0;
> while (run){
> String idx = String.format("%09d", i);
> ctx.collect("{\" field>\"}");
> i++;
> if(i == loop) 
> run = false;
> }
> }
> ... ...
> }
> public class Job {
> public static void main(String[] args) throws Exception {
> ... ...
> DataStream stream = env.addSource(new 
> datagen(loop)).rebalance();
> DataStream convert = stream.map(new passthru(statsdUrl));
> env.execute("Flink");
> } 
> }
> The reason of this sample test is because of Kafka source 
> FlinkKafkaConsumer011 facing the same issue which is not scale-able. And 
> FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always 
> set kafka partition = total TM #slot. But the result is still capped and not 
> improve linearly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9442) Flink Scaling not working

2018-05-26 Thread swy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9442:
---
Description: 
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1kb in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6
 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

 - Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
Thanks.

 

 

 

 public class passthru extends RichMapFunction {
public void open(Configuration configuration) throws Exception {
... ... 
stats = new NonBlockingStatsDClient();
}
public String map(String value) throws Exception { 
... ...
stats.increment(); 
return value;
}
}

public class datagen extends RichSourceFunction {
... ...
public void run(SourceContext ctx) throws Exception {
int i = 0;
while (run){
String idx = String.format("%09d", i);
ctx.collect("{\"\"}");
i++;
if(i == loop) 
run = false;
}
}
... ...
}
public class Job {
public static void main(String[] args) throws Exception {
... ...
DataStream stream = env.addSource(new 
datagen(loop)).rebalance();
DataStream convert = stream.map(new passthru(statsdUrl));
env.execute("Flink");
} 
}






The reason of this sample test is because of Kafka source FlinkKafkaConsumer011 
facing the same issue which is not scale-able. And FlinkKafkaConsumer011 
already using RichParallelSourceFunction. And we always set kafka partition = 
total TM #slot. But the result is still capped and not improve linearly.

  was:
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1kb in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6
 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

 - Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
Thanks.

 


> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  
>  
>  
>  public class passthru extends RichMapFunction {
> public void open(Configuration configuration) throws Exception {
> ... ... 
> stats = new NonBlockingStatsDClient();
> }
> public String map(String value) throws Exception { 
> ... ...
> stats.increment(); 
> return value;
>  

[jira] [Updated] (FLINK-9442) Flink Scaling not working

2018-05-26 Thread swy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9442:
---
Description: 
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1kb in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6
 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

 - Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
Thanks.

 

  was:
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1000k in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6
 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

 - Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
Thanks.

 


> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1kb in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9442) Flink Scaling not working

2018-05-26 Thread swy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

swy updated FLINK-9442:
---
Description: 
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1000k in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6
 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

 - Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
Thanks.

 

  was:
Hi,

 

We are in the middle of testing scaling ability of Flink. But we found that 
scaling not working, no matter increase more slot or increase number of Task 
Manager. We would expect a linear, if not close-to-linear scaling performance 
but the result even show degradation. Appreciated any comments.

 

Test Details,

 

-VMWare vsphere

-Just a simple pass through test,

    - auto gen source 3mil records, each 1000k in size, parallelism=1

    - source pass into next map operator, which just return the same record, 
and sent counter to statsD, parallelism is in cases = 2,4,6

- 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory

- Result:

  - 2 slots: 26 seconds, 3mil/26=115k TPS

  - 4 slots: 23 seconds, 3mil/23=130k TPS

  - 6 slots: 22 seconds, 3mil/22=136k TPS

 

As shown the scaling is almost nothing. Any clue? Thanks.

 


> Flink Scaling not working
> -
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
>  Issue Type: Bug
>Reporter: swy
>Priority: Major
>
> Hi,
>  
> We are in the middle of testing scaling ability of Flink. But we found that 
> scaling not working, no matter increase more slot or increase number of Task 
> Manager. We would expect a linear, if not close-to-linear scaling performance 
> but the result even show degradation. Appreciated any comments.
>  
> Test Details,
>  
> -VMWare vsphere
> -Just a simple pass through test,
>     - auto gen source 3mil records, each 1000k in size, parallelism=1
>     - source pass into next map operator, which just return the same record, 
> and sent counter to statsD, parallelism is in cases = 2,4,6
>  - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
>  - Result:
>   - 2 slots: 26 seconds, 3mil/26=115k TPS
>   - 4 slots: 23 seconds, 3mil/23=130k TPS
>   - 6 slots: 22 seconds, 3mil/22=136k TPS
>  
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? 
> Thanks.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)