[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29662593 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +// dummy initialisation +var fetchCPULoad:(Any) = Double = (obj:Any) = -1 + +if(getCPULoadMethod != null){ + fetchCPULoad = (obj:Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + // Log getProcessCpuLoad method not available for Java 6 + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + +implementation for this Java runtime environment\n + Thread.currentThread().getStackTrace) +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ + val osMXBean = ManagementFactory.getOperatingSystemMXBean(). +asInstanceOf[com.sun.management.OperatingSystemMXBean] + return fetchCPULoad(osMXBean) --- End diff -- no return statement necessary in Scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1792) Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only
[ https://issues.apache.org/jira/browse/FLINK-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528354#comment-14528354 ] ASF GitHub Bot commented on FLINK-1792: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/553#issuecomment-99063892 Thanks for your work @bhatsachin. I had some remarks concerning the Scala style. Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only Key: FLINK-1792 URL: https://issues.apache.org/jira/browse/FLINK-1792 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Sachin Bhat As per https://github.com/apache/flink/pull/421 from FLINK-1501, there are some enhancements to the current monitoring required - Get the CPU utilization in % from each TaskManager process - Remove the metrics graph from the overview and only show the current stats as numbers (cpu load, heap utilization) and add a button to enable the detailed graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29662751 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() --- End diff -- Why not making `getCPULoadMethod` an `Option[Method]`? That way one avoids vicious `null` values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99070508 Hey @andralungu , I'm not sure what @StephanEwen had in mind when suggesting to subclass Vertex, but the current implementation forces the user to cast the `Vertex` to `VertexWithDegrees` in order to access the degrees, which is not a very user-friendly :S I guess the point was to have `VertexWithDegrees` as an argument to the update and messaging functions instead? If that's the case however, it's not quite clear to me how this will play along with the configuration option setting. Back to my previous comment on only accessing the degrees inside the update/messaging functions instead of adding public methods to the Vertex class, a simple way to do this would be the following: Add set/get vertexDegree methods in `VertexUpdateFunction` and `MessagingFunction`. These would be the methods that the user will call to access the vertex degrees, in a way similar to `getDirection()`, `getBroadcastSet()` etc. Inside `createResultVerticesWithDegrees`, you're already creating the augmented vertex dataset with the degrees and pass this as the solution set of the delta iteration. Then, in the corresponding coGroup method of the Messaging/VertexUpdate UDFs, you iterate over this dataset (state) and call the user-defined functions for each vertex. This is where you can set the degrees inside the update/messaging functions, instead of how you currently set them on the vertex object. What do you think of this solution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1523) Vertex-centric iteration extensions
[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528386#comment-14528386 ] ASF GitHub Bot commented on FLINK-1523: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99070508 Hey @andralungu , I'm not sure what @StephanEwen had in mind when suggesting to subclass Vertex, but the current implementation forces the user to cast the `Vertex` to `VertexWithDegrees` in order to access the degrees, which is not a very user-friendly :S I guess the point was to have `VertexWithDegrees` as an argument to the update and messaging functions instead? If that's the case however, it's not quite clear to me how this will play along with the configuration option setting. Back to my previous comment on only accessing the degrees inside the update/messaging functions instead of adding public methods to the Vertex class, a simple way to do this would be the following: Add set/get vertexDegree methods in `VertexUpdateFunction` and `MessagingFunction`. These would be the methods that the user will call to access the vertex degrees, in a way similar to `getDirection()`, `getBroadcastSet()` etc. Inside `createResultVerticesWithDegrees`, you're already creating the augmented vertex dataset with the degrees and pass this as the solution set of the delta iteration. Then, in the corresponding coGroup method of the Messaging/VertexUpdate UDFs, you iterate over this dataset (state) and call the user-defined functions for each vertex. This is where you can set the degrees inside the update/messaging functions, instead of how you currently set them on the vertex object. What do you think of this solution? Vertex-centric iteration extensions --- Key: FLINK-1523 URL: https://issues.apache.org/jira/browse/FLINK-1523 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Andra Lungu We would like to make the following extensions to the vertex-centric iterations of Gelly: - allow vertices to access their in/out degrees and the total number of vertices of the graph, inside the iteration. - allow choosing the neighborhood type (in/out/all) over which to run the vertex-centric iteration. Now, the model uses the updates of the in-neighbors to calculate state and send messages to out-neighbors. We could add a parameter with value in/out/all to the {{VertexUpdateFunction}} and {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [streaming] New Source and state checkpointing...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/643#issuecomment-99073001 Sorry, my bad, I confused the layers. These interfaces do not sufficiently describe things for incremental checkpointing. We may have to introduce additional interfaces for functions or operators that support incremental snapshotting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1792) Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only
[ https://issues.apache.org/jira/browse/FLINK-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528343#comment-14528343 ] ASF GitHub Bot commented on FLINK-1792: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29662675 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +// dummy initialisation +var fetchCPULoad:(Any) = Double = (obj:Any) = -1 + +if(getCPULoadMethod != null){ + fetchCPULoad = (obj:Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + // Log getProcessCpuLoad method not available for Java 6 + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + +implementation for this Java runtime environment\n + Thread.currentThread().getStackTrace) +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ + val osMXBean = ManagementFactory.getOperatingSystemMXBean(). +asInstanceOf[com.sun.management.OperatingSystemMXBean] + return fetchCPULoad(osMXBean) +} catch { + case t: Throwable = { +LOG.warn(Error retrieving CPU Load through OperatingSystemMXBean, t) +return -1 + } +} + } +}) metricRegistry } + + /** + * Fetches getProcessCpuLoad method if available in the + * OperatingSystemMXBean implementation else returns null + * @return + */ + private def getMethodToFetchCPULoad(): Method = { +val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods() +for(method - methodsList){ --- End diff -- Why not simply using a filter method on methods list? Much more scalaesque. Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only Key: FLINK-1792 URL: https://issues.apache.org/jira/browse/FLINK-1792 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Sachin Bhat As per https://github.com/apache/flink/pull/421 from FLINK-1501, there are some enhancements to the current monitoring required - Get the CPU utilization in % from each TaskManager process - Remove the metrics graph from the overview and only show the current stats as numbers (cpu load, heap utilization) and add a button to enable the detailed graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528405#comment-14528405 ] ASF GitHub Bot commented on FLINK-1807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r29666764 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{WeightVector, LabeledVector} +import org.apache.flink.ml.math.{Vector = FlinkVector, BLAS} + +/** Abstract class that implements some of the functionality for common loss functions + * + * A loss function determines the loss term $L(w) of the objective function $f(w) = L(w) + + * \lambda R(w)$ for prediction tasks, the other being regularization, $R(w)$. + * + * We currently only support differentiable loss functions, in the future this class + * could be changed to DiffLossFunction in order to support other types, such absolute loss. + */ +abstract class LossFunction extends Serializable{ + + /** Calculates the loss for a given prediction/truth pair +* +* @param prediction The predicted value +* @param truth The true value +*/ + protected def loss(prediction: Double, truth: Double): Double + + /** Calculates the derivative of the loss function with respect to the prediction +* +* @param prediction The predicted value +* @param truth The true value +*/ + protected def lossDerivative(prediction: Double, truth: Double): Double + + /** Compute the gradient and the loss for the given data. +* The provided cumGradient is updated in place. +* +* @param example The features and the label associated with the example +* @param weights The current weight vector +* @param cumGradient The vector to which the gradient will be added to, in place. +* @return A tuple containing the computed loss as its first element and a the loss derivative as +* its second element. +*/ + def lossAndGradient( + example: LabeledVector, + weights: WeightVector, + cumGradient: FlinkVector, + regType: RegularizationType, + regParameter: Double, + predictionFunction: (FlinkVector, WeightVector) = Double): + (Double, Double) = { +val features = example.vector +val label = example.label +// TODO(tvas): We could also provide for the case where we don't want an intercept value +// i.e. data already centered +val prediction = predictionFunction(features, weights) +//val prediction = BLAS.dot(features, weights.weights) + weights.intercept +val lossValue: Double = loss(prediction, label) +// The loss derivative is used to update the intercept +val lossDeriv= lossDerivative(prediction, label) +// Update the gradient +BLAS.axpy(lossDeriv , features, cumGradient) --- End diff -- This only works if the gradient of the prediction function is the features vector. Not necessarily true. Stochastic gradient descent optimizer for ML library Key: FLINK-1807 URL: https://issues.apache.org/jira/browse/FLINK-1807 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Stochastic gradient descent (SGD) is a widely used optimization technique in different ML algorithms. Thus, it would be helpful to provide a generalized SGD implementation which can be instantiated with the respective gradient computation. Such a building block would make the
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528410#comment-14528410 ] ASF GitHub Bot commented on FLINK-1807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/613#discussion_r29667008 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.ml.optimization + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math._ +import org.apache.flink.ml.optimization.IterativeSolver.{Iterations, Stepsize} +import org.apache.flink.ml.optimization.Solver._ + +/** This [[Solver]] performs Stochastic Gradient Descent optimization using mini batches + * + * For each labeled vector in a mini batch the gradient is computed and added to a partial + * gradient. The partial gradients are then summed and divided by the size of the batches. The + * average gradient is then used to updated the weight values, including regularization. + * + * At the moment, the whole partition is used for SGD, making it effectively a batch gradient + * descent. Once a sampling operator has been introduced, the algorithm can be optimized + * + * @param runParameters The parameters to tune the algorithm. Currently these include: + * [[Solver.LossFunction]] for the loss function to be used, + * [[Solver.RegularizationType]] for the type of regularization, + * [[Solver.RegularizationParameter]] for the regularization parameter, + * [[IterativeSolver.Iterations]] for the maximum number of iteration, + * [[IterativeSolver.Stepsize]] for the learning rate used. + */ +class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { + + import Solver.WEIGHTVECTOR_BROADCAST + + var parameterMap: ParameterMap = parameters ++ runParameters + + // TODO(tvas): Use once we have proper sampling in place +// case object MiniBatchFraction extends Parameter[Double] { +//val defaultValue = Some(1.0) +// } +// +// def setMiniBatchFraction(fraction: Double): GradientDescent = { +//parameterMap.add(MiniBatchFraction, fraction) +//this +// } + + /** Performs one iteration of Stochastic Gradient Descent using mini batches +* +* @param data A Dataset of LabeledVector (label, features) pairs +* @param currentWeights A Dataset with the current weights to be optimized as its only element +* @return A Dataset containing the weights after one stochastic gradient descent step +*/ + private def SGDStep(data: DataSet[(LabeledVector)], currentWeights: DataSet[WeightVector]): + DataSet[WeightVector] = { + +// TODO: Sample from input to realize proper SGD +data.map { + new GradientCalculation +}.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST).reduce { + (left, right) = +val (leftGradVector, leftLoss, leftCount) = left +val (rightGradVector, rightLoss, rightCount) = right +// Add the left gradient to the right one +BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights) +val gradients = WeightVector( + rightGradVector.weights, leftGradVector.intercept + rightGradVector.intercept) + +(gradients , leftLoss + rightLoss, leftCount + rightCount) +}.map { + new WeightsUpdate +}.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) + } + + /** Provides a solution for
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528491#comment-14528491 ] Aljoscha Krettek commented on FLINK-1962: - If would be against reimplementing everything on top of the Scala Batch API. This would soon turn into a maintenance nightmare, I would assume. Add Gelly Scala API --- Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1974] JobExecutionResult NetRuntime - d...
GitHub user jkirsch opened a pull request: https://github.com/apache/flink/pull/652 [FLINK-1974] JobExecutionResult NetRuntime - document result type - Added documentation to indicate that the return type is in milliseconds - Added an elapsedNetRuntime method which accepts a desired time unit for easy conversion You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkirsch/incubator-flink JobExecutionResult Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #652 commit ea909b5298828851149e5a087a03dffcf0a09fca Author: Johannes jkirschn...@gmail.com Date: 2015-05-05T15:32:41Z [FLINK-1974] JobExecutionResult NetRuntime - document result type - Added documentation to indicate that the return type is in milliseconds - Added an elapsedNetRuntime method which accepts a desired time unit for easy conversion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1974) JobExecutionResult NetRuntime - document result type
[ https://issues.apache.org/jira/browse/FLINK-1974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528685#comment-14528685 ] ASF GitHub Bot commented on FLINK-1974: --- GitHub user jkirsch opened a pull request: https://github.com/apache/flink/pull/652 [FLINK-1974] JobExecutionResult NetRuntime - document result type - Added documentation to indicate that the return type is in milliseconds - Added an elapsedNetRuntime method which accepts a desired time unit for easy conversion You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkirsch/incubator-flink JobExecutionResult Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #652 commit ea909b5298828851149e5a087a03dffcf0a09fca Author: Johannes jkirschn...@gmail.com Date: 2015-05-05T15:32:41Z [FLINK-1974] JobExecutionResult NetRuntime - document result type - Added documentation to indicate that the return type is in milliseconds - Added an elapsedNetRuntime method which accepts a desired time unit for easy conversion JobExecutionResult NetRuntime - document result type Key: FLINK-1974 URL: https://issues.apache.org/jira/browse/FLINK-1974 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.9 Environment: The JobExecutionResult stores the net execution times in milliseconds, but does not say so. A simple fix is to add it to the JavaDoc .. a more complete fix would be to change the method name to getNetRuntimeMs or even use an auto conversion, such as getElapsedNetRuntime(Timeunit) which would autoconvert Reporter: Johannes Assignee: Johannes Priority: Trivial Labels: easyfix -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1976) Add ForwardedFields* hints for the optimizer
[ https://issues.apache.org/jira/browse/FLINK-1976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528832#comment-14528832 ] Vasia Kalavri commented on FLINK-1976: -- Big +1! Check also the classes in the graph.utils package! Add ForwardedFields* hints for the optimizer - Key: FLINK-1976 URL: https://issues.apache.org/jira/browse/FLINK-1976 Project: Flink Issue Type: Wish Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Some classes in Graph.java can be improved by adding ForwardedFields* annotations. For instance, EmitOneEdgePerNode, EmitOneVertexWithEdgeValuePerNode, EmitOneEdgeWithNeighborPerNode, ProjectEdgeWithNeighbor, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1975) Graph getUndirected improvement
Andra Lungu created FLINK-1975: -- Summary: Graph getUndirected improvement Key: FLINK-1975 URL: https://issues.apache.org/jira/browse/FLINK-1975 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9 Reporter: Andra Lungu Assignee: Andra Lungu Priority: Minor The current getUndirected method uses a union and a map to add the reverse edges to the graph. This could be done by simply using a flatMap that adds the edge and its reversed version - a flatMap is less expensive than a union followed by a map. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user bhatsachin commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29682706 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +// dummy initialisation +var fetchCPULoad:(Any) = Double = (obj:Any) = -1 --- End diff -- We need to define fetchCPULoad method as per the object returned by getMethodToFetchCPULoad. val cannot be reassigned and has to be initialised. Hence var has been used to overcome this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1792) Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only
[ https://issues.apache.org/jira/browse/FLINK-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528703#comment-14528703 ] ASF GitHub Bot commented on FLINK-1792: --- Github user bhatsachin commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29682706 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +// dummy initialisation +var fetchCPULoad:(Any) = Double = (obj:Any) = -1 --- End diff -- We need to define fetchCPULoad method as per the object returned by getMethodToFetchCPULoad. val cannot be reassigned and has to be initialised. Hence var has been used to overcome this. Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only Key: FLINK-1792 URL: https://issues.apache.org/jira/browse/FLINK-1792 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Sachin Bhat As per https://github.com/apache/flink/pull/421 from FLINK-1501, there are some enhancements to the current monitoring required - Get the CPU utilization in % from each TaskManager process - Remove the metrics graph from the overview and only show the current stats as numbers (cpu load, heap utilization) and add a button to enable the detailed graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1792) Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only
[ https://issues.apache.org/jira/browse/FLINK-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528340#comment-14528340 ] ASF GitHub Bot commented on FLINK-1792: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29662593 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) + +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +// dummy initialisation +var fetchCPULoad:(Any) = Double = (obj:Any) = -1 + +if(getCPULoadMethod != null){ + fetchCPULoad = (obj:Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + // Log getProcessCpuLoad method not available for Java 6 + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + +implementation for this Java runtime environment\n + Thread.currentThread().getStackTrace) +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ + val osMXBean = ManagementFactory.getOperatingSystemMXBean(). +asInstanceOf[com.sun.management.OperatingSystemMXBean] + return fetchCPULoad(osMXBean) --- End diff -- no return statement necessary in Scala Improve TM Monitoring: CPU utilization, hide graphs by default and show summary only Key: FLINK-1792 URL: https://issues.apache.org/jira/browse/FLINK-1792 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Sachin Bhat As per https://github.com/apache/flink/pull/421 from FLINK-1501, there are some enhancements to the current monitoring required - Get the CPU utilization in % from each TaskManager process - Remove the metrics graph from the overview and only show the current stats as numbers (cpu load, heap utilization) and add a button to enable the detailed graph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [WIP] - [FLINK-1807/1889] - Optimization frame...
Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/613#issuecomment-99064842 BTW some of the builds are still failing due to excessive logging. Any ideas for a fix? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1871) Add a Spargel to Gelly migration guide
[ https://issues.apache.org/jira/browse/FLINK-1871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-1871. -- Resolution: Implemented Fix Version/s: 0.9 Add a Spargel to Gelly migration guide -- Key: FLINK-1871 URL: https://issues.apache.org/jira/browse/FLINK-1871 Project: Flink Issue Type: Task Components: docs, Gelly, Spargel Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor Fix For: 0.9 The guide should explain how users can port their existing Spargel applications to Gelly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1942][gelly] GSA Iteration Configuratio...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/635#issuecomment-99169722 Yap, that's what I had in mind! Only the class names are a bit too long. Maybe VertexCentricConfiguration and GSAConfiguration? Or something similar? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1942) Add configuration options to Gelly-GSA
[ https://issues.apache.org/jira/browse/FLINK-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529002#comment-14529002 ] ASF GitHub Bot commented on FLINK-1942: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/635#issuecomment-99169722 Yap, that's what I had in mind! Only the class names are a bit too long. Maybe VertexCentricConfiguration and GSAConfiguration? Or something similar? Add configuration options to Gelly-GSA -- Key: FLINK-1942 URL: https://issues.apache.org/jira/browse/FLINK-1942 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Currently, it is not possible to configure a GSA iteration. Similarly to vertex-centric, we should allow setting the iteration name and degree of parallelism, aggregators, broadcast variables and whether the solution set is kept in unmanaged memory. The docs should be updated accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29648339 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// Log getProcessCpuLoad method not available for Java 6 +if(getCPULoadMethod == null){ + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + + implementation for this Java runtime environment\n+Thread.currentThread().getStackTrace) +} + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +val fetchCPULoad: (Any) = Double = if (getCPULoadMethod != null) { + (obj: Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + (obj: Any) = -1 +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ --- End diff -- Why aren't you doing the check `getCPULoadMethod != null` before the cast? if it is null, return -1, if not, do the cast and fetch the CPU load. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [streaming] New Source and state checkpointing...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/643#issuecomment-98964566 Do you think these interfaces will work if at any point we want to do incremental snapshots? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1953] [runtime] Integrate new snapshot ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/651#issuecomment-98984867 Great! I'll integrate it with my new Kafka Source and test everything on a cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [PATCH] Correct a exmample code in gelly doc.
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/650#issuecomment-98985796 Good catch! Thanks @joey001! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1707][WIP]Add an Affinity Propagation L...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/649#issuecomment-98983169 For the test: did you have a look at the other Gelly tests(https://github.com/apache/flink/tree/master/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example)? They are pretty easy to follow. Unless your algorithm gives non-deterministic results. Is this the case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method
[ https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528075#comment-14528075 ] ASF GitHub Bot commented on FLINK-1707: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/649#issuecomment-98983169 For the test: did you have a look at the other Gelly tests(https://github.com/apache/flink/tree/master/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example)? They are pretty easy to follow. Unless your algorithm gives non-deterministic results. Is this the case? Add an Affinity Propagation Library Method -- Key: FLINK-1707 URL: https://issues.apache.org/jira/browse/FLINK-1707 Project: Flink Issue Type: New Feature Components: Gelly Reporter: Vasia Kalavri Assignee: joey Priority: Minor This issue proposes adding the an implementation of the Affinity Propagation algorithm as a Gelly library method and a corresponding example. The algorithm is described in paper [1] and a description of a vertex-centric implementation can be found is [2]. [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1953) Rework Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-1953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528078#comment-14528078 ] ASF GitHub Bot commented on FLINK-1953: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/651#issuecomment-98984867 Great! I'll integrate it with my new Kafka Source and test everything on a cluster. Rework Checkpoint Coordinator - Key: FLINK-1953 URL: https://issues.apache.org/jira/browse/FLINK-1953 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 The checkpoint coordinator currently contains no tests and is vulnerable to a variety of situations. In particular, I propose to add: - Better configurability which tasks receive the trigger checkpoint messages, which tasks need to acknowledge the checkpoint, and which tasks need to receive confirmation messages. - checkpoint timeouts, such that incomplete checkpoints are guaranteed to be cleaned up after a while, regardless of successful checkpoints - better sanity checking of messages and fields, to properly handle/ignore messages for old/expired checkpoints, or invalidly routed messages - Better handling of checkpoint attempts at points where the execution has just failed is is currently being canceled. - Add a good set of tests -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user bhatsachin commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29650229 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// Log getProcessCpuLoad method not available for Java 6 +if(getCPULoadMethod == null){ + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + + implementation for this Java runtime environment\n+Thread.currentThread().getStackTrace) +} + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +val fetchCPULoad: (Any) = Double = if (getCPULoadMethod != null) { + (obj: Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + (obj: Any) = -1 +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ --- End diff -- Hey, correct me if I am wrong. I found that Scala needs to initialise a variable during declaration. Hence if the variable is declared in an if else clause, it won't be visible to the rest of the code. And reassignment to the val was giving errors. Hence I have made use of the null check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1792] TM Monitoring: CPU utilization, h...
Github user bhatsachin commented on a diff in the pull request: https://github.com/apache/flink/pull/553#discussion_r29650540 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1894,6 +1895,52 @@ object TaskManager { override def getValue: Double = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage() }) +// Preprocessing steps for registering cpuLoad +// fetch the method to get process CPU load +val getCPULoadMethod: Method = getMethodToFetchCPULoad() + +// Log getProcessCpuLoad method not available for Java 6 +if(getCPULoadMethod == null){ + LOG.warn(getProcessCpuLoad method not available in the Operating System Bean + + implementation for this Java runtime environment\n+Thread.currentThread().getStackTrace) +} + +// define the fetchCPULoad method as per the fetched getCPULoadMethod +val fetchCPULoad: (Any) = Double = if (getCPULoadMethod != null) { + (obj: Any) = getCPULoadMethod.invoke(obj).asInstanceOf[Double] +} else { + (obj: Any) = -1 +} + +metricRegistry.register(cpuLoad, new Gauge[Double] { + override def getValue: Double = { +try{ --- End diff -- I will try using var instead of val and update the PR immediately --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1871][gelly] Added Spargel to Gelly mig...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/600 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1944) Add a Gelly-GSA PageRank example
[ https://issues.apache.org/jira/browse/FLINK-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-1944. -- Resolution: Implemented Fix Version/s: 0.9 Add a Gelly-GSA PageRank example Key: FLINK-1944 URL: https://issues.apache.org/jira/browse/FLINK-1944 Project: Flink Issue Type: Task Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor Fix For: 0.9 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1942][gelly] GSA Iteration Configuratio...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/635#issuecomment-99157184 Hi @vasia, The alternatives to having a single config class for all types of iterations are: - having two separate classes in spargel and in gsa = massive duplication of code - having an InterationConfiguration abstract class that VertexCentricIterationConfiguration and GatherSumApplyIterationConfiguration will inherit. I would personally go for the second approach! What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1942) Add configuration options to Gelly-GSA
[ https://issues.apache.org/jira/browse/FLINK-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528901#comment-14528901 ] ASF GitHub Bot commented on FLINK-1942: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/635#issuecomment-99157184 Hi @vasia, The alternatives to having a single config class for all types of iterations are: - having two separate classes in spargel and in gsa = massive duplication of code - having an InterationConfiguration abstract class that VertexCentricIterationConfiguration and GatherSumApplyIterationConfiguration will inherit. I would personally go for the second approach! What do you think? Add configuration options to Gelly-GSA -- Key: FLINK-1942 URL: https://issues.apache.org/jira/browse/FLINK-1942 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Currently, it is not possible to configure a GSA iteration. Similarly to vertex-centric, we should allow setting the iteration name and degree of parallelism, aggregators, broadcast variables and whether the solution set is kept in unmanaged memory. The docs should be updated accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1871) Add a Spargel to Gelly migration guide
[ https://issues.apache.org/jira/browse/FLINK-1871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528868#comment-14528868 ] ASF GitHub Bot commented on FLINK-1871: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/600 Add a Spargel to Gelly migration guide -- Key: FLINK-1871 URL: https://issues.apache.org/jira/browse/FLINK-1871 Project: Flink Issue Type: Task Components: docs, Gelly, Spargel Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor The guide should explain how users can port their existing Spargel applications to Gelly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1944) Add a Gelly-GSA PageRank example
[ https://issues.apache.org/jira/browse/FLINK-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14528867#comment-14528867 ] ASF GitHub Bot commented on FLINK-1944: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/626 Add a Gelly-GSA PageRank example Key: FLINK-1944 URL: https://issues.apache.org/jira/browse/FLINK-1944 Project: Flink Issue Type: Task Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Andra Lungu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1682) Port Record-API based optimizer tests to new Java API
[ https://issues.apache.org/jira/browse/FLINK-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529265#comment-14529265 ] ASF GitHub Bot commented on FLINK-1682: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/627 Port Record-API based optimizer tests to new Java API - Key: FLINK-1682 URL: https://issues.apache.org/jira/browse/FLINK-1682 Project: Flink Issue Type: Sub-task Components: Optimizer Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Minor Fix For: 0.9 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1523) Vertex-centric iteration extensions
[ https://issues.apache.org/jira/browse/FLINK-1523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529347#comment-14529347 ] ASF GitHub Bot commented on FLINK-1523: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99232613 Hey @vasia , Everyone is acting as though I do not want to make these methods user-freindly. I do :), it's just not possible. Keep in mind that in the VertexUpdateFunction and in the MessagingFunction you can only access the vertex in the updateVertex() and sendMessages() methods. getDirection(), getBroadcastSet() return a single value for all the vertices. getDegree() returns one value for each of the vertices. As previously implied, you cannot access the vertex in these classes. The only very ugly solution is to give it the entire DataSet and to constantly filter it. You basically have the degree of the vertex in the Tuple3, but this information is hidden from the user. Vertex-centric iteration extensions --- Key: FLINK-1523 URL: https://issues.apache.org/jira/browse/FLINK-1523 Project: Flink Issue Type: Improvement Components: Gelly Reporter: Vasia Kalavri Assignee: Andra Lungu We would like to make the following extensions to the vertex-centric iterations of Gelly: - allow vertices to access their in/out degrees and the total number of vertices of the graph, inside the iteration. - allow choosing the neighborhood type (in/out/all) over which to run the vertex-centric iteration. Now, the model uses the updates of the in-neighbors to calculate state and send messages to out-neighbors. We could add a parameter with value in/out/all to the {{VertexUpdateFunction}} and {{MessagingFunction}}, that would indicate the type of neighborhood. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99232613 Hey @vasia , Everyone is acting as though I do not want to make these methods user-freindly. I do :), it's just not possible. Keep in mind that in the VertexUpdateFunction and in the MessagingFunction you can only access the vertex in the updateVertex() and sendMessages() methods. getDirection(), getBroadcastSet() return a single value for all the vertices. getDegree() returns one value for each of the vertices. As previously implied, you cannot access the vertex in these classes. The only very ugly solution is to give it the entire DataSet and to constantly filter it. You basically have the degree of the vertex in the Tuple3, but this information is hidden from the user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---