[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510872#comment-16510872
 ] 

swy commented on FLINK-9506:


no problem an email also sent to ML.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510870#comment-16510870
 ] 

Stefan Richter commented on FLINK-9506:
---

[~yow] I would suggest that you discuss it on the user mailing list so that (in 
the spirit of open source) others can potentially benefit from it as well.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510864#comment-16510864
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] I just sent an email to your personal mail box. Let's continue 
there and please let me know if no received. thank you!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510828#comment-16510828
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow], I didn't see the email you sent yet, but I just had a look at your 
code, I think the "non-scale-able"  might be caused by your test code. From 
your code we could see that the source's parallelism is always the same as the 
other operators. And in the each sub-task of the source, you use the loop to 
mock the source records, that means the QPS of the source will increase when 
you trying to rescale up the parallelism of your job, in the end, you didn't 
scale up anything indeed. I would suggest to set the parallelism of the source 
to a fixed value(e.g. 4), and scale up the job, then let's see whether it's 
scalable. I didn't test your code on cluster yet, will test it later. My email 
is "summerle...@163.com", if you had problem to send email to 
"u...@flink.apache.org", you could send to me personally if you want. Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510789#comment-16510789
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] an email is sent to "u...@flink.apache.org" with title "Flink 
performance fluctuation with ListState.add()". Actually it is still very 
related to this ticket, which is when State is in use, performance drop because 
of fluctuation and non-scale-able. Let's keep this ticket open for some more 
time, but we can discuss further over there. thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510757#comment-16510757
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi, I think you could send your question to the "u...@flink.apache.org".

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510752#comment-16510752
 ] 

swy commented on FLINK-9506:


Alright i am ok to close the ticket. Do you mind to share me the information on 
where to continue the discussion?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510750#comment-16510750
 ] 

Sihua Zhou commented on FLINK-9506:
---

I'm + 1 to close this ticket and move to ML, we can definitely continue the 
discussion there, I will try out your code, and give feedbacks tonight, please 
bear me several hours. Cause I currently busy with something others. Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510747#comment-16510747
 ] 

Stefan Richter commented on FLINK-9506:
---

Hi, I think this discussion has no connection with this issue anymore. Please 
close the ticket and take the discussion to the ML.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510746#comment-16510746
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] it surprise me as well. Please advice if something wrong in the 
code especially to rocksdb settings. Also if you don't mind you can run the 
test in your lab and share the performance. To run the application just use the 
following sample parameters,
aggrinterval=60 loop=500 auditinterval=3 statsd=1 
URL=do36.mycompany.com:8127
thank you!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510716#comment-16510716
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] "If we further comment out recordStore.add() then everything works 
well, no more fluctuation" this surprised me, because for RocksDB backend the 
`listState.add()` is just merge(just put without any reading) the record into 
db, it's cheap in my mind. I had downloaded your code. Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510715#comment-16510715
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] please download source from 
https://github.com/swyow/flink_app_parser_git_01
Could you please advice if this is the right way? It should be quite straight 
forward already in this sample test app. Please let me know if you finish 
download as I need to remove it from git. thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-13 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510704#comment-16510704
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] the picture is captured without any line being commented-out. 
Yes the comment-out means no code in onTimer(). If we further comment out 
recordStore.add() then everything works well, no more fluctuation, just the 
input will be stopped if we output something in the processElement() instead(is 
this something expected?). 16 parallelism is used. And rocksdb backend is 
loaded successfully as mentioned in the log. Any optimization could be done on 
Rocksdb? Or any suggestion on hashCode? Is there a way to generate a unique 
hash? thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510649#comment-16510649
 ] 

Sihua Zhou commented on FLINK-9506:
---

Additional, do you make sure that you are using the RocksDb backend? Could you 
find the "Initializing RocksDB keyed state backend" in the TaskManager's log?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510647#comment-16510647
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] in the input_stop_when_timer_run.png, does the yellow line mean QPS 
of input, and the green line mean QPS of output? If this picture is captured 
when the onTimer is uncomment out, then it didn't surprise me, but if the 
picture is captured when the content of onTimer is commented out, then it 
surprised me a bit.

