[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633150#comment-14633150
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34974826
  
--- Diff: docs/apis/dataset_transformations.md ---
@@ -924,6 +924,183 @@ Not supported.
 
 **Note:** Extending the set of supported aggregation functions is on our 
roadmap.
 
+### Distinct
+
+The Distinct transformation computes the DataSet of the distinct elements 
of the source DataSet.
+The following code removes from the DataSet the duplicate elements:
--- End diff --

I'd rephrase this sentence to
 The following code removes duplicate elements from the DataSet:


 distinct is missing in DataSet API documentation
 

 Key: FLINK-2362
 URL: https://issues.apache.org/jira/browse/FLINK-2362
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Java API, Scala API
Affects Versions: 0.9, 0.10
Reporter: Fabian Hueske
Assignee: pietro pinoli
 Fix For: 0.10, 0.9.1


 The DataSet transformation {{distinct}} is not described or listed in the 
 documentation. It is not contained in the DataSet API programming guide 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
  and not in the DataSet Transformation 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122805962
  
Thanks for improving the documentation. Looks good to merge for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633152#comment-14633152
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122805962
  
Thanks for improving the documentation. Looks good to merge for me.


 distinct is missing in DataSet API documentation
 

 Key: FLINK-2362
 URL: https://issues.apache.org/jira/browse/FLINK-2362
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Java API, Scala API
Affects Versions: 0.9, 0.10
Reporter: Fabian Hueske
Assignee: pietro pinoli
 Fix For: 0.10, 0.9.1


 The DataSet transformation {{distinct}} is not described or listed in the 
 documentation. It is not contained in the DataSet API programming guide 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
  and not in the DataSet Transformation 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34974826
  
--- Diff: docs/apis/dataset_transformations.md ---
@@ -924,6 +924,183 @@ Not supported.
 
 **Note:** Extending the set of supported aggregation functions is on our 
roadmap.
 
+### Distinct
+
+The Distinct transformation computes the DataSet of the distinct elements 
of the source DataSet.
+The following code removes from the DataSet the duplicate elements:
--- End diff --

I'd rephrase this sentence to
 The following code removes duplicate elements from the DataSet:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633130#comment-14633130
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973140
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Given that `input` is deterministic, this code will produce mutually 
exclusive validation and training data sets.


 Add cross validation for model evaluation
 -

 Key: FLINK-1723
 URL: https://issues.apache.org/jira/browse/FLINK-1723
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore 

[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633136#comment-14633136
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973783
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Ah yes. I see that now.


 Add cross validation for model evaluation
 -

 Key: FLINK-1723
 URL: https://issues.apache.org/jira/browse/FLINK-1723
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
  Labels: ML

 Cross validation [1] is a standard tool to estimate 

[jira] [Updated] (FLINK-2379) Add methods to evaluate field wise statistics over DataSet of vectors.

2015-07-20 Thread Sachin Goel (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Goel updated FLINK-2379:
---
Description: 
Design methods to evaluate statistics over dataset of vectors.
For continuous fields, Minimum, maximum, mean, variance.
For discrete fields, Class counts, Entropy, Gini Impurity.

Further statistical measures can also be supported. For example, correlation 
between two series, computing the covariance matrix, etc. 
[These are currently the things Spark supports.]

  was:
Design methods to evaluate statistics over dataset of vectors.
For continuous fields, Minimum, maximum, mean, variance.
For discrete fields, Class counts, Entropy, Gini Impurity.

 Issue Type: New Feature  (was: Bug)

 Add methods to evaluate field wise statistics over DataSet of vectors.
 --

 Key: FLINK-2379
 URL: https://issues.apache.org/jira/browse/FLINK-2379
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Sachin Goel

 Design methods to evaluate statistics over dataset of vectors.
 For continuous fields, Minimum, maximum, mean, variance.
 For discrete fields, Class counts, Entropy, Gini Impurity.
 Further statistical measures can also be supported. For example, correlation 
 between two series, computing the covariance matrix, etc. 
 [These are currently the things Spark supports.]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2075] Add Approximate Adamic Adar Simil...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122784993
  
I think you've referenced the wrong jira ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2075) Shade akka and protobuf dependencies away

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633114#comment-14633114
 ] 

ASF GitHub Bot commented on FLINK-2075:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122784993
  
I think you've referenced the wrong jira ticket.


 Shade akka and protobuf dependencies away
 -

 Key: FLINK-2075
 URL: https://issues.apache.org/jira/browse/FLINK-2075
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Till Rohrmann
 Fix For: 0.9


 Lately, the Zeppelin project encountered the following problem: It includes 
 flink-runtime which depends on akka_remote:2.3.7 which again depends on 
 protobuf-java:2.5.0. However, Zeppelin set the protobuf-java version to 2.4.1 
 to make it build with YARN 2.2. Due to this, akka_remote finds a wrong 
 protobuf-java version and fails because of an incompatible change between 
 these versions.
 I propose to shade Flink's akka dependency and protobuf dependency away, so 
 that user projects depending on Flink are not forced to use a special 
 akka/protobuf version.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2312) Random Splits

2015-07-20 Thread Maximilian Alber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633118#comment-14633118
 ] 

Maximilian Alber commented on FLINK-2312:
-

I agree too.

Something else: How do you ensure the ratios? As I see they are only
approximately ensured when you have a big number of samples.

On Mon, Jul 20, 2015 at 9:26 AM, ASF GitHub Bot (JIRA) j...@apache.org



 Random Splits
 -

 Key: FLINK-2312
 URL: https://issues.apache.org/jira/browse/FLINK-2312
 Project: Flink
  Issue Type: Wish
  Components: Machine Learning Library
Reporter: Maximilian Alber
Assignee: pietro pinoli
Priority: Minor

 In machine learning applications it is common to split data sets into f.e. 
 training and testing set.
 To the best of my knowledge there is at the moment no nice way in Flink to 
 split a data set randomly into several partitions according to some ratio.
 The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973783
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Ah yes. I see that now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2312][ml][WIP] Randomly Splitting a Dat...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/921#issuecomment-122785194
  
I agree, a generalized implementation would be favorable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2312) Random Splits

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633116#comment-14633116
 ] 

ASF GitHub Bot commented on FLINK-2312:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/921#issuecomment-122785194
  
I agree, a generalized implementation would be favorable.


 Random Splits
 -

 Key: FLINK-2312
 URL: https://issues.apache.org/jira/browse/FLINK-2312
 Project: Flink
  Issue Type: Wish
  Components: Machine Learning Library
Reporter: Maximilian Alber
Assignee: pietro pinoli
Priority: Minor

 In machine learning applications it is common to split data sets into f.e. 
 training and testing set.
 To the best of my knowledge there is at the moment no nice way in Flink to 
 split a data set randomly into several partitions according to some ratio.
 The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973140
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Given that `input` is deterministic, this code will produce mutually 
exclusive validation and training data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633162#comment-14633162
 ] 

Stephan Ewen commented on FLINK-2377:
-

Can you verify that this is a problem of the test, i.e., the test forgets to 
close the files?

 AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
 --

 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor

 This is probably another file closing issue. (that is, Windows won't delete 
 open files, as opposed to Linux)
 I have encountered two concrete tests so far where this actually appears: 
 CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2027) Flink website does not provide link to source repo

2015-07-20 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi resolved FLINK-2027.

Resolution: Fixed

I think this was reopened by accident.

 Flink website does not provide link to source repo
 --

 Key: FLINK-2027
 URL: https://issues.apache.org/jira/browse/FLINK-2027
 Project: Flink
  Issue Type: Bug
Reporter: Sebb
Priority: Critical

 As the subject says - I could not find a link to the source repo anywhere 
 obvious on the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633184#comment-14633184
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34976620
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

The test shouldn't fail. Maybe there is an error then.

What one could do to have different sequences on each node is to xor the 
subtask id with the seed. But IMO this does not change the statistical 
properties of the sample because we don't know the underlying order of the 
elements. E.g. the underlying order of the element could be that way that we 
obtain the same sample set as with an identical seed and a different order.


 Add cross validation for model evaluation
 

