[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654522#comment-16654522 ] Saikat Maitra edited comment on IGNITE-3303 at 10/18/18 1:39 AM: - Hi [~agoncharuk] Can you please review and let me know if the PR looks good to merge. I have completed the requested changes. https://github.com/apache/ignite/pull/870/files Regards, Saikat was (Author: samaitra): Hi [~agoncharuk] Can you please review and let me know if the PR looks good to merge. I have completed the requested changes. Regards, Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Fix For: 2.8 > > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616800#comment-16616800 ] Saikat Maitra edited comment on IGNITE-3303 at 9/16/18 5:11 PM: Hi [~amashenkov] I have updated the tests and also added java docs. Please review and share feedback. team city build https://ci.ignite.apache.org/viewLog.html?buildId=1883096; Regards Saikat was (Author: samaitra): Hi [~amashenkov] I have updated the tests and also added java docs. Please review and share feedback. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611856#comment-16611856 ] Andrew Mashenkov edited comment on IGNITE-3303 at 9/12/18 4:52 PM: --- [~samaitra] The fail() call in catch block will be executed in separate thread, and this will never cause test failure with the real reason as you never call future.get(). Instead of this test will fails with meaningless assertion "f.error() is not null". You can remove this fail() call and add smth like next to the end of test: igniteSrc.stopped.set(true); // this should stop streaming gracefully. f.get(); // this will rethrow exception in case of igniteSrc thread failure and just waits for thread stopped otherwise. Also, please add java docs to all methods as some looks missed. was (Author: amashenkov): [~samaitra] The fail() call in catch block will be executed in separate thread, and this will never cause test failure with the real reason as you never call future.get(). Instead of this test will fails with meaningless assertion "f.error() is not null". You can remove this fail() call and add smth like next to the end of test: igniteSrc.stopped.set(true); // this should stop streaming gracefully. f.get(); // this will rethrow exception in case of igniteSrc thread failure and just waits for thread stopped otherwise. > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611467#comment-16611467 ] Saikat Maitra edited comment on IGNITE-3303 at 9/12/18 2:18 AM: [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to true or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. I have also added assert f.error() == null to check if any error was encountered during the test execution in igniteSrc.run() process. This will validate for any exception during the streaming data process from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat was (Author: samaitra): [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to true or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. I have also added assert f.error() == null to check if any error was encountered during the test execution in igniteSec.run() process. This will validate for any exception during the streaming data process from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611467#comment-16611467 ] Saikat Maitra edited comment on IGNITE-3303 at 9/12/18 2:17 AM: [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to true or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. I have also added assert f.error() == null to check if any error was encountered during the test execution in igniteSec.run() process. This will validate for any exception during the streaming data process from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat was (Author: samaitra): [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to false or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. I have also added assert f.error() == null to check if any error was encountered during the test execution in igniteSec.run() process. This will validate for any exception during the streaming data process from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611467#comment-16611467 ] Saikat Maitra edited comment on IGNITE-3303 at 9/12/18 2:15 AM: [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to false or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. I have also added assert f.error() == null to check if any error was encountered during the test execution in igniteSec.run() process. This will validate for any exception during the streaming data process from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat was (Author: samaitra): [~amashenkov] I have incorporated the review comments. Also, I wanted to share that IgniteSrc is a continuous process and continue to run unless stopped is set to false or our desired test execution time is complete. I have added a fail () statement in the catch block to check if during igniteSrc.run any exception is encountered within 3 secs. This will validate for any exception in the streaming data from source. If we use f.get() then the source will be continuously running and we will be waiting for result event synchronously. Please review and let me know if any changes required. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608178#comment-16608178 ] Saikat Maitra edited comment on IGNITE-3303 at 9/8/18 7:10 PM: --- [~amashenkov], [~agoncharuk] I have incorporated the review changes. I have also refactored the CacheEventSerializer class and moved it to test folder because it is used only in the FlinkIgniteSourceSelfExample and not required for IgniteSource. Build links [https://ci.ignite.apache.org/viewLog.html?buildId=1821778;] [https://ci.ignite.apache.org/viewLog.html?buildId=1821774;] Please review and share feedback. Regards, Saikat was (Author: samaitra): [~amashenkov], [~agoncharuk] I have incorporated the review changes. I have also refactored the CacheEventSerializer class and moved it to test folder because it is used only in the FlinkIgniteSourceSelfExample and not required for IgniteSource. Build links [https://ci.ignite.apache.org/viewLog.html?buildId=1821778;] [https://ci.ignite.apache.org/viewLog.html?buildId=1821774;] Regards, Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra >Priority: Major > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927877#comment-15927877 ] Vladimir Ozerov edited comment on IGNITE-3303 at 3/16/17 11:45 AM: --- Hi [~samaitra], Any news on the ticket? was (Author: vozerov): Hi [~samaitra], Any news on the tickets? > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676280#comment-15676280 ] Anton Vinogradov edited comment on IGNITE-3303 at 11/18/16 9:30 AM: Saikat, 1) Seems your fix not works. I set breakpoint before last test and I see a lot of theads at {{IgniteSource.run}} {noformat} "Source: Custom Source -> Sink: Unnamed (1/1)@6026" daemon prio=5 tid=0x7e nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.ignite.source.flink.IgniteSource.run(IgniteSource.java:153) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) {noformat} In other works in first test we have 1 {{IgniteSource}}, in second we have 2 and so on. This seems to be not what we expected. As I told before {{IgniteSource.cancel()}} will not stop {{IgniteSource}} in case it runned from test. It should be runned from {{StreamExecutionEnvironment}} somehow. Please fix this and make sure that tests work properly. Btw, we can discuss this by skype before fixing. was (Author: avinogradov): Saikat, 1) Seems your fix not works. I set brakepoint at before last test and I see a lot of theads at {{IgniteSource.run}} {noformat} "Source: Custom Source -> Sink: Unnamed (1/1)@6026" daemon prio=5 tid=0x7e nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.ignite.source.flink.IgniteSource.run(IgniteSource.java:153) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) {noformat} In other works in first test we have 1 {{IgniteSource}}, in second we have 2 and so on. This seems to be not what we expected. As I told before {{IgniteSource.cancel()}} will not stop {{IgniteSource}} in case it runned from test. It should be runned from {{StreamExecutionEnvironment}} somehow. Please fix this and make sure that tests work properly. Btw, we can discuss this by skype before fixing. > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15670037#comment-15670037 ] Anton Vinogradov edited comment on IGNITE-3303 at 11/16/16 10:12 AM: - Saikat, 1) As far as I can see {noformat} stopped = true; {noformat} at {{org.apache.ignite.source.flink.IgniteSource#stop}} has no impact on {noformat} while (!stopped) { ... {noformat} at {{org.apache.ignite.source.flink.IgniteSource#run}}. You can check that {{while}} will contiue to work. This happens because these are different {{stopped}} fields, because they are inside different {{IgniteSource}} instances. Seems that {{IgniteSource}} added as a source to {{StreamExecutionEnvironment}} is used just like template, so there is no way to manage it externally (from test). I think that {{org.apache.ignite.source.flink.IgniteSource#cancel}} is the correct way to stop {{IgniteSource}}, but it should be triggered from {{StreamExecutionEnvironment}}, somehow. So, we should get rid of {{org.apache.ignite.source.flink.IgniteSource#stop}} and relocate it's content to {{org.apache.ignite.source.flink.IgniteSource#cancel}}. Am I missed something? 2) Anyway, we have to check that we can use more than one {{IgniteSource}} related to different caches together. Also, in case {{parallelism}} > 1 can be a good case for production we should test this case as well. was (Author: avinogradov): Saikat, 1) As far as I can see {noformat} stopped = true; {noformat} at {{org.apache.ignite.source.flink.IgniteSource#stop}} has no impact on {noformat} while (!stopped) { ... {noformat} at {{org.apache.ignite.source.flink.IgniteSource#run}}. You can check that {{while}} will contiue to work. This happens because these are different {{stopped}} fields, because they are inside different {{IgniteSource}} instances. Seems that {{IgniteSource}} added as a source to {{StreamExecutionEnvironment}} is used just like template, so there is no way to manage it externally (from test). I think that {{org.apache.ignite.source.flink.IgniteSource#cancel}} is the correct way to stop {{IgniteSource}}, but it should be triggered from {{StreamExecutionEnvironment}}, somehow. So, we should get rid of {{org.apache.ignite.source.flink.IgniteSource#stop}} and relocate it's content to {{org.apache.ignite.source.flink.IgniteSource#cancel}}. Am I missed something? 2) Anyway, we have to check that we can use more than one {{IgniteSource}} together. Also, in case {{parallelism}} > 1 can be a good case for production we should test this case as well. > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15670037#comment-15670037 ] Anton Vinogradov edited comment on IGNITE-3303 at 11/16/16 10:11 AM: - Saikat, 1) As far as I can see {noformat} stopped = true; {noformat} at {{org.apache.ignite.source.flink.IgniteSource#stop}} has no impact on {noformat} while (!stopped) { ... {noformat} at {{org.apache.ignite.source.flink.IgniteSource#run}}. You can check that {{while}} will contiue to work. This happens because these are different {{stopped}} fields, because they are inside different {{IgniteSource}} instances. Seems that {{IgniteSource}} added as a source to {{StreamExecutionEnvironment}} is used just like template, so there is no way to manage it externally (from test). I think that {{org.apache.ignite.source.flink.IgniteSource#cancel}} is the correct way to stop {{IgniteSource}}, but it should be triggered from {{StreamExecutionEnvironment}}, somehow. So, we should get rid of {{org.apache.ignite.source.flink.IgniteSource#stop}} and relocate it's content to {{org.apache.ignite.source.flink.IgniteSource#cancel}}. Am I missed something? 2) Anyway, we have to check that we can use more than one {{IgniteSource}} together. Also, in case {{parallelism}} > 1 can be a good case for production we should test this case as well. was (Author: avinogradov): Saikat, 1) As far as I can see {noformat} stopped = true; {noformat} at {{org.apache.ignite.source.flink.IgniteSource#stop}} has no impact on {noformat} while (!stopped) { ... {notofmat} at {{org.apache.ignite.source.flink.IgniteSource#run}}. You can check that {{while}} will contiue to work. This happens because these are different {{stopped}} fields, because they are inside different {{IgniteSource}} instances. Seems that {{IgniteSource}} added as a source to {{StreamExecutionEnvironment}} is used just like template, so there is no way to manage it externally (from test). I think that {{org.apache.ignite.source.flink.IgniteSource#cancel}} is the correct way to stop {{IgniteSource}}, but it should be triggered from {{StreamExecutionEnvironment}}, somehow. So, we should get rid of {{org.apache.ignite.source.flink.IgniteSource#stop}} and relocate it's content to {{org.apache.ignite.source.flink.IgniteSource#cancel}}. Am I missed something? 2) Anyway, we have to check that we can use more than one {{IgniteSource}} together. Also, in case {{parallelism}} > 1 can be a good case for production we should test this case as well. > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664759#comment-15664759 ] Saikat Maitra edited comment on IGNITE-3303 at 11/15/16 6:41 PM: - [~avinogradov] Hi Anton, Thank you for your feedback. 1. I have investigated the issue further with static stopped variable and noticed that the exit condition was not correctly defined for the tests. I have fixed it and updated the PR. 2. parallelism is set to 1 so that multiple threads do not receive and process same events. The parallelism may not be useful for the tests but will be useful when join, map and reduce are used over data streams. 3. team city report http://ci.ignite.apache.org/viewLog.html?buildId=358872=IgniteTests_IgniteStreamers=buildResultsDiv Regards, Saikat was (Author: samaitra): [~avinogradov] Hi Anton, Thank you for your feedback. 1. I have investigated the issue further with static stopped variable and noticed that the exit condition was not correctly defined for the tests. I have fixed it and updated the PR. 2. parallelism is set to 1 so that multiple threads do not receive and process same events. There is no data filtering, partitioning during receiving events so with parallelism count more than 1, all the threads will start processing the events and the following test assert fails. {noformat} assertEquals(eventList, resultList) {noformat} The parallelism may not be useful for the tests but will be useful in cluster env when join, map and reduce are used over data streams. 3. team city report http://ci.ignite.apache.org/viewLog.html?buildId=358872=IgniteTests_IgniteStreamers=buildResultsDiv Regards, Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657165#comment-15657165 ] Anton Vinogradov edited comment on IGNITE-3303 at 11/11/16 2:20 PM: Saikat, I've started review and still see that {noformat}private static volatile boolean stopped = true;{noformat} still static. What give us no chance to use more that one {{IgniteSource}} So I tried to change it to nonstatiс and tests failed. I've started to check reasons and found that {{IgniteSource}} instance at test and under flink (when flink run {{IgniteSource.run()}}) are different. Just set breakpoint inside {{IgniteSource.start}} and {{IgniteSource.run}} and you'll se that they are different. Af far as I understand flink makes object copy. I've discovered web for some {{RichParallelSourceFunction}} implementations and found they use another strategy to work with {noformat}private volatile boolean isRunning = false;{noformat} for example http://www.programcreek.com/java-api-examples/index.php?source_dir=StreamKV-master/streamkv-java/src/main/java/streamkv/api/java/benchmark/AsyncKVLocalBenchmark.java So, my question is what was the reason to use static {{private static volatile boolean stopped = true;}} and is there any changes to make tests with more than one {{IgniteSource}}? Also, I see, that parallelism is always 1 at tests. Does it means that implementation support only "1" case? P.s. Saikat, I read some articles about Flink last hours, and I'm still not flink guru :) Possible I'm thinking in the wrong direction? was (Author: avinogradov): Saikat, I've started review and still see that {noformat}private static volatile boolean stopped = true;{noformat} still static. What give us no chance to use more that one {{IgniteSource}} So I tried to change it to nonstatiс and tests failed. I've started to check reasons and found that {{IgniteSource}} instance at test and under flink (when flink run IgniteSource.run()) are different. Just set breakpoint inside {{IgniteSource.start}} and {{IgniteSource.run}} and you'll se that they are different. Af far as I understand flink makes object copy. I've discovered web for some {{RichParallelSourceFunction}} implementations and found they use another strategy to work with {noformat}private volatile boolean isRunning = false;{noformat} for example http://www.programcreek.com/java-api-examples/index.php?source_dir=StreamKV-master/streamkv-java/src/main/java/streamkv/api/java/benchmark/AsyncKVLocalBenchmark.java So, my question is what was the reason to use static {{private static volatile boolean stopped = true;}} and is there any changes to make tests with more than one {{IgniteSource}}? Also, I see, that parallelism is always 1 at tests. Does it means that implementation support only "1" case? P.s. Saikat, I read some articles about Flink last hours, and I'm still not flink guru :) Possible I'm thinking in the wrong direction? > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15657165#comment-15657165 ] Anton Vinogradov edited comment on IGNITE-3303 at 11/11/16 2:19 PM: Saikat, I've started review and still see that {noformat}private static volatile boolean stopped = true;{noformat} still static. What give us no chance to use more that one {{IgniteSource}} So I tried to change it to nonstatiс and tests failed. I've started to check reasons and found that {{IgniteSource}} instance at test and under flink (when flink run IgniteSource.run()) are different. Just set breakpoint inside {{IgniteSource.start}} and {{IgniteSource.run}} and you'll se that they are different. Af far as I understand flink makes object copy. I've discovered web for some {{RichParallelSourceFunction}} implementations and found they use another strategy to work with {noformat}private volatile boolean isRunning = false;{noformat} for example http://www.programcreek.com/java-api-examples/index.php?source_dir=StreamKV-master/streamkv-java/src/main/java/streamkv/api/java/benchmark/AsyncKVLocalBenchmark.java So, my question is what was the reason to use static {{private static volatile boolean stopped = true;}} and is there any changes to make tests with more than one {{IgniteSource}}? Also, I see, that parallelism is always 1 at tests. Does it means that implementation support only "1" case? P.s. Saikat, I read some articles about Flink last hours, and I'm still not flink guru :) Possible I'm thinking in the wrong direction? was (Author: avinogradov): Saikat, I've started review and still see that {noformat}private static volatile boolean stopped = true;{noformat} still static. What give us no chance to use more that one {noformat}IgniteSource{noformat} So I tried to change it to nonstatiс and tests failed. I've started to check reasons and found that {noformat}IgniteSource{noformat} instance at test and under flink (when flink run IgniteSource.run()) are different. Just set breakpoint inside {noformat}IgniteSource.start{noformat} and {noformat}IgniteSource.run{noformat} and you'll se that they are different. Af far as I understand flink makes object copy. I've discovered web for some {noformat}RichParallelSourceFunction{noformat} implementations and found they use another strategy to work with {noformat}private volatile boolean isRunning = false;{noformat} for example http://www.programcreek.com/java-api-examples/index.php?source_dir=StreamKV-master/streamkv-java/src/main/java/streamkv/api/java/benchmark/AsyncKVLocalBenchmark.java So, my question is what was the reason to use static {noformat}private static volatile boolean stopped = true;{noformat} and is there any changes to make tests with more than one {noformat}IgniteSource{noformat}? Also, I see, that parallelism is always 1 at tests. Does it means that implementation support only "1" case? P.s. Saikat, I read some articles about Flink last hours, and I'm still not flink guru :) Possible I'm thinking in the wrong direction? > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568093#comment-15568093 ] Anton Vinogradov edited comment on IGNITE-3303 at 10/17/16 7:25 AM: Update. According to https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute each contribution should be checked before review. I found no runs of "Ignite Streamers" at TeamCity, also, no tests suite was to added to this TeamCity task. I've added FlinkIgniteSourceSelfTestSuite and checked result As you can see, TC shows same failure http://ci.ignite.apache.org/viewLog.html?buildId=330384=buildResultsDiv=IgniteTests_IgniteStreamers was (Author: avinogradov): Update. According to https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute each contribution should be checked before review. I found no runs of "Ignite Streamers" at TeamCity, also, new tests suite was to added to this TeamCity task. I've added FlinkIgniteSourceSelfTestSuite and checked result As you can see, TC shows same failure http://ci.ignite.apache.org/viewLog.html?buildId=330384=buildResultsDiv=IgniteTests_IgniteStreamers > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: Screen Shot 2016-10-07 at 12.44.47 AM.png, > testFlinkIgniteSourceWithLargeBatch.log, win7.PNG > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552207#comment-15552207 ] Anton Vinogradov edited comment on IGNITE-3303 at 10/6/16 3:35 PM: --- Saikat, sorry for delay. I've checked latest pullrequest and still see testFlinkIgniteSourceWithLargeBatch failing. {noformat} "test-runner-#180%flink.FlinkIgniteSourceSelfTest%" #319 prio=5 os_prio=0 tid=0x297c7800 nid=0xb1c waiting on condition [0x28c4d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00077b3826d0> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:133) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:423) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:409) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:401) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) at org.apache.ignite.source.flink.FlinkIgniteSourceSelfTest.checkIgniteSource(FlinkIgniteSourceSelfTest.java:162) at org.apache.ignite.source.flink.FlinkIgniteSourceSelfTest.testFlinkIgniteSourceWithLargeBatch(FlinkIgniteSourceSelfTest.java:99) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at junit.framework.TestCase.runTest(TestCase.java:176) at org.apache.ignite.testframework.junits.GridAbstractTest.runTestInternal(GridAbstractTest.java:1768) at org.apache.ignite.testframework.junits.GridAbstractTest.access$000(GridAbstractTest.java:118) at org.apache.ignite.testframework.junits.GridAbstractTest$4.run(GridAbstractTest.java:1706) at java.lang.Thread.run(Thread.java:745) {noformat} I see the same failure at previous log. Did you reproduced and checked failure I've filed before? My os is Win 7, hope it helps to reproduce and fix. Let me know in case additions info required. Update. Rechecked at Ubuntu 16.04, failed too. was (Author: avinogradov): Saikat, sorry for delay. I've checked latest pullrequest and still see testFlinkIgniteSourceWithLargeBatch failing. {noformat} "test-runner-#180%flink.FlinkIgniteSourceSelfTest%" #319 prio=5 os_prio=0 tid=0x297c7800 nid=0xb1c waiting on condition [0x28c4d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00077b3826d0> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15552207#comment-15552207 ] Anton Vinogradov edited comment on IGNITE-3303 at 10/6/16 3:27 PM: --- Saikat, sorry for delay. I've checked latest pullrequest and still see testFlinkIgniteSourceWithLargeBatch failing. {noformat} "test-runner-#180%flink.FlinkIgniteSourceSelfTest%" #319 prio=5 os_prio=0 tid=0x297c7800 nid=0xb1c waiting on condition [0x28c4d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00077b3826d0> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:133) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:423) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:409) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:401) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) at org.apache.ignite.source.flink.FlinkIgniteSourceSelfTest.checkIgniteSource(FlinkIgniteSourceSelfTest.java:162) at org.apache.ignite.source.flink.FlinkIgniteSourceSelfTest.testFlinkIgniteSourceWithLargeBatch(FlinkIgniteSourceSelfTest.java:99) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at junit.framework.TestCase.runTest(TestCase.java:176) at org.apache.ignite.testframework.junits.GridAbstractTest.runTestInternal(GridAbstractTest.java:1768) at org.apache.ignite.testframework.junits.GridAbstractTest.access$000(GridAbstractTest.java:118) at org.apache.ignite.testframework.junits.GridAbstractTest$4.run(GridAbstractTest.java:1706) at java.lang.Thread.run(Thread.java:745) {noformat} I see the same failure at previous log. Did you reproduced and checked failure I've filed before? My os is Win 7, hope it helps to reproduce and fix. Let me know in case additions info required. was (Author: avinogradov): Saikat, sorry for delay. I've checked latest pullrequest and still see testFlinkIgniteSourceWithLargeBatch failing. {noformat} "test-runner-#180%flink.FlinkIgniteSourceSelfTest%" #319 prio=5 os_prio=0 tid=0x297c7800 nid=0xb1c waiting on condition [0x28c4d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00077b3826d0> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484010#comment-15484010 ] Anton Vinogradov edited comment on IGNITE-3303 at 9/12/16 12:53 PM: Saikat, I've started from tests run, one failed, so I did not checked whole code. Comments: 1) Code should be compilable under java 7. 2) testFlinkIgniteSourceWithLargeBatch fails with error (log attached). 3) igniteCfgFile still static 4) evtBuf static too, and I still see no reasons to have it static. Could you please explain why tests can't be written while it's non static? 5) I can't find Singelton pattern usage, still see creation via {noformat}final IgniteSource igniteSrc = new IgniteSource(TEST_CACHE, GRID_CONF_FILE);{noformat} I checked creation and start of new/same instance and it is not cause exceptions. was (Author: avinogradov): Saikat, I've started from tests run, one failed, so I did not checked whole code. Comments: 1) Code should be compilable under java 7. 2) testFlinkIgniteSourceWithLargeBatch fails with error (log attached). 3) igniteCfgFile still static 4) evtBuf static too, and I still see no reasons to have it static. Could you please explain why tests can't be writen while it's non static? 5) I can't find Singelton patter usage, still see creation via {noformat}final IgniteSource igniteSrc = new IgniteSource(TEST_CACHE, GRID_CONF_FILE);{noformat} I checked creation and start of new/same instance and it is not cause exceptions. > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > Attachments: testFlinkIgniteSourceWithLargeBatch.log > > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439343#comment-15439343 ] Saikat Maitra edited comment on IGNITE-3303 at 8/26/16 11:22 PM: - [~avinogradov] Hello Anton, I have made the following changes to the PR. 1. I have taken all the commits changes you have suggested. 2. I have modified the static fields to non-static fields except LinkedBlockingQueue otherwise not all events were getting captured in the unit tests. 3. I have updated the unit tests and removed the code duplicate in the tests. 4. I have followed Singleton Pattern to create Ignite instance otherwise I was facing the same java.lang.IllegalArgumentException you mentioned in your comment. Please review and share your feedback. Regards Saikat was (Author: samaitra): [~avinogradov] Hello Anton, I have made the following changes to the PR. 1. I have taken all the commits changes you have suggested. 2. I have modified the static fields to non-static fields except LinkedBlockingQueue otherwise not all events were getting captured in the unit tests. 3. I have updated the unit tests and removed the code duplicate in the tests. 4. I have followed Singleton Pattern to create Ignite instance otherwise I was facing the same exceptions you mentioned in your comment. Please review and share your feedback. I am still modifying the tests to add concurrent tests and will share updates. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439343#comment-15439343 ] Saikat Maitra edited comment on IGNITE-3303 at 8/26/16 11:21 PM: - [~avinogradov] Hello Anton, I have made the following changes to the PR. 1. I have taken all the commits changes you have suggested. 2. I have modified the static fields to non-static fields except LinkedBlockingQueue otherwise not all events were getting captured in the unit tests. 3. I have updated the unit tests and removed the code duplicate in the tests. 4. I have followed Singleton Pattern to create Ignite instance otherwise I was facing the same exceptions you mentioned in your comment. Please review and share your feedback. I am still modifying the tests to add concurrent tests and will share updates. Regards Saikat was (Author: samaitra): [~avinogradov] Hello Anton, I have made the following changes to the PR. 1. I have taken all the commits changes you have suggested. 2. I have modified the static fields to non-static fields. 3. I have updated the unit tests and removed the code duplicate in the tests. Please review and share your feedback. I am still modifying the tests to add concurrent tests and will share updates. Regards Saikat > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Saikat Maitra > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421180#comment-15421180 ] Anton Vinogradov edited comment on IGNITE-3303 at 8/15/16 3:58 PM: --- Saikat, I still see static fields, are they necessary? I've started to refactoring code, please see my changes here: https://github.com/avinogradovgg/ignite/commit/8ed27ef9a542efadec9606bc3b193154389eb38e ... please use these changes in case they are correct ... but faced with tests passed with exception: {noformat} Caused by: java.lang.IllegalArgumentException: This method should be accessed under org.apache.ignite.thread.IgniteThread at org.apache.ignite.internal.IgnitionEx.localIgnite(IgnitionEx.java:1291) at org.apache.ignite.internal.IgniteKernal.readResolve(IgniteKernal.java:3368) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {noformat} So, seems tests should be refactored to fail at such problems. Tests should check than each event at Ignite was handled by other side. Currently, I still see no check at tests. also, I'm not sure this code {{synchronized (ctx.getCheckpointLock())}} is necessary since we using LinkedBlockingQueue, could you please tell me more about that? was (Author: avinogradov): Saikat, I still see static fields, are they necessary? I've started to refactoring code, please see my changes here: https://github.com/avinogradovgg/ignite/commit/8ed27ef9a542efadec9606bc3b193154389eb38e ... please use these changes in case they correct ... but faced with tests passed with exception: {noformat} Caused by: java.lang.IllegalArgumentException: This method should be accessed under org.apache.ignite.thread.IgniteThread at org.apache.ignite.internal.IgnitionEx.localIgnite(IgnitionEx.java:1291) at org.apache.ignite.internal.IgniteKernal.readResolve(IgniteKernal.java:3368) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {noformat} So, seems tests should be refactored to fail at such problems. Tests should check than each event at Ignite was handled by other side. Currently, I still see no check at tests. also, I'm not sure this code {{synchronized (ctx.getCheckpointLock())}} is necessary since we using LinkedBlockingQueue, could you please tell me more about that? > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Anton Vinogradov > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421180#comment-15421180 ] Anton Vinogradov edited comment on IGNITE-3303 at 8/15/16 3:58 PM: --- Saikat, I still see static fields, are they necessary? I've started to refactoring code, please see my changes here: https://github.com/avinogradovgg/ignite/commit/8ed27ef9a542efadec9606bc3b193154389eb38e ... please use these changes in case they correct ... but faced with tests passed with exception: {noformat} Caused by: java.lang.IllegalArgumentException: This method should be accessed under org.apache.ignite.thread.IgniteThread at org.apache.ignite.internal.IgnitionEx.localIgnite(IgnitionEx.java:1291) at org.apache.ignite.internal.IgniteKernal.readResolve(IgniteKernal.java:3368) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {noformat} So, seems tests should be refactored to fail at such problems. Tests should check than each event at Ignite was handled by other side. Currently, I still see no check at tests. also, I'm not sure this code {{synchronized (ctx.getCheckpointLock())}} is necessary since we using LinkedBlockingQueue, could you please tell me more about that? was (Author: avinogradov): Saikat, I still see static fields, are they necessary? I've started to refactoring code, please see my changes here: https://github.com/avinogradovgg/ignite/commit/8ed27ef9a542efadec9606bc3b193154389eb38e ... please use this changes in case they correct ... but faced with tests passed with exception: {noformat} Caused by: java.lang.IllegalArgumentException: This method should be accessed under org.apache.ignite.thread.IgniteThread at org.apache.ignite.internal.IgnitionEx.localIgnite(IgnitionEx.java:1291) at org.apache.ignite.internal.IgniteKernal.readResolve(IgniteKernal.java:3368) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {noformat} So, seems tests should be refactored to fail at such problems. Tests should check than each event at Ignite was handled by other side. Currently, I still see no check at tests. also, I'm not sure this code {{ synchronized (ctx.getCheckpointLock())}} is necessary since we using LinkedBlockingQueue, could you please tell me more about that? > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Anton Vinogradov > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (IGNITE-3303) Apache Flink Integration - Flink source to run a continuous query against one or multiple caches
[ https://issues.apache.org/jira/browse/IGNITE-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15385755#comment-15385755 ] Anton Vinogradov edited comment on IGNITE-3303 at 7/20/16 12:49 PM: Saikat, I've made a couple of fixes related to codestyle and etc. https://github.com/avinogradovgg/ignite/commit/982f11eca52f3685a1d103f79b97cc13d3fd4749 please cherrypick them. Also, 1) I've added some todos, please fix them. 2) I don't like to set static fields at constructor. Please replace static fields by non-static. Also, I see no reason to have default values (eg evtBufTimeout = 10) in case you'll override them at constructor. As for me, better to use setters with final static default values (see CacheConfiguration for example). 3) Please add more checks to test to make sure everything is works as designed. Thanks! was (Author: avinogradov): Saikat, I've made a couple of fixes related to codestyle and etc. https://github.com/avinogradovgg/ignite/commit/982f11eca52f3685a1d103f79b97cc13d3fd4749 please cherrypick them. Also, 1) I've added some todos, please fix them. 2) I don't like to set static fiels at constructor. Please replace static fields by non-static. Also, I see no reason to have default values (eg evtBufTimeout = 10) in case you'll override them at constructor. As for me, better to use setters with final static default values (see CacheConfiguration for example). 3) Please add more checks to test to make sure everything is works as designed. Thanks! > Apache Flink Integration - Flink source to run a continuous query against one > or multiple caches > > > Key: IGNITE-3303 > URL: https://issues.apache.org/jira/browse/IGNITE-3303 > Project: Ignite > Issue Type: New Feature > Components: streaming >Reporter: Saikat Maitra >Assignee: Anton Vinogradov > > Apache Flink integration > +++ *Ignite as a bidirectional Connector* +++ > As a Flink source => run a continuous query against one or multiple > caches [4]. > Related discussion : > http://apache-ignite-developers.2346864.n4.nabble.com/Apache-Flink-lt-gt-Apache-Ignite-integration-td8163.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)