[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335701#comment-15335701
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1856


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335692#comment-15335692
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
Merging


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334748#comment-15334748
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
Thanks for the update @ramkrish86. PR is good to merge.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334195#comment-15334195
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
@fhueske - Pushed with latest updates. Pls review. Thank you for guiding me 
through this.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334190#comment-15334190
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67385885
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo(
+classOf[(Int, Long, String, Long, Int)], Array(), 
Array(BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+
+override def createSerializer(config: ExecutionConfig):
+  TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+   private val bigger  = new Tuple5[Int, Long, String, Long, Int](10, 
100L, "HelloWorld", 200L, 20)
--- End diff --

I actually did this way. But may be because of my other definition of 
tupleTypeInfo  the assertion was not working fine though I was getting the 
expected result. I was not aware of this 'implicitly' call. Just read that. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333861#comment-15333861
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
Hi @ramkrish86, PR looks quite good now. I had only a few minor comments. 
After those are fixed, the PR should be good to merge.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333853#comment-15333853
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67350850
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo(
+classOf[(Int, Long, String, Long, Int)], Array(), 
Array(BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+
+override def createSerializer(config: ExecutionConfig):
+  TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+   private val bigger  = new Tuple5[Int, Long, String, Long, Int](10, 
100L, "HelloWorld", 200L, 20)
+   private val smaller = new Tuple5[Int, Long, String, Long, Int](5, 50L, 
"Hello", 50L, 15)
+
+   //Special case where only the last value determines if bigger or smaller
+   private val specialCaseBigger =
+ new Tuple5[Int, Long, String, Long, Int](10, 100L, "HelloWorld", 
200L, 17)
+   private val specialCaseSmaller  =
+ new Tuple5[Int, Long, String, Long, Int](5, 50L, "Hello", 50L, 17)
+
+  /**
+* This test validates whether the order of tuples has
+*
+* any impact on the outcome and if the bigger tuple is returned.
+*/
+  @Test
+  def testMaxByComparison(): Unit = {
+val a1 = Array(0)
+val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+  try {
+Assert.assertSame("SelectByMax must return bigger tuple",
+  bigger, maxByTuple.reduce(smaller, bigger))
+Assert.assertSame("SelectByMax must return bigger tuple",
+  bigger, maxByTuple.reduce(bigger, smaller))
+  } catch {
+case e : Exception =>
+  Assert.fail("No exception should be thrown while comapring both 
tuples")
+  }
+  }
+
+  // --- MAXIMUM FUNCTION TEST BELOW 
--
+
+  /**
+* This test cases checks when two tuples only differ in one value, but 
this value is not
+* in the fields list. In that case it should be seen as equal
+* and then the first given tuple (value1) should be returned by 
reduce().
+*/
+  @Test
+  def testMaxByComparisonSpecialCase1() : Unit = {
+val a1 = Array(0, 3)
+val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+
+try {
+  Assert.assertSame("SelectByMax must return the first given tuple",
+specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger))
+  Assert.assertSame("SelectByMax must return the first given tuple",
+bigger, maxByTuple.reduce(bigger, specialCaseBigger))
+} catch {
+  case e : Exception => Assert.fail("No exception should be thrown " +
+"while comapring both tuples")
+}
+  }
+
+  /**
+* This test cases checks when two tuples only differ in one value.
+*/
+  @Test
+  def testMaxByComparisonSpecialCase2() : Unit = {
+val a1 = Array(0, 2, 1, 4, 3)
+val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, 

[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333840#comment-15333840
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67349578
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo(
+classOf[(Int, Long, String, Long, Int)], Array(), 
Array(BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+
+override def createSerializer(config: ExecutionConfig):
+  TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+   private val bigger  = new Tuple5[Int, Long, String, Long, Int](10, 
100L, "HelloWorld", 200L, 20)
--- End diff --

you can create Scala tuples also like this:
```
private val bigger = (10, 100L, "HelloWorld", 200L, 20)
```


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333839#comment-15333839
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67349089
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo(
--- End diff --

you can create `tupleTypeInfo` easier like this: 

```
val tupleTypeInfo = implicitly[TypeInformation[(Int, Long, String, Long, 
Int)]]
.asInstanceOf[TupleTypeInfoBase[(Int, Long, String, Long, Int)]]
```

if you add this `import org.apache.flink.api.scala._`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333801#comment-15333801
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67346072
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int])
+  extends ReduceFunction[T] {
+  for(f <- fields) {
+if (f < 0 || f >= t.getArity()) {
+  throw new IndexOutOfBoundsException(
+"SelectByMinFunction field position " + f + " is out of range.")
+}
+
+// Check whether type is comparable
+if (!t.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f).isKeyType()) {
--- End diff --

the cast is not necessary. `t` is already a `TupleTypeInfoBase`.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333799#comment-15333799
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67345907
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], fields : Array[Int])
+  extends ReduceFunction[T] {
+  for(f <- fields) {
+if (f < 0 || f >= t.getArity()) {
+  throw new IndexOutOfBoundsException(
+"SelectByMaxFunction field position " + f + " is out of range.")
+}
+
+// Check whether type is comparable
+if (!t.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f).isKeyType()) {
--- End diff --

the cast is not necessary. `t` is already a `TupleTypeInfoBase`.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333793#comment-15333793
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67345503
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +357,40 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation `maxBy` on a 
grouped [[DataSet]]
+* The transformation consecutively calls a [[ReduceFunction]]
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A [[ReduceOperator]] representing the minimum.
+*/
+  def maxBy(fields: Int*) : DataSet[T]  = {
+if (!set.getType().isTupleType) {
+  throw new InvalidProgramException("GroupedDataSet#maxBy(int...) only 
works on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]],
+  fields.toArray))
+  }
+
+  /**
+* Applies a special case of a reduce transformation `minBy` on a 
grouped [[DataSet]].
+* The transformation consecutively calls a [[ReduceFunction]]
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A [[ReduceOperator]] representing the minimum.
--- End diff --

The return type is not correct. You can remove `@param` and `@return` as 
these are not used for the other methods in this class either.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333792#comment-15333792
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67345484
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +357,40 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation `maxBy` on a 
grouped [[DataSet]]
+* The transformation consecutively calls a [[ReduceFunction]]
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A [[ReduceOperator]] representing the minimum.
--- End diff --

The return type is not correct. You can remove `@param` and `@return` as 
these are not used for the other methods in this class either.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1586#comment-1586
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
Force pushed as per your advice @fhueske. Ran mvn clean verify and there 
are no warnings generated.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333278#comment-15333278
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297990
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

Yes. I just now updated it reading some where. One more test is failing 
checking that and then wil push the PR. I followed the steps that you told. So 
now all my squashed commits are in a new branch and I will push that forcefully.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333266#comment-15333266
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297138
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

The line should be
```
private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
```
by `List[X]` you define the type to be a list of type `X`. `(Int, Long, 
String)` is a shortcut to define the type for a `scala.Tuple3[Int, Long, 
String]`. The final `()` creates an empty list.
You can also remove the `new CustomType()` from the next line.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333232#comment-15333232
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67294752
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

` 
C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MaxByOperatorTest.scala:27:
 error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in 
object List
[ERROR]   private val emptyTupleData = List[Int, Long, String, Long, Int]()
[ERROR]^
[ERROR] 
C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MinByOperatorTest.scala:27:
 error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in 
object List
[ERROR]   private val emptyTupleData = List[Int, Long, String, Long, Int]()
`
Even this fails. Let me read  on scala to see the syntax here. 



> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333183#comment-15333183
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67291576
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

I think this is what you meant 
`List[Int, Long, String, Long, Int]()` - removing that explicit 
scala.Tuple5.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333182#comment-15333182
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67291510
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

Should this be `List[(Int, Long, String, Long, Int)]` or `List(Int, Long, 
String, Long, Int)`. Because if is the former 
`val collection = env.fromCollection(emptyTupleData)` throws compilation 
error
and if it is the latter the build fails
`[ERROR] 
/home/travis/build/apache/flink/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala:27:
 error: object java.lang.String is not a value
[ERROR]   private val emptyTupleData = List(Int, Long, String, Long, Int)`
This way 'List[scala.Tuple5[Int, Long, String, Long, Int]]()' was not 
throwing any exception. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332538#comment-15332538
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
I think the easiest way is to fork of a branch from the current master and 
cherry-pick all your commits one after the other onto that branch (except for 
the merge commit of course). Then you squash all commits and force push into 
the PR branch to update the PR.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332336#comment-15332336
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
How to remove the merge commit? If I try to remove it I lose all my 
commits. @fhueske - I have updated the PR. Thanks for very sharp eyes like 
seeing the spaces and new lines that were missed. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332251#comment-15332251
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216793
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332250#comment-15332250
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67216772
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*/
+  def minBy(fields: Int*) : DataSet[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+reduce(new 
SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
+* Selects an element with maximum value.
+*
+* The maximum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* maxBy(0)[1, 0]
+* maxBy(1)[0, 1]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* maxBy(0, 1)[0, 1]
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Int*) : DataSet[T] = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
--- End diff --

This is very sharp eyes :)


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1533#comment-1533
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67214413
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -521,7 +521,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMinFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

I have already reverted the style change. The only thing is that it was 
reverted on top of the previous commit. How to totally avoid this change from 
appearing from my commit history? Am not sure how to do it.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331708#comment-15331708
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
Hi @ramkrish86, the PR needs another pass. Please remove also the merge 
commit. Thanks, Fabian


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331703#comment-15331703
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67157345
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
--- End diff --

This class needs to be cleaned up. Most of the comments below apply to the 
whole class.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331701#comment-15331701
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67157175
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo[scala.Tuple5[Int, Long, 
String, Long, Int]](
+classOf[scala.Tuple5[Int, Long, String, Long, Int]],
+Array(),
+Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+override def createSerializer(config: ExecutionConfig):
+TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+  private val bigger : Tuple5[Int, Long, String, Long, Int]  =
+new scala.Tuple5[Int, Long, String, Long, Int](10, 100L, "HelloWorld", 
200L, 20)
+  private val smaller : Tuple5[Int, Long, String, Long, Int]  =
+new scala.Tuple5[Int, Long, String, Long, Int](5, 50L, "Hello", 50L, 
15)
+
+  //Special case where only the last value determines if bigger or smaller
+  private val specialCaseBigger : Tuple5[Int, Long, String, Long, Int]  =
+new scala.Tuple5[Int, Long, String, Long, Int](10, 100L, "HelloWorld", 
200L, 17)
+  private val specialCaseSmaller : Tuple5[Int, Long, String, Long, Int]  =
+new scala.Tuple5[Int, Long, String, Long, Int](5, 50L, "Hello", 50L, 
17)
+
+  /**
+* This test validates whether the order of tuples has
+*
+* any impact on the outcome and if the bigger tuple is returned.
+*/
+  @Test
+  def testMaxByComparison(): Unit = {
+val a1 = Array(0)
+val maxByTuple : SelectByMaxFunction[scala.Tuple5[Int, Long, String, 
Long, Int]] =
+  new SelectByMaxFunction
+[scala.Tuple5[Int, Long, String, Long, Int]](tupleTypeInfo, a1)
+  try {
+Assert.assertSame("SelectByMax must return bigger tuple",
+  bigger, maxByTuple.reduce(smaller, bigger))
+Assert.assertSame("SelectByMax must return bigger tuple",
+  bigger, maxByTuple.reduce(bigger, smaller))
+  }catch {
+case e : Exception =>
+  Assert.fail("No exception should be thrown while comapring both 
tuples")
+  }
+  }
+
+  // --- MAXIMUM FUNCTION TEST BELOW 
--
+
+  /**
+* This test cases checks when two tuples only differ in one value, but 
this value is not
+* in the fields list. In that case it should be seen as equal
+* and then the first given tuple (value1) should be returned by 
reduce().
+*/
+  @Test
+  def testMaxByComparisonSpecialCase1() : Unit = {
+val a1 = Array(0, 3)
+val maxByTuple : SelectByMaxFunction[scala.Tuple5[Int, Long, String, 
Long, Int]] =
+new SelectByMaxFunction[scala.Tuple5[Int, Long, String, Long, 
Int]](tupleTypeInfo, a1)
+
+try {
+  Assert.assertSame("SelectByMax must return the first given tuple",
+specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger))
+  Assert.assertSame("SelectByMax must return the first given tuple",
+ 

[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331698#comment-15331698
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156935
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo[scala.Tuple5[Int, Long, 
String, Long, Int]](
--- End diff --

Please reformat this block of code. It is very hard to read.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331692#comment-15331692
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156469
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo[scala.Tuple5[Int, Long, 
String, Long, Int]](
+classOf[scala.Tuple5[Int, Long, String, Long, Int]],
+Array(),
+Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+override def createSerializer(config: ExecutionConfig):
+TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+  private val bigger : Tuple5[Int, Long, String, Long, Int]  =
+new scala.Tuple5[Int, Long, String, Long, Int](10, 100L, "HelloWorld", 
200L, 20)
--- End diff --

Scala tuples can be creates much nicer:
`new scala.Tuple5[Int, Long, String, Long, Int](10, 100L, "HelloWorld", 
200L, 20)` -> `(10, 100L, "HelloWorld", 200L, 20)`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331695#comment-15331695
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156560
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo[scala.Tuple5[Int, Long, 
String, Long, Int]](
+classOf[scala.Tuple5[Int, Long, String, Long, Int]],
+Array(),
+Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+Array("_1", "_2","_3","_4","_5")) {
+override def createSerializer(config: ExecutionConfig):
+TypeSerializer[(Int, Long, String, Long, Int)] = ???
+  }
+
+  private val bigger : Tuple5[Int, Long, String, Long, Int]  =
--- End diff --

Scala infers the type. `Tuple5[Int, Long, String, Long, Int]` can be removed


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331691#comment-15331691
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156378
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.{SelectByMaxFunction, 
SelectByMinFunction}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+  val tupleTypeInfo = new CaseClassTypeInfo[scala.Tuple5[Int, Long, 
String, Long, Int]](
--- End diff --

`(Int, Long, String, Long, Int)`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331687#comment-15331687
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156229
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+try {
+  collection.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(5)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(-1)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(1,2,3,4,-1)
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
--- End diff --

`maxBy` -> `minBy`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331689#comment-15331689
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156244
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+try {
+  collection.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(5)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(-1)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(1,2,3,4,-1)
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
+* is used on a custom data type.
+*/
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val customDS = env.fromCollection(customTypeData)
+// should not work: groups on custom type
+customDS.minBy(0)
+  }
+
+  /**
+* This test validates that no exceptions is thrown when an empty 
grouping
+* calls minBy().
+*/
+  @Test
+  def testMinByKeyFieldsGrouping() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val groupDs: GroupedDataSet[scala.Tuple5[Int, Long, String, Long, 
Int]] =
+  env.fromCollection(emptyTupleData).groupBy(0)
+// should work
+try {
+  groupDs.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail()
+}
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
--- End diff --

minBy


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
>   

[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331686#comment-15331686
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156170
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+
+  @Test
+  def testMaxByKeyFieldsDataset(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+try {
+  collection.maxBy(0, 1, 2, 3, 4)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.maxBy(5)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.maxBy(-1)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.maxBy(1,2,3,4,-1)
+  }
+
+  /**
+* This test validates that no exceptions is thrown when an empty 
grouping
+* calls maxBy().
+*/
+  @Test
+  def testMaxByKeyFieldsGrouping() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val groupDs: GroupedDataSet[scala.Tuple5[Int, Long, String, Long, 
Int]] =
+  env.fromCollection(emptyTupleData).groupBy(0)
+// should work
+try {
+  groupDs.maxBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
+* is used on a custom data type.
+*/
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val customDS = env.fromCollection(customTypeData)
+// should not work: groups on custom type
+customDS.maxBy(0)
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
+* is used on a custom data type.
+*/
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val groupDs: GroupedDataSet[CustomType] = 

[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331685#comment-15331685
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156147
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+try {
+  collection.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(5)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(-1)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(1,2,3,4,-1)
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
+* is used on a custom data type.
+*/
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val customDS = env.fromCollection(customTypeData)
+// should not work: groups on custom type
+customDS.minBy(0)
+  }
+
+  /**
+* This test validates that no exceptions is thrown when an empty 
grouping
+* calls minBy().
+*/
+  @Test
+  def testMinByKeyFieldsGrouping() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val groupDs: GroupedDataSet[scala.Tuple5[Int, Long, String, Long, 
Int]] =
+  env.fromCollection(emptyTupleData).groupBy(0)
+// should work
+try {
+  groupDs.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail()
+}
+  }
+
+  /**
+* This test validates that an InvalidProgrammException is thrown when 
maxBy
+* is used on a custom data type.
+*/
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+

[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331684#comment-15331684
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156136
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+try {
+  collection.minBy(4, 0, 1, 2, 3)
+} catch {
+  case e : Exception => Assert.fail();
+}
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(5)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(-1)
+  }
+
+  /**
+* This test validates that an index which is out of bounds throws an
+* IndexOutOfBOundsExcpetion.
+*/
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val collection = env.fromCollection(emptyTupleData)
+
+// should not work, key out of tuple bounds
+collection.minBy(1,2,3,4,-1)
--- End diff --

spaces: `1, 2, 3, 4, -1`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331680#comment-15331680
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67156011
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  @Test
--- End diff --

new line


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331679#comment-15331679
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67155983
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

use `List[(Int, Long, String, Long, Int)]`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331668#comment-15331668
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67154971
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

`List[scala.Tuple5[Int, Long, String, Long, Int]]` -> `List[(Int, Long, 
String, Long, Int)]`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331657#comment-15331657
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153992
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], val fields : 
Array[Int])
+  extends ReduceFunction[T] {
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= t.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
--- End diff --

`MinReduceFunction` -> `SelectByMinFunction`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331654#comment-15331654
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153818
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], val fields : 
Array[Int])
+  extends ReduceFunction[T] {
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= t.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
+}
+// Check whether type is comparable
+
--- End diff --

remove line or move comment down


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331656#comment-15331656
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153910
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], val fields : 
Array[Int])
+  extends ReduceFunction[T] {
+  for(f1 <- fields) {
--- End diff --

`f1` -> `f`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331658#comment-15331658
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67154017
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], val fields : 
Array[Int])
+  extends ReduceFunction[T] {
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= t.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
+}
+// Check whether type is comparable
+
+if (!t.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new IllegalArgumentException(
+"MinReduceFunction supports only key(Comparable) types.")
--- End diff --

`MinReduceFunction` -> `SelectByMinFunction`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331655#comment-15331655
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153883
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TupleTypeInfoBase[T], val fields : 
Array[Int])
--- End diff --

do not make `fields` a `val`. Doesn't need to be accessed from outside.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331653#comment-15331653
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153789
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
+  private val typeInf = t
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
+}
+// Check whether type is comparable
+
--- End diff --

remove new line or move comment down


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331652#comment-15331652
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153641
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
+  private val typeInf = t
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
+}
+// Check whether type is comparable
+
+if 
(!typeInf.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new IllegalArgumentException(
+"MinReduceFunction supports only key(Comparable) types.")
--- End diff --

`Min` -> `Max`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331651#comment-15331651
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67153625
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
+  private val typeInf = t
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.")
--- End diff --

`Min` -> `Max`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331641#comment-15331641
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67152398
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
--- End diff --

Remove `fields`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331640#comment-15331640
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67152375
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
--- End diff --

`f` -> `fields`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331637#comment-15331637
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67152249
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
+  private val typeInf = t
+  for(f1 <- fields) {
--- End diff --

`f1` -> `f`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331638#comment-15331638
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67152304
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TupleTypeInfoBase[T], val f : Array[Int])
+  extends ReduceFunction[T] {
+  private val fields = f
+  private val typeInf = t
--- End diff --

No need to store `typeInf`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331629#comment-15331629
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67151472
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +357,40 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Int*) : DataSet[T]  = {
+if (!set.getType().isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](set.getType.asInstanceOf[TupleTypeInfoBase[T]],
+  fields.toArray))
+  }
+
+  /**
+* Applies a special case of a reduce transformation (minBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def minBy(fields: Int*) : DataSet[T]  = {
+if (!set.getType().isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
--- End diff --

`maxBy` -> `minBy`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331621#comment-15331621
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67150934
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +357,40 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
--- End diff --

Fix code highlight + class references.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331620#comment-15331620
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67150891
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
--- End diff --

Link to other classes like this `[[ReduceFunction]]`.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331619#comment-15331619
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67150746
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* minBy(0)[0, 1]
+* minBy(1)[1, 0]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* minBy(0, 1)[0, 0]
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*/
+  def minBy(fields: Int*) : DataSet[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+reduce(new 
SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
+
+  /**
+* Selects an element with maximum value.
+*
+* The maximum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
+* results will be:
+*
+* maxBy(0)[1, 0]
+* maxBy(1)[0, 1]
+* Example 2: Given a data set with elements [0, 0], [0, 1], the
+* results will be:
+* maxBy(0, 1)[0, 1]
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Int*) : DataSet[T] = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+reduce(new 
SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], 
fields.toArray))
+  }
--- End diff --

Add a new line


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331618#comment-15331618
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67150681
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+* Selects an element with minimum value.
+*
+* The minimum is computed over the specified fields in lexicographical 
order.
+*
+* Example 1: Given a data set with elements [0, 1], [1, 0], the
--- End diff --

Please use MarkDown to highlight code (see `DataSet.join()` for an example).


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331596#comment-15331596
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67148632
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -521,7 +521,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMinFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

Can you please revert these style changes as well? We try to avoid touching 
files without a real need. Thanks


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331402#comment-15331402
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
@fhueske 
The merge is also done now. Let me know what you thin of the recent 
updates. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331367#comment-15331367
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
I did mvn clean verify to ensure no warning I generate. Next time will 
ensure I run this command to ensure local warnings are cleared off.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331368#comment-15331368
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
Oh. Let me rebase. I did not do that. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331364#comment-15331364
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117392
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
--- End diff --

Ok. Got it.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331360#comment-15331360
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117310
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331359#comment-15331359
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117302
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
--- End diff --

Changed every where. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331361#comment-15331361
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117319
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.");
+}
+// Check whether type is comparable
+
+if 
(!typeInf.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new java.lang.IllegalArgumentException(
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331363#comment-15331363
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117382
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  private val tupleTypeInfo: TupleTypeInfo[Tuple5[Integer, Long, String, 
Long, Integer]] =
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331362#comment-15331362
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117372
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331347#comment-15331347
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117197
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331351#comment-15331351
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117230
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -23,9 +23,11 @@ import 
org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceF
 import org.apache.flink.api.common.operators.{Keys, Order}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
+import org.apache.flink.api.java.functions.{FirstReducer, KeySelector, 
SelectByMaxFunction, SelectByMinFunction}
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331348#comment-15331348
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117204
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+return new ReduceOperator[T](
--- End diff --

Done. I understand what you are saying here. 


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331357#comment-15331357
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117279
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +358,44 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!set.javaSet.getType().isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+new ReduceOperator[T](
+  set.javaSet,
+  new SelectByMaxFunction[T](set.javaSet.getType, fields),
+  getCallLocationName())
+  }
+
+  /**
+* Applies a special case of a reduce transformation (minBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331350#comment-15331350
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117219
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+return new ReduceOperator[T](
+  javaSet,
+  new SelectByMinFunction[T](getType(), fields),
+  getCallLocationName())
+  }
+
+  /**
+* Selects an element with maximum value.
+* 
+* The maximum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* maxBy(0): [1, 0]
+* maxBy(1): [0, 1]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* maxBy(0, 1): [0, 1]
+* 
+* 
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T] = {
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331353#comment-15331353
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117255
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +358,44 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!set.javaSet.getType().isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+new ReduceOperator[T](
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331356#comment-15331356
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117267
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +358,44 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!set.javaSet.getType().isTupleType) {
--- End diff --

Ok


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331352#comment-15331352
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117243
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +358,44 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T]  = {
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331358#comment-15331358
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117289
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331345#comment-15331345
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117157
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -30,12 +30,14 @@ import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.Partitio
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
+import org.apache.flink.api.java.functions._
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331346#comment-15331346
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117165
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331343#comment-15331343
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117140
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -19,11 +19,12 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 @Internal
-public class SelectByMinFunction implements 
ReduceFunction {
+public class SelectByMinFunction implements ReduceFunction {
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331341#comment-15331341
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117132
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
 ---
@@ -36,7 +37,7 @@
 * is regarded in the reduce function. First index has highest priority 
and last index has
 * least priority.
 */
-   public SelectByMaxFunction(TupleTypeInfo type, int... fields) {
+   public SelectByMaxFunction(TypeInformation type, int... fields) {
--- End diff --

Done. Since we have added new scala class these changes are unnecessary. 
Thanks for pointing it out.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331339#comment-15331339
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67117088
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -557,7 +557,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMaxFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

Done


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330582#comment-15330582
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
One more thing. It would be good to rebase the PR to the current master. 
Thanks!


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330576#comment-15330576
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/1856
  
Hi @ramkrish86, your approach is correct but the PR suffers from quite a 
few Scala issues. 
Please fix the problems I pointed out and double check your code for 
compiler warnings, unused imports, semicolons in Scala code, etc.

Thanks, Fabian


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330571#comment-15330571
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67054034
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  private val tupleTypeInfo: TupleTypeInfo[Tuple5[Integer, Long, String, 
Long, Integer]] =
--- End diff --

`tupleTypeInfo` is not used. Please check the compiler warnings of your IDE.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330565#comment-15330565
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67053326
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMinFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+/**
+  * SelectByMinFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMinFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
--- End diff --

Comments of the `SelectByMaxFunction` apply here as well


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330563#comment-15330563
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67053263
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.");
+}
+// Check whether type is comparable
+
+if 
(!typeInf.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new java.lang.IllegalArgumentException(
+"MinReduceFunction supports only key(Comparable) types.");
+}
+  }
+  override def reduce(value1: T, value2: T): T = {
--- End diff --

new line


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330560#comment-15330560
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67052986
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.");
+}
+// Check whether type is comparable
+
+if 
(!typeInf.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new java.lang.IllegalArgumentException(
+"MinReduceFunction supports only key(Comparable) types.");
+}
+  }
+  override def reduce(value1: T, value2: T): T = {
+for (f <- fields) {
+  val element1  = 
value1.asInstanceOf[Product].productElement(f).asInstanceOf[Comparable[Any]]
--- End diff --

indention


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330557#comment-15330557
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67052820
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
+  for(f1 <- fields) {
+if (f1 < 0 || f1 >= typeInf.getArity()) {
+  throw new IndexOutOfBoundsException(
+"MinReduceFunction field position " + f1 + " is out of range.");
+}
+// Check whether type is comparable
+
+if 
(!typeInf.asInstanceOf[TupleTypeInfoBase[T]].getTypeAt(f1).isKeyType()) {
+  throw new java.lang.IllegalArgumentException(
--- End diff --

Use Scala's `IllegalArgumentException`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330548#comment-15330548
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67052302
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
--- End diff --

Since this function is specific for tuples, you can also use 
`TupleTypeInfoBase` instead of `TypeInformation`


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330545#comment-15330545
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67051905
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
+  private val typeInf = t;
--- End diff --

No need to store the type information.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330538#comment-15330538
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67051269
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/SelectByMaxFunction.scala 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.annotation.Internal
+
+/**
+  * SelectByMaxFunction to work with Scala tuples
+  */
+@Internal
+class SelectByMaxFunction[T](t : TypeInformation[T], val f : Array[Int]) 
extends  ReduceFunction[T]{
+  private val fields = f;
--- End diff --

no `;` in Scala code


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330536#comment-15330536
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050944
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -19,11 +19,12 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 @Internal
-public class SelectByMinFunction implements 
ReduceFunction {
+public class SelectByMinFunction implements ReduceFunction {
--- End diff --

Please revert all changes in this class.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330535#comment-15330535
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050932
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
 ---
@@ -36,7 +37,7 @@
 * is regarded in the reduce function. First index has highest priority 
and last index has
 * least priority.
 */
-   public SelectByMaxFunction(TupleTypeInfo type, int... fields) {
+   public SelectByMaxFunction(TypeInformation type, int... fields) {
--- End diff --

Please revert all changes in this class.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330530#comment-15330530
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050779
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+return new ReduceOperator[T](
+  javaSet,
+  new SelectByMinFunction[T](getType(), fields),
+  getCallLocationName())
+  }
+
+  /**
+* Selects an element with maximum value.
+* 
+* The maximum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* maxBy(0): [1, 0]
+* maxBy(1): [0, 1]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* maxBy(0, 1): [0, 1]
+* 
+* 
+* If multiple values with maximum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T] = {
--- End diff --

Comments on `minBy()` apply here as well.

Also please move these methods up between `combineGroup` and `first`.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330533#comment-15330533
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050838
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -557,7 +557,7 @@ public long count() throws Exception {
}
 
return new ReduceOperator<>(this, new SelectByMaxFunction(
-   (TupleTypeInfo) getType(), fields), 
Utils.getCallLocationName());
--- End diff --

Please revert this change.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330527#comment-15330527
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050534
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!getType.isTupleType) {
+  throw new InvalidProgramException("DataSet#minBy(int...) only works 
on Tuple types.")
+}
+
+return new ReduceOperator[T](
--- End diff --

do not use `return`. In Scala the last expression is implicitly returned.

Do not create a Java API `ReduceOperator`. Use `reduce(new 
SelectByMinFunction(...))` instead.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330524#comment-15330524
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050338
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
--- End diff --

`fields: Int*` instead of `fields: Array[Int]` (see groupBy(fields: Int*)`. 
Do not return a Java API ReduceOperator. This will mix Java and Scala 
DataSet API which must not happen! Return `DataSet[T]` instead.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330520#comment-15330520
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67049994
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -30,12 +30,14 @@ import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.Partitio
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
+import org.apache.flink.api.java.functions._
--- End diff --

Revert this change and all other unnecessary imports.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330521#comment-15330521
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67050097
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
--- End diff --

This class does not use HTML for mark-up in comment. Please follow the 
style of the class.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330519#comment-15330519
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67049929
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -23,9 +23,11 @@ import 
org.apache.flink.api.common.functions.{GroupCombineFunction, GroupReduceF
 import org.apache.flink.api.common.operators.{Keys, Order}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
+import org.apache.flink.api.java.functions.{FirstReducer, KeySelector, 
SelectByMaxFunction, SelectByMinFunction}
--- End diff --

Remove `SelectByMaxFunction` and all other unnecessary imports.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330518#comment-15330518
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67049840
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -356,6 +358,44 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+* Applies a special case of a reduce transformation (maxBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def maxBy(fields: Array[Int]) : ReduceOperator[T]  = {
+if (!set.javaSet.getType().isTupleType) {
+  throw new InvalidProgramException("DataSet#maxBy(int...) only works 
on Tuple types.")
+}
+new ReduceOperator[T](
+  set.javaSet,
+  new SelectByMaxFunction[T](set.javaSet.getType, fields),
+  getCallLocationName())
+  }
+
+  /**
+* Applies a special case of a reduce transformation (minBy) on a 
grouped {@link DataSet}.
+* The transformation consecutively calls a {@link ReduceFunction}
+* until only a single element remains which is the result of the 
transformation.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param fields Keys taken into account for finding the minimum.
+* @return A { @link ReduceOperator} representing the minimum.
+*/
+  def minBy(fields: Array[Int]) : ReduceOperator[T]  = {
--- End diff --

Comments of `maxBy` apply here as well.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >