[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14617040#comment-14617040
 ] 

ASF GitHub Bot commented on FLINK-2200:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/885#discussion_r34067611
  
--- Diff: docs/apis/programming_guide.md ---
@@ -187,7 +187,17 @@ that creates the type information for Flink operations.
 /div
 /div
 
+ Scala Dependency Versions
 
+Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we 
provide multiple artifacts
+to support both Scala versions. If you want to run your program on Flink 
with Scala 2.11, you need
+to add a suffix `_2.11` to all Flink artifact ids in your dependencies. 
You should be careful with
+this difference of artifact id. All modules with Scala 2.11 have a suffix 
`_2.11` in artifact id.
+For example, `flink-java` should be changed to `flink-java_2.11` and 
`flink-clients` should be
+changed to `flink-clients_2.11`.
--- End diff --

@aalexandrov I'm not sure it is good to use the keyword Scala-dependent. 
We cross-build all modules including java-based but linked with Scala modules. 
I think that omitting Scala-dependent is much better.


 Flink API with Scala 2.11 - Maven Repository
 

 Key: FLINK-2200
 URL: https://issues.apache.org/jira/browse/FLINK-2200
 Project: Flink
  Issue Type: Wish
  Components: Build System, Scala API
Reporter: Philipp Götze
Assignee: Chiwan Park
Priority: Trivial
  Labels: maven

 It would be nice if you could upload a pre-built version of the Flink API 
 with Scala 2.11 to the maven repository.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2200] Add Flink with Scala 2.11 in Mave...

2015-07-07 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/885#discussion_r34067611
  
--- Diff: docs/apis/programming_guide.md ---
@@ -187,7 +187,17 @@ that creates the type information for Flink operations.
 /div
 /div
 
+ Scala Dependency Versions
 
+Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we 
provide multiple artifacts
+to support both Scala versions. If you want to run your program on Flink 
with Scala 2.11, you need
+to add a suffix `_2.11` to all Flink artifact ids in your dependencies. 
You should be careful with
+this difference of artifact id. All modules with Scala 2.11 have a suffix 
`_2.11` in artifact id.
+For example, `flink-java` should be changed to `flink-java_2.11` and 
`flink-clients` should be
+changed to `flink-clients_2.11`.
--- End diff --

@aalexandrov I'm not sure it is good to use the keyword Scala-dependent. 
We cross-build all modules including java-based but linked with Scala modules. 
I think that omitting Scala-dependent is much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-07-07 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/696#discussion_r34017778
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSetUtils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.{DistanceMetric, 
EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * This algorithm calculates `k` nearest neighbor points in training set 
for each points of
+  * testing set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.classification.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]]
+  * Sets the distance metric to calculate distance between two points. If 
no metric is specified,
+  * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] 
is used. (Default value:
+  * '''EuclideanDistanceMetric()''')
+  *
+  */
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[Vector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k  1, K must be positive.)
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/
+  def setDistanceMetric(metric: DistanceMetric): KNN = {
+parameters.add(DistanceMetric, metric)
+this
+  }
+
+  /** Sets the number of data blocks/partitions
+* @param n the number of data blocks
+*/
+  def setBlocks(n: Int): KNN = {
+require(n  1, Number of blocks must be positive.)
+parameters.add(Blocks, n)
+this
+  }
+}
+
+object KNN {
+
+  case object K extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  case object DistanceMetric extends Parameter[DistanceMetric] {
+val defaultValue: Option[DistanceMetric] = 
Some(EuclideanDistanceMetric())
+  }
+
+  case object Blocks extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  def apply(): KNN = {
+new KNN()
+  }
+
+  /** [[FitOperation]] which trains a KNN based on the given training data 
set.
+* @tparam T Subtype of [[Vector]]
+*/
+  implicit def fitKNN[T : Vector : TypeInformation] = new 
FitOperation[KNN, T] {

[jira] [Commented] (FLINK-2208) Build error for Java IBM

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616397#comment-14616397
 ] 

ASF GitHub Bot commented on FLINK-2208:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/833#issuecomment-119133444
  
Is anything happening with this pull request?

If you do not want  to update it any more, could you close it?


 Build error for Java IBM
 

 Key: FLINK-2208
 URL: https://issues.apache.org/jira/browse/FLINK-2208
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 0.9
Reporter: Felix Neutatz
Assignee: Felix Neutatz
Priority: Minor

 Using IBM Java 7 will break the built:
 {code:xml}
 [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ 
 flink-runtime ---
 [INFO] 
 /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/java:-1: info: 
 compiling
 [INFO] 
 /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala:-1: 
 info: compiling
 [INFO] Compiling 461 source files to 
 /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/target/classes at 
 1434059956648
 [ERROR] 
 /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1768:
  error: type OperatingSystemMXBean is not a member of package 
 com.sun.management
 [ERROR] asInstanceOf[com.sun.management.OperatingSystemMXBean]).
 [ERROR] ^
 [ERROR] 
 /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1787:
  error: type OperatingSystemMXBean is not a member of package 
 com.sun.management
 [ERROR] val methodsList = 
 classOf[com.sun.management.OperatingSystemMXBean].getMethods()
 [ERROR]  ^
 [ERROR] two errors found
 [INFO] 
 
 [INFO] Reactor Summary:
 [INFO] 
 [INFO] flink .. SUCCESS [ 14.447 
 s]
 [INFO] flink-shaded-hadoop  SUCCESS [  2.548 
 s]
 [INFO] flink-shaded-include-yarn .. SUCCESS [ 36.122 
 s]
 [INFO] flink-shaded-include-yarn-tests  SUCCESS [ 36.980 
 s]
 [INFO] flink-core . SUCCESS [ 21.887 
 s]
 [INFO] flink-java . SUCCESS [ 16.023 
 s]
 [INFO] flink-runtime .. FAILURE [ 20.241 
 s]
 [INFO] flink-optimizer  SKIPPED
 [hadoop@ibm-power-1 /]$ java -version
 java version 1.7.0
 Java(TM) SE Runtime Environment (build pxp6470_27sr1fp1-20140708_01(SR1 FP1))
 IBM J9 VM (build 2.7, JRE 1.7.0 Linux ppc64-64 Compressed References 
 20140707_205525 (JIT enabled, AOT enabled)
 J9VM - R27_Java727_SR1_20140707_1408_B205525
 JIT  - tr.r13.java_20140410_61421.07
 GC   - R27_Java727_SR1_20140707_1408_B205525_CMPRSS
 J9CL - 20140707_205525)
 JCL - 20140707_01 based on Oracle 7u65-b16
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616367#comment-14616367
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/696#discussion_r34017778
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSetUtils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.{DistanceMetric, 
EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * This algorithm calculates `k` nearest neighbor points in training set 
for each points of
+  * testing set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.classification.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]]
+  * Sets the distance metric to calculate distance between two points. If 
no metric is specified,
+  * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] 
is used. (Default value:
+  * '''EuclideanDistanceMetric()''')
+  *
+  */
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[Vector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k  1, K must be positive.)
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/
+  def setDistanceMetric(metric: DistanceMetric): KNN = {
+parameters.add(DistanceMetric, metric)
+this
+  }
+
+  /** Sets the number of data blocks/partitions
+* @param n the number of data blocks
+*/
+  def setBlocks(n: Int): KNN = {
+require(n  1, Number of blocks must be positive.)
+parameters.add(Blocks, n)
+this
+  }
+}
+
+object KNN {
+
+  case object K extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  case object DistanceMetric extends Parameter[DistanceMetric] {
+val defaultValue: Option[DistanceMetric] = 
Some(EuclideanDistanceMetric())
+  }
+
+  case object Blocks extends Parameter[Int] {
+val defaultValue: Option[Int] = None
+  }
+
+  def apply(): KNN = {
+

[jira] [Created] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)

2015-07-07 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-2323:
-

 Summary: Rename OperatorState methods to .value() and .update(..)
 Key: FLINK-2323
 URL: https://issues.apache.org/jira/browse/FLINK-2323
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor


We should rename OperatorState methods to .value() and .update(..) from 
getState and updateState to make it more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2324) Rework partitioned state storage

