[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310646#comment-15310646 ] ASF GitHub Bot commented on FLINK-3908: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2007 then make parseField protected so users MUST call resetErrorStateAndParse. there, problem solved. > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2007: [FLINK-3908] Fixed Parser's error state reset
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2007 then make parseField protected so users MUST call resetErrorStateAndParse. there, problem solved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-1984: Description: There are some users asking for an integration of Flink into Mesos. -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251 Update (May '16): a new effort is now underway, building on the recent ResourceManager work. Design document: (google doc) was: There are some users asking for an integration of Flink into Mesos. -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251 Update (May '16): a new effort is now underway, building on the recent ResourceManager work. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: New Components >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: (google doc) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-1984: Description: There are some users asking for an integration of Flink into Mesos. -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251 Update (May '16): a new effort is now underway, building on the recent ResourceManager work. Design document: ([google doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) was: There are some users asking for an integration of Flink into Mesos. -There also is a pending pull request for adding Mesos support for Flink-: https://github.com/apache/flink/pull/251 Update (May '16): a new effort is now underway, building on the recent ResourceManager work. Design document: (google doc) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: New Components >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4003) Use intrinsics for MathUtils logarithms
Greg Hogan created FLINK-4003: - Summary: Use intrinsics for MathUtils logarithms Key: FLINK-4003 URL: https://issues.apache.org/jira/browse/FLINK-4003 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.1.0 {{MathUtils.log2floor}} and {{MathUtils.log2strict}} use naive loops which have efficient implementations in {{Integer}} that are commonly implemented as intrinsics [0]. [0]: http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/vmSymbols.hpp#l680 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Thanks for the PR @gallenvara. I'm currently on vacation with limited access to internet. Will review your PR when I am back in roughly a week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.
[ https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310722#comment-15310722 ] ASF GitHub Bot commented on FLINK-3971: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Thanks for the PR @gallenvara. I'm currently on vacation with limited access to internet. Will review your PR when I am back in roughly a week. > Aggregates handle null values incorrectly. > -- > > Key: FLINK-3971 > URL: https://issues.apache.org/jira/browse/FLINK-3971 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: GaoLun >Priority: Critical > Fix For: 1.1.0 > > > Table API and SQL aggregates are supposed to ignore null values, e.g., > {{sum(1,2,null,4)}} is supposed to return {{7}}. > There current implementation is correct if at least one valid value is > present however, is incorrect if only null values are aggregated. {{sum(null, > null, null)}} should return {{null}} instead of {{0}} > Currently only the Count aggregate handles the case of null-values-only > correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311362#comment-15311362 ] ASF GitHub Bot commented on FLINK-4000: --- Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 No, it does not make any sense and even makes things worse, sorry. > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 No, it does not make any sense and even makes things worse, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 @asavartsov Ok. Please let me know how you propose to solve this? thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 Take a look at my pull request at https://github.com/apache/flink/pull/2062 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 aha, in one of my intermediate runs had done just initializing idsProcessedButNotAcknowledged and retain pendingCheckpoints , but in last run changed it calling open() :-( . makes sense @asavartsov. closing. thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311403#comment-15311403 ] ASF GitHub Bot commented on FLINK-4000: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 aha, in one of my intermediate runs had done just initializing idsProcessedButNotAcknowledged and retain pendingCheckpoints , but in last run changed it calling open() :-( . makes sense @asavartsov. closing. thanks > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311404#comment-15311404 ] ASF GitHub Bot commented on FLINK-4000: --- Github user rekhajoshm closed the pull request at: https://github.com/apache/flink/pull/2061 > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2061: [FLINK-4000] Checkpoint dictionaries null after ta...
Github user rekhajoshm closed the pull request at: https://github.com/apache/flink/pull/2061 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings
Robert Metzger created FLINK-4004: - Summary: Do not pass custom flink kafka connector properties to Kafka to avoid warnings Key: FLINK-4004 URL: https://issues.apache.org/jira/browse/FLINK-4004 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Robert Metzger The FlinkKafkaConsumer has some custom properties, which we pass to the KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to log warnings about unused properties. We should not pass Flink-internal properties to Kafka, to avoid those warnings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2059: [FLINK-4003] Use intrinsics for MathUtils logarith...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2059 [FLINK-4003] Use intrinsics for MathUtils logarithms You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4003_use_intrinsics_for_mathutils_logarithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2059.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2059 commit 46fc0d70ed7e7c0ea60ce60cf76f6ead996153cd Author: Greg HoganDate: 2016-06-01T16:29:16Z [FLINK-4003] Use intrinsics for MathUtils logarithms --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2058: Not able to create flink-streaming-connectors jar
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2058 Can you please close this pull request and pose this as a question in the mailing list? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4003) Use intrinsics for MathUtils logarithms
[ https://issues.apache.org/jira/browse/FLINK-4003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310841#comment-15310841 ] ASF GitHub Bot commented on FLINK-4003: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2059 [FLINK-4003] Use intrinsics for MathUtils logarithms You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4003_use_intrinsics_for_mathutils_logarithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2059.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2059 commit 46fc0d70ed7e7c0ea60ce60cf76f6ead996153cd Author: Greg HoganDate: 2016-06-01T16:29:16Z [FLINK-4003] Use intrinsics for MathUtils logarithms > Use intrinsics for MathUtils logarithms > --- > > Key: FLINK-4003 > URL: https://issues.apache.org/jira/browse/FLINK-4003 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.1.0 > > > {{MathUtils.log2floor}} and {{MathUtils.log2strict}} use naive loops which > have efficient implementations in {{Integer}} that are commonly implemented > as intrinsics [0]. > [0]: > http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/vmSymbols.hpp#l680 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
GitHub user rekhajoshm opened a pull request: https://github.com/apache/flink/pull/2060 [FLINK-3921] StringParser encoding Corrected StringParser encoding You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/flink FLINK-3921 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2060 commit 1e1ce9efefccf5a5585be802af4200e9d1ae7a98 Author: Rekha JoshiDate: 2016-06-01T20:03:50Z Merge pull request #1 from apache/master Apache Flink master pull commit 675b6a44e76ae71901bc6d4eaea1d09b6f789ff6 Author: Joshi Date: 2016-06-01T20:26:47Z [FLINK-3921] StringParser encoding --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311043#comment-15311043 ] ASF GitHub Bot commented on FLINK-3921: --- GitHub user rekhajoshm opened a pull request: https://github.com/apache/flink/pull/2060 [FLINK-3921] StringParser encoding Corrected StringParser encoding You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/flink FLINK-3921 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2060 commit 1e1ce9efefccf5a5585be802af4200e9d1ae7a98 Author: Rekha JoshiDate: 2016-06-01T20:03:50Z Merge pull request #1 from apache/master Apache Flink master pull commit 675b6a44e76ae71901bc6d4eaea1d09b6f789ff6 Author: Joshi Date: 2016-06-01T20:26:47Z [FLINK-3921] StringParser encoding > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311087#comment-15311087 ] ASF GitHub Bot commented on FLINK-4000: --- GitHub user rekhajoshm opened a pull request: https://github.com/apache/flink/pull/2061 [FLINK-4000] Checkpoint dictionaries null after taskmgr failures Fix for exception during job restart after task mgr failure, at which point restoreState fails as checkpoint dictionaries can be null. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/flink FLINK-4000 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2061.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2061 commit 1e1ce9efefccf5a5585be802af4200e9d1ae7a98 Author: Rekha JoshiDate: 2016-06-01T20:03:50Z Merge pull request #1 from apache/master Apache Flink master pull commit 88929dcd989c34c3232f29a96af1b32ec1315911 Author: Joshi Date: 2016-06-01T20:56:52Z [FLINK-4000] Checkpoint dictionaries null after taskmgr failures > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2061: [FLINK-4000] Checkpoint dictionaries null after ta...
GitHub user rekhajoshm opened a pull request: https://github.com/apache/flink/pull/2061 [FLINK-4000] Checkpoint dictionaries null after taskmgr failures Fix for exception during job restart after task mgr failure, at which point restoreState fails as checkpoint dictionaries can be null. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/flink FLINK-4000 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2061.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2061 commit 1e1ce9efefccf5a5585be802af4200e9d1ae7a98 Author: Rekha JoshiDate: 2016-06-01T20:03:50Z Merge pull request #1 from apache/master Apache Flink master pull commit 88929dcd989c34c3232f29a96af1b32ec1315911 Author: Joshi Date: 2016-06-01T20:56:52Z [FLINK-4000] Checkpoint dictionaries null after taskmgr failures --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rekha Joshi reassigned FLINK-3921: -- Assignee: Rekha Joshi > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rekha Joshi reassigned FLINK-4000: -- Assignee: Rekha Joshi > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4005) LocalExecutorITCase.testLocalExecutorWithWordCount fails
Ufuk Celebi created FLINK-4005: -- Summary: LocalExecutorITCase.testLocalExecutorWithWordCount fails Key: FLINK-4005 URL: https://issues.apache.org/jira/browse/FLINK-4005 Project: Flink Issue Type: Bug Components: Tests Reporter: Ufuk Celebi https://travis-ci.org/apache/flink/jobs/134458338 {code} Running org.apache.flink.test.clients.examples.LocalExecutorITCase org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.FileNotFoundException: /tmp/wctext6556707459868827691.out/1 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:58) at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:258) at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:265) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:160) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:587) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...
Github user skavulya commented on the issue: https://github.com/apache/flink/pull/1985 Thanks @chiwanpark. I made the changes you recommended. Please let me know if it looks ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311314#comment-15311314 ] ASF GitHub Bot commented on FLINK-1979: --- Github user skavulya commented on the issue: https://github.com/apache/flink/pull/1985 Thanks @chiwanpark. I made the changes you recommended. Please let me know if it looks ok. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1985: [FLINK-1979] Add logistic loss, hinge loss and regulariza...
Github user skavulya commented on the issue: https://github.com/apache/flink/pull/1985 While working on L-BFGS, I realized that I need to remove the gradient descent step from RegularizationPenalty. I'll update the PR soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311626#comment-15311626 ] ASF GitHub Bot commented on FLINK-1979: --- Github user skavulya commented on the issue: https://github.com/apache/flink/pull/1985 While working on L-BFGS, I realized that I need to remove the gradient descent step from RegularizationPenalty. I'll update the PR soon. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 This kind of check is might be useless and probably wouldn't fix the issue. My debugging shows that the list `idsProcessedButNotAcknowledged` is null on recovery, not checkpoints itself. This list is initialized in `open` method, but somehow it doesn't get called in such scenario. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311152#comment-15311152 ] ASF GitHub Bot commented on FLINK-4000: --- Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 This kind of check is might be useless and probably wouldn't fix the issue. My debugging shows that the list `idsProcessedButNotAcknowledged` is null on recovery, not checkpoints itself. This list is initialized in `open` method, but somehow it doesn't get called in such scenario. > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2061: [FLINK-4000] Checkpoint dictionaries null after taskmgr f...
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 I agree @asavartsov , that was a quick look and was working to reproduce.Does the updated make sense? thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311206#comment-15311206 ] ASF GitHub Bot commented on FLINK-4000: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 I agree @asavartsov , that was a quick look and was working to reproduce.Does the updated make sense? thank you. > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311356#comment-15311356 ] ASF GitHub Bot commented on FLINK-4000: --- GitHub user asavartsov opened a pull request: https://github.com/apache/flink/pull/2062 [FLINK-4000] Fix for checkpoint state restore at MessageAcknowledgingSourceBase As says documentation for MessageAcknowledgingSourceBase.restoreState() This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open(). So current implementation 1. Fails on restoreState with NullPointerException, jobs fail to restart. 2. Does not restore anything because following open erases all checkpoint data immediately. 3. As consequence, violates exactly once rule because processed but not acknowledged list erased. Proposed change fixes that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/asavartsov/flink FLINK-4000 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2062.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2062 commit 58fb7df0b2d1dd68a90f4088fbde10874eb290b6 Author: Alexey SavartsovDate: 2016-06-01T23:23:53Z [FLINK-4000] Fix for checkpoint state restore at MessageAcknowledgingSourceBase As says documentation for MessageAcknowledgingSourceBase.restoreState() This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open(). So current implementation 1. Fails on restoreState with NullPointerException, jobs fail to restart. 2. Does not restore anything because following open erases all checkpoint data immediately. 3. As consequence, violates exactly once rule because processed but not acknowledged list erased. Proposed change fixes that. > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2062: [FLINK-4000] Fix for checkpoint state restore at M...
GitHub user asavartsov opened a pull request: https://github.com/apache/flink/pull/2062 [FLINK-4000] Fix for checkpoint state restore at MessageAcknowledgingSourceBase As says documentation for MessageAcknowledgingSourceBase.restoreState() This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open(). So current implementation 1. Fails on restoreState with NullPointerException, jobs fail to restart. 2. Does not restore anything because following open erases all checkpoint data immediately. 3. As consequence, violates exactly once rule because processed but not acknowledged list erased. Proposed change fixes that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/asavartsov/flink FLINK-4000 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2062.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2062 commit 58fb7df0b2d1dd68a90f4088fbde10874eb290b6 Author: Alexey SavartsovDate: 2016-06-01T23:23:53Z [FLINK-4000] Fix for checkpoint state restore at MessageAcknowledgingSourceBase As says documentation for MessageAcknowledgingSourceBase.restoreState() This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open(). So current implementation 1. Fails on restoreState with NullPointerException, jobs fail to restart. 2. Does not restore anything because following open erases all checkpoint data immediately. 3. As consequence, violates exactly once rule because processed but not acknowledged list erased. Proposed change fixes that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311375#comment-15311375 ] ASF GitHub Bot commented on FLINK-4000: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2061 @asavartsov Ok. Please let me know how you propose to solve this? thanks > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311395#comment-15311395 ] ASF GitHub Bot commented on FLINK-4000: --- Github user asavartsov commented on the issue: https://github.com/apache/flink/pull/2061 Take a look at my pull request at https://github.com/apache/flink/pull/2062 > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali >Assignee: Rekha Joshi > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rekha Joshi updated FLINK-4000: --- Assignee: (was: Rekha Joshi) > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
[ https://issues.apache.org/jira/browse/FLINK-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhengBowen updated FLINK-4006: -- Attachment: FLINK-4006.patch > ExecutionGraph.restartStrategy field can't be serialized > > > Key: FLINK-4006 > URL: https://issues.apache.org/jira/browse/FLINK-4006 > Project: Flink > Issue Type: Bug >Reporter: ZhengBowen > Attachments: FLINK-4006.patch > > > Exception is following > ``` > java.io.NotSerializableException: > org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) > at > org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ``` > I think you should set null for restartStrategy in prepareForArchiving() > function. > Following is my patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
[ https://issues.apache.org/jira/browse/FLINK-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhengBowen updated FLINK-4006: -- Description: Exception is following ``` java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) at org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) at org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` I think you should set null for restartStrategy in prepareForArchiving() function. Following attachments is my patch was: Exception is following ``` java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) at org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) at org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` I think you should set null for restartStrategy in prepareForArchiving() function. Following is my patch > ExecutionGraph.restartStrategy field can't be serialized > > > Key: FLINK-4006 > URL: https://issues.apache.org/jira/browse/FLINK-4006 > Project: Flink > Issue Type: Bug >Reporter: ZhengBowen > Attachments: FLINK-4006.patch > > > Exception is following > ``` > java.io.NotSerializableException: > org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at >
[jira] [Created] (FLINK-4006) ExecutionGraph.restartStrategy field can't be serialized
ZhengBowen created FLINK-4006: - Summary: ExecutionGraph.restartStrategy field can't be serialized Key: FLINK-4006 URL: https://issues.apache.org/jira/browse/FLINK-4006 Project: Flink Issue Type: Bug Reporter: ZhengBowen Exception is following ``` java.io.NotSerializableException: org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at com.rds.computing.rc.meta.MetaJobInfo.readFrom(MetaJobInfo.java:94) at org.apache.flink.runtime.webmonitor.DatabaseArchivist$DataBaseHashMap.put(DatabaseArchivist.java:209) at org.apache.flink.runtime.webmonitor.DatabaseArchivist.handleMessage(DatabaseArchivist.java:64) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:97) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` I think you should set null for restartStrategy in prepareForArchiving() function. Following is my patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309419#comment-15309419 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309988 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) --- End diff -- I would like to rewrite this block
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65311449 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], --- End diff -- It seems that `getRowData` is redundant. Changing `data` to public (by adding `val` keyword in previous `data`) would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996 Hi @chobeat, thanks for update PR. After addressing comments on source code, I think the last thing to merge this is adding documentation for this. But you can add the documentation after block-based matrix is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309988 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) --- End diff -- I would like to rewrite this block like following to avoid create unnecessary `IndexedRow` object: ```scala val row1 = Option(left) match { case Some(row: IndexedRow) => row case None => IndexedRow(right.rowIndex,
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chobeat commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65318407 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], --- End diff -- Yup, I forgot to remove that. It was supposed to be part of the common trait but in the end I chose not to do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309901#comment-15309901 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chobeat commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65318407 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], --- End diff -- Yup, I forgot to remove that. It was supposed to be part of the common trait but in the end I chose not to do it. > Distributed Linear Algebra: row-based matrix > > > Key: FLINK-3919 > URL: https://issues.apache.org/jira/browse/FLINK-3919 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > > Distributed matrix implementation as a DataSet of IndexedRow and related > operations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309404#comment-15309404 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309186 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values,
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309186 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => numRows.collect().head + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => numCols.collect().head + } + + lazy val numRows: DataSet[Int] = numRowsOpt match { +case Some(rows) => data.getExecutionEnvironment.fromElements(rows) +case None => data.max("rowIndex").map(_.rowIndex + 1) + } + + lazy val numCols: DataSet[Int] = numColsOpt match { +case Some(cols) => data.getExecutionEnvironment.fromElements(cols) +case None => data.first(1).map(_.values.size) + } + + val getRowData = data + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRowsOpt, numColsOpt) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +*/
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark before merging I need to go over the numRows/numCols issue again because I noticed they create problems in another project of mine. I think that the `collect()` there is too risky and obscure so I would like to refactor it. I'm working on it right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark I think it should be better to leave to the user the computation of the dimensionality. I tried different options and all of them are sub-optimal. I would leave this feature for later as a next step if it's ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310162#comment-15310162 ] ASF GitHub Bot commented on FLINK-3806: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/2036 Thanks @greghogan! It looks like you have some checkstyle violation, otherwise +1. > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4002) [py] Improve testing infraestructure
[ https://issues.apache.org/jira/browse/FLINK-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omar Alvarez updated FLINK-4002: Description: The Verify() test function errors out when array elements are missing: {code} env.generate_sequence(1, 5)\ .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() {code} {quote} IndexError: list index out of range {quote} There should also be more documentation in test functions. I am already working on a pull request to fix this. was: The Verify() test function errors out when array elements are missing: {code:python} env.generate_sequence(1, 5)\ .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() {code} {quote} IndexError: list index out of range {quote} There should also be more documentation in test functions. > [py] Improve testing infraestructure > > > Key: FLINK-4002 > URL: https://issues.apache.org/jira/browse/FLINK-4002 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.0.3 >Reporter: Omar Alvarez >Priority: Minor > Labels: Python, Testing > Original Estimate: 24h > Remaining Estimate: 24h > > The Verify() test function errors out when array elements are missing: > {code} > env.generate_sequence(1, 5)\ > .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() > {code} > {quote} > IndexError: list index out of range > {quote} > There should also be more documentation in test functions. > I am already working on a pull request to fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3993) [py] Add generateSequence() support to Python API
[ https://issues.apache.org/jira/browse/FLINK-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310153#comment-15310153 ] ASF GitHub Bot commented on FLINK-3993: --- Github user omaralvarez commented on the pull request: https://github.com/apache/flink/pull/2055 Perfect. I'll open a new issue and try to fix that and document the test scripts. > [py] Add generateSequence() support to Python API > - > > Key: FLINK-3993 > URL: https://issues.apache.org/jira/browse/FLINK-3993 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.0.3 >Reporter: Omar Alvarez >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Right now, I believe that there is only from_elements() support in order to > create a sequence of numbers. It is interesting to be able to create a list > of numbers from the Python API also, apart from the Java API. It would not be > complicated, since we already have generateSequence(). I am already working > on this, and will create a pull request shortly in Github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310188#comment-15310188 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65347119 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java --- @@ -104,8 +106,21 @@ public void runCheckpointedProgram() { postSubmit(); } catch (Exception e) { + Throwable th = e; --- End diff -- @kl0u ping > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310210#comment-15310210 ] ASF GitHub Bot commented on FLINK-3667: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1978#discussion_r65348868 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -20,8 +20,6 @@ --- End diff -- Done. > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework agnostic command line arguments > • Pluggables for Yarn, Mesos handle specific commands > {noformat} > I would like to create/refactor the affected classes to set us up for a more > flexible client side resource management abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Not able to create flink-streaming-connectors jar
GitHub user mrakshay opened a pull request: https://github.com/apache/flink/pull/2058 Not able to create flink-streaming-connectors jar Hello,I am not able to create jar of **flink-streaming-connectors** ...I am able to create jar of others like twitter,kafka,flume but I am not able to create jar of flink-streaming connectors ?? How can I create this jar ?? You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/flink release-1.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2058 commit f3c6646e68750a068b3325181b8a16a4689a0fed Author: Stephan EwenDate: 2016-02-22T17:37:59Z [hotfix] Make DataStream property methods properly Scalaesk This also includes some minor cleanups This closes #1689 commit df19a8bf908a21fc35830c08cc61d8d0566813eb Author: Ufuk Celebi Date: 2016-02-26T11:46:07Z [FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGraph restart Temporary work around to restore initial state on failure during recovery as required by a user. Will be superseded by FLINK-3397 with better handling of checkpoint and savepoint restoring. A failure during recovery resulted in restarting a job without its savepoint state. This temporary work around makes sure that if the savepoint coordinator ever restored a savepoint and there was no checkpoint after the savepoint, the savepoint state will be restored again. This closes #1720. commit 8c3301501934ee0faeffec3f8c2034d292d078ef Author: Stephan Ewen Date: 2016-02-26T14:12:07Z [FLINK-3522] [storm compat] PrintSampleStream prints a proper message when involked without arguments commit c0bc8bcf7e1c3ac1f50c3038456f5af888392a06 Author: Till Rohrmann Date: 2016-02-26T10:57:21Z [hotfix] [build] Disable exclusion rules when using build-jar maven profile. This closes #1719 commit 2c605d275b26793d8676e35b6ccc5102bdcbf30d Author: Till Rohrmann Date: 2016-02-26T13:08:02Z [FLINK-3511] [gelly] Introduce flink-gelly-examples module The new flink-gelly-examples module contains all Java and Scala Gelly examples. The module contains compile scope dependencies on flink-java, flink-scala and flink-clients so that the examples can be conveniently run from within the IDE. commit 0601a762a4ee826bc628842e9b38f205fafdb76d Author: Stephan Ewen Date: 2016-02-26T14:34:06Z [hotfix] Remove remaining unused files from the old standalone web client commit 044479230e984b130f018930adaceb661c9aa80b Author: Till Rohrmann Date: 2016-02-26T14:57:45Z [FLINK-3511] [avro] Move avro examples to test scope commit f2de20b02bef66f437164e24e9fc0084530d4b01 Author: Stephan Ewen Date: 2016-02-26T17:19:27Z [FLINK-3525] [runtime] Fix call to super.close() in TimestampsAndPeriodicWatermarksOperator commit 51ab77b16a994f2f511e34bb37f9c2294a234e50 Author: Stephan Ewen Date: 2016-02-26T17:31:32Z [license] Update LICENSE file for the latest version commit 131f016e71540a5d1e264084c630b93de1aeabae Author: Till Rohrmann Date: 2016-02-26T15:12:59Z [FLINK-3511] [hadoop-compatibility] Move hadoop-compatibility examples to test scope commit 434cff00fd7fdc41dfb14f729888abaf12af1f7d Author: Till Rohrmann Date: 2016-02-26T15:15:44Z [FLINK-3511] [jdbc] Move jdbc examples to test scope and add flink-clients dependency commit 0dc824080f38d83d9a748d19d04344c3bf4d7077 Author: Till Rohrmann Date: 2016-02-26T15:21:13Z [FLINK-3511] [nifi, elasticsearch] Move nifi and elasticsearch examples to test scope commit c0ce1b6d97bea3a938dc81454fcb17f373a58f12 Author: Till Rohrmann Date: 2016-02-26T15:27:06Z [FLINK-3511] [twitter] Move twitter examples to test scope This closes #1725 commit 603f351e2925aa4263a382b83cf41f12e8c5167f Author: Stephan Ewen Date: 2016-02-26T19:06:06Z [FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger This closes #1727 commit ba069f35b21de5d2a30ba0cd2234f20f35532c09 Author: Aljoscha Krettek Date: 2016-02-26T14:19:50Z [FLINK-3521] Make Iterable part of method signature for WindowFunction This closes #1723 commit 75e03caccad159fb04df4c7085a49d7f76e994c5 Author: Aljoscha Krettek Date: 2016-02-26T22:27:26Z [FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows This makes
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309134 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } + + val getRowData = data + + private def calcCols: Int = +data.first(1).collect().headOption match { + case Some(vector) => vector.values.size + case None => 0 +} + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex, fun(row1.values, row2.values)) + } + ) +new DistributedRowMatrix(result, numRowsOpt, numColsOpt) + } + + /** +* Add the matrix to another matrix. +* @param other +* @return +
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309403#comment-15309403 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1996#discussion_r65309134 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import breeze.linalg.{CSCMatrix => BreezeSparseMatrix, Matrix => BreezeMatrix, Vector => BreezeVector} +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Matrix => FlinkMatrix, _} +import org.apache.flink.util.Collector +import org.apache.flink.ml.math.Breeze._ +import scala.collection.JavaConversions._ + +/** + * Distributed row-major matrix representation. + * @param numRowsOpt If None, will be calculated from the DataSet. + * @param numColsOpt If None, will be calculated from the DataSet. + */ +class DistributedRowMatrix(data: DataSet[IndexedRow], + numRowsOpt: Option[Int] = None, + numColsOpt: Option[Int] = None) +extends DistributedMatrix { + + lazy val getNumRows: Int = numRowsOpt match { +case Some(rows) => rows +case None => data.count().toInt + } + + lazy val getNumCols: Int = numColsOpt match { +case Some(cols) => cols +case None => calcCols + } + + val getRowData = data + + private def calcCols: Int = +data.first(1).collect().headOption match { + case Some(vector) => vector.values.size + case None => 0 +} + + /** +* Collects the data in the form of a sequence of coordinates associated with their values. +* @return +*/ + def toCOO: Seq[(Int, Int, Double)] = { + +val localRows = data.collect() + +for (IndexedRow(rowIndex, vector) <- localRows; + (columnIndex, value) <- vector) yield (rowIndex, columnIndex, value) + } + + /** +* Collects the data in the form of a SparseMatrix +* @return +*/ + def toLocalSparseMatrix: SparseMatrix = { +val localMatrix = + SparseMatrix.fromCOO(this.getNumRows, this.getNumCols, this.toCOO) +require(localMatrix.numRows == this.getNumRows) +require(localMatrix.numCols == this.getNumCols) +localMatrix + } + + //TODO: convert to dense representation on the distributed matrix and collect it afterward + def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix + + /** +* Apply a high-order function to couple of rows +* @param fun +* @param other +* @return +*/ + def byRowOperation(fun: (Vector, Vector) => Vector, + other: DistributedRowMatrix): DistributedRowMatrix = { +val otherData = other.getRowData +require(this.getNumCols == other.getNumCols) +require(this.getNumRows == other.getNumRows) + + +val result = this.data + .fullOuterJoin(otherData) + .where("rowIndex") + .equalTo("rowIndex")( + (left: IndexedRow, right: IndexedRow) => { +val row1 = Option(left).getOrElse(IndexedRow( +right.rowIndex, +SparseVector.fromCOO(right.values.size, List((0, 0.0) +val row2 = Option(right).getOrElse(IndexedRow( +left.rowIndex, +SparseVector.fromCOO(left.values.size, List((0, 0.0) + +IndexedRow(row1.rowIndex,
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310133#comment-15310133 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark before merging I need to go over the numRows/numCols issue again because I noticed they create problems in another project of mine. I think that the `collect()` there is too risky and obscure so I would like to refactor it. I'm working on it right now. > Distributed Linear Algebra: row-based matrix > > > Key: FLINK-3919 > URL: https://issues.apache.org/jira/browse/FLINK-3919 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > > Distributed matrix implementation as a DataSet of IndexedRow and related > operations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310160#comment-15310160 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark I think it should be better to leave to the user the computation of the dimensionality. I tried different options and all of them are sub-optimal. I would leave this feature for later as a next step if it's ok. > Distributed Linear Algebra: row-based matrix > > > Key: FLINK-3919 > URL: https://issues.apache.org/jira/browse/FLINK-3919 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > > Distributed matrix implementation as a DataSet of IndexedRow and related > operations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.count()
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/2036 Thanks @greghogan! It looks like you have some checkstyle violation, otherwise +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310169#comment-15310169 ] ASF GitHub Bot commented on FLINK-3806: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2036 Yes. If I allow IntelliJ to handle automatically organize imports then they can be reordered due to a lack of consistency and project-wide guidelines. > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4002) [py] Improve testing infraestructure
Omar Alvarez created FLINK-4002: --- Summary: [py] Improve testing infraestructure Key: FLINK-4002 URL: https://issues.apache.org/jira/browse/FLINK-4002 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 1.0.3 Reporter: Omar Alvarez Priority: Minor The Verify() test function errors out when array elements are missing: {code:python} env.generate_sequence(1, 5)\ .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() {code} {quote} IndexError: list index out of range {quote} There should also be more documentation in test functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310186#comment-15310186 ] Christophe S commented on FLINK-2765: - Great! Almost all distro are in 1.1 or 1.2. > Upgrade hbase version for hadoop-2 to 1.2 release > - > > Key: FLINK-2765 > URL: https://issues.apache.org/jira/browse/FLINK-2765 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently 0.98.11 is used: > {code} > 0.98.11-hadoop2 > {code} > Stable release for hadoop-2 is 1.1.x line > We should upgrade to 1.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65347119 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java --- @@ -104,8 +106,21 @@ public void runCheckpointedProgram() { postSubmit(); } catch (Exception e) { + Throwable th = e; --- End diff -- @kl0u ping ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310194#comment-15310194 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/2020 The input formats still have a leftover field that stores the split. After that, the only thing that remains is the API methods. Also what was the reason for the new code in `StreamFaultToleranceTestBase.java`. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310211#comment-15310211 ] ASF GitHub Bot commented on FLINK-3667: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1978#discussion_r65348898 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -100,6 +102,9 @@ */ private JobID lastJobID; + /** Switch for blocking/detached job submission of the client */ + private boolean detachedJobSubmission = false; --- End diff -- Done. > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework agnostic command line arguments > • Pluggables for Yarn, Mesos handle specific commands > {noformat} > I would like to create/refactor the affected classes to set us up for a more > flexible client side resource management abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3993] [py] Add generateSequence() support to Pyth...
Github user omaralvarez commented on the pull request: https://github.com/apache/flink/pull/2055 Perfect. I'll open a new issue and try to fix that and document the test scripts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996 @chobeat Okay. Then we should force user to calculate dimensionality of matrix by changing type of number parameters in constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310165#comment-15310165 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1996 @chobeat Okay. Then we should force user to calculate dimensionality of matrix by changing type of number parameters in constructor. > Distributed Linear Algebra: row-based matrix > > > Key: FLINK-3919 > URL: https://issues.apache.org/jira/browse/FLINK-3919 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > > Distributed matrix implementation as a DataSet of IndexedRow and related > operations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.count()
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2036 Yes. If I allow IntelliJ to handle automatically organize imports then they can be reordered due to a lack of consistency and project-wide guidelines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/2020 The input formats still have a leftover field that stores the split. After that, the only thing that remains is the API methods. Also what was the reason for the new code in `StreamFaultToleranceTestBase.java`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3993) [py] Add generateSequence() support to Python API
[ https://issues.apache.org/jira/browse/FLINK-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309918#comment-15309918 ] ASF GitHub Bot commented on FLINK-3993: --- Github user omaralvarez commented on the pull request: https://github.com/apache/flink/pull/2055 I think, everything should be ready now. Although while performing testing, I have found something that I think is not ideal. If we use this code: ```python env.generate_sequence(1, 5)\ .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() ``` The Verify() function will error out with `IndexError: list index out of range`, this is not ideal, since it should raise a Flink testing exception. I could also try to fix this if needed. > [py] Add generateSequence() support to Python API > - > > Key: FLINK-3993 > URL: https://issues.apache.org/jira/browse/FLINK-3993 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.0.3 >Reporter: Omar Alvarez >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Right now, I believe that there is only from_elements() support in order to > create a sequence of numbers. It is interesting to be able to create a list > of numbers from the Python API also, apart from the Java API. It would not be > complicated, since we already have generateSequence(). I am already working > on this, and will create a pull request shortly in Github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3993] [py] Add generateSequence() support to Pyth...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/2055 Changing the test infrastructure should be done as a separate issue. There are a few things that could be improved, especially a bit of documentation (like what is the difference between Verify and Verify2). Your changes look good, I'll merge them later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309941#comment-15309941 ] ASF GitHub Bot commented on FLINK-3994: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for fixing! > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3919) Distributed Linear Algebra: row-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309971#comment-15309971 ] ASF GitHub Bot commented on FLINK-3919: --- Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark Yeah I thought I could write the documentation as a third PR but I would like to review the block matrix first because it may change in structure. Anyway I will soon begin working on a doc page with the general structure and some examples for the row-based matrix. > Distributed Linear Algebra: row-based matrix > > > Key: FLINK-3919 > URL: https://issues.apache.org/jira/browse/FLINK-3919 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > > Distributed matrix implementation as a DataSet of IndexedRow and related > operations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310006#comment-15310006 ] ASF GitHub Bot commented on FLINK-3758: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1979 Sorry for chiming in late. I was wondering whether we actually need accumulators here, or what we should add is access to the metrics. Because the use case seems to be "monitoring", not "side aggregates". Admittedly, the accumulators were the means of choice for metrics up to now, but for the future (now that we are adding proper metrics), we may want to fix this. > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
[ https://issues.apache.org/jira/browse/FLINK-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310007#comment-15310007 ] Ufuk Celebi commented on FLINK-4000: Thanks for reporting the issue. Did you configure a specific state backend or are you using the default one? > Exception: Could not restore checkpointed state to operators and functions; > during Job Restart (Job restart is triggered due to one of the task manager > failure) > - > > Key: FLINK-4000 > URL: https://issues.apache.org/jira/browse/FLINK-4000 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 > Environment: //Fault Tolerance Configuration of the Job > env.enableCheckpointing(5000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); >Reporter: Aride Chettali > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Failed to restore state to function: null > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) > ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3993] [py] Add generateSequence() support to Pyth...
Github user omaralvarez commented on the pull request: https://github.com/apache/flink/pull/2055 I think, everything should be ready now. Although while performing testing, I have found something that I think is not ideal. If we use this code: ```python env.generate_sequence(1, 5)\ .map(Id()).map_partition(Verify([1,2,3,4], "Sequence")).output() ``` The Verify() function will error out with `IndexError: list index out of range`, this is not ideal, since it should raise a Flink testing exception. I could also try to fix this if needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3993) [py] Add generateSequence() support to Python API
[ https://issues.apache.org/jira/browse/FLINK-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309943#comment-15309943 ] ASF GitHub Bot commented on FLINK-3993: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/2055 Changing the test infrastructure should be done as a separate issue. There are a few things that could be improved, especially a bit of documentation (like what is the difference between Verify and Verify2). Your changes look good, I'll merge them later today. > [py] Add generateSequence() support to Python API > - > > Key: FLINK-3993 > URL: https://issues.apache.org/jira/browse/FLINK-3993 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.0.3 >Reporter: Omar Alvarez >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Right now, I believe that there is only from_elements() support in order to > create a sequence of numbers. It is interesting to be able to create a list > of numbers from the Python API also, apart from the Java API. It would not be > complicated, since we already have generateSequence(). I am already working > on this, and will create a pull request shortly in Github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310040#comment-15310040 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65331149 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65331149 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/7ad8375a89374bec80571029e9166f1336bdea8e#commitcomment-17693170 In flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java: In flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java on line 64: where is this done now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2672) Add partitioned output format to HDFS RollingSink
[ https://issues.apache.org/jira/browse/FLINK-2672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2672: -- Component/s: filesystem-connector > Add partitioned output format to HDFS RollingSink > - > > Key: FLINK-2672 > URL: https://issues.apache.org/jira/browse/FLINK-2672 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Affects Versions: 0.10.0 >Reporter: Mohamed Amine ABDESSEMED >Priority: Minor > Labels: features > > An interesting use case of the HDFS Sink is to dispatch data into multiple > directories depending of attributes present in the source data. > For example, for some data with a timestamp and a status fields, we want to > write it into different directories using a pattern like : > /somepath/%{timestamp}/%{status} > The expected results are somethings like: > /somepath/some_timestamp/wellformed > /somepath/some_timestamp/malformed > /somepath/some_timestamp/incomplete > ... > etc > To support this functionality the bucketing and checkpointing logics need to > be changed. > Note: For now, this can be done using the current version of the Rolling HDFS > Sink (https://github.com/apache/flink/pull/1084) with the help of splitting > data streams and having multiple HDFS sinks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3854) Support Avro key-value rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3854: -- Component/s: (was: esystem-connector) filesystem-connector > Support Avro key-value rolling sink writer > -- > > Key: FLINK-3854 > URL: https://issues.apache.org/jira/browse/FLINK-3854 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Igor Berman > > Support rolling sink writer in avro key value format. > preferably without additional classpath dependencies > preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for fixing! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3919][flink-ml] Distributed Linear Algebra: row-b...
Github user chobeat commented on the pull request: https://github.com/apache/flink/pull/1996 @chiwanpark Yeah I thought I could write the documentation as a third PR but I would like to review the block matrix first because it may change in structure. Anyway I will soon begin working on a doc page with the general structure and some examples for the row-based matrix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65328505 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309992#comment-15309992 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65328505 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[GitHub] flink pull request: [FLINK-3758] Add possibility to register accumulators in...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1979 Sorry for chiming in late. I was wondering whether we actually need accumulators here, or what we should add is access to the metrics. Because the use case seems to be "monitoring", not "side aggregates". Admittedly, the accumulators were the means of choice for metrics up to now, but for the future (now that we are adding proper metrics), we may want to fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4001) Add event time support to filesystem connector
Robert Metzger created FLINK-4001: - Summary: Add event time support to filesystem connector Key: FLINK-4001 URL: https://issues.apache.org/jira/browse/FLINK-4001 Project: Flink Issue Type: New Feature Components: filesystem-connector, Streaming Connectors Reporter: Robert Metzger Currently, the file system connector (rolling file sink) does not respect the event time of records. For full reprocessing capabilities, we need to make the sink aware of the event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1873) Distributed matrix implementation
[ https://issues.apache.org/jira/browse/FLINK-1873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309972#comment-15309972 ] Simone Robutti commented on FLINK-1873: --- I thought I could open a dedicated issue for the documentation. Is it ok? > Distributed matrix implementation > - > > Key: FLINK-1873 > URL: https://issues.apache.org/jira/browse/FLINK-1873 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: liaoyuxi >Assignee: Simone Robutti > Labels: ML > > It would help to implement machine learning algorithm more quickly and > concise if Flink would provide support for storing data and computation in > distributed matrix. The design of the implementation is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/7ad8375a89374bec80571029e9166f1336bdea8e#commitcomment-17693130 In flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java: In flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java on line 51: this change has broken the scope formats. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309998#comment-15309998 ] Robert Metzger commented on FLINK-3758: --- I think metrics and accumulators are going to co-exist. Metrics are low-overhead and optional, wheres accumulators could contain some relevant data. The base components of the Metrics have been merged. The RuntimeContext now has a method for adding a metrics group. > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2706) Add support for streaming RollingFileSink to truncate / append on UNIX file systems
[ https://issues.apache.org/jira/browse/FLINK-2706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2706: -- Component/s: filesystem-connector > Add support for streaming RollingFileSink to truncate / append on UNIX file > systems > --- > > Key: FLINK-2706 > URL: https://issues.apache.org/jira/browse/FLINK-2706 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > Efficient exactly-once behavior needs the filesystem to support appending and > truncating files. > Since the UNIX file system API allows to append files and truncate files, we > can support perfect exactly-once behavior efficiently on all file systems > that expose a UNIX / POSIX-style interface (local FS, NFS, MapR FS). > Without this support, only Hadoop 2.7+ versions support proper exactly once > behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3854) Support Avro key-value rolling sink writer
[ https://issues.apache.org/jira/browse/FLINK-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3854: -- Component/s: esystem-connector > Support Avro key-value rolling sink writer > -- > > Key: FLINK-3854 > URL: https://issues.apache.org/jira/browse/FLINK-3854 > Project: Flink > Issue Type: Improvement > Components: esystem-connector, Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Igor Berman > > Support rolling sink writer in avro key value format. > preferably without additional classpath dependencies > preferable in same format as M/R jobs for backward compatibility -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/7ad8375a89374bec80571029e9166f1336bdea8e#commitcomment-17693724 In flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java: In flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java on line 51: note that trhis change, coupled with 651155775e74473f0adb3847b282bd4a8b636640, effectively disabled the entire metric system. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15310217#comment-15310217 ] ASF GitHub Bot commented on FLINK-3667: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1978#discussion_r65349622 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,14 +191,41 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { + GetClusterStatusResponse status = getClusterStatus(); + if (status != null) { + if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { + logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + } else { + logAndSysout("All TaskManagers are connected"); + break; + } + } else { + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); + } + + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + Thread.currentThread().interrupt(); --- End diff -- Done. > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework