[jira] [Closed] (FLINK-9597) Flink application does not scale as expected
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy closed FLINK-9597. -- Resolution: Later > Flink application does not scale as expected > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.mycompany.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 32 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 > Please refer TM.png and JM.png for further details. > The test without any checkpoint enable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9597) Flink application does not scale as expected
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514727#comment-16514727 ] swy edited comment on FLINK-9597 at 6/16/18 9:27 AM: - Hi [~fhueske] Do you mind to share your experience any other reason the performance is capped? CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab? We are going to process 10billions of record per day and it will be increased so scalability is very important to us. I will close the ticket but hopefully can have your valuable input on this. thank you. * There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get? was (Author: yow): Hi [~fhueske] Do you mind to share your experience any other reason the performance is capped? CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab? We are going to process 10billions of record per day and it will be increased so scalability is very important to us. I will close the ticket but hopefully can have your valuable input on this. thank you. > Flink application does not scale as expected > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.mycompany.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 32 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc >
[jira] [Commented] (FLINK-9597) Flink application does not scale as expected
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514727#comment-16514727 ] swy commented on FLINK-9597: Hi [~fhueske] Do you mind to share your experience any other reason the performance is capped? CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab? We are going to process 10billions of record per day and it will be increased so scalability is very important to us. I will close the ticket but hopefully can have your valuable input on this. thank you. > Flink application does not scale as expected > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.mycompany.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 32 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 > Please refer TM.png and JM.png for further details. > The test without any checkpoint enable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink application does not scale as expected
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) We are running in VMWare, 5 Task Managers and each has 32 slots. lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):32 On-line CPU(s) list: 0-31 Thread(s) per core:1 Core(s) per socket:1 Socket(s): 32 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 63 Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2593.993 BogoMIPS: 5187.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 20480K NUMA node0 CPU(s): 0-31 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts totalusedfree shared buff/cache available Mem: 98 24 72 0 1 72 Swap: 3 0 3 Please refer TM.png and JM.png for further details. The test without any checkpoint enable. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: sample.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, sample.png, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 32 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 > Please refer TM.png and JM.png for further details. > The test without any checkpoint enable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) We are running in VMWare, 5 Task Managers and each has 32 slots. lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):32 On-line CPU(s) list: 0-31 Thread(s) per core:1 Core(s) per socket:1 Socket(s): 32 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 63 Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2593.993 BogoMIPS: 5187.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 20480K NUMA node0 CPU(s): 0-31 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts totalusedfree shared buff/cache available Mem: 98 24 72 0 1 72 Swap: 3 0 3 Please refer TM.png and JM.png for further details. The test without any checkpoint enable. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) *
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) We are running in VMWare, 5 Task Managers and each has 32 slots. lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):32 On-line CPU(s) list: 0-31 Thread(s) per core:1 Core(s) per socket:1 Socket(s): 32 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 63 Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2593.993 BogoMIPS: 5187.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 20480K NUMA node0 CPU(s): 0-31 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts totalusedfree shared buff/cache available Mem: 98 24 72 0 1 72 Swap: 3 0 3 Please refer TM.png and JM.png for further details. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) We are running in VMWare, 5 Task Managers and each has 16 slots. lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):32 On-line CPU(s) list: 0-31 Thread(s) per core:1 Core(s) per socket:1 Socket(s): 32 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 63 Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2593.993 BogoMIPS: 5187.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 20480K NUMA node0 CPU(s): 0-31 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts totalusedfree shared buff/cache available Mem: 98 24 72 0 1 72 Swap: 3 0 3 Please refer TM.png and JM.png for further details. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer
[jira] [Commented] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513963#comment-16513963 ] swy commented on FLINK-9597: Hi [~fhueske], information updated. > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 16 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 > Please refer TM.png and JM.png for further details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: TM.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 16 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: JM.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: JM.png, TM.png, flink_app_parser_git.zip, > scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) > We are running in VMWare, 5 Task Managers and each has 16 slots. > lscpu > Architecture: x86_64 > CPU op-mode(s):32-bit, 64-bit > Byte Order:Little Endian > CPU(s):32 > On-line CPU(s) list: 0-31 > Thread(s) per core:1 > Core(s) per socket:1 > Socket(s): 32 > NUMA node(s): 1 > Vendor ID: GenuineIntel > CPU family:6 > Model: 63 > Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz > Stepping: 2 > CPU MHz: 2593.993 > BogoMIPS: 5187.98 > Hypervisor vendor: VMware > Virtualization type: full > L1d cache: 32K > L1i cache: 32K > L2 cache: 256K > L3 cache: 20480K > NUMA node0 CPU(s): 0-31 > Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge > mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp > lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc > aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe > popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm > ida arat pln pts > totalusedfree shared buff/cache > available > Mem: 98 24 72 0 1 > 72 > Swap: 3 0 3 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) We are running in VMWare, 5 Task Managers and each has 16 slots. lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):32 On-line CPU(s) list: 0-31 Thread(s) per core:1 Core(s) per socket:1 Socket(s): 32 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 63 Model name:Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz Stepping: 2 CPU MHz: 2593.993 BogoMIPS: 5187.98 Hypervisor vendor: VMware Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 20480K NUMA node0 CPU(s): 0-31 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts totalusedfree shared buff/cache available Mem: 98 24 72 0 1 72 Swap: 3 0 3 was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png! !TM.png! >
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: (was: TM.png) > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png! > !TM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: TM.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png! !TM.png! was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: (was: JM.png) > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) !JM.png! > !TM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: JM.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 > URL=do36.comptel.com:8127" > * aggrinterval: time in ms for timer to trigger > * loop: how many row of data to feed > * statsd: to send result to statsd > * psrc: source parallelism > * pJ2R: parallelism of map operator(JsonRecTranslator) > * pAggr: parallelism of process+timer operator(AggregationDuration) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. To run the program, sample parameters, "aggrinterval=600 loop=750 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.comptel.com:8127" * aggrinterval: time in ms for timer to trigger * loop: how many row of data to feed * statsd: to send result to statsd * psrc: source parallelism * pJ2R: parallelism of map operator(JsonRecTranslator) * pAggr: parallelism of process+timer operator(AggregationDuration) was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. > To run the program, sample parameters, > "aggrinterval=600 loop=750 statsd=1 psrc=4
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32. Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily. We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance drop somemore! Sample source code attached. It is very simple program, just parse json record into object, and pass to a almost empty logic flink process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily! We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result which is capped at > ~250k TPS. No matter how we tune the parallelism of the operators it just not > scale, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > worse than parallelism 32. > Sample source code attached(flink_app_parser_git.zip). It is a simple > program, parsing json record into object, and pass it to a empty logic > Flink's process function. Rocksdb is in used, and the source is generated by > the program itself. This could be reproduce easily. > We choose Flink because of it scalability, but this is not the case now, > appreciated if anyone could help as this is impacting our projects! thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 performance drop somemore! Sample source code attached. It is very simple program, just parse json record into object, and pass to a almost empty logic flink process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily! We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 Sample source code attached. It is very simple program, just parse json record into object, and pass to a almost empty logic flink process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily! We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result where is capped at > around 250k TPS. No matter how we tune the parallelism of the operators it > just not work, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 performance > drop somemore! > Sample source code attached. It is very simple program, just parse json > record into object, and pass to a almost empty logic flink process function. > Rocksdb is in used, and the source is generated by the program itself. This > could be reproduce easily! We choose Flink because of it scalability, but > this is not the case now, appreciated if anyone could help as this is > impacting our projects! thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 Sample source code attached. It is very simple program, just parse json record into object, and pass to a almost empty logic flink process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily! We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you. was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result where is capped at > around 250k TPS. No matter how we tune the parallelism of the operators it > just not work, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 > Sample source code attached. It is very simple program, just parse json > record into object, and pass to a almost empty logic flink process function. > Rocksdb is in used, and the source is generated by the program itself. This > could be reproduce easily! We choose Flink because of it scalability, but > this is not the case now, appreciated if anyone could help as this is > impacting our projects! thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: flink_app_parser_git.zip > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: flink_app_parser_git.zip, scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result where is capped at > around 250k TPS. No matter how we tune the parallelism of the operators it > just not work, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Description: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png", 1. fixed source parallelism 4, other operators parallelism 8 2. fixed source parallelism 4, other operators parallelism 16 3. fixed source parallelism 4, other operators parallelism 32 4. fixed source parallelism 6, other operators parallelism 8 5. fixed source parallelism 6, other operators parallelism 16 6. fixed source parallelism 6, other operators parallelism 32 7. fixed source parallelism 6, other operators parallelism 64 was: Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png" 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result where is capped at > around 250k TPS. No matter how we tune the parallelism of the operators it > just not work, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9597) Flink fail to scale!
[ https://issues.apache.org/jira/browse/FLINK-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9597: --- Attachment: scaleNotWork.png > Flink fail to scale! > > > Key: FLINK-9597 > URL: https://issues.apache.org/jira/browse/FLINK-9597 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 >Reporter: swy >Priority: Major > Attachments: scaleNotWork.png > > > Hi, we found that our Flink application with simple logic, which using > process function is not scale-able when scale from 8 parallelism onward even > though with sufficient resources. Below it the result where is capped at > around 250k TPS. No matter how we tune the parallelism of the operators it > just not work, same to increase source parallelism. > Please refer to "scaleNotWork.png", > 1. fixed source parallelism 4, other operators parallelism 8 > 2. fixed source parallelism 4, other operators parallelism 16 > 3. fixed source parallelism 4, other operators parallelism 32 > 4. fixed source parallelism 6, other operators parallelism 8 > 5. fixed source parallelism 6, other operators parallelism 16 > 6. fixed source parallelism 6, other operators parallelism 32 > 7. fixed source parallelism 6, other operators parallelism 64 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9597) Flink fail to scale!
swy created FLINK-9597: -- Summary: Flink fail to scale! Key: FLINK-9597 URL: https://issues.apache.org/jira/browse/FLINK-9597 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.5.0 Reporter: swy Hi, we found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result where is capped at around 250k TPS. No matter how we tune the parallelism of the operators it just not work, same to increase source parallelism. Please refer to "scaleNotWork.png" 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 1. fixed source parallelism 4, other operators parallelism 8 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510872#comment-16510872 ] swy commented on FLINK-9506: no problem an email also sent to ML. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510864#comment-16510864 ] swy commented on FLINK-9506: Hi [~sihuazhou] I just sent an email to your personal mail box. Let's continue there and please let me know if no received. thank you! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510789#comment-16510789 ] swy commented on FLINK-9506: Hi [~sihuazhou] an email is sent to "u...@flink.apache.org" with title "Flink performance fluctuation with ListState.add()". Actually it is still very related to this ticket, which is when State is in use, performance drop because of fluctuation and non-scale-able. Let's keep this ticket open for some more time, but we can discuss further over there. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510752#comment-16510752 ] swy commented on FLINK-9506: Alright i am ok to close the ticket. Do you mind to share me the information on where to continue the discussion? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510746#comment-16510746 ] swy commented on FLINK-9506: Hi [~sihuazhou] it surprise me as well. Please advice if something wrong in the code especially to rocksdb settings. Also if you don't mind you can run the test in your lab and share the performance. To run the application just use the following sample parameters, aggrinterval=60 loop=500 auditinterval=3 statsd=1 URL=do36.mycompany.com:8127 thank you! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510715#comment-16510715 ] swy commented on FLINK-9506: Hi [~sihuazhou] please download source from https://github.com/swyow/flink_app_parser_git_01 Could you please advice if this is the right way? It should be quite straight forward already in this sample test app. Please let me know if you finish download as I need to remove it from git. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510704#comment-16510704 ] swy commented on FLINK-9506: Hi [~sihuazhou] the picture is captured without any line being commented-out. Yes the comment-out means no code in onTimer(). If we further comment out recordStore.add() then everything works well, no more fluctuation, just the input will be stopped if we output something in the processElement() instead(is this something expected?). 16 parallelism is used. And rocksdb backend is loaded successfully as mentioned in the log. Any optimization could be done on Rocksdb? Or any suggestion on hashCode? Is there a way to generate a unique hash? thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9506: --- Attachment: input_stop_when_timer_run.png > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510632#comment-16510632 ] swy commented on FLINK-9506: Hi [~sihuazhou] to your questions, 1. Checkpoint is disable at the moment. But in flink_config.yaml incremental checkpoint is enable for rocksdb. 2. No different even comment out onTimer content. 3. Please refer to below for sample code in 'ProcessAggregation' public void processElement(Record r, Context ctx, Collector out) throws Exception { recordStore.add(r); Record auditRec = new Record(); auditRec.setAuditOnly(true); auditRec.setInput_id(r.getInput_id()); auditRec.setInput_type(r.getInput_type()); auditRec.setOutput_id(r.getOutput_id()); auditRec.setOutput_type(r.getOutput_type()); auditRec.setAddkey(r.getAddkey()); auditRec.setSource_id(r.getSource_id()); auditRec.setINPUT_LINK(r.getINPUT_LINK()); auditRec.setFilename(r.getFilename()); auditRec.setOUTPUT_LINK(r.getOUTPUT_LINK()); auditRec.setEL_COUNTER_IN(1); auditRec.setEL_COUNTER_STORED(1); out.collect(auditRec); ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() + aggrWindowsIntervalMs) / 1000) * 1000); if(countMarker != null) countMarker.count(); } public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { Iterable records = recordStore.get(); int primary_units = 0; int secondary_units = 0; int tertiary_units = 0; int numReduce = -1; Record lastRecord = null; for (Record rec : records) { primary_units += rec.getI_PRIMARY_UNITS(); secondary_units += rec.getI_SECONDARY_UNITS(); tertiary_units += rec.getI_TERTIARY_UNITS(); lastRecord = rec; numReduce++; } if(lastRecord != null) { lastRecord.setI_PRIMARY_UNITS(primary_units); lastRecord.setI_SECONDARY_UNITS(secondary_units); lastRecord.setI_TERTIARY_UNITS(tertiary_units); lastRecord.setPARTIALS_COMBINED_(numReduce); lastRecord.setEL_COUNTER_RETRIEVED(1); lastRecord.setEL_COUNTER_REDUCED(numReduce); out.collect(lastRecord); } recordStore.clear(); } There is a new observation that when timer is running, which is used to flush record, the input from source will stop. Please refer to new attachment 'input_stop_when_timer_run.png'. Is this something expected? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510575#comment-16510575 ] swy commented on FLINK-9506: Hi [~sihuazhou] you are right. Fluctuation only happened when "keyby" with State. "keyby" without State is quite stable. Unfortunately hashcode() is not good enough for us as we need to aggregate billing data, which require high accuracy. We are now using ListState, and keyby with a 50 characters value, it seems a reasonable use case to me. Can I consider this a limitation in Flink? Otherwise any advice to get through? Rocksdb already in used(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) as suggested. Thank you for your support [~sihuazhou]. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509808#comment-16509808 ] swy edited comment on FLINK-9506 at 6/12/18 4:10 PM: - Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem is caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation()); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation is not the root caused of fluctuation, no difference between empty logic or logic with ListState. Seems the fluctuation is caused by "KeyBy". Any idea why? Thank you. was (Author: yow): Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())) .name("AggregationDuration: " + aggrDuration +"ms"); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation not the root caused of fluctuation, no difference between empty logic or logic with ListState in ProcessAggregation. Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx,
[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9506: --- Attachment: keyby.png > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509808#comment-16509808 ] swy commented on FLINK-9506: Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())) .name("AggregationDuration: " + aggrDuration +"ms"); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation not the root caused of fluctuation, no difference between empty logic or logic with ListState in ProcessAggregation. Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504820#comment-16504820 ] swy commented on FLINK-9506: thank for response [~sihuazhou], the key length is around 50 chars. We will change to hashCode as suggested and test again :) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504675#comment-16504675 ] swy commented on FLINK-9506: [~sihuazhou] your idea is brilliant, but the first test result is not show too much of change surprisingly. Let's us do more test to confirm. But thank you! Do you mind to answer my questions above regarding to Rocksdb setup? I believe it is crucial in this performance test. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503426#comment-16503426 ] swy commented on FLINK-9506: [~kkrugler] please don't close the ticket yet. Because the performance degradation still happen, it is just a bit better in fluctuation after Rocksdb is deployed. We need to get Rocksdb setup correctly, hopefully performance drop will not happen in busty pattern if statebackend is working properly. Would appreciated to get advice here. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503284#comment-16503284 ] swy commented on FLINK-9506: [~srichter] Thanks for tips, after implement Rocksdb the performance seems much more scale-able now, and a little bit less fluctuation. I have few questions related to rocksdb. 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? 4. Below is the configuration we are using, could you please comment if something not right? env.getConfig().enableObjectReuse(); env.getConfig().setUseSnapshotCompression(true); RocksDBStateBackend rocksdb = new RocksDBStateBackend(new FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true); env.setStateBackend(rocksdb); //rocksdb.setOptions(new RocksdbOptions()); rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); Or in flink_config.yaml: state.backend: rocksdb state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints state.backend.incremental: true state.backend.async: true state.checkpoints.num-retained: 5 state.savepoints.dir: file:///tmp/rocksdb_simple_example/savepoints Thank you in advance! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500211#comment-16500211 ] swy commented on FLINK-9506: [~srichter] Really appreciated your help. To your points, *[First, when using the FSStateBackend, try to set asynchronous checkpoints to true]* We are using 1.4.2 now, is "state.backend.async" available? Just found this parameter available in 1.5 according to Flink website. *[your comparison does not consider that in case that you are using the reducing state, out.collect(output); in onTimer produces an output and not just forwards null]* This again is sample app issue, in real application we have a NULL check to prevent NULL record to next operator. *[you can think about the object reuse setting env.getConfig().enableObjectReuse()]* This is good idea indeed, will try it out. *[And you can also make your AggregationKey much more efficient]* Agree. Now only has one key from one field in Record structure, .keyBy(new KeySelector() { @Override public String getKey(Record r) throws Exception { return r.getUNIQUE_KEY(); } }) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499972#comment-16499972 ] swy commented on FLINK-9506: [~srichter] thanks for good explanation. We have no choice but need to store that much of record(10 mil to 1 bil, record size of 1kb) for aggregation(multiple fields aggr) because of business requirement. Due to the large state, RocksDB will be setup for production. We know passing/accessing object to/from RocksDB is not easy to performance but hopefully at least the result is scale-able. In our case now, not only the result is fluctuated, the scaling performance also capped, this is the real pain. Do you mind to share your thought on how to handle such case? Or any similar use case to share? Next Step, we are going to tune memory related configuration, and also setup RocksDB hopefully the performance is scale-able. Thanks again. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499869#comment-16499869 ] swy commented on FLINK-9506: Any workaround would be appreciated, do you think using rocksdb help in such case? Or any other magic to make it work? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499859#comment-16499859 ] swy edited comment on FLINK-9506 at 6/4/18 8:03 AM: [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. I guess it might due to wrong key reducing. Any advice? Thanks. was (Author: yow): [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. Any advice? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499859#comment-16499859 ] swy commented on FLINK-9506: [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. Any advice? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9506: --- Attachment: KeyNoHash_VS_KeyHash.png > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499667#comment-16499667 ] swy commented on FLINK-9506: Thanks for pointing out, this is a mistake in the sample. In real application we just return the record object directly without any 'new'. I have removed the 'new' from sample and test it again but the performance still same. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445 ] swy edited comment on FLINK-9506 at 6/3/18 3:04 PM: what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not expect so much and also in a fluctuation pattern. was (Author: yow): what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not so much and also in a fluctuation pattern. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445 ] swy edited comment on FLINK-9506 at 6/3/18 3:03 PM: what we want to know is: Is this something expected(which is drop >100%) or something wrong in our code? Even we understand that the 'state' will impact performance, but not so much and also in a fluctuation pattern. was (Author: yow): what we want to know is: Is this something expected or something wrong in our code? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445 ] swy commented on FLINK-9506: what we want to know is: Is this something expected or something wrong in our code? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499412#comment-16499412 ] swy commented on FLINK-9506: Hi [~sihuazhou] the keyby is comes from one of the private member in POJO. .keyBy(new KeySelector() { @Override public String getKey(Record r) throws Exception { return r.getUNIQUE_KEY(); } }) There is sample code https://github.com/swyow/flink_tester. Just JsonObj is replaced with simple POJO 'Record' now, which contain 50 String private member. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9442: --- Comment: was deleted (was: ok will post to mailing list as well. thanks. by the way once we change to RichParallelSourceFunction it works much better now. But our real application which using FlinkKafkaConsumer011 already using RichParallelSourceFunction still not able to scale. Any tips to share?) > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1kb in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > > > > public class passthru extends RichMapFunction { > public void open(Configuration configuration) throws Exception { > ... ... > stats = new NonBlockingStatsDClient(); > } > public String map(String value) throws Exception { > ... ... > stats.increment(); > return value; > } > } > public class datagen extends RichSourceFunction { > ... ... > public void run(SourceContext ctx) throws Exception { > int i = 0; > while (run){ > String idx = String.format("%09d", i); > ctx.collect("{\" field>\"}"); > i++; > if(i == loop) > run = false; > } > } > ... ... > } > public class Job { > public static void main(String[] args) throws Exception { > ... ... > DataStream stream = env.addSource(new > datagen(loop)).rebalance(); > DataStream convert = stream.map(new passthru(statsdUrl)); > env.execute("Flink"); > } > } > The reason of this sample test is because of Kafka source > FlinkKafkaConsumer011 facing the same issue which is not scale-able. And > FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always > set kafka partition = total TM #slot. But the result is still capped and not > improve linearly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499393#comment-16499393 ] swy commented on FLINK-9506: Hi Fabian, 1. Yes. 2. Yes. The state is suppose to store in Task Manager's memory, and we ensure they have sufficient memory. Also the checkpoint is disable for this testing. For more information, please refer to https://stackoverflow.com/questions/50587771/flink-reducingstate-impact-performance Any alternative or advice would be appreciated as this is impacting our project. Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9506: --- Attachment: flink.png > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
swy created FLINK-9506: -- Summary: Flink ReducingState.add causing more than 100% performance drop Key: FLINK-9506 URL: https://issues.apache.org/jira/browse/FLINK-9506 Project: Flink Issue Type: Improvement Affects Versions: 1.4.2 Reporter: swy Hi, we found out application performance drop more than 100% when ReducingState.add is used in the source code. In the test checkpoint is disable. And filesystem(hdfs) as statebackend. It could be easyly reproduce with a simple app, without checkpoint, just simply keep storing record, also with simple reduction function(in fact with empty function would see the same result). Any idea would be appreciated. What an unbelievable obvious issue. Basically the app just keep storing record into the state, and we measure how many record per second in "JsonTranslator", which is shown in the graph. The difference between is just 1 line, comment/un-comment "recStore.add(r)". DataStream stream = env.addSource(new GeneratorSource(loop); DataStream convert = stream.map(new JsonTranslator()) .keyBy() .process(new ProcessAggregation()) .map(new PassthruFunction()); public class ProcessAggregation extends ProcessFunction { private ReducingState recStore; public void processElement(Recordr, Context ctx, Collector out) { recStore.add(r); //this line make the difference } Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491720#comment-16491720 ] swy commented on FLINK-9442: ok will post to mailing list as well. thanks. by the way once we change to RichParallelSourceFunction it works much better now. But our real application which using FlinkKafkaConsumer011 already using RichParallelSourceFunction still not able to scale. Any tips to share? > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1kb in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > > > > public class passthru extends RichMapFunction{ > public void open(Configuration configuration) throws Exception { > ... ... > stats = new NonBlockingStatsDClient(); > } > public String map(String value) throws Exception { > ... ... > stats.increment(); > return value; > } > } > public class datagen extends RichSourceFunction { > ... ... > public void run(SourceContext ctx) throws Exception { > int i = 0; > while (run){ > String idx = String.format("%09d", i); > ctx.collect("{\" field>\"}"); > i++; > if(i == loop) > run = false; > } > } > ... ... > } > public class Job { > public static void main(String[] args) throws Exception { > ... ... > DataStream stream = env.addSource(new > datagen(loop)).rebalance(); > DataStream convert = stream.map(new passthru(statsdUrl)); > env.execute("Flink"); > } > } > The reason of this sample test is because of Kafka source > FlinkKafkaConsumer011 facing the same issue which is not scale-able. And > FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always > set kafka partition = total TM #slot. But the result is still capped and not > improve linearly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9442: --- Affects Version/s: 1.4.2 > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1kb in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > > > > public class passthru extends RichMapFunction{ > public void open(Configuration configuration) throws Exception { > ... ... > stats = new NonBlockingStatsDClient(); > } > public String map(String value) throws Exception { > ... ... > stats.increment(); > return value; > } > } > public class datagen extends RichSourceFunction { > ... ... > public void run(SourceContext ctx) throws Exception { > int i = 0; > while (run){ > String idx = String.format("%09d", i); > ctx.collect("{\" field>\"}"); > i++; > if(i == loop) > run = false; > } > } > ... ... > } > public class Job { > public static void main(String[] args) throws Exception { > ... ... > DataStream stream = env.addSource(new > datagen(loop)).rebalance(); > DataStream convert = stream.map(new passthru(statsdUrl)); > env.execute("Flink"); > } > } > The reason of this sample test is because of Kafka source > FlinkKafkaConsumer011 facing the same issue which is not scale-able. And > FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always > set kafka partition = total TM #slot. But the result is still capped and not > improve linearly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9442: --- Description: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1kb in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks. public class passthru extends RichMapFunction{ public void open(Configuration configuration) throws Exception { ... ... stats = new NonBlockingStatsDClient(); } public String map(String value) throws Exception { ... ... stats.increment(); return value; } } public class datagen extends RichSourceFunction { ... ... public void run(SourceContext ctx) throws Exception { int i = 0; while (run){ String idx = String.format("%09d", i); ctx.collect("{\"\"}"); i++; if(i == loop) run = false; } } ... ... } public class Job { public static void main(String[] args) throws Exception { ... ... DataStream stream = env.addSource(new datagen(loop)).rebalance(); DataStream convert = stream.map(new passthru(statsdUrl)); env.execute("Flink"); } } The reason of this sample test is because of Kafka source FlinkKafkaConsumer011 facing the same issue which is not scale-able. And FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always set kafka partition = total TM #slot. But the result is still capped and not improve linearly. was: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1kb in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks. > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1kb in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > > > > public class passthru extends RichMapFunction { > public void open(Configuration configuration) throws Exception { > ... ... > stats = new NonBlockingStatsDClient(); > } > public String map(String value) throws Exception { > ... ... > stats.increment(); > return value; >
[jira] [Updated] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9442: --- Description: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1kb in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks. was: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1000k in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks. > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1kb in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9442) Flink Scaling not working
[ https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] swy updated FLINK-9442: --- Description: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1000k in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks. was: Hi, We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments. Test Details, -VMWare vsphere -Just a simple pass through test, - auto gen source 3mil records, each 1000k in size, parallelism=1 - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6 - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory - Result: - 2 slots: 26 seconds, 3mil/26=115k TPS - 4 slots: 23 seconds, 3mil/23=130k TPS - 6 slots: 22 seconds, 3mil/22=136k TPS As shown the scaling is almost nothing. Any clue? Thanks. > Flink Scaling not working > - > > Key: FLINK-9442 > URL: https://issues.apache.org/jira/browse/FLINK-9442 > Project: Flink > Issue Type: Bug >Reporter: swy >Priority: Major > > Hi, > > We are in the middle of testing scaling ability of Flink. But we found that > scaling not working, no matter increase more slot or increase number of Task > Manager. We would expect a linear, if not close-to-linear scaling performance > but the result even show degradation. Appreciated any comments. > > Test Details, > > -VMWare vsphere > -Just a simple pass through test, > - auto gen source 3mil records, each 1000k in size, parallelism=1 > - source pass into next map operator, which just return the same record, > and sent counter to statsD, parallelism is in cases = 2,4,6 > - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory > - Result: > - 2 slots: 26 seconds, 3mil/26=115k TPS > - 4 slots: 23 seconds, 3mil/23=130k TPS > - 6 slots: 22 seconds, 3mil/22=136k TPS > > As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? > Thanks. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)