2015-07-07 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-2324:
-

 Summary: Rework partitioned state storage
 Key: FLINK-2324
 URL: https://issues.apache.org/jira/browse/FLINK-2324
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora


Partitioned states are currently stored per-key in statehandles. This is 
alright for in-memory storage but is very inefficient for HDFS. 

The logic behind the current mechanism is that this approach provides a way to 
repartition a state without fetching the data from the external storage and 
only manipulating handles.

We should come up with a solution that can achieve both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2180] [streaming] Iteration test fix

2015-07-07 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/802#issuecomment-119321451
  
The main purpose of this is to enable a deterministic (non-time based) test 
for iteration as it sometimes failed for some reason on Travis. (See [here] 
(https://issues.apache.org/jira/browse/FLINK-2180?focusedCommentId=14576968page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14576968)).
 I thought it might also be useful to users for debugging purposes (similarly 
to maxWaitTime) and it was also easier to introduce it in the API level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...

2015-07-07 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/871#discussion_r34032155
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Takes a whole data set and then computes the evaluation score on them 
(obviously, again encoded
+ * in a DataSet)
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double]
+}
+
+/** Traits to allow us to determine at runtime if a Score is a loss (lower 
is better) or a
+  * performance score (higher is better)
+  */
+trait Loss
+
+trait PerformanceScore
+
+/**
+ * Metrics expressible as a mean of a function taking output pairs as input
+ *
+ * @param scoringFct function to apply to all elements
+ * @tparam PredictionType output type
+ */
+abstract class MeanScore[PredictionType: TypeInformation: ClassTag](
+scoringFct: (PredictionType, PredictionType) = Double)
+(implicit yyt: TypeInformation[(PredictionType, PredictionType)])
+  extends Score[PredictionType] with Serializable {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double] = {
+trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean()
+  }
+}
+
+object RegressionScores {
+  /**
+   * Squared loss function
+   *
+   * returns (y1 - y2)'
+   *
+   * @return a Loss object
+   */
+  def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - 
y2)) with Loss
+
+  /**
+   * Zero One Loss Function also usable for score information
+   *
+   * returns 1 if sign of outputs differ and 0 if the signs are equal
+   *
+   * @return a Loss object
+   */
+  def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) =
+val sy1 = scala.math.signum(y1)
+val sy2 = scala.math.signum(y2)
+if (sy1 == sy2) 0 else 1
+  }) with Loss
+
+  /** Calculates the coefficient of determination, $R^2^$
+*
+* $R^2^$ indicates how well the data fit the a calculated model
+* Reference: 
[[http://en.wikipedia.org/wiki/Coefficient_of_determination]]
+*/
+  def r2Score = new Score[Double] with PerformanceScore {
+override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): 
DataSet[Double] = {
+  val onlyTrue = trueAndPredicted.map(truthPrediction = 
truthPrediction._1)
+  val meanTruth = onlyTrue.mean()
+
+  val ssRes = trueAndPredicted
+.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _)
+  val ssTot = onlyTrue
+.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - 
tp._2)).reduce(_ + _)
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616639#comment-14616639
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34035723
  