[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34976620
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

The test shouldn't fail. Maybe there is an error then.

What one could do to have different sequences on each node is to xor the 
subtask id with the seed. But IMO this does not change the statistical 
properties of the sample because we don't know the underlying order of the 
elements. E.g. the underlying order of the element could be that way that we 
obtain the same sample set as with an identical seed and a different order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA 

[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34975724
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

However, in case the parallelism of data is more than one, this can lead to 
problem. The random number sequence generated on every node would be the same, 
wouldn't it?
I printed all the random numbers generated and it looks like this: 
https://gist.github.com/sachingoel0101/ecde269af996fba7a39a

Further, for a parallelism of 2, the test itself fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread pp86
Github user pp86 commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122815918
  
Thanks for the suggestion @mxm , I updated the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2367) “flink-xx-jobmanager-linux-3lsu.log file can't auto be recovered/detected after mistaking delete

2015-07-20 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633163#comment-14633163
 ] 

Maximilian Michels commented on FLINK-2367:
---

I'm afraid the logger is unaware of the deleted file. We would have to manually 
check if the file still exists but that seems to be a too much effort for such 
a corner case. However, we could add a note in the documentation that log file 
deletion during runtime is not supported.

 “flink-xx-jobmanager-linux-3lsu.log file can't auto be recovered/detected 
 after mistaking delete
 -

 Key: FLINK-2367
 URL: https://issues.apache.org/jira/browse/FLINK-2367
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 0.9
 Environment: Linux
Reporter: chenliang
Priority: Minor
  Labels: reliability
 Fix For: 0.9.0


 For checking system whether be adequately reliability, testers usually 
 designedly do some delete operation.
 Steps:
 1.go to flink\build-target\log 
 2.delete “flink-xx-jobmanager-linux-3lsu.log file 
 3.Run jobs along with writing log info, meanwhile the system didn't give any 
 error info when the log info can't be wrote correctly.
 4.when some jobs be run failed , go to check log file for finding the reason, 
 can't find the log file. 
 Must restart Job Manager to regenerate the log file, then continue to run 
 jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633165#comment-14633165
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34975724
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P : Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce-union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

However, in case the parallelism of data is more than one, this can lead to 
problem. The random number sequence generated on every node would be the same, 
wouldn't it?
I printed all the random numbers generated and it looks like this: 
https://gist.github.com/sachingoel0101/ecde269af996fba7a39a

Further, for a parallelism of 2, the test itself fails.


 Add cross validation for model evaluation
 -

 Key: 

[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633166#comment-14633166
 ] 

Ufuk Celebi commented on FLINK-2299:


The FAQ is on the website repository flink-web 
(http://flink.apache.org/community.html#source-code).

 The slot on which the task maanger was scheduled was killed
 ---

 Key: FLINK-2299
 URL: https://issues.apache.org/jira/browse/FLINK-2299
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Priority: Critical
 Fix For: 0.9.1


 The following code: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Ran on the twitter follower graph: 
 http://twitter.mpi-sws.org/data-icwsm2010.html 
 With a similar configuration to the one in FLINK-2293
 fails with the following exception:
 java.lang.Exception: The slot in which the task was executed has been 
 released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
 wally025 - 8 slots - URL: 
 akka.tcp://flink@130.149.249.35:56135/user/taskmanager
 at 
 org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
 at 
 org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
 at 
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
 at 
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
 at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at 
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at 
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at 
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 06/29/2015 10:33:46 Job execution switched to status FAILING.
 The logs are here:
 https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633189#comment-14633189
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user pp86 commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122815918
  
Thanks for the suggestion @mxm , I updated the documentation.


 distinct is missing in DataSet API documentation
 

 Key: FLINK-2362
 URL: https://issues.apache.org/jira/browse/FLINK-2362
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Java API, Scala API
Affects Versions: 0.9, 0.10
Reporter: Fabian Hueske
Assignee: pietro pinoli
 Fix For: 0.10, 0.9.1


 The DataSet transformation {{distinct}} is not described or listed in the 
 documentation. It is not contained in the DataSet API programming guide 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
  and not in the DataSet Transformation 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2380) Allow to configure default FS for file inputs

2015-07-20 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2380:
--

 Summary: Allow to configure default FS for file inputs
 Key: FLINK-2380
 URL: https://issues.apache.org/jira/browse/FLINK-2380
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
Priority: Minor
 Fix For: 0.10


File inputs use file:// as default prefix. A user asked to make this 
configurable, e.g. hdfs:// as default.

