[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510872#comment-16510872 ] swy commented on FLINK-9506: no problem an email also sent to ML. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510870#comment-16510870 ] Stefan Richter commented on FLINK-9506: --- [~yow] I would suggest that you discuss it on the user mailing list so that (in the spirit of open source) others can potentially benefit from it as well. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510864#comment-16510864 ] swy commented on FLINK-9506: Hi [~sihuazhou] I just sent an email to your personal mail box. Let's continue there and please let me know if no received. thank you! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510828#comment-16510828 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow], I didn't see the email you sent yet, but I just had a look at your code, I think the "non-scale-able" might be caused by your test code. From your code we could see that the source's parallelism is always the same as the other operators. And in the each sub-task of the source, you use the loop to mock the source records, that means the QPS of the source will increase when you trying to rescale up the parallelism of your job, in the end, you didn't scale up anything indeed. I would suggest to set the parallelism of the source to a fixed value(e.g. 4), and scale up the job, then let's see whether it's scalable. I didn't test your code on cluster yet, will test it later. My email is "summerle...@163.com", if you had problem to send email to "u...@flink.apache.org", you could send to me personally if you want. Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510789#comment-16510789 ] swy commented on FLINK-9506: Hi [~sihuazhou] an email is sent to "u...@flink.apache.org" with title "Flink performance fluctuation with ListState.add()". Actually it is still very related to this ticket, which is when State is in use, performance drop because of fluctuation and non-scale-able. Let's keep this ticket open for some more time, but we can discuss further over there. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510757#comment-16510757 ] Sihua Zhou commented on FLINK-9506: --- Hi, I think you could send your question to the "u...@flink.apache.org". > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510752#comment-16510752 ] swy commented on FLINK-9506: Alright i am ok to close the ticket. Do you mind to share me the information on where to continue the discussion? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510750#comment-16510750 ] Sihua Zhou commented on FLINK-9506: --- I'm + 1 to close this ticket and move to ML, we can definitely continue the discussion there, I will try out your code, and give feedbacks tonight, please bear me several hours. Cause I currently busy with something others. Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510747#comment-16510747 ] Stefan Richter commented on FLINK-9506: --- Hi, I think this discussion has no connection with this issue anymore. Please close the ticket and take the discussion to the ML. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510746#comment-16510746 ] swy commented on FLINK-9506: Hi [~sihuazhou] it surprise me as well. Please advice if something wrong in the code especially to rocksdb settings. Also if you don't mind you can run the test in your lab and share the performance. To run the application just use the following sample parameters, aggrinterval=60 loop=500 auditinterval=3 statsd=1 URL=do36.mycompany.com:8127 thank you! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510716#comment-16510716 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] "If we further comment out recordStore.add() then everything works well, no more fluctuation" this surprised me, because for RocksDB backend the `listState.add()` is just merge(just put without any reading) the record into db, it's cheap in my mind. I had downloaded your code. Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510715#comment-16510715 ] swy commented on FLINK-9506: Hi [~sihuazhou] please download source from https://github.com/swyow/flink_app_parser_git_01 Could you please advice if this is the right way? It should be quite straight forward already in this sample test app. Please let me know if you finish download as I need to remove it from git. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510704#comment-16510704 ] swy commented on FLINK-9506: Hi [~sihuazhou] the picture is captured without any line being commented-out. Yes the comment-out means no code in onTimer(). If we further comment out recordStore.add() then everything works well, no more fluctuation, just the input will be stopped if we output something in the processElement() instead(is this something expected?). 16 parallelism is used. And rocksdb backend is loaded successfully as mentioned in the log. Any optimization could be done on Rocksdb? Or any suggestion on hashCode? Is there a way to generate a unique hash? thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510649#comment-16510649 ] Sihua Zhou commented on FLINK-9506: --- Additional, do you make sure that you are using the RocksDb backend? Could you find the "Initializing RocksDB keyed state backend" in the TaskManager's log? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510647#comment-16510647 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] in the input_stop_when_timer_run.png, does the yellow line mean QPS of input, and the green line mean QPS of output? If this picture is captured when the onTimer is uncomment out, then it didn't surprise me, but if the picture is captured when the content of onTimer is commented out, then it surprised me a bit. And you mentioned that, when the content of onTimer is commented out, the Fluctuation still exists. Does the commented out means that there is nothing in the onTimer()? If yes, I think it surprised me and for an additional could you also comment out the `recordStore.add()` in processElement(). If both the content of onTimer() and the `recordStore.add()` are commented out and the Fluctuation still there, I think the problem is related to the timer, because of the GC. And I'm curious about the QPS of source for you job? and the degree of the parallelism of your job? Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, > input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510632#comment-16510632 ] swy commented on FLINK-9506: Hi [~sihuazhou] to your questions, 1. Checkpoint is disable at the moment. But in flink_config.yaml incremental checkpoint is enable for rocksdb. 2. No different even comment out onTimer content. 3. Please refer to below for sample code in 'ProcessAggregation' public void processElement(Record r, Context ctx, Collector out) throws Exception { recordStore.add(r); Record auditRec = new Record(); auditRec.setAuditOnly(true); auditRec.setInput_id(r.getInput_id()); auditRec.setInput_type(r.getInput_type()); auditRec.setOutput_id(r.getOutput_id()); auditRec.setOutput_type(r.getOutput_type()); auditRec.setAddkey(r.getAddkey()); auditRec.setSource_id(r.getSource_id()); auditRec.setINPUT_LINK(r.getINPUT_LINK()); auditRec.setFilename(r.getFilename()); auditRec.setOUTPUT_LINK(r.getOUTPUT_LINK()); auditRec.setEL_COUNTER_IN(1); auditRec.setEL_COUNTER_STORED(1); out.collect(auditRec); ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() + aggrWindowsIntervalMs) / 1000) * 1000); if(countMarker != null) countMarker.count(); } public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { Iterable records = recordStore.get(); int primary_units = 0; int secondary_units = 0; int tertiary_units = 0; int numReduce = -1; Record lastRecord = null; for (Record rec : records) { primary_units += rec.getI_PRIMARY_UNITS(); secondary_units += rec.getI_SECONDARY_UNITS(); tertiary_units += rec.getI_TERTIARY_UNITS(); lastRecord = rec; numReduce++; } if(lastRecord != null) { lastRecord.setI_PRIMARY_UNITS(primary_units); lastRecord.setI_SECONDARY_UNITS(secondary_units); lastRecord.setI_TERTIARY_UNITS(tertiary_units); lastRecord.setPARTIALS_COMBINED_(numReduce); lastRecord.setEL_COUNTER_RETRIEVED(1); lastRecord.setEL_COUNTER_REDUCED(numReduce); out.collect(lastRecord); } recordStore.clear(); } There is a new observation that when timer is running, which is used to flush record, the input from source will stop. Please refer to new attachment 'input_stop_when_timer_run.png'. Is this something expected? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510580#comment-16510580 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] I don't think this a limitation in Flink, we have more complex with terrible data flow on production but flink supports it very well. Let look into your case deeper. - Did you enable the checkpoint now? if yes, are you using incremental checkpoint? and what the checkpoint interval? - could you try to comment the code that related to the accumulation in the `onTimer` and have a try? Specially, comment the line "listState.get()" - Is it possible that you could somehow provide some code that related to the `ProcessAggregation` that you are using currentlly? Thanks > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510575#comment-16510575 ] swy commented on FLINK-9506: Hi [~sihuazhou] you are right. Fluctuation only happened when "keyby" with State. "keyby" without State is quite stable. Unfortunately hashcode() is not good enough for us as we need to aggregate billing data, which require high accuracy. We are now using ListState, and keyby with a 50 characters value, it seems a reasonable use case to me. Can I consider this a limitation in Flink? Otherwise any advice to get through? Rocksdb already in used(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) as suggested. Thank you for your support [~sihuazhou]. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509818#comment-16509818 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] What do you means by the empty ProcessAggregation? Did you use any state in the empty ProcessAggregation? Or could you somehow provide some code of the empty ProcessAggregation? I think the picture related to he keyNoHash vs KeyHash is what I expected. Without hash() the key's length is only 4 bytes and the distribution is uniform, without hash your key's length is 50 and also the distribution maybe not uniform. But with the hash() approach you could only get a approximate result, if that is enough for you then I think it's good to go now, is it not enough for you? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509808#comment-16509808 ] swy commented on FLINK-9506: Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())) .name("AggregationDuration: " + aggrDuration +"ms"); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation not the root caused of fluctuation, no difference between empty logic or logic with ListState in ProcessAggregation. Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507815#comment-16507815 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] I think we can close this ticket now, do you agree? Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504820#comment-16504820 ] swy commented on FLINK-9506: thank for response [~sihuazhou], the key length is around 50 chars. We will change to hashCode as suggested and test again :) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504729#comment-16504729 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] From the top of my head, I list answers here: - >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? RocksDB is needed to setup in every sub-tasks that use the KeyedState if you are using RocksDB backend. - >> 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. Q1. I think the default configuration of the RocksDB backend is quite good for the most of the jobs. Q2. I'm not sure whether I got you correctly, the savepoint is triggered manually, and checkpoint is triggered automatically, you means that you trigger the savepoint manually periodically? - >> 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? AFAIK, RocksDB's options need to set in source code if you need to special it. The default parallelism of the operator can be configured in flink-conf.yaml - >> 4. related to your RocksDB config. I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I think that is ok, and also I didn't see your checkpoint interval... BTW, you said you are using the {{r.getUNIQUE_KEY();}} as the key, I'm a bit curious about it's length in general. If it's too long and if you don't need an exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, that may also help to improve the performance. And in fact, I also agree with [~kkrugler] that this type of question is best asked in the user mail list, that way more people could take part in and you might also get more ideals from them. ;) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504675#comment-16504675 ] swy commented on FLINK-9506: [~sihuazhou] your idea is brilliant, but the first test result is not show too much of change surprisingly. Let's us do more test to confirm. But thank you! Do you mind to answer my questions above regarding to Rocksdb setup? I believe it is crucial in this performance test. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503505#comment-16503505 ] Sihua Zhou commented on FLINK-9506: --- [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it not put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key very seconds, you only need to do one read operation of RocksDB. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503426#comment-16503426 ] swy commented on FLINK-9506: [~kkrugler] please don't close the ticket yet. Because the performance degradation still happen, it is just a bit better in fluctuation after Rocksdb is deployed. We need to get Rocksdb setup correctly, hopefully performance drop will not happen in busty pattern if statebackend is working properly. Would appreciated to get advice here. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503401#comment-16503401 ] Ken Krugler commented on FLINK-9506: @swy - the questions about setting up RocksDB, and your configuration, are best asked on the mailing list versus as a comment on this issue. Also, it looks like this issue can be closed as not a problem now, do you agree? Thanks! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503284#comment-16503284 ] swy commented on FLINK-9506: [~srichter] Thanks for tips, after implement Rocksdb the performance seems much more scale-able now, and a little bit less fluctuation. I have few questions related to rocksdb. 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? 4. Below is the configuration we are using, could you please comment if something not right? env.getConfig().enableObjectReuse(); env.getConfig().setUseSnapshotCompression(true); RocksDBStateBackend rocksdb = new RocksDBStateBackend(new FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true); env.setStateBackend(rocksdb); //rocksdb.setOptions(new RocksdbOptions()); rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); Or in flink_config.yaml: state.backend: rocksdb state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints state.backend.incremental: true state.backend.async: true state.checkpoints.num-retained: 5 state.savepoints.dir: file:///tmp/rocksdb_simple_example/savepoints Thank you in advance! > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500211#comment-16500211 ] swy commented on FLINK-9506: [~srichter] Really appreciated your help. To your points, *[First, when using the FSStateBackend, try to set asynchronous checkpoints to true]* We are using 1.4.2 now, is "state.backend.async" available? Just found this parameter available in 1.5 according to Flink website. *[your comparison does not consider that in case that you are using the reducing state, out.collect(output); in onTimer produces an output and not just forwards null]* This again is sample app issue, in real application we have a NULL check to prevent NULL record to next operator. *[you can think about the object reuse setting env.getConfig().enableObjectReuse()]* This is good idea indeed, will try it out. *[And you can also make your AggregationKey much more efficient]* Agree. Now only has one key from one field in Record structure, .keyBy(new KeySelector() { @Override public String getKey(Record r) throws Exception { return r.getUNIQUE_KEY(); } }) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500165#comment-16500165 ] Stefan Richter commented on FLINK-9506: --- [~yow] I had another look at your code and can point out a number of inefficiencies that also sum up to a bigger difference. I also suggest you update to a newer Flink version >= 1.4. First, when using the {{FSStateBackend}}, try to set asynchronous checkpoints to true. This will change the implementation to something that is often a bit more efficient. Next, your comparison does not consider that in case that you are using the reducing state, {{out.collect(output);}} in {{onTimer}} produces an output and not just forwards {{null}}. Furthermore, you can think about the object reuse setting {{env.getConfig().enableObjectReuse();}}. And you can also make your {{AggregationKey}} much more efficient, e.g. storing a single {{char[]}} in which you concatenate all 3 input strings and a cached hashcode, instead of 3 strings. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499972#comment-16499972 ] swy commented on FLINK-9506: [~srichter] thanks for good explanation. We have no choice but need to store that much of record(10 mil to 1 bil, record size of 1kb) for aggregation(multiple fields aggr) because of business requirement. Due to the large state, RocksDB will be setup for production. We know passing/accessing object to/from RocksDB is not easy to performance but hopefully at least the result is scale-able. In our case now, not only the result is fluctuated, the scaling performance also capped, this is the real pain. Do you mind to share your thought on how to handle such case? Or any similar use case to share? Next Step, we are going to tune memory related configuration, and also setup RocksDB hopefully the performance is scale-able. Thanks again. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499947#comment-16499947 ] Stefan Richter commented on FLINK-9506: --- >From what I can see, the problem is purely related to garbage collection. When >you use reducing state, your json objects are accumulated on the heap for >longer time and are no longer short lived. The observed performance variance >is caused by the GC activity, and the job is indeed producing and releasing >big amounts of nested structures of small objects. You can think about tuning >GC and memory/allocation related settings, or think about a different >in-memory representation (object layout) of your state data. With RocksDB, the >GC problem would surely go away, but passing your large objects through ser/de >on every access/update will not be a piece of cake as well for performance. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499873#comment-16499873 ] Sihua Zhou commented on FLINK-9506: --- [~yow] I think RocksDB backend could give a more stable performance, but the peak performance may be reduced, anyway I think it worths a a try > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499869#comment-16499869 ] swy commented on FLINK-9506: Any workaround would be appreciated, do you think using rocksdb help in such case? Or any other magic to make it work? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499867#comment-16499867 ] Sihua Zhou commented on FLINK-9506: --- Thanks for trying it out. Then I think the performance drop and the fluctuation might be caused by the state lookup, and since you are using the KeyedStateBackend base on Heap, I think the fluctuation might caused by the capacity rescale of the "Hash Map", but I think the impaction should not be that obvious... Maybe [~srichter] could give some more useful and professional information... > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499859#comment-16499859 ] swy commented on FLINK-9506: [~sihuazhou] Your tricks quite promising as the performance has been improved very much, and in a more stable pattern. Please refer to attach "KeyNoHash_VS_KeyHash.png", left hand side is fluctuation pattern "before the change" while the right hand side is "after the change". .keyBy(new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().*hashCode() % 128*; } }) However, the change also affected process timer as the record cannot be flushed, or partially flushed even the schedule reached. Any advice? Thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499672#comment-16499672 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] could you please just replace the getKey() as follow and give a try? {code} new KeySelector() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hash() / 128; } } {code} if this is work then I think the performance drop may cause by the state lookup. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499667#comment-16499667 ] swy commented on FLINK-9506: Thanks for pointing out, this is a mistake in the sample. In real application we just return the record object directly without any 'new'. I have removed the 'new' from sample and test it again but the performance still same. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499560#comment-16499560 ] Ken Krugler commented on FLINK-9506: Why are you creating a new {{AggregrationFunction}} every time {{reduce()}} is called in your {{ReducingState}} implementation? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499445#comment-16499445 ] swy commented on FLINK-9506: what we want to know is: Is this something expected or something wrong in our code? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499412#comment-16499412 ] swy commented on FLINK-9506: Hi [~sihuazhou] the keyby is comes from one of the private member in POJO. .keyBy(new KeySelector() { @Override public String getKey(Record r) throws Exception { return r.getUNIQUE_KEY(); } }) There is sample code https://github.com/swyow/flink_tester. Just JsonObj is replaced with simple POJO 'Record' now, which contain 50 String private member. thanks. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499395#comment-16499395 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] could you please give some information of the `keyBy()`? e.g. what are you keyed by in keyBy()? Is it also a POJO that with 50 string member or something others? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499393#comment-16499393 ] swy commented on FLINK-9506: Hi Fabian, 1. Yes. 2. Yes. The state is suppose to store in Task Manager's memory, and we ensure they have sufficient memory. Also the checkpoint is disable for this testing. For more information, please refer to https://stackoverflow.com/questions/50587771/flink-reducingstate-impact-performance Any alternative or advice would be appreciated as this is impacting our project. Thank you. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499356#comment-16499356 ] Fabian Hueske commented on FLINK-9506: -- Thanks for opening this issue. I have a few questions: 1) Are you comparing a function that doesn't do anything against a function that uses {{ReducingState}}? 2) From your description I assume, you are using the {{FSStateBackend}}, correct? > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)