--- Diff: docs/setup/jobmanager_high_availability.md ---
@@ -0,0 +1,121 @@
+---
+title: JobManager High Availability (HA)
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+The JobManager is the coordinator of each Flink deployment. It is 
responsible for both *scheduling* and *resource management*.
+
+By default, there is a single JobManager instance per Flink cluster. This 
creates a *single point of failure* (SPOF): if the JobManager crashes, no new 
programs can be submitted and running programs fail.
+
+With JobManager High Availability, you can run multiple JobManager 
instances per Flink cluster and thereby circumvent the *SPOF*.
+
+The general idea of JobManager high availability is that there is a 
**single leading JobManager** at any time and **multiple standby JobManagers** 
to take over leadership in case the leader fails. This guarantees that there is 
**no single point of failure** and programs can make progress as soon as a 
standby JobManager has taken leadership. There is no explicit distinction 
between standby and master JobManager instances. Each JobManager can take the 
role of master or standby.
+
+As an example, consider the following setup with three JobManager 
instances:
+
+img src=fig/jobmanager_ha_overview.png class=center /
+
+## Configuration
+
+To enable JobManager High Availability you have to configure a **ZooKeeper 
quorum** and set up a **masters file** with all JobManagers hosts.
+
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  
*distributed coordination* between all running JobManager instances. ZooKeeper 
is a separate service from Flink, which provides highly reliable distirbuted 
coordination via leader election and light-weight consistent state storage. 
Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more 
information about ZooKeeper.
+
+Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high 
availability mode and all Flink components try to connect to a JobManager via 
coordination through ZooKeeper.
+
+- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated 
group of ZooKeeper servers, which provide the distributed coordination service.
+  
+  preha.zookeeper.quorum: address1:2181[,...],addressX:2181/pre
+
+  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+
+- The following configuration keys are optional:
+
+  - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for 
coordination
+  - TODO Add client configuration keys
+
+## Starting an HA-cluster
+
+In order to start an HA-cluster configure the *masters* file in 
`conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started.
+
+  pre
+jobManagerAddress1
+[...]
+jobManagerAddressX
+  /pre
+
+After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start a HA-cluster. **Keep 
in mind that the ZooKeeper quorum has to be running when you call the scripts**.
+
+## Running ZooKeeper
+
+If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
+
+There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can 
configure the hosts to run ZooKeeper on with the `server.X` entries, where X is 
a unique ID of each server:
+
+pre
+server.X=addressX:peerPort:leaderPort
+[...]
+server.Y=addressY:peerPort:leaderPort
+/pre
+
+The script 

[jira] [Commented] (FLINK-2157) Create evaluation framework for ML library

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616580#comment-14616580
 ] 

ASF GitHub Bot commented on FLINK-2157:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/871#discussion_r34032155
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Takes a whole data set and then computes the evaluation score on them 
(obviously, again encoded
+ * in a DataSet)
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double]
+}
+
+/** Traits to allow us to determine at runtime if a Score is a loss (lower 
is better) or a
+  * performance score (higher is better)
+  */
+trait Loss
+
+trait PerformanceScore
+
+/**
+ * Metrics expressible as a mean of a function taking output pairs as input
+ *
+ * @param scoringFct function to apply to all elements
+ * @tparam PredictionType output type
+ */
+abstract class MeanScore[PredictionType: TypeInformation: ClassTag](
+scoringFct: (PredictionType, PredictionType) = Double)
+(implicit yyt: TypeInformation[(PredictionType, PredictionType)])
+  extends Score[PredictionType] with Serializable {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double] = {
+trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean()
+  }
+}
+
+object RegressionScores {
+  /**
+   * Squared loss function
+   *
+   * returns (y1 - y2)'
+   *
+   * @return a Loss object
+   */
+  def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - 
y2)) with Loss
+
+  /**
+   * Zero One Loss Function also usable for score information
+   *
+   * returns 1 if sign of outputs differ and 0 if the signs are equal
+   *
+   * @return a Loss object
+   */
+  def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) =
+val sy1 = scala.math.signum(y1)
+val sy2 = scala.math.signum(y2)
+if (sy1 == sy2) 0 else 1
+  }) with Loss
+
+  /** Calculates the coefficient of determination, $R^2^$
+*
+* $R^2^$ indicates how well the data fit the a calculated model
+* Reference: 
[[http://en.wikipedia.org/wiki/Coefficient_of_determination]]
+*/
+  def r2Score = new Score[Double] with PerformanceScore {
+override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): 
DataSet[Double] = {
+  val onlyTrue = trueAndPredicted.map(truthPrediction = 
truthPrediction._1)
+  val meanTruth = onlyTrue.mean()
+
+  val ssRes = trueAndPredicted
+.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _)
+  val ssTot = onlyTrue
+.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - 
tp._2)).reduce(_ + _)
--- End diff --

Will do.


 Create evaluation framework for ML library
 --

 Key: FLINK-2157
 URL: https://issues.apache.org/jira/browse/FLINK-2157
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.10


 Currently, FlinkML lacks means to evaluate the performance of trained models. 
 It would be great to add some {{Evaluators}} which can calculate some score 

[jira] [Assigned] (FLINK-1943) Add Gelly-GSA compiler and translation tests

2015-07-07 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri reassigned FLINK-1943:


Assignee: Vasia Kalavri

 Add Gelly-GSA compiler and translation tests
 

 Key: FLINK-1943
 URL: https://issues.apache.org/jira/browse/FLINK-1943
 Project: Flink
  Issue Type: Test
  Components: Gelly
Affects Versions: 0.9
Reporter: Vasia Kalavri
Assignee: Vasia Kalavri

 These should be similar to the corresponding Spargel tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Collect(): Fixing the akka.framesize size limi...

2015-07-07 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/887#issuecomment-119140624
  
Ok, sounds good! 
Could you give the number of the ticket of the changes @mxm is doing? 
Just to have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Hits

2015-07-07 Thread mfahimazizi
Github user mfahimazizi commented on the pull request:

https://github.com/apache/flink/pull/765#issuecomment-119188834
  
Hi @vasia,
sorry for late reply.
I set indentation with tabs instead of spaces in IDE.
and push new updated files.
as I discuss in jira issue about vertexupdate function, where we couldn't 
access edges inside this function. if we overload this function with following, 
then we can identify Hub and Authority edges in undirected graph.
```public void updateVertex(VertexK, Double vertex, 
MessageIteratorEdgeLong,String,Double inMessages)```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34035723
  
--- Diff: docs/setup/jobmanager_high_availability.md ---
@@ -0,0 +1,121 @@
+---
+title: JobManager High Availability (HA)
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+The JobManager is the coordinator of each Flink deployment. It is 
responsible for both *scheduling* and *resource management*.
+
+By default, there is a single JobManager instance per Flink cluster. This 
creates a *single point of failure* (SPOF): if the JobManager crashes, no new 
programs can be submitted and running programs fail.
+
+With JobManager High Availability, you can run multiple JobManager 
instances per Flink cluster and thereby circumvent the *SPOF*.
+
+The general idea of JobManager high availability is that there is a 
**single leading JobManager** at any time and **multiple standby JobManagers** 
to take over leadership in case the leader fails. This guarantees that there is 
**no single point of failure** and programs can make progress as soon as a 
standby JobManager has taken leadership. There is no explicit distinction 
between standby and master JobManager instances. Each JobManager can take the 
role of master or standby.
+
+As an example, consider the following setup with three JobManager 
instances:
+
+img src=fig/jobmanager_ha_overview.png class=center /
+
+## Configuration
+
+To enable JobManager High Availability you have to configure a **ZooKeeper 
quorum** and set up a **masters file** with all JobManagers hosts.
+
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  
*distributed coordination* between all running JobManager instances. ZooKeeper 
is a separate service from Flink, which provides highly reliable distirbuted 
coordination via leader election and light-weight consistent state storage. 
Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more 
information about ZooKeeper.
+
+Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high 
availability mode and all Flink components try to connect to a JobManager via 
coordination through ZooKeeper.
+
+- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated 
group of ZooKeeper servers, which provide the distributed coordination service.
+  
+  preha.zookeeper.quorum: address1:2181[,...],addressX:2181/pre
+
+  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+
+- The following configuration keys are optional:
+
+  - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for 
coordination
+  - TODO Add client configuration keys
+
+## Starting an HA-cluster
+
+In order to start an HA-cluster configure the *masters* file in 
`conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started.
+
+  pre
+jobManagerAddress1
+[...]
+jobManagerAddressX
+  /pre
+
+After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start a HA-cluster. **Keep 
in mind that the ZooKeeper quorum has to be running when you call the scripts**.
+
+## Running ZooKeeper
+
+If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
+
+There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can 
configure the hosts to run ZooKeeper on with the `server.X` entries, where X is 
a unique ID of each server:
+
+pre
+server.X=addressX:peerPort:leaderPort
+[...]
+server.Y=addressY:peerPort:leaderPort
+/pre
+
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server 
on each of the configured hosts. The started processes start ZooKeeper servers 
via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and 
makes sure to set some rqeuired 