(I'm not sure whether this is already possible or not. I will check and either 
close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Andra Lungu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andra Lungu resolved FLINK-2299.

   Resolution: Fixed
Fix Version/s: (was: 0.9.1)
   0.10

 The slot on which the task maanger was scheduled was killed
 ---

 Key: FLINK-2299
 URL: https://issues.apache.org/jira/browse/FLINK-2299
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Andra Lungu
Priority: Critical
 Fix For: 0.10


 The following code: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Ran on the twitter follower graph: 
 http://twitter.mpi-sws.org/data-icwsm2010.html 
 With a similar configuration to the one in FLINK-2293
 fails with the following exception:
 java.lang.Exception: The slot in which the task was executed has been 
 released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
 wally025 - 8 slots - URL: 
 akka.tcp://flink@130.149.249.35:56135/user/taskmanager
 at 
 org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
 at 
 org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
 at 
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
 at 
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
 at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at 
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at 
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at 
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 06/29/2015 10:33:46 Job execution switched to status FAILING.
 The logs are here:
 https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633428#comment-14633428
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34984880
  
--- Diff: docs/apis/programming_guide.md ---
@@ -606,7 +606,17 @@ DataSetTuple3Integer, String, Double output = 
input.sum(0).andMin(2);
   /td
 /tr
 
+tr
+  tdstrongDistinct/strong/td
+  td
+pReturns the distinct elements of a data set./p
--- End diff --

Could you add a sentence here to explain what distinct means?


 distinct is missing in DataSet API documentation
 

 Key: FLINK-2362
 URL: https://issues.apache.org/jira/browse/FLINK-2362
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Java API, Scala API
Affects Versions: 0.9, 0.10
Reporter: Fabian Hueske
Assignee: pietro pinoli
 Fix For: 0.10, 0.9.1


 The DataSet transformation {{distinct}} is not described or listed in the 
 documentation. It is not contained in the DataSet API programming guide 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
  and not in the DataSet Transformation 
 (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34984880
  
--- Diff: docs/apis/programming_guide.md ---
@@ -606,7 +606,17 @@ DataSetTuple3Integer, String, Double output = 
input.sum(0).andMin(2);
   /td
 /tr
 
+tr
+  tdstrongDistinct/strong/td
+  td
+pReturns the distinct elements of a data set./p
--- End diff --

Could you add a sentence here to explain what distinct means?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Andra Lungu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633422#comment-14633422
 ] 

Andra Lungu commented on FLINK-2299:


Thanks, Ufuk! :)

 The slot on which the task maanger was scheduled was killed
 ---

 Key: FLINK-2299
 URL: https://issues.apache.org/jira/browse/FLINK-2299
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Andra Lungu
Priority: Critical
 Fix For: 0.9.1


 The following code: 
 https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
 Ran on the twitter follower graph: 
 http://twitter.mpi-sws.org/data-icwsm2010.html 
 With a similar configuration to the one in FLINK-2293
 fails with the following exception:
 java.lang.Exception: The slot in which the task was executed has been 
 released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
 wally025 - 8 slots - URL: 
 akka.tcp://flink@130.149.249.35:56135/user/taskmanager
 at 
 org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
 at 
 org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
 org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
 at 
 org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
 at 
 org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
 at 
 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at 
 org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at 
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at 
 akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
 at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
 at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
 at akka.actor.ActorCell.invoke(ActorCell.scala:486)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at 
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 06/29/2015 10:33:46 Job execution switched to status FAILING.
 The logs are here:
 https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633514#comment-14633514
 ] 

ASF GitHub Bot commented on FLINK-2375:
---

Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122868758
  
Could you also do a git ammend to reference the correct jira issue in the 
commit?


 Add Approximate Adamic Adar Similarity method using BloomFilters
 

 Key: FLINK-2375
 URL: https://issues.apache.org/jira/browse/FLINK-2375
 Project: Flink
  Issue Type: Task
  Components: Gelly
Reporter: Shivani Ghatge
Assignee: Shivani Ghatge
Priority: Minor

 Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
 set of nodes. However, instead of counting the common neighbors and dividing 
 them by the total number of neighbors, the similarity is weighted according 
 to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
 The Adamic-Adar algorithm can be broken into three steps:
 1). For each vertex, compute the log of its inverse degrees (with the formula 
 above) and set it as the vertex value.
 2). Each vertex will then send this new computed value along with a list of 
 neighbors to the targets of its out-edges
 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
 log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
 the degree of node n). See [2]
 Using BloomFilters we increase the scalability of the algorithm. The values 
 calculated for the edges will be approximate.
 Prerequisites:
 Full understanding of the Jaccard Similarity Measure algorithm
 Reading the associated literature:
 [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
 [2] 
 http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122868758
  
Could you also do a git ammend to reference the correct jira issue in the 
commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633525#comment-14633525
 ] 

Gabor Gevay commented on FLINK-2377:


Yes, I have found the problem: TestBaseUtils.readAllResultLines is not closing 
the readers. I will open a PR that fixes this.

 AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
 --

 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor

 This is probably another file closing issue. (that is, Windows won't delete 
 open files, as opposed to Linux)
 I have encountered two concrete tests so far where this actually appears: 
 CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...

2015-07-20 Thread ggevay
GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/924

[FLINK-2377] Add reader.close() to readAllResultLines



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink readAllResultLinesFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #924


commit 63b71d7c2364652030fdca360b9804c344da56a3
Author: Gabor Gevay gga...@gmail.com
Date:   2015-07-20T12:39:43Z

[FLINK-2377] Add reader.close() to readAllResultLines




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-2381) Possible class not found Exception on failed partition producer

2015-07-20 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-2381:
--

Assignee: Ufuk Celebi

 Possible class not found Exception on failed partition producer
 ---

 Key: FLINK-2381
 URL: https://issues.apache.org/jira/browse/FLINK-2381
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10, 0.9.1


 Failing the production of a result partition marks the respective partition 
 as failed with a ProducerFailedException.
 The cause of this exception can be a user defined class, which can only be 
 loaded by the user code class loader. The network stack fails the shuffle 
 with a RemoteTransportException, which has the user exception as a cause. 
 When the consuming task receives this exception, this leads to a class not 
 found exception, because the network stack tries to load the class with the 
 system class loader.
 {code}
 +--+
 | FAILING  |
 | PRODUCER |
 +--+
  || 
  \/
  ProducerFailedException(CAUSE) via network
  || 
  \/
 +--+
 | RECEIVER |
 +--+
 {code}
 CAUSE is only loadable by the user code class loader.
 When trying to deserialize this, RECEIVER fails with a 
 LocalTransportException, which is super confusing, because the error is not 
 local, but remote.
 Thanks to [~rmetzger] for reporting and debugging the issue with the 
 following stack trace:
 {code}
 Flat Map (26/120)
 14:03:00,343 ERROR 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask   - Flat Map 
 (26/120) failed
 java.lang.RuntimeException: Could not read next record.
 at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71)
 at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: 
 org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
 java.lang.ClassNotFoundException: 
 kafka.common.ConsumerRebalanceFailedException
 at 
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151)
 at 
 io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
 at 
 io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
 at 
 io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
 at 
 io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
 at 
 io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
 at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
 at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
 at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
 at 
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
 at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
 ... 1 more
 Caused by: io.netty.handler.codec.DecoderException: 
 java.lang.ClassNotFoundException: 
 kafka.common.ConsumerRebalanceFailedException
 at 
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
 at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
 ... 12 more
 Caused by: java.lang.ClassNotFoundException: 
 kafka.common.ConsumerRebalanceFailedException
 at 

[jira] [Created] (FLINK-2381) Possible class not found Exception on failed partition producer

2015-07-20 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2381:
--

 Summary: Possible class not found Exception on failed partition 
producer
 Key: FLINK-2381
 URL: https://issues.apache.org/jira/browse/FLINK-2381
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
 Fix For: 0.10, 0.9.1


Failing the production of a result partition marks the respective partition as 
failed with a ProducerFailedException.

The cause of this exception can be a user defined class, which can only be 
loaded by the user code class loader. The network stack fails the shuffle with 
a RemoteTransportException, which has the user exception as a cause. When the 
consuming task receives this exception, this leads to a class not found 
exception, because the network stack tries to load the class with the system 
class loader.

{code}
+--+
| FAILING  |
| PRODUCER |
+--+
 || 
 \/
 ProducerFailedException(CAUSE) via network
 || 
 \/
+--+
| RECEIVER |
+--+
{code}

CAUSE is only loadable by the user code class loader.

When trying to deserialize this, RECEIVER fails with a LocalTransportException, 
which is super confusing, because the error is not local, but remote.

Thanks to [~rmetzger] for reporting and debugging the issue with the following 
stack trace:

{code}
Flat Map (26/120)


14:03:00,343 ERROR org.apache.flink.streaming.runtime.tasks.OneInputStreamTask  
 - Flat Map (26/120) failed
java.lang.RuntimeException: Could not read next record.
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at 
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at 
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at 
io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
Caused by: io.netty.handler.codec.DecoderException: 
java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
... 12 more
Caused by: java.lang.ClassNotFoundException: 
kafka.common.ConsumerRebalanceFailedException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at 

[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633628#comment-14633628
 ] 

ASF GitHub Bot commented on FLINK-2377:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/924#issuecomment-122893574
  
Great catch! Looks good to merge.


 AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
 --

 Key: FLINK-2377
 URL: https://issues.apache.org/jira/browse/FLINK-2377
 Project: Flink
  Issue Type: Bug
  Components: Tests
 Environment: Windows
Reporter: Gabor Gevay
Priority: Minor

 This is probably another file closing issue. (that is, Windows won't delete 
 open files, as opposed to Linux)
 I have encountered two concrete tests so far where this actually appears: 
 CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...

2015-07-20 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/924#issuecomment-122893574
  
Great catch! Looks good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks

2015-07-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2382:
---

 Summary: Live Metric Reporting Does Not Work for Two-Input 
StreamTasks
 Key: FLINK-2382
 URL: https://issues.apache.org/jira/browse/FLINK-2382
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Aljoscha Krettek


Also, there are no tests for the live metrics in streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1967] Introduce (Event)time in Streamin...

2015-07-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/906#issuecomment-122949512
  
Does anyone have any objections still? The impact of the timestamps is now 
completely disabled by default.

If there are no objections I would like to merge this by tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1967) Introduce (Event)time in Streaming

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633826#comment-14633826
 ] 

ASF GitHub Bot commented on FLINK-1967:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/906#issuecomment-122949512
  
Does anyone have any objections still? The impact of the timestamps is now 
completely disabled by default.

If there are no objections I would like to merge this by tomorrow.


 Introduce (Event)time in Streaming
 --

 Key: FLINK-1967
 URL: https://issues.apache.org/jira/browse/FLINK-1967
 Project: Flink
  Issue Type: Improvement
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek

 This requires introducing a timestamp in streaming record and a change in the 
 sources to add timestamps to records. This will also introduce punctuations 
 (or low watermarks) to allow windows to work correctly on unordered, 
 timestamped input data. In the process of this, the windowing subsystem also 
 needs to be adapted to use the punctuations. Furthermore, all operators need 
 to be made aware of punctuations and correctly forward them. Then, a new 
 operator must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2371) AccumulatorLiveITCase fails

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633752#comment-14633752
 ] 

ASF GitHub Bot commented on FLINK-2371:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/925

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #925


commit 44687e783065a4157d2d3a695d9e94070ca6e8cd
Author: Maximilian Michels m...@apache.org
Date:   2015-07-20T09:55:11Z

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.




 AccumulatorLiveITCase fails
 ---

 Key: FLINK-2371
 URL: https://issues.apache.org/jira/browse/FLINK-2371
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Matthias J. Sax
Assignee: Maximilian Michels

 AccumulatorLiveITCase fails regularly (however, not in each run). The tests 
 relies on timing (via sleep) which does not work well on Travis.
 See dev-list for more details: 
 https://mail-archives.apache.org/mod_mbox/flink-dev/201507.mbox/browser
 AccumulatorLiveITCase.testProgram:106-access$1100:68-checkFlinkAccumulators:189



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks

2015-07-20 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-2382:
-

Assignee: Maximilian Michels

 Live Metric Reporting Does Not Work for Two-Input StreamTasks
 -

 Key: FLINK-2382
 URL: https://issues.apache.org/jira/browse/FLINK-2382
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Aljoscha Krettek
Assignee: Maximilian Michels

 Also, there are no tests for the live metrics in streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-20 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/925

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #925


commit 44687e783065a4157d2d3a695d9e94070ca6e8cd
Author: Maximilian Michels m...@apache.org
Date:   2015-07-20T09:55:11Z

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634123#comment-14634123
 ] 

ASF GitHub Bot commented on FLINK-2310:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123051793
  
@andralungu @vasia PR has been updated to make the code more efficient.


 Add an Adamic-Adar Similarity example
 -

 Key: FLINK-2310
 URL: https://issues.apache.org/jira/browse/FLINK-2310
 Project: Flink
  Issue Type: Task
  Components: Gelly
Reporter: Andra Lungu
Assignee: Shivani Ghatge
Priority: Minor

 Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
 set of nodes. However, instead of counting the common neighbors and dividing 
 them by the total number of neighbors, the similarity is weighted according 
 to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
 The Adamic-Adar algorithm can be broken into three steps: 
 1). For each vertex, compute the log of its inverse degrees (with the formula 
 above) and set it as the vertex value. 
 2). Each vertex will then send this new computed value along with a list of 
 neighbors to the targets of its out-edges
 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
 log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
 the degree of node n). See [2]
 Prerequisites: 
 - Full understanding of the Jaccard Similarity Measure algorithm
 - Reading the associated literature: 
 [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
 [2] 
 http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-123051521
  
Updated PR
@vasia I removed the Vertex Centric Iteration :) Hope this method is okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123051793
  
@andralungu @vasia PR has been updated to make the code more efficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634119#comment-14634119
 ] 

ASF GitHub Bot commented on FLINK-2375:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-123051521
  
Updated PR
@vasia I removed the Vertex Centric Iteration :) Hope this method is okay.


 Add Approximate Adamic Adar Similarity method using BloomFilters
 

 Key: FLINK-2375
 URL: https://issues.apache.org/jira/browse/FLINK-2375
 Project: Flink
  Issue Type: Task
  Components: Gelly
Reporter: Shivani Ghatge
Assignee: Shivani Ghatge
Priority: Minor

 Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
 set of nodes. However, instead of counting the common neighbors and dividing 
 them by the total number of neighbors, the similarity is weighted according 
 to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
 The Adamic-Adar algorithm can be broken into three steps:
 1). For each vertex, compute the log of its inverse degrees (with the formula 
 above) and set it as the vertex value.
 2). Each vertex will then send this new computed value along with a list of 
 neighbors to the targets of its out-edges
 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
 log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
 the degree of node n). See [2]
 Using BloomFilters we increase the scalability of the algorithm. The values 
 calculated for the edges will be approximate.
 Prerequisites:
 Full understanding of the Jaccard Similarity Measure algorithm
 Reading the associated literature:
 [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
 [2] 
 http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-123057981
  
The only problem with assuming NullValue if a value is missing is that we 
can't return NullValue in place of VV.
I mean to say GraphK, VV, EV in this VV or EV can't be NullValue. 
otherwise that was what I was originally going for. 
Maybe since any of the other methods to create DataSet/Graph don't provide 
a method to give EdgeValue as NullValue and just expect the user to map it (at 
least that is what I saw), maybe we could just remove the functionality. I had 
only added it since many examples seemed to use it so I thought it would be 
nice to have that functionality. 
In any case we can just keep one typesNullEdge method too because if they 
don't want that, they can use normal overloaded types, 3 arguments for no 
NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge 
and just one method named typesNullEdge to tell that only edges have NullValue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634142#comment-14634142
 ] 

ASF GitHub Bot commented on FLINK-1520:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-123057981
  
The only problem with assuming NullValue if a value is missing is that we 
can't return NullValue in place of VV.
I mean to say GraphK, VV, EV in this VV or EV can't be NullValue. 
otherwise that was what I was originally going for. 
Maybe since any of the other methods to create DataSet/Graph don't provide 
a method to give EdgeValue as NullValue and just expect the user to map it (at 
least that is what I saw), maybe we could just remove the functionality. I had 
only added it since many examples seemed to use it so I thought it would be 
nice to have that functionality. 
In any case we can just keep one typesNullEdge method too because if they 
don't want that, they can use normal overloaded types, 3 arguments for no 
NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge 
and just one method named typesNullEdge to tell that only edges have NullValue.


 Read edges and vertices from CSV files
 --

 Key: FLINK-1520
 URL: https://issues.apache.org/jira/browse/FLINK-1520
 Project: Flink
  Issue Type: New Feature
  Components: Gelly
Reporter: Vasia Kalavri
Assignee: Shivani Ghatge
Priority: Minor
  Labels: easyfix, newbie

 Add methods to create Vertex and Edge Datasets directly from CSV file inputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-685) Add support for semi-joins

