[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository
[ https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617040#comment-14617040 ] ASF GitHub Bot commented on FLINK-2200: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/885#discussion_r34067611 --- Diff: docs/apis/programming_guide.md --- @@ -187,7 +187,17 @@ that creates the type information for Flink operations. /div /div + Scala Dependency Versions +Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we provide multiple artifacts +to support both Scala versions. If you want to run your program on Flink with Scala 2.11, you need +to add a suffix `_2.11` to all Flink artifact ids in your dependencies. You should be careful with +this difference of artifact id. All modules with Scala 2.11 have a suffix `_2.11` in artifact id. +For example, `flink-java` should be changed to `flink-java_2.11` and `flink-clients` should be +changed to `flink-clients_2.11`. --- End diff -- @aalexandrov I'm not sure it is good to use the keyword Scala-dependent. We cross-build all modules including java-based but linked with Scala modules. I think that omitting Scala-dependent is much better. Flink API with Scala 2.11 - Maven Repository Key: FLINK-2200 URL: https://issues.apache.org/jira/browse/FLINK-2200 Project: Flink Issue Type: Wish Components: Build System, Scala API Reporter: Philipp Götze Assignee: Chiwan Park Priority: Trivial Labels: maven It would be nice if you could upload a pre-built version of the Flink API with Scala 2.11 to the maven repository. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/885#discussion_r34067611 --- Diff: docs/apis/programming_guide.md --- @@ -187,7 +187,17 @@ that creates the type information for Flink operations. /div /div + Scala Dependency Versions +Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we provide multiple artifacts +to support both Scala versions. If you want to run your program on Flink with Scala 2.11, you need +to add a suffix `_2.11` to all Flink artifact ids in your dependencies. You should be careful with +this difference of artifact id. All modules with Scala 2.11 have a suffix `_2.11` in artifact id. +For example, `flink-java` should be changed to `flink-java_2.11` and `flink-clients` should be +changed to `flink-clients_2.11`. --- End diff -- @aalexandrov I'm not sure it is good to use the keyword Scala-dependent. We cross-build all modules including java-based but linked with Scala modules. I think that omitting Scala-dependent is much better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/696#discussion_r34017778 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala --- @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSetUtils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.{DistanceMetric, EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * This algorithm calculates `k` nearest neighbor points in training set for each points of + * testing set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.classification.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]] + * Sets the distance metric to calculate distance between two points. If no metric is specified, + * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. (Default value: + * '''EuclideanDistanceMetric()''') + * + */ +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[Vector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k 1, K must be positive.) +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/ + def setDistanceMetric(metric: DistanceMetric): KNN = { +parameters.add(DistanceMetric, metric) +this + } + + /** Sets the number of data blocks/partitions +* @param n the number of data blocks +*/ + def setBlocks(n: Int): KNN = { +require(n 1, Number of blocks must be positive.) +parameters.add(Blocks, n) +this + } +} + +object KNN { + + case object K extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + case object DistanceMetric extends Parameter[DistanceMetric] { +val defaultValue: Option[DistanceMetric] = Some(EuclideanDistanceMetric()) + } + + case object Blocks extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + def apply(): KNN = { +new KNN() + } + + /** [[FitOperation]] which trains a KNN based on the given training data set. +* @tparam T Subtype of [[Vector]] +*/ + implicit def fitKNN[T : Vector : TypeInformation] = new FitOperation[KNN, T] {
[jira] [Commented] (FLINK-2208) Build error for Java IBM
[ https://issues.apache.org/jira/browse/FLINK-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616397#comment-14616397 ] ASF GitHub Bot commented on FLINK-2208: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/833#issuecomment-119133444 Is anything happening with this pull request? If you do not want to update it any more, could you close it? Build error for Java IBM Key: FLINK-2208 URL: https://issues.apache.org/jira/browse/FLINK-2208 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 0.9 Reporter: Felix Neutatz Assignee: Felix Neutatz Priority: Minor Using IBM Java 7 will break the built: {code:xml} [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ flink-runtime --- [INFO] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/java:-1: info: compiling [INFO] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala:-1: info: compiling [INFO] Compiling 461 source files to /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/target/classes at 1434059956648 [ERROR] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1768: error: type OperatingSystemMXBean is not a member of package com.sun.management [ERROR] asInstanceOf[com.sun.management.OperatingSystemMXBean]). [ERROR] ^ [ERROR] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1787: error: type OperatingSystemMXBean is not a member of package com.sun.management [ERROR] val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods() [ERROR] ^ [ERROR] two errors found [INFO] [INFO] Reactor Summary: [INFO] [INFO] flink .. SUCCESS [ 14.447 s] [INFO] flink-shaded-hadoop SUCCESS [ 2.548 s] [INFO] flink-shaded-include-yarn .. SUCCESS [ 36.122 s] [INFO] flink-shaded-include-yarn-tests SUCCESS [ 36.980 s] [INFO] flink-core . SUCCESS [ 21.887 s] [INFO] flink-java . SUCCESS [ 16.023 s] [INFO] flink-runtime .. FAILURE [ 20.241 s] [INFO] flink-optimizer SKIPPED [hadoop@ibm-power-1 /]$ java -version java version 1.7.0 Java(TM) SE Runtime Environment (build pxp6470_27sr1fp1-20140708_01(SR1 FP1)) IBM J9 VM (build 2.7, JRE 1.7.0 Linux ppc64-64 Compressed References 20140707_205525 (JIT enabled, AOT enabled) J9VM - R27_Java727_SR1_20140707_1408_B205525 JIT - tr.r13.java_20140410_61421.07 GC - R27_Java727_SR1_20140707_1408_B205525_CMPRSS J9CL - 20140707_205525) JCL - 20140707_01 based on Oracle 7u65-b16 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616367#comment-14616367 ] ASF GitHub Bot commented on FLINK-1745: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/696#discussion_r34017778 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala --- @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSetUtils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.{DistanceMetric, EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * This algorithm calculates `k` nearest neighbor points in training set for each points of + * testing set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.classification.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]] + * Sets the distance metric to calculate distance between two points. If no metric is specified, + * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. (Default value: + * '''EuclideanDistanceMetric()''') + * + */ +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[Vector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k 1, K must be positive.) +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/ + def setDistanceMetric(metric: DistanceMetric): KNN = { +parameters.add(DistanceMetric, metric) +this + } + + /** Sets the number of data blocks/partitions +* @param n the number of data blocks +*/ + def setBlocks(n: Int): KNN = { +require(n 1, Number of blocks must be positive.) +parameters.add(Blocks, n) +this + } +} + +object KNN { + + case object K extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + case object DistanceMetric extends Parameter[DistanceMetric] { +val defaultValue: Option[DistanceMetric] = Some(EuclideanDistanceMetric()) + } + + case object Blocks extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + def apply(): KNN = { +
[jira] [Created] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)
Gyula Fora created FLINK-2323: - Summary: Rename OperatorState methods to .value() and .update(..) Key: FLINK-2323 URL: https://issues.apache.org/jira/browse/FLINK-2323 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor We should rename OperatorState methods to .value() and .update(..) from getState and updateState to make it more clear. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2324) Rework partitioned state storage
Gyula Fora created FLINK-2324: - Summary: Rework partitioned state storage Key: FLINK-2324 URL: https://issues.apache.org/jira/browse/FLINK-2324 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Partitioned states are currently stored per-key in statehandles. This is alright for in-memory storage but is very inefficient for HDFS. The logic behind the current mechanism is that this approach provides a way to repartition a state without fetching the data from the external storage and only manipulating handles. We should come up with a solution that can achieve both. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2180] [streaming] Iteration test fix
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/802#issuecomment-119321451 The main purpose of this is to enable a deterministic (non-time based) test for iteration as it sometimes failed for some reason on Travis. (See [here] (https://issues.apache.org/jira/browse/FLINK-2180?focusedCommentId=14576968page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14576968)). I thought it might also be useful to users for debugging purposes (similarly to maxWaitTime) and it was also easier to introduce it in the API level. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/871#discussion_r34032155 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( +scoringFct: (PredictionType, PredictionType) = Double) +(implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { +trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean() + } +} + +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) = +val sy1 = scala.math.signum(y1) +val sy2 = scala.math.signum(y2) +if (sy1 == sy2) 0 else 1 + }) with Loss + + /** Calculates the coefficient of determination, $R^2^$ +* +* $R^2^$ indicates how well the data fit the a calculated model +* Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] +*/ + def r2Score = new Score[Double] with PerformanceScore { +override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction = truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val ssRes = trueAndPredicted +.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue +.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) --- End diff -- Will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616639#comment-14616639 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34035723 --- Diff: docs/setup/jobmanager_high_availability.md --- @@ -0,0 +1,121 @@ +--- +title: JobManager High Availability (HA) +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +The JobManager is the coordinator of each Flink deployment. It is responsible for both *scheduling* and *resource management*. + +By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. + +With JobManager High Availability, you can run multiple JobManager instances per Flink cluster and thereby circumvent the *SPOF*. + +The general idea of JobManager high availability is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby. + +As an example, consider the following setup with three JobManager instances: + +img src=fig/jobmanager_ha_overview.png class=center / + +## Configuration + +To enable JobManager High Availability you have to configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts. + +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distirbuted coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. + +Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availability mode and all Flink components try to connect to a JobManager via coordination through ZooKeeper. + +- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. + + preha.zookeeper.quorum: address1:2181[,...],addressX:2181/pre + + Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + +- The following configuration keys are optional: + + - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination + - TODO Add client configuration keys + +## Starting an HA-cluster + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started. + + pre +jobManagerAddress1 +[...] +jobManagerAddressX + /pre + +After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start a HA-cluster. **Keep in mind that the ZooKeeper quorum has to be running when you call the scripts**. + +## Running ZooKeeper + +If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. + +There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: + +pre +server.X=addressX:peerPort:leaderPort +[...] +server.Y=addressY:peerPort:leaderPort +/pre + +The script
[jira] [Commented] (FLINK-2157) Create evaluation framework for ML library
[ https://issues.apache.org/jira/browse/FLINK-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616580#comment-14616580 ] ASF GitHub Bot commented on FLINK-2157: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/871#discussion_r34032155 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( +scoringFct: (PredictionType, PredictionType) = Double) +(implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { +trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean() + } +} + +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) = +val sy1 = scala.math.signum(y1) +val sy2 = scala.math.signum(y2) +if (sy1 == sy2) 0 else 1 + }) with Loss + + /** Calculates the coefficient of determination, $R^2^$ +* +* $R^2^$ indicates how well the data fit the a calculated model +* Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] +*/ + def r2Score = new Score[Double] with PerformanceScore { +override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction = truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val ssRes = trueAndPredicted +.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue +.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) --- End diff -- Will do. Create evaluation framework for ML library -- Key: FLINK-2157 URL: https://issues.apache.org/jira/browse/FLINK-2157 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.10 Currently, FlinkML lacks means to evaluate the performance of trained models. It would be great to add some {{Evaluators}} which can calculate some score
[jira] [Assigned] (FLINK-1943) Add Gelly-GSA compiler and translation tests
[ https://issues.apache.org/jira/browse/FLINK-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri reassigned FLINK-1943: Assignee: Vasia Kalavri Add Gelly-GSA compiler and translation tests Key: FLINK-1943 URL: https://issues.apache.org/jira/browse/FLINK-1943 Project: Flink Issue Type: Test Components: Gelly Affects Versions: 0.9 Reporter: Vasia Kalavri Assignee: Vasia Kalavri These should be similar to the corresponding Spargel tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Collect(): Fixing the akka.framesize size limi...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/887#issuecomment-119140624 Ok, sounds good! Could you give the number of the ticket of the changes @mxm is doing? Just to have a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Hits
Github user mfahimazizi commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-119188834 Hi @vasia, sorry for late reply. I set indentation with tabs instead of spaces in IDE. and push new updated files. as I discuss in jira issue about vertexupdate function, where we couldn't access edges inside this function. if we overload this function with following, then we can identify Hub and Authority edges in undirected graph. ```public void updateVertex(VertexK, Double vertex, MessageIteratorEdgeLong,String,Double inMessages)``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34035723 --- Diff: docs/setup/jobmanager_high_availability.md --- @@ -0,0 +1,121 @@ +--- +title: JobManager High Availability (HA) +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +The JobManager is the coordinator of each Flink deployment. It is responsible for both *scheduling* and *resource management*. + +By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. + +With JobManager High Availability, you can run multiple JobManager instances per Flink cluster and thereby circumvent the *SPOF*. + +The general idea of JobManager high availability is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby. + +As an example, consider the following setup with three JobManager instances: + +img src=fig/jobmanager_ha_overview.png class=center / + +## Configuration + +To enable JobManager High Availability you have to configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts. + +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distirbuted coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. + +Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availability mode and all Flink components try to connect to a JobManager via coordination through ZooKeeper. + +- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. + + preha.zookeeper.quorum: address1:2181[,...],addressX:2181/pre + + Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + +- The following configuration keys are optional: + + - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination + - TODO Add client configuration keys + +## Starting an HA-cluster + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started. + + pre +jobManagerAddress1 +[...] +jobManagerAddressX + /pre + +After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start a HA-cluster. **Keep in mind that the ZooKeeper quorum has to be running when you call the scripts**. + +## Running ZooKeeper + +If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. + +There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: + +pre +server.X=addressX:peerPort:leaderPort +[...] +server.Y=addressY:peerPort:leaderPort +/pre + +The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired
[jira] [Commented] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join
[ https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616423#comment-14616423 ] ASF GitHub Bot commented on FLINK-2240: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/888#issuecomment-119136367 This is a very nice idea, thank you for the contribution! The numbers look quite encouraging. I need to look into this carefully, as it touches a very sensitive part of the system. It will probably take me a bit of time. Here are some initial comments: - The integration tests seem to be failing, this change apparently triggers a stack-overflow at some point. Have a look at the logs of the Travis CI build. - Can we add a flag to the hash-table, to enable/disable the bloom-filters? That would make it easier for future comparisons. - Could you include a standalone mini benchmark similar to the one you did where you posted the numbers here? A simple standalone Java executable that creates the hash table and feeds some generated records through it (with bloom filters activated and deactivated)? It would not start a full Flink cluster, but only test the HashJoin in isolation. We like to include some of those mini bechmarks for performance critical parts, and re-run them once in a while to determine how the performance behaves at that point. Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join Key: FLINK-2240 URL: https://issues.apache.org/jira/browse/FLINK-2240 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Assignee: Chengxiang Li Priority: Minor In Hybrid-Hash-Join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. If we build a BloomFilter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source
[ https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-2325: - Assignee: Robert Metzger PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source - Key: FLINK-2325 URL: https://issues.apache.org/jira/browse/FLINK-2325 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 0.9 Reporter: Rico Bergmann Assignee: Robert Metzger I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's why the number of partitions, that is read in the open(.) function is 0, which leads to arrays of length 0 (lastOffsets and committedOffsets). May be it is better to check, whether numberOfPartitions returns 0 and if so, to take the default number of partitions from Kafka config? Stacktrace: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2321) The seed for the SVM classifier is currently static
[ https://issues.apache.org/jira/browse/FLINK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616455#comment-14616455 ] ASF GitHub Bot commented on FLINK-2321: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/889#issuecomment-119151761 Good catch @thvasilo. LGTM. Will merge it with the next batch. The seed for the SVM classifier is currently static --- Key: FLINK-2321 URL: https://issues.apache.org/jira/browse/FLINK-2321 Project: Flink Issue Type: Bug Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.10 The seed for the SVM algorithm in FlinkML has a default value of 0, meaning that if it's not set, we always have the same seed when running the algorithm. What should happen instead is that if the seed is not set, it should be a random number. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)
[ https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616453#comment-14616453 ] ASF GitHub Bot commented on FLINK-2323: --- GitHub user gyfora opened a pull request: https://github.com/apache/flink/pull/890 [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) Refactor OperatorState interface for clearer method names You can merge this pull request into a Git repository by running: $ git pull https://github.com/gyfora/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/890.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #890 commit 9d807a6b7eef35bcbca8ef19aa6bf49b12f97dfd Author: Gyula Fora gyf...@apache.org Date: 2015-07-07T09:07:05Z [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) Rename OperatorState methods to .value() and .update(..) Key: FLINK-2323 URL: https://issues.apache.org/jira/browse/FLINK-2323 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor We should rename OperatorState methods to .value() and .update(..) from getState and updateState to make it more clear. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/871#discussion_r34025701 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( +scoringFct: (PredictionType, PredictionType) = Double) +(implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { +trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean() + } +} + +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) = +val sy1 = scala.math.signum(y1) +val sy2 = scala.math.signum(y2) +if (sy1 == sy2) 0 else 1 + }) with Loss + + /** Calculates the coefficient of determination, $R^2^$ +* +* $R^2^$ indicates how well the data fit the a calculated model +* Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] +*/ + def r2Score = new Score[Double] with PerformanceScore { +override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction = truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val ssRes = trueAndPredicted +.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue +.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) --- End diff -- Broadcast is more efficient than cross because it avoids repeated deserialization. So why not using `mapWithBcVariable`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/871#discussion_r34025726 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( +scoringFct: (PredictionType, PredictionType) = Double) +(implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { +trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean() + } +} + +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) = +val sy1 = scala.math.signum(y1) +val sy2 = scala.math.signum(y2) +if (sy1 == sy2) 0 else 1 + }) with Loss + + /** Calculates the coefficient of determination, $R^2^$ +* +* $R^2^$ indicates how well the data fit the a calculated model +* Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] +*/ + def r2Score = new Score[Double] with PerformanceScore { +override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction = truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val ssRes = trueAndPredicted +.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue +.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val r2 = ssRes.crossWithTiny(ssTot).map{resTot = --- End diff -- Same here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2157) Create evaluation framework for ML library
[ https://issues.apache.org/jira/browse/FLINK-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616467#comment-14616467 ] ASF GitHub Bot commented on FLINK-2157: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/871#discussion_r34025701 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( +scoringFct: (PredictionType, PredictionType) = Double) +(implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { +trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean() + } +} + +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) = +val sy1 = scala.math.signum(y1) +val sy2 = scala.math.signum(y2) +if (sy1 == sy2) 0 else 1 + }) with Loss + + /** Calculates the coefficient of determination, $R^2^$ +* +* $R^2^$ indicates how well the data fit the a calculated model +* Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] +*/ + def r2Score = new Score[Double] with PerformanceScore { +override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction = truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val ssRes = trueAndPredicted +.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue +.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) --- End diff -- Broadcast is more efficient than cross because it avoids repeated deserialization. So why not using `mapWithBcVariable`? Create evaluation framework for ML library -- Key: FLINK-2157 URL: https://issues.apache.org/jira/browse/FLINK-2157 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.10 Currently, FlinkML lacks means to
[jira] [Commented] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)
[ https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616503#comment-14616503 ] ASF GitHub Bot commented on FLINK-2323: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/890 Rename OperatorState methods to .value() and .update(..) Key: FLINK-2323 URL: https://issues.apache.org/jira/browse/FLINK-2323 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor We should rename OperatorState methods to .value() and .update(..) from getState and updateState to make it more clear. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2319) Collect size limitation due to akka.
[ https://issues.apache.org/jira/browse/FLINK-2319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas updated FLINK-2319: -- Issue Type: Bug (was: New Feature) Collect size limitation due to akka. Key: FLINK-2319 URL: https://issues.apache.org/jira/browse/FLINK-2319 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.10 Reporter: Kostas Each TaksManager keeps the results of a local task in a set of Accumulators. Upon termination of the task, the Accumulators are sent back to the JobManager, who merges them, before sending them back to the Client. To exchange the Accumulators and their content, akka is used. This limits the size of the output of a task to no more than akka.framesize bytes. In other case, akka would drop the message. This ticket is to propose the removal of this limitation so that results can be of arbitrary size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/890 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Collect(): Fixing the akka.framesize size limi...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/887#issuecomment-119137396 I think all in all, this looks good. We have another change by @mxm pending that changes the accumulators to do live updates during runtime. Since @mxm's change is more invasive, I would suggest that we wait for it to be merged before merging this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source
Rico Bergmann created FLINK-2325: Summary: PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source Key: FLINK-2325 URL: https://issues.apache.org/jira/browse/FLINK-2325 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 0.9 Reporter: Rico Bergmann I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's why the number of partitions, that is read in the open(.) function is 0, which leads to arrays of length 0 (lastOffsets and committedOffsets). May be it is better to check, whether numberOfPartitions returns 0 and if so, to take the default number of partitions from Kafka config? Stacktrace: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running
[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616469#comment-14616469 ] ASF GitHub Bot commented on FLINK-2292: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/887#issuecomment-119157639 Hi @kl0u, the tickets are https://issues.apache.org/jira/browse/FLINK-2292 and https://issues.apache.org/jira/browse/FLINK-1573. Report accumulators periodically while job is running - Key: FLINK-2292 URL: https://issues.apache.org/jira/browse/FLINK-2292 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 0.10 Accumulators should be sent periodically, as part of the heartbeat that sends metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/890#issuecomment-119164038 +2 Looks good, this is a simple rename :ski: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)
[ https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-2323. - Resolution: Fixed Rename OperatorState methods to .value() and .update(..) Key: FLINK-2323 URL: https://issues.apache.org/jira/browse/FLINK-2323 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor We should rename OperatorState methods to .value() and .update(..) from getState and updateState to make it more clear. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source
[ https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616446#comment-14616446 ] Robert Metzger commented on FLINK-2325: --- Thank you for reporting this. I'm currently reworking the KafkaSource anyways. PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source - Key: FLINK-2325 URL: https://issues.apache.org/jira/browse/FLINK-2325 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 0.9 Reporter: Rico Bergmann Assignee: Robert Metzger I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's why the number of partitions, that is read in the open(.) function is 0, which leads to arrays of length 0 (lastOffsets and committedOffsets). May be it is better to check, whether numberOfPartitions returns 0 and if so, to take the default number of partitions from Kafka config? Stacktrace: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...
GitHub user gyfora opened a pull request: https://github.com/apache/flink/pull/890 [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) Refactor OperatorState interface for clearer method names You can merge this pull request into a Git repository by running: $ git pull https://github.com/gyfora/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/890.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #890 commit 9d807a6b7eef35bcbca8ef19aa6bf49b12f97dfd Author: Gyula Fora gyf...@apache.org Date: 2015-07-07T09:07:05Z [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2321] [ml] The seed for the SVM classif...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/889#issuecomment-119151761 Good catch @thvasilo. LGTM. Will merge it with the next batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/696#issuecomment-119163944 @thvasilo Yeah exact k-NN is not scalable for gigabytes-sized, terabytes-sized data. If I add R-Tree to this algorithm, the algorithm would be better. But I agree that we need voting or discussion about this. I am also interested in approximate k-NN but we should check progress to the assigned contributor. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616668#comment-14616668 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34037018 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper config file and writes + * the required 'myid' file before starting the peer. + */ +public class FlinkZooKeeperQuorumPeer { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class); + + public static void main(String[] args) { + try { + final ParameterTool params = ParameterTool.fromArgs(args); --- End diff -- JobManager and TaskManager log environment information. That helps a lot in debugging setups. This should log the environment info as well, via `EnvironmentInformation.logEnvironmentInfo(LOG, FlinkZooKeeperQuorumPeer, args)`. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2324) Rework partitioned state storage
[ https://issues.apache.org/jira/browse/FLINK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616675#comment-14616675 ] Paris Carbone commented on FLINK-2324: -- We cannot avoid a key-value store to make this efficient imho. There are several levelDB backed distributed key value stores we can use to persist and retrieve partitioned state. There is also HBase though if we want to stick to HDFS for this... Any other opinions? Rework partitioned state storage Key: FLINK-2324 URL: https://issues.apache.org/jira/browse/FLINK-2324 Project: Flink Issue Type: Improvement Reporter: Gyula Fora Assignee: Gyula Fora Partitioned states are currently stored per-key in statehandles. This is alright for in-memory storage but is very inefficient for HDFS. The logic behind the current mechanism is that this approach provides a way to repartition a state without fetching the data from the external storage and only manipulating handles. We should come up with a solution that can achieve both. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616663#comment-14616663 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34036874 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper config file and writes + * the required 'myid' file before starting the peer. + */ +public class FlinkZooKeeperQuorumPeer { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class); + + public static void main(String[] args) { + try { + final ParameterTool params = ParameterTool.fromArgs(args); + final String zkConfigFile = params.getRequired(zkConfigFile); + final int peerId = params.getInt(peerId); + + // Run quorum peer + runFlinkZkQuorumPeer(zkConfigFile, peerId); + } + catch (Throwable t) { + t.printStackTrace(); --- End diff -- This should probably show up in the log as well. A lot of people do not check the `.out` files. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616699#comment-14616699 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34038403 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -993,9 +1000,34 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + /..) } -val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) -val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) +// HA mode +val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is + // chosen. For the FlinkMiniCluster you have to choose it on your own. + LOG.info(HA mode.) --- End diff -- How about we make this a bit more prominent ;-) Staring JobManager in High Availability Mode. The other case should also have a log message, for example Starting JobManager without High Availability. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34038430 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1435,6 +1435,11 @@ object TaskManager { // start the I/O manager last, it will create some temp directories. val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths) +if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann Setup leader retrieval service + LOG.info(HA mode.) --- End diff -- See above... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616701#comment-14616701 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-119202663 Looks like a good piece of work! Can we actually get into ZooKeeper version conflicts here? For example, the Kafka connector needs a certain Zookeeper version. Can it conflict with our Zookeeper version? Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34038257 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -993,9 +1000,34 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + /..) } -val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) -val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) +// HA mode +val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is + // chosen. For the FlinkMiniCluster you have to choose it on your own. + LOG.info(HA mode.) + + if (config.getHost == null) { --- End diff -- I think we should call all Java methods with parenthesis. They are defined with parenthesis after all, even if Scala allows you to treat them as if they were not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616695#comment-14616695 ] ASF GitHub Bot commented on FLINK-2288: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34038257 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -993,9 +1000,34 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + /..) } -val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) -val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) +// HA mode +val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is + // chosen. For the FlinkMiniCluster you have to choose it on your own. + LOG.info(HA mode.) + + if (config.getHost == null) { --- End diff -- I think we should call all Java methods with parenthesis. They are defined with parenthesis after all, even if Scala allows you to treat them as if they were not. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34036874 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper config file and writes + * the required 'myid' file before starting the peer. + */ +public class FlinkZooKeeperQuorumPeer { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class); + + public static void main(String[] args) { + try { + final ParameterTool params = ParameterTool.fromArgs(args); + final String zkConfigFile = params.getRequired(zkConfigFile); + final int peerId = params.getInt(peerId); + + // Run quorum peer + runFlinkZkQuorumPeer(zkConfigFile, peerId); + } + catch (Throwable t) { + t.printStackTrace(); --- End diff -- This should probably show up in the log as well. A lot of people do not check the `.out` files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34037018 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java --- @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper config file and writes + * the required 'myid' file before starting the peer. + */ +public class FlinkZooKeeperQuorumPeer { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class); + + public static void main(String[] args) { + try { + final ParameterTool params = ParameterTool.fromArgs(args); --- End diff -- JobManager and TaskManager log environment information. That helps a lot in debugging setups. This should log the environment info as well, via `EnvironmentInformation.logEnvironmentInfo(LOG, FlinkZooKeeperQuorumPeer, args)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/886#issuecomment-119202663 Looks like a good piece of work! Can we actually get into ZooKeeper version conflicts here? For example, the Kafka connector needs a certain Zookeeper version. Can it conflict with our Zookeeper version? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination
[ https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616782#comment-14616782 ] ASF GitHub Bot commented on FLINK-2288: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/886#discussion_r34045870 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -993,9 +1000,34 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + /..) } -val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) -val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) +// HA mode +val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is + // chosen. For the FlinkMiniCluster you have to choose it on your own. + LOG.info(HA mode.) + + if (config.getHost == null) { --- End diff -- In Scala it is actually recommended to call methods without parenthesis which don't have side effects. This is due to the uniform access principle which does not treat variable accesses and pure function calls differently. Calling methods based on this principle is IMHO easier to understand than based on which language the object was implemented. That would require that everyone knows in which language an object is implemented which does not really scale. Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2326) Mutitenancy on Yarn
LINTE created FLINK-2326: Summary: Mutitenancy on Yarn Key: FLINK-2326 URL: https://issues.apache.org/jira/browse/FLINK-2326 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 0.9 Environment: Centos 6.6 Hadoop 2.7 secured with kerberos Flink 0.9 Reporter: LINTE When a user launch a Flink cluster on yarn, i .yarn-properties file is created in the conf directory. In multiuser environnement the configuration directory is read only. Even with write permission for all user on conf directory, then just on user can run a Flink cluster. Regards -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2242] deprecate RemoteCollector interfa...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/852#issuecomment-119228088 Looks good, merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---