[jira] [Commented] (FLINK-2240) Use BloomFilter to minimize probe side records which are spilled to disk in Hybrid-Hash-Join

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616423#comment-14616423
 ] 

ASF GitHub Bot commented on FLINK-2240:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/888#issuecomment-119136367
  
This is a very nice idea, thank you for the contribution! The numbers look 
quite encouraging.

I need to look into this carefully, as it touches a very sensitive part of 
the system. It will probably take me a bit of time.

Here are some initial comments:

  - The integration tests seem to be failing, this change apparently 
triggers a stack-overflow at some point. Have a look at the logs of the Travis 
CI build.

  - Can we add a flag to the hash-table, to enable/disable the 
bloom-filters? That would make it easier for future comparisons.

  - Could you include a standalone mini benchmark similar to the one you 
did where you posted the numbers here? A simple standalone Java executable that 
creates the hash table and feeds some generated records through it (with bloom 
filters activated and deactivated)? It would not start a full Flink cluster, 
but only test the HashJoin in isolation.
We like to include some of those mini bechmarks for performance critical 
parts, and re-run them once in a while to determine how the performance behaves 
at that point.


 Use BloomFilter to minimize probe side records which are spilled to disk in 
 Hybrid-Hash-Join
 

 Key: FLINK-2240
 URL: https://issues.apache.org/jira/browse/FLINK-2240
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Assignee: Chengxiang Li
Priority: Minor

 In Hybrid-Hash-Join, while small table does not fit into memory, part of the 
 small table data would be spilled to disk, and the counterpart partition of 
 big table data would be spilled to disk in probe phase as well. If we build a 
 BloomFilter while spill small table to disk during build phase, and use it to 
 filter the big table records which tend to be spilled to disk, this may 
 greatly  reduce the spilled big table file size, and saved the disk IO cost 
 for writing and further reading.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source

2015-07-07 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-2325:
-

Assignee: Robert Metzger

 PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a 
 topic that is created after starting the Source
 -

 Key: FLINK-2325
 URL: https://issues.apache.org/jira/browse/FLINK-2325
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 0.9
Reporter: Rico Bergmann
Assignee: Robert Metzger

 I'm creating a PersistentKafkaSource reading from a specified topic from 
 Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) 
 not yet present. That's why the number of partitions, that is read in the 
 open(.) function is 0, which leads to arrays of length 0 (lastOffsets and 
 committedOffsets).
 May be it is better to check, whether numberOfPartitions returns 0 and if so, 
 to take the default number of partitions from Kafka config?
 Stacktrace:
 java.lang.ArrayIndexOutOfBoundsException: 0
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
   at 
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2321) The seed for the SVM classifier is currently static

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616455#comment-14616455
 ] 

ASF GitHub Bot commented on FLINK-2321:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/889#issuecomment-119151761
  
Good catch @thvasilo. LGTM. Will merge it with the next batch.


 The seed for the SVM classifier is currently static
 ---

 Key: FLINK-2321
 URL: https://issues.apache.org/jira/browse/FLINK-2321
 Project: Flink
  Issue Type: Bug
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.10


 The seed for the SVM algorithm in FlinkML has a default value of 0, meaning 
 that if it's not set, we always have the same seed when running the algorithm.
 What should happen instead is that if the seed is not set, it should be a 
 random number.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616453#comment-14616453
 ] 

ASF GitHub Bot commented on FLINK-2323:
---

GitHub user gyfora opened a pull request:

https://github.com/apache/flink/pull/890

[FLINK-2323] [api-breaking] Rename OperatorState interface methods to 
value() and update(..)

Refactor OperatorState interface for clearer method names

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gyfora/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/890.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #890


commit 9d807a6b7eef35bcbca8ef19aa6bf49b12f97dfd
Author: Gyula Fora gyf...@apache.org
Date:   2015-07-07T09:07:05Z

[FLINK-2323] [api-breaking] Rename OperatorState interface methods to 
value() and update(..)




 Rename OperatorState methods to .value() and .update(..)
 

 Key: FLINK-2323
 URL: https://issues.apache.org/jira/browse/FLINK-2323
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 We should rename OperatorState methods to .value() and .update(..) from 
 getState and updateState to make it more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...

2015-07-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/871#discussion_r34025701
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Takes a whole data set and then computes the evaluation score on them 
(obviously, again encoded
+ * in a DataSet)
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double]
+}
+
+/** Traits to allow us to determine at runtime if a Score is a loss (lower 
is better) or a
+  * performance score (higher is better)
+  */
+trait Loss
+
+trait PerformanceScore
+
+/**
+ * Metrics expressible as a mean of a function taking output pairs as input
+ *
+ * @param scoringFct function to apply to all elements
+ * @tparam PredictionType output type
+ */
+abstract class MeanScore[PredictionType: TypeInformation: ClassTag](
+scoringFct: (PredictionType, PredictionType) = Double)
+(implicit yyt: TypeInformation[(PredictionType, PredictionType)])
+  extends Score[PredictionType] with Serializable {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double] = {
+trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean()
+  }
+}
+
+object RegressionScores {
+  /**
+   * Squared loss function
+   *
+   * returns (y1 - y2)'
+   *
+   * @return a Loss object
+   */
+  def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - 
y2)) with Loss
+
+  /**
+   * Zero One Loss Function also usable for score information
+   *
+   * returns 1 if sign of outputs differ and 0 if the signs are equal
+   *
+   * @return a Loss object
+   */
+  def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) =
+val sy1 = scala.math.signum(y1)
+val sy2 = scala.math.signum(y2)
+if (sy1 == sy2) 0 else 1
+  }) with Loss
+
+  /** Calculates the coefficient of determination, $R^2^$
+*
+* $R^2^$ indicates how well the data fit the a calculated model
+* Reference: 
[[http://en.wikipedia.org/wiki/Coefficient_of_determination]]
+*/
+  def r2Score = new Score[Double] with PerformanceScore {
+override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): 
DataSet[Double] = {
+  val onlyTrue = trueAndPredicted.map(truthPrediction = 
truthPrediction._1)
+  val meanTruth = onlyTrue.mean()
+
+  val ssRes = trueAndPredicted
+.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _)
+  val ssTot = onlyTrue
+.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - 
tp._2)).reduce(_ + _)
--- End diff --

Broadcast is more efficient than cross because it avoids repeated 
deserialization. So why not using `mapWithBcVariable`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] [WIP] Create evaluation fram...

2015-07-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/871#discussion_r34025726
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Takes a whole data set and then computes the evaluation score on them 
(obviously, again encoded
+ * in a DataSet)
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double]
+}
+
+/** Traits to allow us to determine at runtime if a Score is a loss (lower 
is better) or a
+  * performance score (higher is better)
+  */
+trait Loss
+
+trait PerformanceScore
+
+/**
+ * Metrics expressible as a mean of a function taking output pairs as input
+ *
+ * @param scoringFct function to apply to all elements
+ * @tparam PredictionType output type
+ */
+abstract class MeanScore[PredictionType: TypeInformation: ClassTag](
+scoringFct: (PredictionType, PredictionType) = Double)
+(implicit yyt: TypeInformation[(PredictionType, PredictionType)])
+  extends Score[PredictionType] with Serializable {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double] = {
+trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean()
+  }
+}
+
+object RegressionScores {
+  /**
+   * Squared loss function
+   *
+   * returns (y1 - y2)'
+   *
+   * @return a Loss object
+   */
+  def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - 
y2)) with Loss
+
+  /**
+   * Zero One Loss Function also usable for score information
+   *
+   * returns 1 if sign of outputs differ and 0 if the signs are equal
+   *
+   * @return a Loss object
+   */
+  def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) =
+val sy1 = scala.math.signum(y1)
+val sy2 = scala.math.signum(y2)
+if (sy1 == sy2) 0 else 1
+  }) with Loss
+
+  /** Calculates the coefficient of determination, $R^2^$
+*
+* $R^2^$ indicates how well the data fit the a calculated model
+* Reference: 
[[http://en.wikipedia.org/wiki/Coefficient_of_determination]]
+*/
+  def r2Score = new Score[Double] with PerformanceScore {
+override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): 
DataSet[Double] = {
+  val onlyTrue = trueAndPredicted.map(truthPrediction = 
truthPrediction._1)
+  val meanTruth = onlyTrue.mean()
+
+  val ssRes = trueAndPredicted
+.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _)
+  val ssTot = onlyTrue
+.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - 
tp._2)).reduce(_ + _)
+  val r2 = ssRes.crossWithTiny(ssTot).map{resTot =
--- End diff --

Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2157) Create evaluation framework for ML library

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616467#comment-14616467
 ] 