And you mentioned that, when the content of onTimer is commented out, the 
Fluctuation still exists. Does the commented out means that there is nothing in 
the onTimer()? If yes, I think it surprised me and for an additional could you 
also comment out the `recordStore.add()` in processElement(). If both the 
content of onTimer() and the `recordStore.add()` are commented out and the 
Fluctuation still there, I think the problem is related to the timer, because 
of the GC.

And I'm curious about the QPS of source for you job? and the degree of the 
parallelism of your job?

Thanks~



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, 
> input_stop_when_timer_run.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510632#comment-16510632
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] to your questions,
1. Checkpoint is disable at the moment. But in flink_config.yaml incremental 
checkpoint is enable for rocksdb.
2. No different even comment out onTimer content.
3. Please refer to below for sample code in 'ProcessAggregation'
public void processElement(Record r, Context ctx, Collector out)
throws Exception {
recordStore.add(r);

Record auditRec = new Record();
auditRec.setAuditOnly(true);
auditRec.setInput_id(r.getInput_id());
auditRec.setInput_type(r.getInput_type());
auditRec.setOutput_id(r.getOutput_id());
auditRec.setOutput_type(r.getOutput_type());
auditRec.setAddkey(r.getAddkey());
auditRec.setSource_id(r.getSource_id());
auditRec.setINPUT_LINK(r.getINPUT_LINK());
auditRec.setFilename(r.getFilename());
auditRec.setOUTPUT_LINK(r.getOUTPUT_LINK());

auditRec.setEL_COUNTER_IN(1);
auditRec.setEL_COUNTER_STORED(1);

out.collect(auditRec);


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + aggrWindowsIntervalMs) / 1000) * 1000);
if(countMarker != null) countMarker.count();
}

public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out)
throws Exception {
Iterable records = recordStore.get();
int primary_units = 0;
int secondary_units = 0;
int tertiary_units = 0;
int numReduce = -1;
Record lastRecord = null;
for (Record rec : records) {
primary_units += rec.getI_PRIMARY_UNITS();
secondary_units += rec.getI_SECONDARY_UNITS();
tertiary_units += rec.getI_TERTIARY_UNITS();
lastRecord = rec;
numReduce++;
}
if(lastRecord != null) {
lastRecord.setI_PRIMARY_UNITS(primary_units);
lastRecord.setI_SECONDARY_UNITS(secondary_units);
lastRecord.setI_TERTIARY_UNITS(tertiary_units);
lastRecord.setPARTIALS_COMBINED_(numReduce);

lastRecord.setEL_COUNTER_RETRIEVED(1);
lastRecord.setEL_COUNTER_REDUCED(numReduce);
out.collect(lastRecord);
}
recordStore.clear();
}


There is a new observation that when timer is running, which is used to flush 
record, the input from source will stop. Please refer to new attachment 
'input_stop_when_timer_run.png'. Is this something expected?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510580#comment-16510580
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] I don't think this a limitation in Flink, we have more complex with 
terrible data flow on production but flink supports it very well. Let look into 
your case deeper. 

- Did you enable the checkpoint now? if yes, are you using incremental 
checkpoint? and what the checkpoint interval?
- could you try to comment the code that related to the accumulation in the 
`onTimer` and have a try? Specially, comment the line "listState.get()"
- Is it possible that you could somehow provide some code that related to the 
`ProcessAggregation` that you are using currentlly?

Thanks

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510575#comment-16510575
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] you are right. Fluctuation only happened when "keyby" with 
State. "keyby" without State is quite stable. Unfortunately hashcode() is not 
good enough for us as we need to aggregate billing data, which require high 
accuracy. We are now using ListState, and keyby with a 50 characters value, it 
seems a reasonable use case to me. Can I consider this a limitation in Flink? 
Otherwise any advice to get through? Rocksdb already in 
used(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) as suggested. Thank 
you for your support [~sihuazhou].

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509818#comment-16509818
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any 
state in the empty ProcessAggregation? Or could you somehow provide some code 
of the empty ProcessAggregation?

I think the picture related to he keyNoHash vs KeyHash is what I expected. 
Without hash() the key's length is only 4 bytes and the distribution is 
uniform, without hash your key's length is 50 and also the distribution maybe 
not uniform. But with the hash() approach you could only get a approximate 
result, if that is enough for you then I think it's good to go now, is it not 
enough for you?


> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-12 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509808#comment-16509808
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] I hope to close the ticket too but the problem still persists 
even though reduction state is no more in used, with ListState as replacement 
as suggested by you. However, further investigation show the problem caused by 
"KeyBy" instead. Please refer to KeyBy.png, 

1. the first run without KeyBy
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 

2. the second run with KeyBy and with ProcessAggregation logic(the logic using 
ListState to store all record and will be sum up when timer triggered)
DataStream AggregatedRecordWithAuditStream = sourceStringStream
.map(new 
JsonToRecordTranslator().name("JsonRecTranslator") 
.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().hashCode() * 
31; 
}
}) 
.process(new ProcessAggregation(aggrDuration, 
markerFactory.getMarker(), markerFactory.getMarker()))
.name("AggregationDuration: " + aggrDuration +"ms");

3. the third run is with KeyBy and empty ProcessAggregation logic.

The result show ProcessAggregation not the root caused of fluctuation, no 
difference between empty logic or logic with ListState in ProcessAggregation. 
Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-11 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507815#comment-16507815
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] I think we can close this ticket now, do you agree? Thanks~

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504820#comment-16504820
 ] 

swy commented on FLINK-9506:


thank for response [~sihuazhou], the key length is around 50 chars. We will 
change to hashCode as suggested and test again :)

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504729#comment-16504729
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] From the top of my head, I list answers here:

- >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any 
other option?

RocksDB is needed to setup in every sub-tasks that use the KeyedState if you 
are using RocksDB backend.

- >> 2. What is the recommendation for RocksDB's statebackend? We are using 
tmpfs with checkpoint now with savepoint persists to hdfs.

Q1. I think the default configuration of the RocksDB backend is quite good for 
the most of the jobs.
Q2. I'm not sure whether I got you correctly, the savepoint is triggered 
manually, and checkpoint is triggered automatically, you means that you trigger 
the savepoint manually periodically?

- >> 3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?

AFAIK, RocksDB's options need to set in source code if you need to special it. 
The default parallelism of the operator can be configured in flink-conf.yaml

- >> 4. related to your RocksDB config.

I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the 
checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I 
think that is ok, and also I didn't see your checkpoint interval...

BTW, you said you are using the {{r.getUNIQUE_KEY();}}  as the key, I'm a bit 
curious about it's length in general. If it's too long and if you don't need an 
exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, 
that may also help to improve the performance. And in fact, I also agree with 
[~kkrugler] that this type of question is best asked in the user mail list, 
that way more people could take part in and you might also get more ideals from 
them. ;)



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504675#comment-16504675
 ] 

swy commented on FLINK-9506:


[~sihuazhou] your idea is brilliant, but the first test result is not show too 
much of change surprisingly. Let's us do more test to confirm. But thank you! 
Do you mind to answer my questions above regarding to Rocksdb setup? I believe 
it is crucial in this performance test.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505
 ] 

Sihua Zhou commented on FLINK-9506:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503426#comment-16503426
 ] 

swy commented on FLINK-9506:


[~kkrugler] please don't close the ticket yet. Because the performance 
degradation still happen, it is just a bit better in fluctuation after Rocksdb 
is deployed. We need to get Rocksdb setup correctly, hopefully performance drop 
will not happen in busty pattern if statebackend is working properly. Would 
appreciated to get advice here.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503401#comment-16503401
 ] 

Ken Krugler commented on FLINK-9506:


@swy - the questions about setting up RocksDB, and your configuration, are best 
asked on the mailing list versus as a comment on this issue. Also, it looks 
like this issue can be closed as not a problem now, do you agree? Thanks!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503284#comment-16503284
 ] 

swy commented on FLINK-9506:


[~srichter] Thanks for tips, after implement Rocksdb the performance seems much 
more scale-able now, and a little bit less fluctuation. I have few questions 
related to rocksdb. 
1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other 
option?
2. What is the recommendation for RocksDB's statebackend? We are using tmpfs 
with checkpoint now with savepoint persists to hdfs.
3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?
4. Below is the configuration we are using, could you please comment if 
something not right?
env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
RocksDBStateBackend rocksdb = new RocksDBStateBackend(new 
FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true);
env.setStateBackend(rocksdb);
//rocksdb.setOptions(new RocksdbOptions());

rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);


Or in flink_config.yaml:
state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  file:///tmp/rocksdb_simple_example/savepoints


Thank you in advance! 

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500211#comment-16500211
 ] 

swy commented on FLINK-9506:


[~srichter] Really appreciated your help. To your points,

*[First, when using the FSStateBackend, try to set asynchronous checkpoints to 
true]*
We are using 1.4.2 now, is "state.backend.async" available? Just found this 
parameter available in 1.5 according to Flink website. 

*[your comparison does not consider that in case that you are using the 
reducing state, out.collect(output); in onTimer produces an output and not just 
forwards null]*
This again is sample app issue, in real application we have a NULL check to 
prevent NULL record to next operator.

*[you can think about the object reuse setting 
env.getConfig().enableObjectReuse()]*
This is good idea indeed, will try it out.

*[And you can also make your AggregationKey much more efficient]*
Agree. Now only has one key from one field in Record structure,

.keyBy(new KeySelector() {
@Override
public String getKey(Record r) throws Exception
{ return r.getUNIQUE_KEY(); }

})




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500165#comment-16500165
 ] 

Stefan Richter commented on FLINK-9506:
---

[~yow] I had another look at your code and can point out a number of 
inefficiencies that also sum up to a bigger difference. I also suggest you 
update to a newer Flink version >= 1.4.
First, when using the {{FSStateBackend}}, try to set asynchronous checkpoints 
to true. This will change the implementation to something that is often a bit 
more efficient. Next, your comparison does not consider that in case that you 
are using the reducing state, {{out.collect(output);}} in {{onTimer}} produces 
an output and not just forwards {{null}}. Furthermore, you can think about the 
object reuse setting {{env.getConfig().enableObjectReuse();}}. And you can also 
make your {{AggregationKey}} much more efficient, e.g. storing a single 
{{char[]}} in which you concatenate all 3 input strings and a cached hashcode, 
instead of 3 strings.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499972#comment-16499972
 ] 

swy commented on FLINK-9506:


[~srichter] thanks for good explanation. We have no choice but need to store 
that much of record(10 mil to 1 bil, record size of 1kb) for 
aggregation(multiple fields aggr) because of business requirement. Due to the 
large state, RocksDB will be setup for production. We know passing/accessing 
object to/from RocksDB is not easy to performance but hopefully at least the 
result is scale-able. In our case now, not only the result is fluctuated, the 
scaling performance also capped, this is the real pain. Do you mind to share 
your thought on how to handle such case? Or any similar use case to share?

Next Step, we are going to tune memory related configuration, and also setup 
RocksDB hopefully the performance is scale-able. Thanks again.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499947#comment-16499947
 ] 

Stefan Richter commented on FLINK-9506:
---

>From what I can see, the problem is purely related to garbage collection. When 
>you use reducing state, your json objects are accumulated on the heap for 
>longer time and are no longer short lived. The observed performance variance 
>is caused by the GC activity, and the job is indeed producing and releasing 
>big amounts of nested structures of small objects. You can think about tuning 
>GC and memory/allocation related settings, or think about a different 
>in-memory representation (object layout) of your state data. With RocksDB, the 
>GC problem would surely go away, but passing your large objects through ser/de 
>on every access/update will not be a piece of cake as well for performance.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499873#comment-16499873
 ] 

Sihua Zhou commented on FLINK-9506:
---

[~yow] I think RocksDB backend could give a more stable performance, but the 
peak performance may be reduced, anyway I think it worths a a try

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499869#comment-16499869
 ] 

swy commented on FLINK-9506:


Any workaround would be appreciated, do you think using rocksdb help in such 
case? Or any other magic to make it work? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499867#comment-16499867
 ] 

Sihua Zhou commented on FLINK-9506:
---

Thanks for trying it out. Then I think the performance drop and the fluctuation 
might be caused by the state lookup, and since you are using the 
KeyedStateBackend base on Heap, I think the fluctuation might caused by the 
capacity rescale of the "Hash Map", but  I think the impaction should not be 
that obvious... Maybe [~srichter] could give some more useful and professional 
information...

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-04 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499859#comment-16499859
 ] 

swy commented on FLINK-9506:


[~sihuazhou] Your tricks quite promising as the performance has been improved 
very much, and in a more stable pattern. Please refer to attach 
"KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the 
change" while the right hand side is "after the change".

.keyBy(new KeySelector() {
@Override
public Integer getKey(Record r) throws 
Exception {
return r.getUNIQUE_KEY().*hashCode() % 
128*; 
}
})  

However, the change also affected process timer as the record cannot be 
flushed, or partially flushed even the schedule reached. Any advice? Thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499672#comment-16499672
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] could you please just replace the getKey() as follow and give a try?
{code}
new KeySelector() {
@Override
public Integer getKey(Record r) throws Exception { 
return r.getUNIQUE_KEY().hash() / 128; 
}
}
{code}

if this is work then I think the performance drop may cause by the state lookup.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499667#comment-16499667
 ] 

swy commented on FLINK-9506:


Thanks for pointing out, this is a mistake in the sample. In real application 
we just return the record object directly without any 'new'. I have removed the 
'new' from sample and test it again but the performance still same.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499560#comment-16499560
 ] 

Ken Krugler commented on FLINK-9506:


Why are you creating a new {{AggregrationFunction}} every time {{reduce()}} is 
called in your {{ReducingState}} implementation?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499445#comment-16499445
 ] 

swy commented on FLINK-9506:


what we want to know is: Is this something expected or something wrong in our 
code?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499412#comment-16499412
 ] 

swy commented on FLINK-9506:


Hi [~sihuazhou] the keyby is comes from one of the private member in POJO.
.keyBy(new KeySelector() {
@Override
public String getKey(Record r) throws Exception 
{
return r.getUNIQUE_KEY();
}
})

There is sample code https://github.com/swyow/flink_tester. Just JsonObj is 
replaced with simple POJO 'Record' now, which contain 50 String private member. 
thanks.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499395#comment-16499395
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] could you please give some information of the `keyBy()`? e.g. what 
are you keyed by in keyBy()? Is it also a POJO that with 50 string member or 
something others?

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499393#comment-16499393
 ] 

swy commented on FLINK-9506:


Hi Fabian,
1. Yes.
2. Yes. The state is suppose to store in Task Manager's memory, and we ensure 
they have sufficient memory. Also the checkpoint is disable for this testing.

For more information, please refer to
https://stackoverflow.com/questions/50587771/flink-reducingstate-impact-performance

Any alternative or advice would be appreciated as this is impacting our 
project. Thank you.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499356#comment-16499356
 ] 

Fabian Hueske commented on FLINK-9506:
--

Thanks for opening this issue. I have a few questions:

1) Are you comparing a function that doesn't do anything against a function 
that uses {{ReducingState}}?
2) From your description I assume, you are using the {{FSStateBackend}}, 
correct?



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)