[GitHub] flink pull request: [FLINK-1633][gelly] Added getTriplets() method...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/452 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1633) Add getTriplets() Gelly method
[ https://issues.apache.org/jira/browse/FLINK-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386481#comment-14386481 ] ASF GitHub Bot commented on FLINK-1633: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/452 Add getTriplets() Gelly method -- Key: FLINK-1633 URL: https://issues.apache.org/jira/browse/FLINK-1633 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor Labels: starter In some graph algorithms, it is required to access the graph edges together with the vertex values of the source and target vertices. For example, several graph weighting schemes compute some kind of similarity weights for edges, based on the attributes of the source and target vertices. This issue proposes adding a convenience Gelly method that generates a DataSet of srcVertex, Edge, TrgVertex triplets from the input graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1633) Add getTriplets() Gelly method
[ https://issues.apache.org/jira/browse/FLINK-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-1633. -- Resolution: Implemented Fix Version/s: 0.9 Add getTriplets() Gelly method -- Key: FLINK-1633 URL: https://issues.apache.org/jira/browse/FLINK-1633 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor Labels: starter Fix For: 0.9 In some graph algorithms, it is required to access the graph edges together with the vertex values of the source and target vertices. For example, several graph weighting schemes compute some kind of similarity weights for edges, based on the attributes of the source and target vertices. This issue proposes adding a convenience Gelly method that generates a DataSet of srcVertex, Edge, TrgVertex triplets from the input graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1798) Bug in IterateExample while running with parallelism 1: broker slot is already occupied
Péter Szabó created FLINK-1798: -- Summary: Bug in IterateExample while running with parallelism 1: broker slot is already occupied Key: FLINK-1798 URL: https://issues.apache.org/jira/browse/FLINK-1798 Project: Flink Issue Type: Bug Reporter: Péter Szabó Link to the example: https://github.com/mbalassi/flink/blob/FLINK-1560/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java Stack Trace: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:178) ~[classes/:na] at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$submitTask(TaskManager.scala:433) ~[classes/:na] at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:238) ~[classes/:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) ~[scala-library-2.10.4.jar:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) ~[classes/:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) ~[classes/:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) ~[scala-library-2.10.4.jar:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) ~[classes/:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.10-2.3.7.jar:na] at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:92) ~[classes/:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.10-2.3.7.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:221) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:231) [akka-actor_2.10-2.3.7.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:64) ~[classes/:na] at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:86) ~[classes/:na] at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:175) ~[classes/:na] ... 20 common frames omitted Caused by: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.runtime.iterative.concurrent.Broker.handIn(Broker.java:39) ~[classes/:na] at org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:62) ~[classes/:na] ... 22 common frames omitted -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1633) Add getTriplets() Gelly method
[ https://issues.apache.org/jira/browse/FLINK-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386418#comment-14386418 ] ASF GitHub Bot commented on FLINK-1633: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/452#issuecomment-87600209 @vasia: I think the error is not related to the PR and it only happened in one out of 10 builds. Its not good that it failed but it should not block you from merging this PR. Add getTriplets() Gelly method -- Key: FLINK-1633 URL: https://issues.apache.org/jira/browse/FLINK-1633 Project: Flink Issue Type: New Feature Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor Labels: starter In some graph algorithms, it is required to access the graph edges together with the vertex values of the source and target vertices. For example, several graph weighting schemes compute some kind of similarity weights for edges, based on the attributes of the source and target vertices. This issue proposes adding a convenience Gelly method that generates a DataSet of srcVertex, Edge, TrgVertex triplets from the input graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1633][gelly] Added getTriplets() method...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/452#issuecomment-87600209 @vasia: I think the error is not related to the PR and it only happened in one out of 10 builds. Its not good that it failed but it should not block you from merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1798) Bug in IterateExample while running with parallelism 1: broker slot is already occupied
[ https://issues.apache.org/jira/browse/FLINK-1798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Péter Szabó updated FLINK-1798: --- Component/s: Streaming Bug in IterateExample while running with parallelism 1: broker slot is already occupied - Key: FLINK-1798 URL: https://issues.apache.org/jira/browse/FLINK-1798 Project: Flink Issue Type: Bug Components: Streaming Reporter: Péter Szabó Link to the example: https://github.com/mbalassi/flink/blob/FLINK-1560/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java Stack Trace: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:178) ~[classes/:na] at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$submitTask(TaskManager.scala:433) ~[classes/:na] at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:238) ~[classes/:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) ~[scala-library-2.10.4.jar:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) ~[classes/:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) ~[classes/:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) ~[scala-library-2.10.4.jar:na] at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) ~[classes/:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.10-2.3.7.jar:na] at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:92) ~[classes/:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.10-2.3.7.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:221) [akka-actor_2.10-2.3.7.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:231) [akka-actor_2.10-2.3.7.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:64) ~[classes/:na] at org.apache.flink.streaming.api.streamvertex.StreamVertex.registerInputOutput(StreamVertex.java:86) ~[classes/:na] at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:175) ~[classes/:na] ... 20 common frames omitted Caused by: java.lang.RuntimeException: Could not register the given element, broker slot is already occupied. at org.apache.flink.runtime.iterative.concurrent.Broker.handIn(Broker.java:39) ~[classes/:na] at org.apache.flink.streaming.api.streamvertex.StreamIterationHead.setInputsOutputs(StreamIterationHead.java:62) ~[classes/:na] ... 22 common frames omitted -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1804) flink-quickstart-scala tests fail on scala-2.11 build profile on travis
Robert Metzger created FLINK-1804: - Summary: flink-quickstart-scala tests fail on scala-2.11 build profile on travis Key: FLINK-1804 URL: https://issues.apache.org/jira/browse/FLINK-1804 Project: Flink Issue Type: Task Components: Build System, Quickstarts Affects Versions: 0.9 Reporter: Robert Metzger Travis builds on master started failing after the Scala 2.11 profile has been added to Flink. For example: https://travis-ci.org/apache/flink/jobs/56312734 The error: {code} [INFO] [INFO] --- scala-maven-plugin:3.1.4:compile (default) @ testArtifact --- [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [WARNING] Expected all dependencies to require Scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-avro_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-bijection_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-core_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-avro_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-reflect:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-compiler:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-streaming-scala:0.9-SNAPSHOT requires scala version: 2.11.4 [INFO] [WARNING] Multiple versions of scala libraries detected! [INFO] [INFO] /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala:-1: info: compiling [INFO] [INFO] Compiling 3 source files to /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes at 1427650524446 [INFO] [ERROR] error: [INFO] [INFO] while compiling: /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala/org/apache/flink/archetypetest/SocketTextStreamWordCount.scala [INFO] [INFO] during phase: typer [INFO] [INFO] library version: version 2.10.4 [INFO] [INFO] compiler version: version 2.10.4 [INFO] [INFO] reconstructed args: -d /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes -classpath
[jira] [Commented] (FLINK-1774) Remove the redundant code in try{} block
[ https://issues.apache.org/jira/browse/FLINK-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386611#comment-14386611 ] ASF GitHub Bot commented on FLINK-1774: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/522 Remove the redundant code in try{} block Key: FLINK-1774 URL: https://issues.apache.org/jira/browse/FLINK-1774 Project: Flink Issue Type: Improvement Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master Remove the redundant code of fos.close(); fos = null; in try block because the fos,close() code will always executes in finally block. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/544 [FLINK-1741][gelly] Adds Jaccard Similarity Metric Example You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink flinkJaccard Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/544.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #544 commit d6e98e2b46ff1ba8346fec214ffdec286e3d3891 Author: andralungu lungu.an...@gmail.com Date: 2015-03-30T12:50:09Z [FLINK-1741][gelly] Adds Jaccard Similarity Metric Example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1662) NullFieldException is thrown when testing IterativeDataStream with tuples, windowing and maxBy
[ https://issues.apache.org/jira/browse/FLINK-1662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-1662: -- Assignee: Péter Szabó NullFieldException is thrown when testing IterativeDataStream with tuples, windowing and maxBy -- Key: FLINK-1662 URL: https://issues.apache.org/jira/browse/FLINK-1662 Project: Flink Issue Type: Bug Components: Streaming Reporter: Péter Szabó Assignee: Péter Szabó NullFieldException is thrown when testing IterativeDataStream with tuples, windowing and maxBy. Stack Trace: 17:02:56,332 ERROR org.apache.flink.streaming.api.collector.StreamOutput - Emit failed due to: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value. at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:118) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:92) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:86) at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.java:65) at org.apache.flink.streaming.api.collector.DirectedCollectorWrapper.collect(DirectedCollectorWrapper.java:95) at org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:142) at org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:129) at org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer.processRealElement(StreamDiscretizer.java:101) at org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer.invoke(StreamDiscretizer.java:75) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invokeUserFunction(StreamVertex.java:85) at org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:229) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:121) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205) at java.lang.Thread.run(Thread.java:745) Examples can be found in commit https://github.com/mbalassi/flink/commit/42c7889777a0a6d558d58feeb5acb3c06b6fb7df. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1779]Rename the function getCurrentyAct...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/529#issuecomment-87633234 Looks good, will merge this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-1799) Scala API does not support generic arrays
Till Rohrmann created FLINK-1799: Summary: Scala API does not support generic arrays Key: FLINK-1799 URL: https://issues.apache.org/jira/browse/FLINK-1799 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Assignee: Aljoscha Krettek The Scala API does not support generic arrays at the moment. It throws a rather unhelpful error message ```InvalidTypesException: The given type is not a valid object array```. Code to reproduce the problem is given below: {code} def main(args: Array[String]) { foobar[Double] } def foobar[T: ClassTag: TypeInformation]: DataSet[Block[T]] = { val tpe = createTypeInformation[Array[T]] null } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1779) Rename the function name from getCurrentyActiveConnections to getCurrentActiveConnections in org.apache.flink.runtime.blob
[ https://issues.apache.org/jira/browse/FLINK-1779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386612#comment-14386612 ] ASF GitHub Bot commented on FLINK-1779: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/529 Rename the function name from getCurrentyActiveConnections to getCurrentActiveConnections in org.apache.flink.runtime.blob --- Key: FLINK-1779 URL: https://issues.apache.org/jira/browse/FLINK-1779 Project: Flink Issue Type: Improvement Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master I think the function name getCurrentyActiveConnections in ' org.apache.flink.runtime.blob' is a wrong spelling, it should be getCurrentActiveConnections is more better, and also I add some comments about the function and the Tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1774) Remove the redundant code in try{} block
[ https://issues.apache.org/jira/browse/FLINK-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1774. - Resolution: Fixed Fixed in c89c657ae16bbe89da54669a234713a3811813ee Thank you for the patch! Remove the redundant code in try{} block Key: FLINK-1774 URL: https://issues.apache.org/jira/browse/FLINK-1774 Project: Flink Issue Type: Improvement Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master Remove the redundant code of fos.close(); fos = null; in try block because the fos,close() code will always executes in finally block. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1779) Rename the function name from getCurrentyActiveConnections to getCurrentActiveConnections in org.apache.flink.runtime.blob
[ https://issues.apache.org/jira/browse/FLINK-1779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1779. - Resolution: Fixed Fixed in fb3f3ee845a3aae295c9aae00f3d406d9f1d5813 Thank you for the patch! Rename the function name from getCurrentyActiveConnections to getCurrentActiveConnections in org.apache.flink.runtime.blob --- Key: FLINK-1779 URL: https://issues.apache.org/jira/browse/FLINK-1779 Project: Flink Issue Type: Improvement Reporter: Sibao Hong Assignee: Sibao Hong Priority: Minor Fix For: master I think the function name getCurrentyActiveConnections in ' org.apache.flink.runtime.blob' is a wrong spelling, it should be getCurrentActiveConnections is more better, and also I add some comments about the function and the Tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1801) NetworkEnvironment should start without JobManager association
Stephan Ewen created FLINK-1801: --- Summary: NetworkEnvironment should start without JobManager association Key: FLINK-1801 URL: https://issues.apache.org/jira/browse/FLINK-1801 Project: Flink Issue Type: Sub-task Components: TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 The NetworkEnvironment should be able to start without a dedicated JobManager association and get one / loose one as the TaskManager connects to different JobManagers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1348) Move Stream Connector Jars from lib to Client JARs
[ https://issues.apache.org/jira/browse/FLINK-1348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1348. - Resolution: Fixed Fix Version/s: 0.9 Fixed with the updated dependency management in the rewrite to use shading. Move Stream Connector Jars from lib to Client JARs Key: FLINK-1348 URL: https://issues.apache.org/jira/browse/FLINK-1348 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Márton Balassi Fix For: 0.9 Right now, the connectors and all dependencies are put into the lib folder and are part of the system at startup time. This is a large bunch of dependencies, and they may actually conflict with the dependencies of custom connectors (or example with a different version of RabbitMQ or so). We could fix that, if we remove the dependencies from the lib folder and set up archetypes that build fat jars with the dependencies. That way, each job (with its custom class loader) will gets the dependencies it needs and will not see all the other (potentially conflicting ones) in the namespace. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1774]Remove the redundant code in try{}...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/522 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1779]Rename the function getCurrentyAct...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/529 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1754) Deadlock in job execution
[ https://issues.apache.org/jira/browse/FLINK-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1754. - Resolution: Not a Problem Is actually a known bug in 0.8 and fixed in 0.9 Deadlock in job execution - Key: FLINK-1754 URL: https://issues.apache.org/jira/browse/FLINK-1754 Project: Flink Issue Type: Bug Affects Versions: 0.8.1 Reporter: Sebastian Kruse I have encountered a reproducible deadlock in the execution of one of my jobs. The part of the plan, where this happens, is the following: {code:java} /** Performs the reduction via creating transitive INDs and removing them from the original IND set. */ private DataSetTuple2Integer, int[] calculateTransitiveReduction1(DataSetTuple2Integer, int[] inclusionDependencies) { // Concatenate INDs (only one hop). DataSetTuple2Integer, int[] transitiveInds = inclusionDependencies .flatMap(new SplitInds()) .joinWithTiny(inclusionDependencies) .where(1).equalTo(0) .with(new ConcatenateInds()); // Remove the concatenated INDs to come up with a transitive reduction of the INDs. return inclusionDependencies .coGroup(transitiveInds) .where(0).equalTo(0) .with(new RemoveTransitiveInds()); } {code} Seemingly, the flatmap operator waits infinitely for a free buffer to write on. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1650) Suppress Akka's Netty Shutdown Errors through the log config
[ https://issues.apache.org/jira/browse/FLINK-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386792#comment-14386792 ] Robert Metzger commented on FLINK-1650: --- I'm going to set the Akka version back to 2.3.7 in https://github.com/apache/flink/pull/542 because the error detection in Akka is much slower in 2.3.9. I've asked on the Akka mailing list for help. Suppress Akka's Netty Shutdown Errors through the log config Key: FLINK-1650 URL: https://issues.apache.org/jira/browse/FLINK-1650 Project: Flink Issue Type: Bug Components: other Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 I suggest to set the logging for `org.jboss.netty.channel.DefaultChannelPipeline` to error, in order to get rid of the misleading stack trace caused by an akka/netty hickup on shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1794) Add test base for scalatest and adapt flink-ml test cases
[ https://issues.apache.org/jira/browse/FLINK-1794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386867#comment-14386867 ] ASF GitHub Bot commented on FLINK-1794: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/540#issuecomment-87723741 If there are no objections, then I would merge this PR. My other PRs depend on this as well. Add test base for scalatest and adapt flink-ml test cases - Key: FLINK-1794 URL: https://issues.apache.org/jira/browse/FLINK-1794 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Currently, the flink-ml test cases use the standard {{ExecutionEnvironment}} which can cause problems in parallel test executions as they happen on Travis. For these tests it would be helpful to have an appropriate Scala test base which instantiates a {{ForkableFlinkMiniCluster}} and sets the {{ExecutionEnvironment}} appropriately. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1805) The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log
Sibao Hong created FLINK-1805: - Summary: The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log Key: FLINK-1805 URL: https://issues.apache.org/jira/browse/FLINK-1805 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Although class 'IOManagerAsync' is extended from 'IOManager' in package 'org.apache.flink.runtime.io.disk.iomanager', but I think it should has its own Log instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1716) Add CoCoA algorithm to flink-ml
[ https://issues.apache.org/jira/browse/FLINK-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386820#comment-14386820 ] ASF GitHub Bot commented on FLINK-1716: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/545 [FLINK-1716] Adds CoCoA algorithm This PR adds the CoCoA algorithm using l2-norm and hinge-loss functions. Thus, it can be used to train soft-margin SVM. This PR is based on #539 and #543. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink cocoa Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/545.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #545 commit 4c18940bf14f376cdb339d908324e5f2cd4593ad Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-25T14:27:58Z [FLINK-1718] [ml] Adds sparse matrix and sparse vector types commit f3d021febf0e7796a1f250c2e693d7f9dcbc36e1 Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-26T16:44:17Z [ml] Adds convenience functions for Breeze matrix/vector conversion [ml] Adds breeze to flink-dist LICENSE file [ml] Optimizes sanity checks in vector/matrix accessors [ml] Fixes scala check style error with missing whitespaces before and after + [ml] Fixes DenseMatrixTest commit be8ca43b5f11c789b2acfe38127ed542cdea3cd3 Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-28T17:31:02Z [FLINK-1717] [ml] Adds support to directly read libSVM and SVMLight files commit 850d5880be0c7e484fa14e92b8af61dd5ceb6d4f Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-12T15:52:45Z [FLINK-1716] [ml] Adds CoCoA algorithm [ml] Adds web documentation and code comments to CoCoA [ml] Adds comments Add CoCoA algorithm to flink-ml --- Key: FLINK-1716 URL: https://issues.apache.org/jira/browse/FLINK-1716 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Add the communication efficient distributed dual coordinate ascent algorithm to the flink machine learning library. See [CoCoA|http://arxiv.org/pdf/1409.1458.pdf] for the implementation details. I propose to first implement it with hinge loss and l2-norm. This way, it will allow us to train SVMs in parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1522) Add tests for the library methods and examples
[ https://issues.apache.org/jira/browse/FLINK-1522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386865#comment-14386865 ] Vasia Kalavri commented on FLINK-1522: -- [~balidani], [~andralungu] if I'm not mistaken none of you is currently working on this, right? If that's the case, I'll take over this one :-) Add tests for the library methods and examples -- Key: FLINK-1522 URL: https://issues.apache.org/jira/browse/FLINK-1522 Project: Flink Issue Type: New Feature Components: Gelly Reporter: Vasia Kalavri Assignee: Daniel Bali Labels: easyfix, test The current tests in gelly test one method at a time. We should have some tests for complete applications. As a start, we could add one test case per example and this way also make sure that our graph library methods actually give correct results. I'm assigning this to [~andralungu] because she has already implemented the test for SSSP, but I will help as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1716] Adds CoCoA algorithm
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/545 [FLINK-1716] Adds CoCoA algorithm This PR adds the CoCoA algorithm using l2-norm and hinge-loss functions. Thus, it can be used to train soft-margin SVM. This PR is based on #539 and #543. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink cocoa Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/545.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #545 commit 4c18940bf14f376cdb339d908324e5f2cd4593ad Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-25T14:27:58Z [FLINK-1718] [ml] Adds sparse matrix and sparse vector types commit f3d021febf0e7796a1f250c2e693d7f9dcbc36e1 Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-26T16:44:17Z [ml] Adds convenience functions for Breeze matrix/vector conversion [ml] Adds breeze to flink-dist LICENSE file [ml] Optimizes sanity checks in vector/matrix accessors [ml] Fixes scala check style error with missing whitespaces before and after + [ml] Fixes DenseMatrixTest commit be8ca43b5f11c789b2acfe38127ed542cdea3cd3 Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-28T17:31:02Z [FLINK-1717] [ml] Adds support to directly read libSVM and SVMLight files commit 850d5880be0c7e484fa14e92b8af61dd5ceb6d4f Author: Till Rohrmann trohrm...@apache.org Date: 2015-03-12T15:52:45Z [FLINK-1716] [ml] Adds CoCoA algorithm [ml] Adds web documentation and code comments to CoCoA [ml] Adds comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: The class IOManagerAsync should use its own Lo...
GitHub user matadorhong opened a pull request: https://github.com/apache/flink/pull/546 The class IOManagerAsync should use its own Log instance Although class 'IOManagerAsync' is extended from 'IOManager' in package 'org.apache.flink.runtime.io.disk.iomanager', but I think it should has its own Log instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/matadorhong/flink FLINK-1805 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #546 commit 56b9dab105a9d0884f791fbd42f62ce94bdb3042 Author: hongsibao hongsi...@huawei.com Date: 2015-03-30T15:00:26Z The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log instance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1694) Change the split between create/run of a vertex-centric iteration
[ https://issues.apache.org/jira/browse/FLINK-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386854#comment-14386854 ] ASF GitHub Bot commented on FLINK-1694: --- GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/547 [FLINK-1694] [gelly] added IterationConfiguration as a way to configure ... ...a VertexCentricIteration and avoid the split between create and run methods. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink vertex-centric-configuration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #547 commit a377fba6b9a6b249525fffb0eda876687e7e7c0f Author: vasia vasilikikala...@gmail.com Date: 2015-03-29T21:39:08Z [FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration Change the split between create/run of a vertex-centric iteration - Key: FLINK-1694 URL: https://issues.apache.org/jira/browse/FLINK-1694 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Vasia Kalavri Currently, the vertex-centric API in Gelly looks like this: {code:java} Graph inputGaph = ... //create graph VertexCentricIteration iteration = inputGraph.createVertexCentricIteration(); ... // configure the iteration Graph newGraph = inputGaph.runVertexCentricIteration(iteration); {code} We have this create/run split, in order to expose the iteration object and be able to call the public methods of VertexCentricIteration. However, this is not very nice and might lead to errors, if create and run are mistakenly called on different graph objects. One suggestion is to change this to the following: {code:java} VertexCentricIteration iteration = inputGraph.createVertexCentricIteration(); ... // configure the iteration Graph newGraph = iteration.result(); {code} or to go with a single run call, where we add an IterationConfiguration object as a parameter and we don't expose the iteration object to the user at all: {code:java} IterationConfiguration parameters = ... Graph newGraph = inputGraph.runVertexCentricIteration(parameters); {code} and we can also have a simplified method where no configuration is passed. What do you think? Personally, I like the second option a bit more. -Vasia. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1522) Add tests for the library methods and examples
[ https://issues.apache.org/jira/browse/FLINK-1522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386869#comment-14386869 ] Daniel Bali commented on FLINK-1522: Yeah, I'm not working on this right now. Add tests for the library methods and examples -- Key: FLINK-1522 URL: https://issues.apache.org/jira/browse/FLINK-1522 Project: Flink Issue Type: New Feature Components: Gelly Reporter: Vasia Kalavri Assignee: Daniel Bali Labels: easyfix, test The current tests in gelly test one method at a time. We should have some tests for complete applications. As a start, we could add one test case per example and this way also make sure that our graph library methods actually give correct results. I'm assigning this to [~andralungu] because she has already implemented the test for SSSP, but I will help as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1522) Add tests for the library methods and examples
[ https://issues.apache.org/jira/browse/FLINK-1522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri reassigned FLINK-1522: Assignee: Vasia Kalavri (was: Daniel Bali) Add tests for the library methods and examples -- Key: FLINK-1522 URL: https://issues.apache.org/jira/browse/FLINK-1522 Project: Flink Issue Type: New Feature Components: Gelly Reporter: Vasia Kalavri Assignee: Vasia Kalavri Labels: easyfix, test The current tests in gelly test one method at a time. We should have some tests for complete applications. As a start, we could add one test case per example and this way also make sure that our graph library methods actually give correct results. I'm assigning this to [~andralungu] because she has already implemented the test for SSSP, but I will help as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27407901 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number --- End diff -- I'd say the size of the union of the neighbor sets, instead of total number of neighbors. The Jaccard coefficient denominator doesn't just sum up the neighborhood sizes, it's the size of the union. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1741) Add Jaccard Similarity Metric Example
[ https://issues.apache.org/jira/browse/FLINK-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386976#comment-14386976 ] ASF GitHub Bot commented on FLINK-1741: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27409101 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + GraphLong, HashSetLong, Double graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); --- End diff -- OK, so here you're using the edges of the directed graph again? Add Jaccard Similarity Metric Example - Key: FLINK-1741 URL: https://issues.apache.org/jira/browse/FLINK-1741 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu http://www.inside-r.org/packages/cran/igraph/docs/similarity -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27409101 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + GraphLong, HashSetLong, Double graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); --- End diff -- OK, so here you're using the edges of the directed graph again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27408830 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + GraphLong, HashSetLong, Double graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); + + // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors) + DataSetTuple3Long, Long, Double edgesWithJaccardWeight = graphWithVertexValues.getTriplets() + .map(new WeighEdgesMapper()); + + DataSetEdgeLong, Double result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight, + new MapFunctionTuple2Double, Double, Double() { + + @Override + public Double map(Tuple2Double, Double value) throws Exception { + return value.f1; + } + }).getEdges(); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, \n, ,); + } else { + result.print(); + } + +
[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...
Github user bhatsachin commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87776384 Thanks a lot Robert, let us have a hangout session tomorrow (Tuesday). Please suggest a time of your convenience. My other friends from IIT Mandi will also join in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1741) Add Jaccard Similarity Metric Example
[ https://issues.apache.org/jira/browse/FLINK-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386969#comment-14386969 ] ASF GitHub Bot commented on FLINK-1741: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27408604 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); --- End diff -- Also, I think you can do this with `reduceOnEdges` instead. You only need the vertex ID. Add Jaccard Similarity Metric Example - Key: FLINK-1741 URL: https://issues.apache.org/jira/browse/FLINK-1741 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu http://www.inside-r.org/packages/cran/igraph/docs/similarity -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27408604 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); --- End diff -- Also, I think you can do this with `reduceOnEdges` instead. You only need the vertex ID. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1435) TaskManager does not log missing memory error on start up
[ https://issues.apache.org/jira/browse/FLINK-1435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1435. - Resolution: Not a Problem TaskManager does not log missing memory error on start up - Key: FLINK-1435 URL: https://issues.apache.org/jira/browse/FLINK-1435 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 0.7.0-incubating Reporter: Malte Schwarzer Priority: Minor Labels: memorymanager, starter When using bin/start-cluster.sh to start TaskManagers and a worker node is failing to start because of missing memory, you do not receive any error messages in log files. Worker node has only 15000M memory available, but it is configured with Maximum heap size: 4 MiBytes. Task manager does not join the cluster. Process hangs. Last lines of log looks like this: ... ... - - Starting with 12 incoming and 12 outgoing connection threads. ... - Setting low water mark to 16384 and high water mark to 32768 bytes. ... - Instantiated PooledByteBufAllocator with direct arenas: 24, heap arenas: 0, page size (bytes): 65536, chunk size (bytes): 16777216. ... - Using 0.7 of the free heap space for managed memory. ... - Initializing memory manager with 24447 megabytes of memory. Page size is 32768 bytes. (END) Error message about not enough memory is missing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1501) Integrate metrics library and report basic metrics to JobManager web interface
[ https://issues.apache.org/jira/browse/FLINK-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387100#comment-14387100 ] ASF GitHub Bot commented on FLINK-1501: --- Github user bhatsachin commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87776384 Thanks a lot Robert, let us have a hangout session tomorrow (Tuesday). Please suggest a time of your convenience. My other friends from IIT Mandi will also join in. Integrate metrics library and report basic metrics to JobManager web interface -- Key: FLINK-1501 URL: https://issues.apache.org/jira/browse/FLINK-1501 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 0.9 As per mailing list, the library: https://github.com/dropwizard/metrics The goal of this task is to get the basic infrastructure in place. Subsequent issues will integrate more features into the system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1741) Add Jaccard Similarity Metric Example
[ https://issues.apache.org/jira/browse/FLINK-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386962#comment-14386962 ] ASF GitHub Bot commented on FLINK-1741: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27407901 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number --- End diff -- I'd say the size of the union of the neighbor sets, instead of total number of neighbors. The Jaccard coefficient denominator doesn't just sum up the neighborhood sizes, it's the size of the union. Add Jaccard Similarity Metric Example - Key: FLINK-1741 URL: https://issues.apache.org/jira/browse/FLINK-1741 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu http://www.inside-r.org/packages/cran/igraph/docs/similarity -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1805) The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log
[ https://issues.apache.org/jira/browse/FLINK-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387000#comment-14387000 ] ASF GitHub Bot commented on FLINK-1805: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/546#discussion_r27410240 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java --- @@ -43,7 +45,10 @@ /** Flag to signify that the IOManager has been shut down already */ private final AtomicBoolean isShutdown = new AtomicBoolean(); - + + /** Logging */ + protected static final Logger LOG = LoggerFactory.getLogger(IOManagerAsync.class); --- End diff -- Could we just use private here instead of protected? The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log -- Key: FLINK-1805 URL: https://issues.apache.org/jira/browse/FLINK-1805 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Although class 'IOManagerAsync' is extended from 'IOManager' in package 'org.apache.flink.runtime.io.disk.iomanager', but I think it should has its own Log instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27408109 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); --- End diff -- I wouldn't use undirect as a verb here :P Also, in the beginning you say given an undirected, unweighted graph... So, if you give an undirected graph as input, why call `getUndirected()` again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1801) NetworkEnvironment should start without JobManager association
[ https://issues.apache.org/jira/browse/FLINK-1801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1801. - Resolution: Implemented Implemented in ee273dbe01e95d2b260fa690e21e2c244a2a5711 NetworkEnvironment should start without JobManager association -- Key: FLINK-1801 URL: https://issues.apache.org/jira/browse/FLINK-1801 Project: Flink Issue Type: Sub-task Components: TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 The NetworkEnvironment should be able to start without a dedicated JobManager association and get one / loose one as the TaskManager connects to different JobManagers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1465) GlobalBufferPool reports negative memory allocation
[ https://issues.apache.org/jira/browse/FLINK-1465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1465. - Resolution: Fixed Fix Version/s: 0.9 Assignee: Stephan Ewen (was: Ufuk Celebi) Fixed via ee273dbe01e95d2b260fa690e21e2c244a2a5711 GlobalBufferPool reports negative memory allocation --- Key: FLINK-1465 URL: https://issues.apache.org/jira/browse/FLINK-1465 Project: Flink Issue Type: Bug Components: Local Runtime, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Stephan Ewen Fix For: 0.9 I've got this error message when starting Flink. It does not really help me. I suspect that my configuration files (which worked with 0.8 aren't working with 0.9 anymore). Still, the exception is reporting weird stuff {code} 11:41:02,516 INFO org.apache.flink.yarn.YarnUtils$$anonfun$startActorSystemAndTaskManager$1$$anon$1 - TaskManager successfully registered at JobManager akka.tcp://fl...@cloud-18.dima.tu-berlin.de:39674/user/jo bmanager. 11:41:25,230 ERROR org.apache.flink.yarn.YarnUtils$$anonfun$startActorSystemAndTaskManager$1$$anon$1 - Failed to instantiate network environment. java.io.IOException: Failed to instantiate network buffer pool: Could not allocate enough memory segments for GlobalBufferPool (required (Mb): 0, allocated (Mb): -965, missing (Mb): 965). at org.apache.flink.runtime.io.network.NetworkEnvironment.init(NetworkEnvironment.java:81) at org.apache.flink.runtime.taskmanager.TaskManager.setupNetworkEnvironment(TaskManager.scala:508) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$finishRegistration(TaskManager.scala:479) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:226) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.YarnTaskManager$$anonfun$receiveYarnMessages$1.applyOrElse(YarnTaskManager.scala:32) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:41) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:78) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.OutOfMemoryError: Could not allocate enough memory segments for GlobalBufferPool (required (Mb): 0, allocated (Mb): -965, missing (Mb): 965). at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.init(NetworkBufferPool.java:76) at org.apache.flink.runtime.io.network.NetworkEnvironment.init(NetworkEnvironment.java:78) ... 23 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1435) TaskManager does not log missing memory error on start up
[ https://issues.apache.org/jira/browse/FLINK-1435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386949#comment-14386949 ] Stephan Ewen commented on FLINK-1435: - I think we misunderstood this issue initially. This seems like the TaskManager is started with a heap size that exceeds the physical memory of the machine. It is possible to do that, if your OS has enough swap space. The process hangs, because it is incredibly slow doe to non-stop swapping. Inside the JVM, you do not see that memory is missing, because it is not, it only comes from the swap space. This is not a Flink bug, such mis-configuration is well possible. TaskManager does not log missing memory error on start up - Key: FLINK-1435 URL: https://issues.apache.org/jira/browse/FLINK-1435 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 0.7.0-incubating Reporter: Malte Schwarzer Priority: Minor Labels: memorymanager, starter When using bin/start-cluster.sh to start TaskManagers and a worker node is failing to start because of missing memory, you do not receive any error messages in log files. Worker node has only 15000M memory available, but it is configured with Maximum heap size: 4 MiBytes. Task manager does not join the cluster. Process hangs. Last lines of log looks like this: ... ... - - Starting with 12 incoming and 12 outgoing connection threads. ... - Setting low water mark to 16384 and high water mark to 32768 bytes. ... - Instantiated PooledByteBufAllocator with direct arenas: 24, heap arenas: 0, page size (bytes): 65536, chunk size (bytes): 16777216. ... - Using 0.7 of the free heap space for managed memory. ... - Initializing memory manager with 24447 megabytes of memory. Page size is 32768 bytes. (END) Error message about not enough memory is missing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1465) GlobalBufferPool reports negative memory allocation
[ https://issues.apache.org/jira/browse/FLINK-1465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386966#comment-14386966 ] Stephan Ewen commented on FLINK-1465: - This is actually an integer overflow issue. I have a fix coming up... GlobalBufferPool reports negative memory allocation --- Key: FLINK-1465 URL: https://issues.apache.org/jira/browse/FLINK-1465 Project: Flink Issue Type: Bug Components: Local Runtime, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Ufuk Celebi I've got this error message when starting Flink. It does not really help me. I suspect that my configuration files (which worked with 0.8 aren't working with 0.9 anymore). Still, the exception is reporting weird stuff {code} 11:41:02,516 INFO org.apache.flink.yarn.YarnUtils$$anonfun$startActorSystemAndTaskManager$1$$anon$1 - TaskManager successfully registered at JobManager akka.tcp://fl...@cloud-18.dima.tu-berlin.de:39674/user/jo bmanager. 11:41:25,230 ERROR org.apache.flink.yarn.YarnUtils$$anonfun$startActorSystemAndTaskManager$1$$anon$1 - Failed to instantiate network environment. java.io.IOException: Failed to instantiate network buffer pool: Could not allocate enough memory segments for GlobalBufferPool (required (Mb): 0, allocated (Mb): -965, missing (Mb): 965). at org.apache.flink.runtime.io.network.NetworkEnvironment.init(NetworkEnvironment.java:81) at org.apache.flink.runtime.taskmanager.TaskManager.setupNetworkEnvironment(TaskManager.scala:508) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$finishRegistration(TaskManager.scala:479) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$receiveWithLogMessages$1.applyOrElse(TaskManager.scala:226) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.YarnTaskManager$$anonfun$receiveYarnMessages$1.applyOrElse(YarnTaskManager.scala:32) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:41) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:78) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.OutOfMemoryError: Could not allocate enough memory segments for GlobalBufferPool (required (Mb): 0, allocated (Mb): -965, missing (Mb): 965). at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.init(NetworkBufferPool.java:76) at org.apache.flink.runtime.io.network.NetworkEnvironment.init(NetworkEnvironment.java:78) ... 23 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1741) Add Jaccard Similarity Metric Example
[ https://issues.apache.org/jira/browse/FLINK-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386980#comment-14386980 ] ASF GitHub Bot commented on FLINK-1741: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/544#issuecomment-87751974 Hey @andralungu! Thanks a lot for this PR. I have left some inline comments. Basically, it's not very clear (at least to me) whether you're working on a directed or undirected graph and what you expect as input :-) Add Jaccard Similarity Metric Example - Key: FLINK-1741 URL: https://issues.apache.org/jira/browse/FLINK-1741 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu http://www.inside-r.org/packages/cran/igraph/docs/similarity -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1741) Add Jaccard Similarity Metric Example
[ https://issues.apache.org/jira/browse/FLINK-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386967#comment-14386967 ] ASF GitHub Bot commented on FLINK-1741: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/544#discussion_r27408463 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; +import org.apache.flink.types.NullValue; + +import java.util.HashSet; +import java.util.Iterator; + +/** + * Given an undirected, unweighted graph,return a weighted graph where the edge values are equal + * to the Jaccard similarity coefficient - the number of common neighbors divided by the total number + * of neighbors - for the src and target vertices. + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code JaccardSimilarityMeasureExample lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class JaccardSimilarityMeasureExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + GraphLong, NullValue, Double graph = Graph.fromDataSet(edges, env); + // undirect the graph + GraphLong, NullValue, Double undirectedGraph = graph.getUndirected(); + + DataSetVertexLong, HashSetLong verticesWithNeighbors = + undirectedGraph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); --- End diff -- If you have a directed graph, `EdgeDirection.ALL` will give you all neighbors (in-coming and out-going). If you have an undirected graph, then this will return them twice.. Add Jaccard Similarity Metric Example - Key: FLINK-1741 URL: https://issues.apache.org/jira/browse/FLINK-1741 Project: Flink Issue Type: Task Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu http://www.inside-r.org/packages/cran/igraph/docs/similarity -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...
Github user bhatsachin commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87796282 Great. 17:00 India Time Zone (UTC+05:30) would be perfect. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1501) Integrate metrics library and report basic metrics to JobManager web interface
[ https://issues.apache.org/jira/browse/FLINK-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387250#comment-14387250 ] ASF GitHub Bot commented on FLINK-1501: --- Github user bhatsachin commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87796282 Great. 17:00 India Time Zone (UTC+05:30) would be perfect. Integrate metrics library and report basic metrics to JobManager web interface -- Key: FLINK-1501 URL: https://issues.apache.org/jira/browse/FLINK-1501 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 0.9 As per mailing list, the library: https://github.com/dropwizard/metrics The goal of this task is to get the basic infrastructure in place. Subsequent issues will integrate more features into the system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387135#comment-14387135 ] ASF GitHub Bot commented on FLINK-1771: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87780049 A user I'm talking with offline depends on these changes. I'm currently hardening the tests on Travis, once that's done I'll merge the changes (probably in the next 12-15 hours). Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87817661 @rmetzger, the PR is too large to do effective review =( Could you kindly summarize the significant changes made to fix this? For example why introduce new class JobSubmissionResult. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1501) Integrate metrics library and report basic metrics to JobManager web interface
[ https://issues.apache.org/jira/browse/FLINK-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387144#comment-14387144 ] ASF GitHub Bot commented on FLINK-1501: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87781100 Cool. I'm available between 16:30 - 20:30 India Time Zone (UTC+05:30). Is that possible for you? Integrate metrics library and report basic metrics to JobManager web interface -- Key: FLINK-1501 URL: https://issues.apache.org/jira/browse/FLINK-1501 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 0.9 As per mailing list, the library: https://github.com/dropwizard/metrics The goal of this task is to get the basic infrastructure in place. Subsequent issues will integrate more features into the system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87781100 Cool. I'm available between 16:30 - 20:30 India Time Zone (UTC+05:30). Is that possible for you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1501) Integrate metrics library and report basic metrics to JobManager web interface
[ https://issues.apache.org/jira/browse/FLINK-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387260#comment-14387260 ] ASF GitHub Bot commented on FLINK-1501: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87798034 Confirmed ;) Looking forward talking to you tomorrow. My google hangout id is metrob...@gmail.com. Integrate metrics library and report basic metrics to JobManager web interface -- Key: FLINK-1501 URL: https://issues.apache.org/jira/browse/FLINK-1501 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 0.9 As per mailing list, the library: https://github.com/dropwizard/metrics The goal of this task is to get the basic infrastructure in place. Subsequent issues will integrate more features into the system. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387329#comment-14387329 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87817661 @rmetzger, the PR is too large to do effective review =( Could you kindly summarize the significant changes made to fix this? For example why introduce new class JobSubmissionResult. Thanks! Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87780049 A user I'm talking with offline depends on these changes. I'm currently hardening the tests on Travis, once that's done I'll merge the changes (probably in the next 12-15 hours). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-87798034 Confirmed ;) Looking forward talking to you tomorrow. My google hangout id is metrob...@gmail.com. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1741][gelly] Adds Jaccard Similarity Me...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/544#issuecomment-87830769 Hi @vasia , The problem(at least mine) was that Jaccard works on both an undirected and on a directed graph... and I just needed to make a final decision :). Directed won today! Next, I see why you wanted reduceOnEdges, I used it, but there's a small glitch there(in EmitOneEdgePerNode) that the test doesn't catch. I solved the problem in a not-so-elegant way for now, because I think that solving the problem should be a separate JIRA issue(it has nothing to do with Jaccard). So my suggestion is: we merge the code with the if statement; I open a bug, fix reduceOnEdges to work for this usecase as well and then everything will look nice and clean. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87840336 I know that the PR is touching many components. I'll try to split up my work into smaller parts. In this case I didn't expect in the beginning that I need to change so many things. The PR is mainly about adding support for executing a Flink job on YARN in a fire and forget fashion. Therefore, I needed to make some changes to the YARN client. In the previous big change to YARN, I added support for a detached yarn session. So that you can tell the Flink Yarn Client to start Flink on YARN without connecting to the AM afterwards. Users have to manage such a yarn session using other tools afterwards (for example `yarn application -kill` to stop it) This change brings this feature even further to support single flink jobs being submitted to YARN. But since the Yarn client doesn't connect to the AM once Flink has been started, there is no way to tell the AM to stop Flink on YARN again. In this change, I add a new Akka message for the ApplicationMaster `case class StopAMAfterJob(jobId:JobID)`. The message tells the AM to monitor the JM until the job has finished. Once that has happened, the AM stops Flink on YARN. To get this `JobID` I needed to make some changes to the CliFrontend / Client. The Client has two ways of submitting a Job to Flink: an attached mode (default) and a detached mode. The attached mode is returning the `JobExecutionResult`, the detached mode was returning nothing. I created a new type called `JobSubmissionResult` which is returned by the detached job submission. It only contains the job id. The JobExecutionResult is extending the JobSubmissionResult. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1805) The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log
[ https://issues.apache.org/jira/browse/FLINK-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387591#comment-14387591 ] ASF GitHub Bot commented on FLINK-1805: --- Github user matadorhong commented on a diff in the pull request: https://github.com/apache/flink/pull/546#discussion_r27441775 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java --- @@ -43,7 +45,10 @@ /** Flag to signify that the IOManager has been shut down already */ private final AtomicBoolean isShutdown = new AtomicBoolean(); - + + /** Logging */ + protected static final Logger LOG = LoggerFactory.getLogger(IOManagerAsync.class); --- End diff -- @hsaputra Yes, private here is better. Thanks you. I have updated it. The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log -- Key: FLINK-1805 URL: https://issues.apache.org/jira/browse/FLINK-1805 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Although class 'IOManagerAsync' is extended from 'IOManager' in package 'org.apache.flink.runtime.io.disk.iomanager', but I think it should has its own Log instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1805]The class IOManagerAsync should us...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/546#issuecomment-87879789 +1 LGTM Once Travis done will merge unless more review coming in. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1797] Add jumping pre-reducer for Count...
GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/549 [FLINK-1797] Add jumping pre-reducer for Count and Time windows I created 4 classes: for count and time windows, and their grouped versions. They extend the corresponding (grouped/non-grouped) TumblingPreReducer class, since the only difference from a tumbling window is that we skip some elements after an emitWindow. I added some unit tests for the new classes, and also did some testing with real streams. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink JumpingWindow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/549.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #549 commit 3c06ca4d0a1fc528eab5924f8ed3c5a98be5f168 Author: Gabor Gevay gga...@gmail.com Date: 2015-03-31T03:36:30Z [FLINK-1797] Add jumping pre-reducer for Count and Time windows --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387966#comment-14387966 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452509 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java --- @@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); --- End diff -- For this and other tests, if we made static method to create new Configuration, we could make it clear this is just for local executor: ``` LocalExecutor.createConfigForLocalExecutor(this.executor); ``` Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452529 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala --- @@ -18,12 +18,12 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.optimizer.util.CompilerTestBase --- End diff -- side effect of auto format? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387967#comment-14387967 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452529 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala --- @@ -18,12 +18,12 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.optimizer.util.CompilerTestBase --- End diff -- side effect of auto format? Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452585 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java --- @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, this.sessionFilesDir = sessionFilesDir; this.applicationId = appId; this.detached = detached; + this.flinkConfig = flinkConfig; + this.appId = appId; // get one application report manually intialAppReport = yarnClient.getApplicationReport(appId); String jobManagerHost = intialAppReport.getHost(); int jobManagerPort = intialAppReport.getRpcPort(); this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + } - if(!detached) { - // start actor system - LOG.info(Start actor system.); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); + /** +* Connect the FlinkYarnCluster to the ApplicationMaster. +* +* Detached YARN sessions don't need to connect to the ApplicationMaster. +* Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. +* +* @throws IOException +*/ + public void connectToCluster() throws IOException { + if(isConnected) { + throw new IllegalStateException(Can not connect to the cluster again); + } - // start application client - LOG.info(Start application client.); + // start actor system + LOG.info(Start actor system.); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient); + // start application client + LOG.info(Start application client.); - // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), applicationClient); + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); - // add hook to ensure proper shutdown - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationMaster has been stopped - actorSystem.awaitTermination(); + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info(Application + appId + finished with state + appReport - .getYarnApplicationState() + and final state + appReport - .getFinalApplicationStatus() + at + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() - == YarnApplicationState.KILLED) { - LOG.warn(Application failed. Diagnostics + appReport.getDiagnostics()); -
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387970#comment-14387970 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452585 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java --- @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, this.sessionFilesDir = sessionFilesDir; this.applicationId = appId; this.detached = detached; + this.flinkConfig = flinkConfig; + this.appId = appId; // get one application report manually intialAppReport = yarnClient.getApplicationReport(appId); String jobManagerHost = intialAppReport.getHost(); int jobManagerPort = intialAppReport.getRpcPort(); this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + } - if(!detached) { - // start actor system - LOG.info(Start actor system.); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); + /** +* Connect the FlinkYarnCluster to the ApplicationMaster. +* +* Detached YARN sessions don't need to connect to the ApplicationMaster. +* Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. +* +* @throws IOException +*/ + public void connectToCluster() throws IOException { + if(isConnected) { + throw new IllegalStateException(Can not connect to the cluster again); + } - // start application client - LOG.info(Start application client.); + // start actor system + LOG.info(Start actor system.); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient); + // start application client + LOG.info(Start application client.); - // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), applicationClient); + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); - // add hook to ensure proper shutdown - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationMaster has been stopped - actorSystem.awaitTermination(); + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info(Application + appId + finished with state + appReport - .getYarnApplicationState() + and final state + appReport - .getFinalApplicationStatus() + at + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED ||
[jira] [Commented] (FLINK-1797) Add jumping pre-reducer for Count and Time windows
[ https://issues.apache.org/jira/browse/FLINK-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387919#comment-14387919 ] ASF GitHub Bot commented on FLINK-1797: --- GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/549 [FLINK-1797] Add jumping pre-reducer for Count and Time windows I created 4 classes: for count and time windows, and their grouped versions. They extend the corresponding (grouped/non-grouped) TumblingPreReducer class, since the only difference from a tumbling window is that we skip some elements after an emitWindow. I added some unit tests for the new classes, and also did some testing with real streams. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink JumpingWindow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/549.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #549 commit 3c06ca4d0a1fc528eab5924f8ed3c5a98be5f168 Author: Gabor Gevay gga...@gmail.com Date: 2015-03-31T03:36:30Z [FLINK-1797] Add jumping pre-reducer for Count and Time windows Add jumping pre-reducer for Count and Time windows -- Key: FLINK-1797 URL: https://issues.apache.org/jira/browse/FLINK-1797 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gabor Gevay Fix For: 0.9 There is currently only support for sliding and tumbling windows. This should be an easy extension of the tumbling pre-reducer -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27451902 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -265,12 +266,32 @@ protected int run(String[] args) { } try { - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); - - int parallelism = options.getParallelism(); - int exitCode = executeProgram(program, client, parallelism); - - if (yarnCluster != null) { + int userParallelism = options.getParallelism(); + LOG.debug(User parallelism is set to {}, userParallelism); + + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism); + LOG.debug(Client slots is set to {}, client.getMaxSlots()); + if(client.getMaxSlots() != -1 userParallelism == -1) { + logAndSysout(Using the parallelism provided by the remote cluster (+client.getMaxSlots()+). + + To use another parallelism, set it at the ./bin/flink client.); + userParallelism = client.getMaxSlots(); + } + int exitCode = 0; + + // check if detached per job yarn cluster is used to start flink + if(yarnCluster != null yarnCluster.isDetached()) { + logAndSysout(The Flink YARN client has been started in detached mode. In order to stop + + Flink on YARN, use the following command or a YARN web interface to stop it:\n + + yarn application -kill +yarnCluster.getApplicationId()+\n + + Please also note that the temporary files of the YARN session in the home directoy will not be removed.); + executeProgram(program, client, userParallelism, false); + } else { + // regular (blocking) execution. + exitCode = executeProgram(program, client, userParallelism, true); + } + + // show YARN cluster status if its not a detached YARN cluster. + if (yarnCluster != null !yarnCluster.isDetached()) { --- End diff -- Since finally trumps returns statement, the code block following this check could be moved to finally statement below? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387939#comment-14387939 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27451902 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -265,12 +266,32 @@ protected int run(String[] args) { } try { - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); - - int parallelism = options.getParallelism(); - int exitCode = executeProgram(program, client, parallelism); - - if (yarnCluster != null) { + int userParallelism = options.getParallelism(); + LOG.debug(User parallelism is set to {}, userParallelism); + + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism); + LOG.debug(Client slots is set to {}, client.getMaxSlots()); + if(client.getMaxSlots() != -1 userParallelism == -1) { + logAndSysout(Using the parallelism provided by the remote cluster (+client.getMaxSlots()+). + + To use another parallelism, set it at the ./bin/flink client.); + userParallelism = client.getMaxSlots(); + } + int exitCode = 0; + + // check if detached per job yarn cluster is used to start flink + if(yarnCluster != null yarnCluster.isDetached()) { + logAndSysout(The Flink YARN client has been started in detached mode. In order to stop + + Flink on YARN, use the following command or a YARN web interface to stop it:\n + + yarn application -kill +yarnCluster.getApplicationId()+\n + + Please also note that the temporary files of the YARN session in the home directoy will not be removed.); + executeProgram(program, client, userParallelism, false); + } else { + // regular (blocking) execution. + exitCode = executeProgram(program, client, userParallelism, true); + } + + // show YARN cluster status if its not a detached YARN cluster. + if (yarnCluster != null !yarnCluster.isDetached()) { --- End diff -- Since finally trumps returns statement, the code block following this check could be moved to finally statement below? Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387958#comment-14387958 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452411 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean printStatus) { } // - + + public Configuration getConfiguration() { --- End diff -- I think we could use static method to indicate this is only for LocalExecutor: ```java public static Configuration createConfigForLocalExecutor(LocalExecutor le) { } ``` Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452411 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean printStatus) { } // - + + public Configuration getConfiguration() { --- End diff -- I think we could use static method to indicate this is only for LocalExecutor: ```java public static Configuration createConfigForLocalExecutor(LocalExecutor le) { } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387978#comment-14387978 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87936262 While it is a big patch, with the additional comments I was able to follow your changes. The main changes look good, so the rest I assume is side effect to make the refactor working. Added some small comments on the PR and other than those seems like ready to merge. Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452062 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle program.deleteExtractedLibraries(); } - LOG.info(Program execution finished); + if(wait) { + LOG.info(Program execution finished); + } - // we come here after the job has finished + // we come here after the job has finished (or the job has been submitted) if (execResult != null) { - System.out.println(Job Runtime: + execResult.getNetRuntime()); - MapString, Object accumulatorsResult = execResult.getAllAccumulatorResults(); - if (accumulatorsResult.size() 0) { - System.out.println(Accumulator Results: ); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + // if the job has been submitted to a detached YARN cluster, there won't be any + // exec results, but the object will be set (for the job id) + if(yarnCluster != null yarnCluster.isDetached()) { --- End diff -- Small style nit, space after if-else and parentheses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1771) Add support for submitting single jobs to a detached YARN session
[ https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387943#comment-14387943 ] ASF GitHub Bot commented on FLINK-1771: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452062 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle program.deleteExtractedLibraries(); } - LOG.info(Program execution finished); + if(wait) { + LOG.info(Program execution finished); + } - // we come here after the job has finished + // we come here after the job has finished (or the job has been submitted) if (execResult != null) { - System.out.println(Job Runtime: + execResult.getNetRuntime()); - MapString, Object accumulatorsResult = execResult.getAllAccumulatorResults(); - if (accumulatorsResult.size() 0) { - System.out.println(Accumulator Results: ); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + // if the job has been submitted to a detached YARN cluster, there won't be any + // exec results, but the object will be set (for the job id) + if(yarnCluster != null yarnCluster.isDetached()) { --- End diff -- Small style nit, space after if-else and parentheses. Add support for submitting single jobs to a detached YARN session - Key: FLINK-1771 URL: https://issues.apache.org/jira/browse/FLINK-1771 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger We need tests ensuring that the processing slots are set properly when starting Flink on YARN, in particular with the per job YARN session feature. Also, the YARN tests for detached YARN sessions / per job yarn clusters are polluting the local home-directory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452509 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java --- @@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); --- End diff -- For this and other tests, if we made static method to create new Configuration, we could make it clear this is just for local executor: ``` LocalExecutor.createConfigForLocalExecutor(this.executor); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87936262 While it is a big patch, with the additional comments I was able to follow your changes. The main changes look good, so the rest I assume is side effect to make the refactor working. Added some small comments on the PR and other than those seems like ready to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---