ASF GitHub Bot commented on FLINK-2157:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/871#discussion_r34025701
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Takes a whole data set and then computes the evaluation score on them 
(obviously, again encoded
+ * in a DataSet)
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double]
+}
+
+/** Traits to allow us to determine at runtime if a Score is a loss (lower 
is better) or a
+  * performance score (higher is better)
+  */
+trait Loss
+
+trait PerformanceScore
+
+/**
+ * Metrics expressible as a mean of a function taking output pairs as input
+ *
+ * @param scoringFct function to apply to all elements
+ * @tparam PredictionType output type
+ */
+abstract class MeanScore[PredictionType: TypeInformation: ClassTag](
+scoringFct: (PredictionType, PredictionType) = Double)
+(implicit yyt: TypeInformation[(PredictionType, PredictionType)])
+  extends Score[PredictionType] with Serializable {
+  def evaluate(trueAndPredicted: DataSet[(PredictionType, 
PredictionType)]): DataSet[Double] = {
+trueAndPredicted.map(yy = scoringFct(yy._1, yy._2)).mean()
+  }
+}
+
+object RegressionScores {
+  /**
+   * Squared loss function
+   *
+   * returns (y1 - y2)'
+   *
+   * @return a Loss object
+   */
+  def squaredLoss = new MeanScore[Double]((y1,y2) = (y1 - y2) * (y1 - 
y2)) with Loss
+
+  /**
+   * Zero One Loss Function also usable for score information
+   *
+   * returns 1 if sign of outputs differ and 0 if the signs are equal
+   *
+   * @return a Loss object
+   */
+  def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) =
+val sy1 = scala.math.signum(y1)
+val sy2 = scala.math.signum(y2)
+if (sy1 == sy2) 0 else 1
+  }) with Loss
+
+  /** Calculates the coefficient of determination, $R^2^$
+*
+* $R^2^$ indicates how well the data fit the a calculated model
+* Reference: 
[[http://en.wikipedia.org/wiki/Coefficient_of_determination]]
+*/
+  def r2Score = new Score[Double] with PerformanceScore {
+override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): 
DataSet[Double] = {
+  val onlyTrue = trueAndPredicted.map(truthPrediction = 
truthPrediction._1)
+  val meanTruth = onlyTrue.mean()
+
+  val ssRes = trueAndPredicted
+.map(tp = (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _)
+  val ssTot = onlyTrue
+.crossWithTiny(meanTruth).map(tp = (tp._1 - tp._2) * (tp._1 - 
tp._2)).reduce(_ + _)
--- End diff --

Broadcast is more efficient than cross because it avoids repeated 
deserialization. So why not using `mapWithBcVariable`?


 Create evaluation framework for ML library
 --

 Key: FLINK-2157
 URL: https://issues.apache.org/jira/browse/FLINK-2157
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.10


 Currently, FlinkML lacks means to 

[jira] [Commented] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616503#comment-14616503
 ] 

ASF GitHub Bot commented on FLINK-2323:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/890


 Rename OperatorState methods to .value() and .update(..)
 

 Key: FLINK-2323
 URL: https://issues.apache.org/jira/browse/FLINK-2323
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 We should rename OperatorState methods to .value() and .update(..) from 
 getState and updateState to make it more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2319) Collect size limitation due to akka.

2015-07-07 Thread Kostas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas updated FLINK-2319:
--
Issue Type: Bug  (was: New Feature)

 Collect size limitation due to akka.
 

 Key: FLINK-2319
 URL: https://issues.apache.org/jira/browse/FLINK-2319
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Kostas

 Each TaksManager keeps the results of a local task in a set of Accumulators. 
 Upon termination of the task, the Accumulators are sent back to the 
 JobManager, who merges them, before sending them back to the Client. 
 To exchange the Accumulators and their content, akka is used. This limits the 
 size of the output of a task to no more than akka.framesize bytes. In other 
 case, akka would drop the message.
 This ticket is to propose the removal of this limitation so that results can 
 be of arbitrary size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...

2015-07-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/890


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Collect(): Fixing the akka.framesize size limi...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/887#issuecomment-119137396
  
I think all in all, this looks good.

We have another change by @mxm pending that changes the accumulators to do 
live updates during runtime. Since @mxm's change is more invasive, I would 
suggest that we wait for it to be merged before merging this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source

2015-07-07 Thread Rico Bergmann (JIRA)
Rico Bergmann created FLINK-2325:


 Summary: PersistentKafkaSource throws 
ArrayIndexOutOfBoundsException if reading from a topic that is created after 
starting the Source
 Key: FLINK-2325
 URL: https://issues.apache.org/jira/browse/FLINK-2325
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 0.9
Reporter: Rico Bergmann


I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, 
that is at the time the PersistentKafkaSource is started (via open(.)) not yet 
present. That's why the number of partitions, that is read in the open(.) 
function is 0, which leads to arrays of length 0 (lastOffsets and 
committedOffsets).
May be it is better to check, whether numberOfPartitions returns 0 and if so, 
to take the default number of partitions from Kafka config?

Stacktrace:
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2292) Report accumulators periodically while job is running

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616469#comment-14616469
 ] 

ASF GitHub Bot commented on FLINK-2292:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/887#issuecomment-119157639
  
Hi @kl0u, the tickets are https://issues.apache.org/jira/browse/FLINK-2292 
and https://issues.apache.org/jira/browse/FLINK-1573.


 Report accumulators periodically while job is running
 -

 Key: FLINK-2292
 URL: https://issues.apache.org/jira/browse/FLINK-2292
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


 Accumulators should be sent periodically, as part of the heartbeat that sends 
 metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...

2015-07-07 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/890#issuecomment-119164038
  
+2 Looks good, this is a simple rename :ski:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2323) Rename OperatorState methods to .value() and .update(..)

2015-07-07 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-2323.
-
Resolution: Fixed

 Rename OperatorState methods to .value() and .update(..)
 

 Key: FLINK-2323
 URL: https://issues.apache.org/jira/browse/FLINK-2323
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 We should rename OperatorState methods to .value() and .update(..) from 
 getState and updateState to make it more clear.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2325) PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source

2015-07-07 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616446#comment-14616446
 ] 

Robert Metzger commented on FLINK-2325:
---

Thank you for reporting this. I'm currently reworking the KafkaSource anyways.

 PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a 
 topic that is created after starting the Source
 -

 Key: FLINK-2325
 URL: https://issues.apache.org/jira/browse/FLINK-2325
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 0.9
Reporter: Rico Bergmann
Assignee: Robert Metzger

 I'm creating a PersistentKafkaSource reading from a specified topic from 
 Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) 
 not yet present. That's why the number of partitions, that is read in the 
 open(.) function is 0, which leads to arrays of length 0 (lastOffsets and 
 committedOffsets).
 May be it is better to check, whether numberOfPartitions returns 0 and if so, 
 to take the default number of partitions from Kafka config?
 Stacktrace:
 java.lang.ArrayIndexOutOfBoundsException: 0
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
   at 
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2323] [api-breaking] Rename OperatorSta...

2015-07-07 Thread gyfora
GitHub user gyfora opened a pull request:

https://github.com/apache/flink/pull/890

[FLINK-2323] [api-breaking] Rename OperatorState interface methods to 
value() and update(..)

Refactor OperatorState interface for clearer method names

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gyfora/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/890.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #890


commit 9d807a6b7eef35bcbca8ef19aa6bf49b12f97dfd
Author: Gyula Fora gyf...@apache.org
Date:   2015-07-07T09:07:05Z

[FLINK-2323] [api-breaking] Rename OperatorState interface methods to 
value() and update(..)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2321] [ml] The seed for the SVM classif...

2015-07-07 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/889#issuecomment-119151761
  
Good catch @thvasilo. LGTM. Will merge it with the next batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-07-07 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/696#issuecomment-119163944
  
@thvasilo Yeah exact k-NN is not scalable for gigabytes-sized, 
terabytes-sized data. If I add R-Tree to this algorithm, the algorithm would be 
better. But I agree that we need voting or discussion about this.

I am also interested in approximate k-NN but we should check progress to 
the assigned contributor. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616668#comment-14616668
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34037018
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a 
ZooKeeper config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+   public static void main(String[] args) {
+   try {
+   final ParameterTool params = 
ParameterTool.fromArgs(args);
--- End diff --

JobManager and TaskManager log environment information. That helps a lot in 
debugging setups. This should log the environment info as well, via 
`EnvironmentInformation.logEnvironmentInfo(LOG, FlinkZooKeeperQuorumPeer, 
args)`.


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2324) Rework partitioned state storage

2015-07-07 Thread Paris Carbone (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616675#comment-14616675
 ] 

Paris Carbone commented on FLINK-2324:
--

We cannot avoid a key-value store to make this efficient imho. There are 
several levelDB backed distributed key value stores we can use to persist and 
retrieve partitioned state. There is also HBase though if we want to stick to 
HDFS for this...

Any other opinions?

 Rework partitioned state storage
 

 Key: FLINK-2324
 URL: https://issues.apache.org/jira/browse/FLINK-2324
 Project: Flink
  Issue Type: Improvement
Reporter: Gyula Fora
Assignee: Gyula Fora

 Partitioned states are currently stored per-key in statehandles. This is 
 alright for in-memory storage but is very inefficient for HDFS. 
 The logic behind the current mechanism is that this approach provides a way 
 to repartition a state without fetching the data from the external storage 
 and only manipulating handles.
 We should come up with a solution that can achieve both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616663#comment-14616663
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34036874
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a 
ZooKeeper config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+   public static void main(String[] args) {
+   try {
+   final ParameterTool params = 
ParameterTool.fromArgs(args);
+   final String zkConfigFile = 
params.getRequired(zkConfigFile);
+   final int peerId = params.getInt(peerId);
+
+   // Run quorum peer
+   runFlinkZkQuorumPeer(zkConfigFile, peerId);
+   }
+   catch (Throwable t) {
+   t.printStackTrace();
--- End diff --

This should probably show up in the log as well. A lot of people do not 
check the `.out` files.


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616699#comment-14616699
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34038403
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -993,9 +1000,34 @@ object JobManager {
   configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDir + /..)
 }
 
-val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+// HA mode
+val (hostname, port) = if 
(ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+  // TODO @removeme @tillrohrmann This is the place where the host and 
random port for JM is
+  // chosen.  For the FlinkMiniCluster you have to choose it on your 
own.
+  LOG.info(HA mode.)
--- End diff --

How about we make this a bit more prominent ;-) Staring JobManager in High 
Availability Mode.

The other case should also have a log message, for example Starting 
JobManager without High Availability.




 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34038430
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1435,6 +1435,11 @@ object TaskManager {
 // start the I/O manager last, it will create some temp directories.
 val ioManager: IOManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths)
 
+if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+  // TODO @removeme @tillrohrmann Setup leader retrieval service
+  LOG.info(HA mode.)
--- End diff --

See above...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616701#comment-14616701
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-119202663
  
Looks like a good piece of work!

Can we actually get into ZooKeeper version conflicts here? For example, the 
Kafka connector needs a certain Zookeeper version. Can it conflict with our 
Zookeeper version?


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34038257
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -993,9 +1000,34 @@ object JobManager {
   configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDir + /..)
 }
 
-val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+// HA mode
+val (hostname, port) = if 
(ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+  // TODO @removeme @tillrohrmann This is the place where the host and 
random port for JM is
+  // chosen.  For the FlinkMiniCluster you have to choose it on your 
own.
+  LOG.info(HA mode.)
+
+  if (config.getHost == null) {
--- End diff --

I think we should call all Java methods with parenthesis. They are defined 
with parenthesis after all, even if Scala allows you to treat them as if they 
were not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616695#comment-14616695
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34038257
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -993,9 +1000,34 @@ object JobManager {
   configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDir + /..)
 }
 
-val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+// HA mode
+val (hostname, port) = if 
(ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+  // TODO @removeme @tillrohrmann This is the place where the host and 
random port for JM is
+  // chosen.  For the FlinkMiniCluster you have to choose it on your 
own.
+  LOG.info(HA mode.)
+
+  if (config.getHost == null) {
--- End diff --

I think we should call all Java methods with parenthesis. They are defined 
with parenthesis after all, even if Scala allows you to treat them as if they 
were not.


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34036874
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a 
ZooKeeper config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+   public static void main(String[] args) {
+   try {
+   final ParameterTool params = 
ParameterTool.fromArgs(args);
+   final String zkConfigFile = 
params.getRequired(zkConfigFile);
+   final int peerId = params.getInt(peerId);
+
+   // Run quorum peer
+   runFlinkZkQuorumPeer(zkConfigFile, peerId);
+   }
+   catch (Throwable t) {
+   t.printStackTrace();
--- End diff --

This should probably show up in the log as well. A lot of people do not 
check the `.out` files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34037018
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a 
ZooKeeper config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+   public static void main(String[] args) {
+   try {
+   final ParameterTool params = 
ParameterTool.fromArgs(args);
--- End diff --

JobManager and TaskManager log environment information. That helps a lot in 
debugging setups. This should log the environment info as well, via 
`EnvironmentInformation.logEnvironmentInfo(LOG, FlinkZooKeeperQuorumPeer, 
args)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [wip] [FLINK-2288] [FLINK-2302] Setup ZooKeepe...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/886#issuecomment-119202663
  
Looks like a good piece of work!

Can we actually get into ZooKeeper version conflicts here? For example, the 
Kafka connector needs a certain Zookeeper version. Can it conflict with our 
Zookeeper version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14616782#comment-14616782
 ] 

ASF GitHub Bot commented on FLINK-2288:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/886#discussion_r34045870
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -993,9 +1000,34 @@ object JobManager {
   configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDir + /..)
 }
 
-val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+// HA mode
+val (hostname, port) = if 
(ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+  // TODO @removeme @tillrohrmann This is the place where the host and 
random port for JM is
+  // chosen.  For the FlinkMiniCluster you have to choose it on your 
own.
+  LOG.info(HA mode.)
+
+  if (config.getHost == null) {
--- End diff --

In Scala it is actually recommended to call methods without parenthesis 
which don't have side effects. This is due to the uniform access principle 
which does not treat variable accesses and pure function calls differently. 
Calling methods based on this principle is IMHO easier to understand than based 
on which language the object was implemented. That would require that everyone 
knows in which language an object is implemented which does not really scale.


 Setup ZooKeeper for distributed coordination
 

 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


 Having standby JM instances for job manager high availabilty requires 
 distributed coordination between JM, TM, and clients. For this, we will use 
 ZooKeeper (ZK).
 Pros:
 - Proven solution (other projects use it for this as well)
 - Apache TLP with large community, docs, and library with required recipies 
 like leader election (see below)
 Related Wiki: 
 https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2326) Mutitenancy on Yarn

2015-07-07 Thread LINTE (JIRA)
LINTE created FLINK-2326:


 Summary: Mutitenancy on Yarn
 Key: FLINK-2326
 URL: https://issues.apache.org/jira/browse/FLINK-2326
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 0.9
 Environment: Centos 6.6
Hadoop 2.7 secured with kerberos
Flink 0.9
Reporter: LINTE


When a user launch a Flink cluster on yarn, i .yarn-properties file is created 
in the conf directory.

In multiuser environnement the configuration directory is read only. 
Even with write permission for all user on conf directory, then just on user 
can run a Flink cluster.

Regards






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2242] deprecate RemoteCollector interfa...

2015-07-07 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/852#issuecomment-119228088
  
Looks good, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---