2015-07-20 Thread pietro pinoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634358#comment-14634358
 ] 

pietro pinoli commented on FLINK-685:
-

Hi [~fhueske],

then if I implement the dummy version  (your first alternative) does it have 
some chances to get merged ?:)

Thanks again for your time.
PP

 Add support for semi-joins
 --

 Key: FLINK-685
 URL: https://issues.apache.org/jira/browse/FLINK-685
 Project: Flink
  Issue Type: New Feature
Reporter: GitHub Import
Priority: Minor
  Labels: github-import
 Fix For: pre-apache


 A semi-join is basically a join filter. One input is filtering and the 
 other one is filtered.
 A tuple of the filtered input is emitted exactly once if the filtering 
 input has one (ore more) tuples with matching join keys. That means that the 
 output of a semi-join has the same type as the filtered input and the 
 filtering input is completely discarded.
 In order to support a semi-join, we need to add an additional physical 
 execution strategy, that ensures, that a tuple of the filtered input is 
 emitted only once if the filtering input has more than one tuple with 
 matching keys. Furthermore, a couple of optimizations compared to standard 
 joins can be done such as storing only keys and not the full tuple of the 
 filtering input in a hash table.
  Imported from GitHub 
 Url: https://github.com/stratosphere/stratosphere/issues/685
 Created by: [fhueske|https://github.com/fhueske]
 Labels: enhancement, java api, runtime, 
 Milestone: Release 0.6 (unplanned)
 Created at: Mon Apr 14 12:05:29 CEST 2014
 State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-685) Add support for semi-joins

2015-07-20 Thread pietro pinoli (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

pietro pinoli reassigned FLINK-685:
---

Assignee: pietro pinoli

 Add support for semi-joins
 --

 Key: FLINK-685
 URL: https://issues.apache.org/jira/browse/FLINK-685
 Project: Flink
  Issue Type: New Feature
Reporter: GitHub Import
Assignee: pietro pinoli
Priority: Minor
  Labels: github-import
 Fix For: pre-apache


 A semi-join is basically a join filter. One input is filtering and the 
 other one is filtered.
 A tuple of the filtered input is emitted exactly once if the filtering 
 input has one (ore more) tuples with matching join keys. That means that the 
 output of a semi-join has the same type as the filtered input and the 
 filtering input is completely discarded.
 In order to support a semi-join, we need to add an additional physical 
 execution strategy, that ensures, that a tuple of the filtered input is 
 emitted only once if the filtering input has more than one tuple with 
 matching keys. Furthermore, a couple of optimizations compared to standard 
 joins can be done such as storing only keys and not the full tuple of the 
 filtering input in a hash table.
  Imported from GitHub 
 Url: https://github.com/stratosphere/stratosphere/issues/685
 Created by: [fhueske|https://github.com/fhueske]
 Labels: enhancement, java api, runtime, 
 Milestone: Release 0.6 (unplanned)
 Created at: Mon Apr 14 12:05:29 CEST 2014
 State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123153943
  
Hi @shghatge ,

Did you also run the two examples on the cluster to make sure that the 
approximate version is faster?
Then you could also add some numbers to the two PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634516#comment-14634516
 ] 

ASF GitHub Bot commented on FLINK-2310:
---

Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123153943
  
Hi @shghatge ,

Did you also run the two examples on the cluster to make sure that the 
approximate version is faster?
Then you could also add some numbers to the two PRs.


 Add an Adamic-Adar Similarity example
 -

 Key: FLINK-2310
 URL: https://issues.apache.org/jira/browse/FLINK-2310
 Project: Flink
  Issue Type: Task
  Components: Gelly
Reporter: Andra Lungu
Assignee: Shivani Ghatge
Priority: Minor

 Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
 set of nodes. However, instead of counting the common neighbors and dividing 
 them by the total number of neighbors, the similarity is weighted according 
 to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
 The Adamic-Adar algorithm can be broken into three steps: 
 1). For each vertex, compute the log of its inverse degrees (with the formula 
 above) and set it as the vertex value. 
 2). Each vertex will then send this new computed value along with a list of 
 neighbors to the targets of its out-edges
 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
 log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
 the degree of node n). See [2]
 Prerequisites: 
 - Full understanding of the Jaccard Similarity Measure algorithm
 - Reading the associated literature: 
 [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
 [2] 
 http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)