[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744495#comment-15744495 ] Vasia Kalavri commented on FLINK-5245: -- My point is not that these features are useless for bipartite graphs, but we have to think whether re-implementing these features specifically for bipartite graphs makes sense, e.g. because general graphs do not supported them or because we can use the knowledge that we have a bipartite graph to make implementation more efficient. For example, projection is a transformation that can only be applied on bipartite graphs. But if all you want to do is get the degrees of your bipartite graph, can you use the available Graph methods? Or can we provide a better way to get the degrees because we know we have a bipartite graph? These are the questions we have to ask for each of these features in the list in my opinion. > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
[ https://issues.apache.org/jira/browse/FLINK-5324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng updated FLINK-5324: --- Description: YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code} so when we add some jvm options for one of them ,it will be both worked was: YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code/} so when we add some jvm options for one of them ,it will be both worked > JVM Opitons will be work both for YarnApplicationMasterRunner and > YarnTaskManager with yarn mode > > > Key: FLINK-5324 > URL: https://issues.apache.org/jira/browse/FLINK-5324 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.1.3 >Reporter: yuemeng >Priority: Critical > > YarnApplicationMasterRunner and YarnTaskManager both use follow code to get > jvm options > {code} > final String javaOpts = > flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") > {code} > so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
yuemeng created FLINK-5324: -- Summary: JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode Key: FLINK-5324 URL: https://issues.apache.org/jira/browse/FLINK-5324 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.3 Reporter: yuemeng Priority: Critical YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code/} so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744479#comment-15744479 ] Fabian Hueske commented on FLINK-3475: -- Hi [~ykt836], sure. That's fine with me and would be great! I don't have time to work on it myself and just wanted to sketch a possible design. Please let me know if you have questions about it or if you have other suggestions. > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Chengxiang Li > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744476#comment-15744476 ] ASF GitHub Bot commented on FLINK-5311: --- Github user vasia commented on the issue: https://github.com/apache/flink/pull/2984 Thank you for the update @mushketyk! I still don't see any link from the Gelly guide page to the bipartite docs though. Can you please add that too? Otherwise people won't be able to find the docs :) As for the images, I think it would be nice to have show how a projection works. > Write user documentation for BipartiteGraph > --- > > Key: FLINK-5311 > URL: https://issues.apache.org/jira/browse/FLINK-5311 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > We need to add user documentation. The progress on BipartiteGraph can be > tracked in the following JIRA: > https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2984: [FLINK-5311] Add user documentation for bipartite graph
Github user vasia commented on the issue: https://github.com/apache/flink/pull/2984 Thank you for the update @mushketyk! I still don't see any link from the Gelly guide page to the bipartite docs though. Can you please add that too? Otherwise people won't be able to find the docs :) As for the images, I think it would be nice to have show how a projection works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4648) Implement bipartite graph generators
[ https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1570#comment-1570 ] ASF GitHub Bot commented on FLINK-4648: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2986 Hi @greghogan , Thank you for your review. I've update my PR accordingly. > Implement bipartite graph generators > > > Key: FLINK-4648 > URL: https://issues.apache.org/jira/browse/FLINK-4648 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement generators for bipartite graphs. > Should implement at least: > * *BipartiteGraphGenerator* (maybe requires a better name) that will generate > a bipartite graph where every vertex of one set is connected only to some > vertices from another set > * *CompleteBipartiteGraphGenerator* that will generate a graph where every > vertex of one set is conneted to every vertex of another set -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2986: [FLINK-4648] Implement BipartiteGraph generator
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2986 Hi @greghogan , Thank you for your review. I've update my PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743965#comment-15743965 ] Jark Wu commented on FLINK-5280: Hi [~ivan.mushketyk], I will try to answer your question. The main problem you confused is that {{CodeGenerator}} doesn't support nested access. Actually, it has been fixed in FLINK-4294, and you can have a look at the test example {{CompositeAccessTest}}. I think it will give you some inspiration. And the other problem that {{BatchScan#convertToExpectedType}} will convert input dataset into Row type. Actually, it will not flatten the nested fields, but keep the same schema in Row. In your case, the ParentPojo will be converted Row type with {{Row(child: ChildPojo, num:Int)}}. Hope that will help you. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506583#comment-15506583 ] Ted Yu edited comment on FLINK-4534 at 12/13/16 2:06 AM: - Feel free to work on this. Thanks, Liwei . was (Author: yuzhih...@gmail.com): Feel free to work on this. Thanks, Liwei > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entryentry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743753#comment-15743753 ] Kurt Young commented on FLINK-3475: --- We are willing to offer some helps on this issue, [~fhueske] [~chengxiang li] Is this ok for both of you? > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Chengxiang Li > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5323) CheckpointNotifier should be removed from docs
Abhishek Singh created FLINK-5323: - Summary: CheckpointNotifier should be removed from docs Key: FLINK-5323 URL: https://issues.apache.org/jira/browse/FLINK-5323 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.3 Reporter: Abhishek Singh Priority: Trivial I was following the official documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html Looks like this is the right one to be using: import org.apache.flink.runtime.state.CheckpointListener; -Abhishek- On Dec 9, 2016, at 4:30 PM, Abhishek R. Singhwrote: I can’t seem to find CheckpointNotifier. Appreciate help ! CheckpointNotifier is not a member of package org.apache.flink.streaming.api.checkpoint >From my pom.xml: org.apache.flink flink-scala_2.11 1.1.3 org.apache.flink flink-streaming-scala_2.11 1.1.3 org.apache.flink flink-clients_2.11 1.1.3 org.apache.flink flink-statebackend-rocksdb_2.11 1.1.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743344#comment-15743344 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 To suggest some way to fix the guarantees: To my mind, the crux lies in the way that the feedback channel is implemented - a simple blocking queue just does not cut it for that case. To make this proper, I think we need to do the following: - Have an elastic feedback channel (unbounded) with a certain memory budget, that can spill if needed. I think it would be best implemented holding data serialized. - On checkpoint, one simply adds the feedback channel data (already bytes) to the checkpoint - The source task should probably prioritize reading from the feedback channel, to keep it always as small as possible. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743343#comment-15743343 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the reminder, I went over the code today. The code looks mostly good, but here are some thoughts: - The head task supports only one concurrent checkpoint. In general, the tasks need to support multiple checkpoints being in progress at the same time. It frequently happens when people trigger savepoints concurrent to a running checkpoint. I think that is important to support. - There tail task offers the elements to the blocking queue. That means records are simply dropped if the capacity bound queue (one element) is not polled by the head task in time. - With the capacity bound in the feedback queue, it is pretty easy to build a full deadlock. Just use a loop function that explodes data into the feedback channel. - Recent code also introduced the ability to change parallelism. What are the semantics here when the parallelism of the loop is changed? Since loops did not support any fault tolerance guarantees, I guess this does improve recovery behavior. But as long as the loops can either deadlock or drop data, the hard guarantees are in the end still a bit weak. So that leaves me a bit ambivalent what to do with this pull request. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 To suggest some way to fix the guarantees: To my mind, the crux lies in the way that the feedback channel is implemented - a simple blocking queue just does not cut it for that case. To make this proper, I think we need to do the following: - Have an elastic feedback channel (unbounded) with a certain memory budget, that can spill if needed. I think it would be best implemented holding data serialized. - On checkpoint, one simply adds the feedback channel data (already bytes) to the checkpoint - The source task should probably prioritize reading from the feedback channel, to keep it always as small as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1668 Thanks for the reminder, I went over the code today. The code looks mostly good, but here are some thoughts: - The head task supports only one concurrent checkpoint. In general, the tasks need to support multiple checkpoints being in progress at the same time. It frequently happens when people trigger savepoints concurrent to a running checkpoint. I think that is important to support. - There tail task offers the elements to the blocking queue. That means records are simply dropped if the capacity bound queue (one element) is not polled by the head task in time. - With the capacity bound in the feedback queue, it is pretty easy to build a full deadlock. Just use a loop function that explodes data into the feedback channel. - Recent code also introduced the ability to change parallelism. What are the semantics here when the parallelism of the loop is changed? Since loops did not support any fault tolerance guarantees, I guess this does improve recovery behavior. But as long as the loops can either deadlock or drop data, the hard guarantees are in the end still a bit weak. So that leaves me a bit ambivalent what to do with this pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5223) Add documentation of UDTF in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743270#comment-15743270 ] ASF GitHub Bot commented on FLINK-5223: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2956 Thanks for the PR @wuchong. The documentation is mostly very good. I'll rephrase a few minor things before merging. Thanks, Fabian > Add documentation of UDTF in Table API & SQL > > > Key: FLINK-5223 > URL: https://issues.apache.org/jira/browse/FLINK-5223 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2956: [FLINK-5223] [doc] Add documentation of UDTF in Table API...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2956 Thanks for the PR @wuchong. The documentation is mostly very good. I'll rephrase a few minor things before merging. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4091) flink-connector-cassandra has conflicting guava version
[ https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445 ] Shannon Carey edited comment on FLINK-4091 at 12/12/16 9:44 PM: Flink's inclusion of altered Cassandra classes (though probably unavoidable) was causing a lot of problems for us when using a library with dependencies on cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job locally from the IDE, the two different versions of Cassandra included on the classpath would cause runtime errors. Excluding the two dependencies with Maven seems to have fixed the issue, allowing us to run our Flink jobs from the IDE again. Just figured I'd mention it here in case it helps anyone else. However, we do have to be careful in the library not to use methods that return Guava classes (such as asyncPrepare) because they'll be written against the non-shaded Guava. Relocating the Guava package with the shade plugin works when using the jar, but not when running Flink jobs from within the IDE (IntelliJ doesn't have full integration with the shade plugin). was (Author: rehevkor5): Flink's inclusion of altered Cassandra classes (though probably unavoidable) was causing a lot of problems for us when using a library with dependencies on cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job locally from the IDE, the two different versions of Cassandra classed on the classpath would cause runtime errors. Excluding the two dependencies with Maven seems to have fixed the issue, allowing us to run our Flink jobs from the IDE again. Just figured I'd mention it here in case it helps anyone else. > flink-connector-cassandra has conflicting guava version > --- > > Key: FLINK-4091 > URL: https://issues.apache.org/jira/browse/FLINK-4091 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 > Environment: MacOSX, 1.10-SNAPSHOT (head is > 1a6bab3ef76805685044cf4521e32315169f9033) >Reporter: Dominik Bruhn >Assignee: Chesnay Schepler > > The newly merged cassandra streaming connector has an issue with its guava > dependency. > The build-process for flink-connector-cassandra creates shaded JAR file which > contains the connector, the datastax cassandra driver plus in > org.apache.flink.shaded a shaded copy of guava. > The datastax cassandra driver calls into Futures.withFallback ([1]) which is > present in this guava version. This also works inside the > flink-connector-cassandra jar. > Now the actual build-process for Flink happens and builds another shaded JAR > and creates the flink-dist.jar. Inside this JAR, there is also a shaded > version of guava inside org.apache.flink.shaded. > Now the issue: The guava version which is in the flink-dist.jar is not > compatible and doesn't contain the Futures.withFallback which the datastax > driver is using. > This leads into the following issue: You can without any problems launch a > flink task which uses the casandra driver locally (so through the > mini-cluster) because that is never using the flink-dist.jar. > BUT: As soon as you are trying to start this job on a flink cluster (which > uses the flink-dist.jar), the job breaks with the following exception: > https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d > You can inspect this by opening the > flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the > flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler. > I don't know a good solution here: Perhaps it would be one solution to shade > the guava for the cassandra-driver somewhere else than at > org.apache.flink.shaded. > [1]: > https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture, > com.google.common.util.concurrent.FutureFallback, > java.util.concurrent.Executor) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5187) Create analog of Row in core
[ https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743211#comment-15743211 ] ASF GitHub Bot commented on FLINK-5187: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2968#discussion_r92037566 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java --- @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; --- End diff -- `ArrayList` is unused and needs to be removed. > Create analog of Row in core > > > Key: FLINK-5187 > URL: https://issues.apache.org/jira/browse/FLINK-5187 > Project: Flink > Issue Type: Sub-task > Components: Core, Table API & SQL >Reporter: Anton Solovev >Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2968: [FLINK-5187] [core] Create analog of Row and RowTy...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2968#discussion_r92037566 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java --- @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; --- End diff -- `ArrayList` is unused and needs to be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743127#comment-15743127 ] Ivan Mushketyk commented on FLINK-5245: --- Thank you for your answer, Greg. Do I understand you correctly "mutators" is the only questionable feature for BiparateGraph and the rest are good to go from your point of view? Regarding the duplication, I don't there is much duplication now between BipartiteGraph and Graph, but this may change later when we implement other features. BipartiteEdge can be a super class for the Edge class. > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution
[ https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743050#comment-15743050 ] ASF GitHub Bot commented on FLINK-5108: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2928 +1 for that approach > Remove ClientShutdownHook during job execution > -- > > Key: FLINK-5108 > URL: https://issues.apache.org/jira/browse/FLINK-5108 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Renkai Ge >Priority: Blocker > Fix For: 1.2.0 > > > The behavior of the Standalone mode is to not react to client interrupts once > a job has been deployed. We should change the Yarn client implementation to > behave the same. This avoids accidental shutdown of the job, e.g. when the > user sends an interrupt via CTRL-C or when the client machine shuts down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2928 +1 for that approach --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743017#comment-15743017 ] Greg Hogan commented on FLINK-5245: --- In addition to being useful the API must promote efficiency and best practices. The add/remove methods are lacking since 1) the vertices/edges must be known a priori, 2) the methods are not scalable, and 3) users may have the idea to call these methods inside a local for loop. We should rethink the API as features are added. I would like to see `getUndirected()` and `getDegrees()` reimplemented. I like that we're adding a lot of bipartite graph functionality. How can we best avoid duplication of code? Perhaps Edge should be a specialization of BipartiteEdge and Graph a specialization of BipartiteGraph (the names would change, of course). Implementing some algorithms (such as redundancy from the two-mode paper) may help answer these questions. > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742957#comment-15742957 ] ASF GitHub Bot commented on FLINK-5113: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 It should be part of this PR since this case simply didn't exist before the new interface. The equivalent to no state was the state being null, at which point restore was never called in the first place. Now, if the state is null we get an empty list, afaik. Here is what is confusing me: Every single function checks whether the state is empty. Every one. So, there is apparently the possibility that it's empty. But the behavior for that case does not seem well-defined. According to the code receiving an empty state list is not a reason to fail for any of these tests. If this is the case we don't need to actually implement `restoreState` in the first place since it is irrelevant to the result of the test If this is not the case we should try to fail as early as possible by adding a failure condition. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 It should be part of this PR since this case simply didn't exist before the new interface. The equivalent to no state was the state being null, at which point restore was never called in the first place. Now, if the state is null we get an empty list, afaik. Here is what is confusing me: Every single function checks whether the state is empty. Every one. So, there is apparently the possibility that it's empty. But the behavior for that case does not seem well-defined. According to the code receiving an empty state list is not a reason to fail for any of these tests. If this is the case we don't need to actually implement `restoreState` in the first place since it is irrelevant to the result of the test If this is not the case we should try to fail as early as possible by adding a failure condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5189) Deprecate table.Row
[ https://issues.apache.org/jira/browse/FLINK-5189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Solovev updated FLINK-5189: - Summary: Deprecate table.Row (was: Delete table.Row) > Deprecate table.Row > --- > > Key: FLINK-5189 > URL: https://issues.apache.org/jira/browse/FLINK-5189 > Project: Flink > Issue Type: Sub-task > Components: Core, Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5187) Create analog of Row in core
[ https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742935#comment-15742935 ] ASF GitHub Bot commented on FLINK-5187: --- Github user tonycox commented on the issue: https://github.com/apache/flink/pull/2968 @fhueske Yes. I already started https://github.com/tonycox/flink/tree/FLINK-5188 > Create analog of Row in core > > > Key: FLINK-5187 > URL: https://issues.apache.org/jira/browse/FLINK-5187 > Project: Flink > Issue Type: Sub-task > Components: Core, Table API & SQL >Reporter: Anton Solovev >Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2968: [FLINK-5187] [core] Create analog of Row and RowTypeInfo ...
Github user tonycox commented on the issue: https://github.com/apache/flink/pull/2968 @fhueske Yes. I already started https://github.com/tonycox/flink/tree/FLINK-5188 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes
[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742901#comment-15742901 ] Alexander Chermenin commented on FLINK-5319: More simple test with the same result: {code}ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet longDataSet = env.fromCollection(Arrays.asList(1L, 2L, 3L, 4L, 5L)); DataSet intDataSet = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5)); longDataSet.map(Number::doubleValue).print(); intDataSet.map(Number::doubleValue).print();{code} > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Alexander Chermenin > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource bStream = environment.fromElements(bs); > DataStreamSource cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector) A::getId).print(); > cStream.keyBy((KeySelector) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at >
[jira] [Commented] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742870#comment-15742870 ] Chesnay Schepler commented on FLINK-5322: - Could you try whether this works: {code} yarn.taskmanager.env.MY_ENV: test {code} > yarn.taskmanager.env value does not appear in System.getenv > --- > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Bug > Components: YARN > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Priority: Trivial > Fix For: 1.1.3 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4631) NullPointerException during stream task cleanup
[ https://issues.apache.org/jira/browse/FLINK-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4631. --- Resolution: Fixed > NullPointerException during stream task cleanup > --- > > Key: FLINK-4631 > URL: https://issues.apache.org/jira/browse/FLINK-4631 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 > Environment: Ubuntu server 12.04.5 64 bit > java version "1.8.0_40" > Java(TM) SE Runtime Environment (build 1.8.0_40-b26) > Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode) >Reporter: Avihai Berkovitz > Fix For: 1.2.0 > > > If a streaming job failed during startup (in my case, due to lack of network > buffers), all the tasks are being cancelled before they started. This causes > many instances of the following exception: > {noformat} > 2016-09-18 14:17:12,177 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > cleanup of stream task > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes
[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742827#comment-15742827 ] Alexander Chermenin commented on FLINK-5319: There are no problems if use code either only with B or only with C. > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Alexander Chermenin > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource bStream = environment.fromElements(bs); > DataStreamSource cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector) A::getId).print(); > cStream.keyBy((KeySelector) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at >
[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742773#comment-15742773 ] Ivan Mushketyk commented on FLINK-5245: --- Are these methods useful for Graph class? Here is a list of BipartiteGraph features that are pending implementation or review: * Graph mutation method * Accessors * CSV readers * Validators * Generators * Metrics * Graph properties * Graph transformations * Scala interface * Bipartite graph examples What do you think is useful for BipartiteGraph? Is there a case that a feature can be useful for Graph class, but useless for BipartiteGraph class? > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742744#comment-15742744 ] Vasia Kalavri commented on FLINK-5245: -- I don't think so. We used to have some simple examples that showcased how to create a graph in this way, but I don't think we really need such methods for the bipartite graph. That said, we should probably go through all the bipartite features and decide whether they are useful, e.g. validator and generators. Do they even make sense for bipartite graphs? Or when do they? > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742726#comment-15742726 ] Greg Hogan commented on FLINK-5245: --- [~vkalavri] are these methods used outside of tests? > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92007073 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; --- End diff -- As with validation, we should consider packaging these under the `bipartite` package. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92006696 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; --- End diff -- Use `PARALLELISM_DEFAULT`. We used to use `-1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4648) Implement bipartite graph generators
[ https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742705#comment-15742705 ] ASF GitHub Bot commented on FLINK-4648: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92006686 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; + + public CompleteBipartiteGraph(ExecutionEnvironment env, long topVertexCount, long bottomVertexCount) { + this.env = env; + this.topVertexCount = topVertexCount; + this.bottomVertexCount = bottomVertexCount; + } + + @Override + public BipartiteGraph generate() { + DataSet > topVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, topVertexCount); + DataSet > bottomVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, bottomVertexCount); + + DataSet > edges = topVertices.cross(bottomVertices) --- End diff -- Can implement `EdgeGenerator` as a `flatMap` instead of using a `cross`, as in `CompleteGraph`. > Implement bipartite graph generators > > > Key: FLINK-4648 > URL: https://issues.apache.org/jira/browse/FLINK-4648 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement generators for bipartite graphs. > Should implement at least: > * *BipartiteGraphGenerator* (maybe requires a better name) that will generate > a bipartite graph where every vertex of one set is connected only to some > vertices from another set > * *CompleteBipartiteGraphGenerator* that will generate a graph where every > vertex of one set is conneted to every vertex of another set -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4648) Implement bipartite graph generators
[ https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742702#comment-15742702 ] ASF GitHub Bot commented on FLINK-4648: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92006696 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; --- End diff -- Use `PARALLELISM_DEFAULT`. We used to use `-1`. > Implement bipartite graph generators > > > Key: FLINK-4648 > URL: https://issues.apache.org/jira/browse/FLINK-4648 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement generators for bipartite graphs. > Should implement at least: > * *BipartiteGraphGenerator* (maybe requires a better name) that will generate > a bipartite graph where every vertex of one set is connected only to some > vertices from another set > * *CompleteBipartiteGraphGenerator* that will generate a graph where every > vertex of one set is conneted to every vertex of another set -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4648) Implement bipartite graph generators
[ https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742703#comment-15742703 ] ASF GitHub Bot commented on FLINK-4648: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92007172 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; + + public CompleteBipartiteGraph(ExecutionEnvironment env, long topVertexCount, long bottomVertexCount) { + this.env = env; + this.topVertexCount = topVertexCount; + this.bottomVertexCount = bottomVertexCount; + } + + @Override + public BipartiteGraph generate() { + DataSet > topVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, topVertexCount); + DataSet > bottomVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, bottomVertexCount); + + DataSet > edges = topVertices.cross(bottomVertices) + .setParallelism(parallelism) + .map(new EdgeGenerator()) + .setParallelism(parallelism); + + return BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env); + } + + @Override + public BipartiteGraphGenerator setParallelism(int parallelism) { --- End diff -- Move this to an `AbstractBipartiteGraphGenerator` as with `AbstractGraphGenerator`? > Implement bipartite graph generators > > > Key: FLINK-4648 > URL: https://issues.apache.org/jira/browse/FLINK-4648 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement generators for bipartite graphs. > Should implement at least: > * *BipartiteGraphGenerator* (maybe requires a better name) that will generate > a bipartite graph where every vertex of one set is connected only to some > vertices from another set > * *CompleteBipartiteGraphGenerator* that will generate a graph where every > vertex of one set is conneted to every vertex of another set -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4648) Implement bipartite graph generators
[ https://issues.apache.org/jira/browse/FLINK-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742704#comment-15742704 ] ASF GitHub Bot commented on FLINK-4648: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92007073 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; --- End diff -- As with validation, we should consider packaging these under the `bipartite` package. > Implement bipartite graph generators > > > Key: FLINK-4648 > URL: https://issues.apache.org/jira/browse/FLINK-4648 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement generators for bipartite graphs. > Should implement at least: > * *BipartiteGraphGenerator* (maybe requires a better name) that will generate > a bipartite graph where every vertex of one set is connected only to some > vertices from another set > * *CompleteBipartiteGraphGenerator* that will generate a graph where every > vertex of one set is conneted to every vertex of another set -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92007172 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; + + public CompleteBipartiteGraph(ExecutionEnvironment env, long topVertexCount, long bottomVertexCount) { + this.env = env; + this.topVertexCount = topVertexCount; + this.bottomVertexCount = bottomVertexCount; + } + + @Override + public BipartiteGraph generate() { + DataSet > topVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, topVertexCount); + DataSet > bottomVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, bottomVertexCount); + + DataSet > edges = topVertices.cross(bottomVertices) + .setParallelism(parallelism) + .map(new EdgeGenerator()) + .setParallelism(parallelism); + + return BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env); + } + + @Override + public BipartiteGraphGenerator setParallelism(int parallelism) { --- End diff -- Move this to an `AbstractBipartiteGraphGenerator` as with `AbstractGraphGenerator`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2986: [FLINK-4648] Implement BipartiteGraph generator
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2986#discussion_r92006686 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteBipartiteGraph.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a complete bipartate graph where every top node is connected to every bottom node. + */ +public class CompleteBipartiteGraph + implements BipartiteGraphGenerator{ + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private final long topVertexCount; + private final long bottomVertexCount; + + private int parallelism = 1; + + public CompleteBipartiteGraph(ExecutionEnvironment env, long topVertexCount, long bottomVertexCount) { + this.env = env; + this.topVertexCount = topVertexCount; + this.bottomVertexCount = bottomVertexCount; + } + + @Override + public BipartiteGraph generate() { + DataSet > topVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, topVertexCount); + DataSet > bottomVertices + = GraphGeneratorUtils.vertexSequence(env, parallelism, bottomVertexCount); + + DataSet > edges = topVertices.cross(bottomVertices) --- End diff -- Can implement `EdgeGenerator` as a `flatMap` instead of using a `cross`, as in `CompleteGraph`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator
[ https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742663#comment-15742663 ] ASF GitHub Bot commented on FLINK-5104: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r92004487 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); --- End diff -- Can `GetTopIdsMap` be replaced with `. project(0)`? > Implement BipartiteGraph validator > -- > > Key: FLINK-5104 > URL: https://issues.apache.org/jira/browse/FLINK-5104 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > BipartiteGraph should have a validator similar to GraphValidator for Graph > class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator
[ https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742661#comment-15742661 ] ASF GitHub Bot commented on FLINK-5104: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91994405 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/BipartiteGraphValidator.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; --- End diff -- Move to `org.apache.flink.graph.bipartite.validation`? > Implement BipartiteGraph validator > -- > > Key: FLINK-5104 > URL: https://issues.apache.org/jira/browse/FLINK-5104 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > BipartiteGraph should have a validator similar to GraphValidator for Graph > class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator
[ https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742662#comment-15742662 ] ASF GitHub Bot commented on FLINK-5104: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91997234 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); + DataSet edgesBottomIds = bipartiteGraph.getEdges().map(new GetBottomIdsMap ()); + + DataSet invalidTopIds = invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds); + DataSet invalidBottomIds = invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds); + + return invalidTopIds.count() == 0 && invalidBottomIds.count() == 0; + } + + private DataSet invalidIds(DataSet > topVertices, DataSet edgesIds) { + return topVertices.coGroup(edgesIds) + .where(0) + .equalTo(0) + .with(new CoGroupFunction , Tuple1, K>() { + @Override + public void coGroup(Iterable > vertices, Iterable edgeIds, Collector out) throws Exception { + if (!vertices.iterator().hasNext()) { + out.collect(edgeIds.iterator().next().f0); + } + } + }); + } + + @ForwardedFields("f0") + private class GetTopIdsMap implements MapFunction , Tuple1> { --- End diff -- Inner Flink functions should be `static` to prevent serialization of the outer class. > Implement BipartiteGraph validator > -- > > Key: FLINK-5104 > URL: https://issues.apache.org/jira/browse/FLINK-5104 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > BipartiteGraph should have a validator similar to GraphValidator for Graph > class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5104) Implement BipartiteGraph validator
[ https://issues.apache.org/jira/browse/FLINK-5104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742664#comment-15742664 ] ASF GitHub Bot commented on FLINK-5104: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91996140 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); + DataSet edgesBottomIds = bipartiteGraph.getEdges().map(new GetBottomIdsMap ()); + + DataSet invalidTopIds = invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds); + DataSet invalidBottomIds = invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds); + + return invalidTopIds.count() == 0 && invalidBottomIds.count() == 0; --- End diff -- `count()` executes the program so shouldn't be called twice. Could use an accumulator in `RichOutputFormat` instead. > Implement BipartiteGraph validator > -- > > Key: FLINK-5104 > URL: https://issues.apache.org/jira/browse/FLINK-5104 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > BipartiteGraph should have a validator similar to GraphValidator for Graph > class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91994405 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/BipartiteGraphValidator.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; --- End diff -- Move to `org.apache.flink.graph.bipartite.validation`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91997234 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); + DataSet edgesBottomIds = bipartiteGraph.getEdges().map(new GetBottomIdsMap ()); + + DataSet invalidTopIds = invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds); + DataSet invalidBottomIds = invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds); + + return invalidTopIds.count() == 0 && invalidBottomIds.count() == 0; + } + + private DataSet invalidIds(DataSet > topVertices, DataSet edgesIds) { + return topVertices.coGroup(edgesIds) + .where(0) + .equalTo(0) + .with(new CoGroupFunction , Tuple1, K>() { + @Override + public void coGroup(Iterable > vertices, Iterable edgeIds, Collector out) throws Exception { + if (!vertices.iterator().hasNext()) { + out.collect(edgeIds.iterator().next().f0); + } + } + }); + } + + @ForwardedFields("f0") + private class GetTopIdsMap implements MapFunction , Tuple1> { --- End diff -- Inner Flink functions should be `static` to prevent serialization of the outer class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r91996140 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); + DataSet edgesBottomIds = bipartiteGraph.getEdges().map(new GetBottomIdsMap ()); + + DataSet invalidTopIds = invalidIds(bipartiteGraph.getTopVertices(), edgesTopIds); + DataSet invalidBottomIds = invalidIds(bipartiteGraph.getBottomVertices(), edgesBottomIds); + + return invalidTopIds.count() == 0 && invalidBottomIds.count() == 0; --- End diff -- `count()` executes the program so shouldn't be called twice. Could use an accumulator in `RichOutputFormat` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2985: [FLINK-5104] Bipartite graph validation
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2985#discussion_r92004487 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidBipartiteVertexIdsValidator.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.apache.flink.graph.bipartite.BipartiteGraph; +import org.apache.flink.util.Collector; +import static org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; + +/** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in top and bottom vertex sets. + */ +public class InvalidBipartiteVertexIdsValidatorextends BipartiteGraphValidator { + + @Override + public boolean validate(BipartiteGraph bipartiteGraph) throws Exception { + DataSet edgesTopIds = bipartiteGraph.getEdges().map(new GetTopIdsMap ()); --- End diff -- Can `GetTopIdsMap` be replaced with `. project(0)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742647#comment-15742647 ] Robert Metzger edited comment on FLINK-4858 at 12/12/16 6:11 PM: - Okay, thank you for your input. I'll remove 1.2 as a target fix version. was (Author: rmetzger): Okay, thank you for your input. I'lll remove 1.2 as a target fix version. > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4858: -- Component/s: State Backends, Checkpointing > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742647#comment-15742647 ] Robert Metzger commented on FLINK-4858: --- Okay, thank you for your input. I'lll remove 1.2 as a target fix version. > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
[ https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shannon Carey updated FLINK-5322: - Description: The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. Interestingly, it does appear within the Flink JobManager Web UI under Job Manager -> Configuration. was: The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. > yarn.taskmanager.env value does not appear in System.getenv > --- > > Key: FLINK-5322 > URL: https://issues.apache.org/jira/browse/FLINK-5322 > Project: Flink > Issue Type: Bug > Components: YARN > Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") >Reporter: Shannon Carey >Priority: Trivial > Fix For: 1.1.3 > > > The value I specified in flink-conf.yaml > {code} > yarn.taskmanager.env: > MY_ENV: test > {code} > is not available in {{System.getenv("MY_ENV")}} from the plan execution > (execution flow of main method) nor from within execution of a streaming > operator. > Interestingly, it does appear within the Flink JobManager Web UI under Job > Manager -> Configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4858: -- Fix Version/s: (was: 1.2.0) > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
Shannon Carey created FLINK-5322: Summary: yarn.taskmanager.env value does not appear in System.getenv Key: FLINK-5322 URL: https://issues.apache.org/jira/browse/FLINK-5322 Project: Flink Issue Type: Bug Components: YARN Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") Reporter: Shannon Carey Priority: Trivial Fix For: 1.1.3 The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds
[ https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742637#comment-15742637 ] ASF GitHub Bot commented on FLINK-5290: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r92004254 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); +
[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r92004254 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + //
[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds
[ https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742622#comment-15742622 ] ASF GitHub Bot commented on FLINK-5290: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r92003177 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); +
[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742624#comment-15742624 ] Stefan Richter commented on FLINK-4858: --- Agreed. We can not remove it in 1.2, because that means breaking backwards compatibility to all UDFs that use {{Checkpointed}}. > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.2.0 > > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r92003177 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + // node,
[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742614#comment-15742614 ] Aljoscha Krettek commented on FLINK-4858: - We probably can't remove it. (unfortunately) > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.2.0 > > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes
[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742609#comment-15742609 ] Aljoscha Krettek commented on FLINK-5319: - [~twalthr] could you please have a look? This seems related to another case that we had a while back. > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Alexander Chermenin > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource bStream = environment.fromElements(bs); > DataStreamSource cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector) A::getId).print(); > cStream.keyBy((KeySelector) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at >
[jira] [Assigned] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-5245: - Assignee: Ivan Mushketyk > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2984: [FLINK-5311] Add user documentation for bipartite graph
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2984 Hi @vasia, Thank you for your review. I've updated the documentation accordingly and added more detailed descriptions for projections code examples. Do you think it is still better to add some images? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742585#comment-15742585 ] Aljoscha Krettek commented on FLINK-3398: - I just saw a lot of discussion about this on the ML so I bumped it. We can also reduce the importance again. > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Shikhar Bhushan >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0 > > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742590#comment-15742590 ] ASF GitHub Bot commented on FLINK-5311: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2984 Hi @vasia, Thank you for your review. I've updated the documentation accordingly and added more detailed descriptions for projections code examples. Do you think it is still better to add some images? > Write user documentation for BipartiteGraph > --- > > Key: FLINK-5311 > URL: https://issues.apache.org/jira/browse/FLINK-5311 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > We need to add user documentation. The progress on BipartiteGraph can be > tracked in the following JIRA: > https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5295) Migrate the AlignedWindowOperators to the WindowOperator and make it backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5295: Priority: Blocker (was: Major) > Migrate the AlignedWindowOperators to the WindowOperator and make it > backwards compatible. > -- > > Key: FLINK-5295 > URL: https://issues.apache.org/jira/browse/FLINK-5295 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5318) Make the Rolling/Bucketing sink backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5318: Priority: Blocker (was: Major) > Make the Rolling/Bucketing sink backwards compatible. > - > > Key: FLINK-5318 > URL: https://issues.apache.org/jira/browse/FLINK-5318 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5293) Make the Kafka consumer backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5293: Priority: Blocker (was: Major) > Make the Kafka consumer backwards compatible. > - > > Key: FLINK-5293 > URL: https://issues.apache.org/jira/browse/FLINK-5293 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5294) Make the WindowOperator backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5294: Priority: Blocker (was: Major) > Make the WindowOperator backwards compatible. > - > > Key: FLINK-5294 > URL: https://issues.apache.org/jira/browse/FLINK-5294 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Priority: Blocker > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5296) Expose the old AlignedWindowOperators to the user through explicit commands.
[ https://issues.apache.org/jira/browse/FLINK-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5296: Priority: Blocker (was: Major) > Expose the old AlignedWindowOperators to the user through explicit commands. > > > Key: FLINK-5296 > URL: https://issues.apache.org/jira/browse/FLINK-5296 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4973) Flakey Yarn tests due to recently added latency marker
[ https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4973: Assignee: Till Rohrmann (was: Robert Metzger) > Flakey Yarn tests due to recently added latency marker > -- > > Key: FLINK-4973 > URL: https://issues.apache.org/jira/browse/FLINK-4973 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > The newly introduced {{LatencyMarksEmitter}} emits latency marker on the > {{Output}}. This can still happen after the underlying {{BufferPool}} has > been destroyed. The occurring exception is then logged: > {code} > 2016-10-29 15:00:48,088 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom File Source (1/1) switched to FINISHED > 2016-10-29 15:00:48,089 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Custom File Source (1/1) > 2016-10-29 15:00:48,089 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > FINISHED to JobManager for task Source: Custom File Source > (8fe0f817fa6d960ea33f6e57e0c3891c) > 2016-10-29 15:00:48,101 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error > while emitting latency marker > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:734) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.run(StreamSource.java:134) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:118) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:103) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96) > ... 9 more > {code} > This exception is clearly related to the shutdown of a stream operator and > does not indicate a wrong behaviour. Since the yarn tests simply scan the log > for some keywords (including exception) such a case can make them fail. > Best if we could make sure that the {{LatencyMarksEmitter}} would only emit > latency marker if the {{Output}} would still be active. But we could also > simply not log exceptions which occurred after the stream operator has been > stopped. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/171578846/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4631) NullPointerException during stream task cleanup
[ https://issues.apache.org/jira/browse/FLINK-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742565#comment-15742565 ] Aljoscha Krettek commented on FLINK-4631: - I think it's ok to close. The log entries are not pretty but it only happens in the case of failure. > NullPointerException during stream task cleanup > --- > > Key: FLINK-4631 > URL: https://issues.apache.org/jira/browse/FLINK-4631 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 > Environment: Ubuntu server 12.04.5 64 bit > java version "1.8.0_40" > Java(TM) SE Runtime Environment (build 1.8.0_40-b26) > Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode) >Reporter: Avihai Berkovitz > Fix For: 1.2.0 > > > If a streaming job failed during startup (in my case, due to lack of network > buffers), all the tasks are being cancelled before they started. This causes > many instances of the following exception: > {noformat} > 2016-09-18 14:17:12,177 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > cleanup of stream task > java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata
[ https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742566#comment-15742566 ] ASF GitHub Bot commented on FLINK-4814: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2752 I think this is a very nice addition, but probably needs a bit refinement: - I heard from various users that they would like to have a config option for the checkpoint directory, over a configuration "in code" - I would suggest to actually retain the `state.checkpoints.dir` config parameter, and make every state backend respect it for externalized checkpoints - State backends that write to a file system (FsStateBackend, RocksDB, ...) would respect that parameter (we might even want to deprecate their specific checkpoint dir parameter) - State backends that do not write to files will be fine with this parameter being absent unless one chooses to enable externalized checkpoints - The "in code" options overrides the config option, if both are set. - The option to enable externalize checkpoints should be also present in the configuration. What would be nice is to merge the documentation, as far as it is applicable to the current state. > Remove extra storage location for externalized checkpoint metadata > -- > > Key: FLINK-4814 > URL: https://issues.apache.org/jira/browse/FLINK-4814 > Project: Flink > Issue Type: Sub-task >Reporter: Ufuk Celebi > > Follow up for FLINK-4512. > Store checkpoint meta data in checkpoint directory. That makes it simpler > for users to track and clean up checkpoints manually, if they want to retain > externalized checkpoints across cancellations and terminal failures. > Every state backend needs to be able to provide a storage location for the > checkpoint metadata. The memory state backend would hence not work with > externalized checkpoints, unless one sets explicitly a parameter > `setExternalizedCheckpointsLocation(uri)`. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2752: [FLINK-4814] [checkpointing] Use checkpoint directory for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2752 I think this is a very nice addition, but probably needs a bit refinement: - I heard from various users that they would like to have a config option for the checkpoint directory, over a configuration "in code" - I would suggest to actually retain the `state.checkpoints.dir` config parameter, and make every state backend respect it for externalized checkpoints - State backends that write to a file system (FsStateBackend, RocksDB, ...) would respect that parameter (we might even want to deprecate their specific checkpoint dir parameter) - State backends that do not write to files will be fine with this parameter being absent unless one chooses to enable externalized checkpoints - The "in code" options overrides the config option, if both are set. - The option to enable externalize checkpoints should be also present in the configuration. What would be nice is to merge the documentation, as far as it is applicable to the current state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2756 @VenturaDelMonte, that could require yet more tests, yes. We'll see once I'm done with those for the regular windows. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4997) Extending Window Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742554#comment-15742554 ] ASF GitHub Bot commented on FLINK-4997: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2756 @VenturaDelMonte, that could require yet more tests, yes. We'll see once I'm done with those for the regular windows. > Extending Window Function Metadata > -- > > Key: FLINK-4997 > URL: https://issues.apache.org/jira/browse/FLINK-4997 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming, Windowing Operators >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.2.0 > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5290) Ensure backwards compatibility of the hashes used to generate JobVertexIds
[ https://issues.apache.org/jira/browse/FLINK-5290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742541#comment-15742541 ] ASF GitHub Bot commented on FLINK-5290: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r91997425 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); +
[GitHub] flink pull request #2966: [FLINK-5290] Ensure backwards compatibility of the...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2966#discussion_r91997425 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphHasher.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +public class DefaultStreamGraphHasher implements StreamGraphHasher { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphHasher.class); + + /** +* Returns a map with a hash for each {@link StreamNode} of the {@link +* StreamGraph}. The hash is used as the {@link JobVertexID} in order to +* identify nodes across job submissions if they didn't change. +* +* The complete {@link StreamGraph} is traversed. The hash is either +* computed from the transformation's user-specified id (see +* {@link StreamTransformation#getUid()}) or generated in a deterministic way. +* +* The generated hash is deterministic with respect to: +* +* node-local properties (like parallelism, UDF, node ID), +* chained output nodes, and +* input nodes hashes +* +* +* @return A map from {@link StreamNode#id} to hash as 16-byte array. +*/ + @Override + public MaptraverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { + // The hash function used to generate the hash + final HashFunction hashFunction = Hashing.murmur3_128(0); + final Map hashes = new HashMap<>(); + + Set visited = new HashSet<>(); + Queue remaining = new ArrayDeque<>(); + + // We need to make the source order deterministic. The source IDs are + // not returned in the same order, which means that submitting the same + // program twice might result in different traversal, which breaks the + // deterministic hash assignment. + List sources = new ArrayList<>(); + for (Integer sourceNodeId : streamGraph.getSourceIDs()) { + sources.add(sourceNodeId); + } + Collections.sort(sources); + + // + // Traverse the graph in a breadth-first manner. Keep in mind that + // the graph is not a tree and multiple paths to nodes can exist. + // + + // Start with source nodes + for (Integer sourceNodeId : sources) { + remaining.add(streamGraph.getStreamNode(sourceNodeId)); + visited.add(sourceNodeId); + } + + StreamNode currentNode; + while ((currentNode = remaining.poll()) != null) { + // Generate the hash code. Because multiple path exist to each + //
[jira] [Commented] (FLINK-4858) Remove Legacy Checkpointing Interfaces
[ https://issues.apache.org/jira/browse/FLINK-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742521#comment-15742521 ] Robert Metzger commented on FLINK-4858: --- Are we going to remove the Checkpointing interface before the 1.2 release? I think we should maybe deprecate it in 1.2 and remove it in 1.3 ? (I'm basically wondering whether we can remove "fix-for-version 1.2") > Remove Legacy Checkpointing Interfaces > -- > > Key: FLINK-4858 > URL: https://issues.apache.org/jira/browse/FLINK-4858 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.2.0 > > > This issue tracks the removal of the deprecated/legacy interfaces that are > still in the code and can only be removed once the move to the new Flink 1.2 > checkpointing is completely in place. > So far, these are {{StreamCheckpointedOperator}} and the legacy snapshot > methods on the testing harnesses. Furthermore, codepaths and classes touching > legacy state (e.g. {{StateAssignmentOperation}} or {{TaskStateHandles}} can > be cleaned up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 I will go ahead and merge this PR since there have been no further comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5108) Remove ClientShutdownHook during job execution
[ https://issues.apache.org/jira/browse/FLINK-5108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742520#comment-15742520 ] ASF GitHub Bot commented on FLINK-5108: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 I will go ahead and merge this PR since there have been no further comments. > Remove ClientShutdownHook during job execution > -- > > Key: FLINK-5108 > URL: https://issues.apache.org/jira/browse/FLINK-5108 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Renkai Ge >Priority: Blocker > Fix For: 1.2.0 > > > The behavior of the Standalone mode is to not react to client interrupts once > a job has been deployed. We should change the Yarn client implementation to > behave the same. This avoids accidental shutdown of the job, e.g. when the > user sends an interrupt via CTRL-C or when the client machine shuts down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2984: [FLINK-5311] Add user documentation for bipartite ...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2984#discussion_r91995428 --- Diff: docs/dev/libs/gelly/bipartite_graph.md --- @@ -0,0 +1,148 @@ +--- +title: Graph Generators --- End diff -- Oops, right :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5311) Write user documentation for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742515#comment-15742515 ] ASF GitHub Bot commented on FLINK-5311: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2984#discussion_r91995428 --- Diff: docs/dev/libs/gelly/bipartite_graph.md --- @@ -0,0 +1,148 @@ +--- +title: Graph Generators --- End diff -- Oops, right :) > Write user documentation for BipartiteGraph > --- > > Key: FLINK-5311 > URL: https://issues.apache.org/jira/browse/FLINK-5311 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > We need to add user documentation. The progress on BipartiteGraph can be > tracked in the following JIRA: > https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742507#comment-15742507 ] Ivan Mushketyk commented on FLINK-5280: --- Hi [~fhueske] , Thank you for your comments. It's a much clearer now, but it seems that I am either still missing something obvious or it seems to me that the task is more involved than it was described. Let me first describe how I understand this issue so that you could correct me. So the goal of this task is to support nested data structures. So it means that if we have a type definition like this: {code:java} class ParentPojo { ChildPojo child; int num; } class ChildPojo { String str; } {code} and we have a *TableSource* that returns a dataset of *ParentPojo* we can access nested fields in SQL queries. Something like: {code:sql} SELECT * FROM pojos WHERE child.str LIKE '%Rubber%' {code} In this case *child.str* is a way to access a nested field. The first thing that confuses me is that current [SQL grammar|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#sql-syntax] does not seem to support any nested fields access, but I think may be a relatively minor nuisance. If I understand it correctly internally *flink-table* converts any input into a dataset of Rows and then performs operations on it. To convert a nested *ParentPojo* into a flat schema we can extract all leaf values into two columns: {code} child.str num {code} similarly to how *Parquet* identifies columns in nested types (see the following [slide|http://www.slideshare.net/julienledem/strata-london-2016-the-future-of-column-oriented-data-processing-with-arrow-and-parquet/10?src=clipshare]) Now, where this becomes more interesting. If I understand it correctly *BatchScan#convertToExpectedType* is used to convert an input dataset into a dataset of *Row*s. For this task it generates a mapper function in *FlinkRel#getConversionMapper* which than calls *CodeGenerator#generateConverterResultExpression*. So in our case it should generate code similar to something like: {code:java} public Row map(ParentPojo parent) { Row row = new Row(2); row.setField(0, parent.child.str); row.setField(1, parent.num); return row; } {code} *CodeGenerator* accepts *fieldNames* and optional POJO field mapping to generate accessors. It seems that the main work is performed in *CodeGenerator#generateFieldAccess* that generates an access code for different fields of the POJO, but it does not create any code that accesses nested fields. It just generates an access code to a POJO field with a corresponding field name in CodeGenerator#generateFieldAccess. Therefore, if I understand this correctly, we need to start with updating *CodeGenerator* to generate nested accessors and then we can extend *TableSource* to support nested data. Am I overthink this issue? Or am I missing something obvious? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes
[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742502#comment-15742502 ] Chesnay Schepler commented on FLINK-5319: - Does this also happen if you remove all code related to C? > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Alexander Chermenin > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource bStream = environment.fromElements(bs); > DataStreamSource cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector) A::getId).print(); > cStream.keyBy((KeySelector) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at >
[jira] [Commented] (FLINK-5321) FlinkMiniCluster does not start Jobmanager MetricQueryService
[ https://issues.apache.org/jira/browse/FLINK-5321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742500#comment-15742500 ] ASF GitHub Bot commented on FLINK-5321: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2991 [FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQueryService You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5321_mqs_fmc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2991.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2991 commit e943a2631c2750af9b1798c6a32929c3e3dac610 Author: zentolDate: 2016-12-12T17:15:14Z [FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQS > FlinkMiniCluster does not start Jobmanager MetricQueryService > - > > Key: FLINK-5321 > URL: https://issues.apache.org/jira/browse/FLINK-5321 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.2.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0 > > > The JobManager MetricQueryService is never started when using the > LocalFlinkMiniCluster. It lacks the call to > MetricRegistry#startQueryService(). > As a result jobmanager metrics aren't reporter to the web frontend, and it > causes repeated logging of exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2991: [FLINK-5321] [metrics] LocalFlinkMiniCluster start...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2991 [FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQueryService You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5321_mqs_fmc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2991.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2991 commit e943a2631c2750af9b1798c6a32929c3e3dac610 Author: zentolDate: 2016-12-12T17:15:14Z [FLINK-5321] [metrics] LocalFlinkMiniCluster starts JM MetricQS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742498#comment-15742498 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments. > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2939: [FLINK-5113] Ports all functions in the tests to the new ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 The test should cover if something semantically wrong happened during restoring. For example, if you expected some state that never came, then the test should fail. This is not a matter of the interface and thus not part of this PR I think. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5113) Make all Testing Functions implement CheckpointedFunction Interface.
[ https://issues.apache.org/jira/browse/FLINK-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742489#comment-15742489 ] ASF GitHub Bot commented on FLINK-5113: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 The test should cover if something semantically wrong happened during restoring. For example, if you expected some state that never came, then the test should fail. This is not a matter of the interface and thus not part of this PR I think. What do you think? > Make all Testing Functions implement CheckpointedFunction Interface. > > > Key: FLINK-5113 > URL: https://issues.apache.org/jira/browse/FLINK-5113 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently stateful functions implement the (old) Checkpointed interface. > This is issue aims at porting all these function to the new > CheckpointedFunction interface, so that they can leverage the new > capabilities by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5321) FlinkMiniCluster does not start Jobmanager MetricQueryService
Chesnay Schepler created FLINK-5321: --- Summary: FlinkMiniCluster does not start Jobmanager MetricQueryService Key: FLINK-5321 URL: https://issues.apache.org/jira/browse/FLINK-5321 Project: Flink Issue Type: Bug Components: JobManager, Metrics Affects Versions: 1.2.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.2.0 The JobManager MetricQueryService is never started when using the LocalFlinkMiniCluster. It lacks the call to MetricRegistry#startQueryService(). As a result jobmanager metrics aren't reporter to the web frontend, and it causes repeated logging of exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91991599 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. --- End diff -- Currently I'm not aware of a way to filter these requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742469#comment-15742469 ] ASF GitHub Bot commented on FLINK-5122: --- Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91991599 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. --- End diff -- Currently I'm not aware of a way to filter these requests. > Elasticsearch Sink loses documents when cluster has high load > - > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: static-max >Assignee: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5320) Fix result TypeInformation in WindowedStream.fold(ACC, FoldFunction, WindowFunction)
[ https://issues.apache.org/jira/browse/FLINK-5320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yassine Marzougui updated FLINK-5320: - Affects Version/s: 1.2.0 > Fix result TypeInformation in WindowedStream.fold(ACC, FoldFunction, > WindowFunction) > > > Key: FLINK-5320 > URL: https://issues.apache.org/jira/browse/FLINK-5320 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Yassine Marzougui >Assignee: Yassine Marzougui > > The WindowedStream.fold(ACC, FoldFunction, WindowFunction) does not correctly > infer the resultType. -- This message was sent by Atlassian JIRA (v6.3.4#6332)