[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-26 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r64779643
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

@tillrohrmann  - Any chance of a review on the updated push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-20 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r6491
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

@tillrohrmann  - I have done the needful updates and now I can see maxBy 
and minBy works with scala tuples. It took some time as was busy with other 
things but some how could find time to complete this. In the process learnt 
some scala too. Let me know what you think of this last  commit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r62320682
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

Thanks for the tip @tillrohrmann . Let me see how I can adapt the 
CaseClassTypeInfo for SelectByMax/Min function. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r62310212
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

The `CaseClassTypeInfo` is the type information which is created for Scala 
tuples, if I'm not mistaken. And all Scala tuples are of type `Product`. With 
that you should be able to adapt the `SelectByMax/MinFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r62309620
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

I think the purpose of Flink-3650 is adding support for Scala tuples. Thus, 
I would rather drop support for Java tuples in the Scala API than for Scala 
tuples. I would assume that you have to implement a Scala specific 
`SelectByMax/MinFunction` to support Scala tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-06 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r62296025
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

@tillrohrmann 
Any suggestions here?  Should we handle the scala Tuple in another JIRA?  
Am not an expert in Scala. Just started working with it. So if it can be 
handled in another JIRA, this PR can be integrated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-04 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r62025502
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

I just updated the PR with the return type ReduceOperator. I was trying to 
work with scala Tuples and tried creating an example like 
SelectByFunctionsTest.  TupleTypeInfo and related things lik TupleSerializer 
and Tuplecomparator all accepts only java based tuples and not scala tuples. So 
am not pretty sure on how to make this SelectByMaxFunction to work with scala 
tuples.  Is there any other code that makes thing work with both scala.Tuple 
and java tuples?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-03 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61918711
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1599,7 +1601,77 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
 javaSet.output(outputFormat)
   }
-  
+
+  /**
+* Selects an element with minimum value.
+* 
+* The minimum is computed over the specified fields in lexicographical 
order.
+* 
+* Example 1: Given a data set with elements [0, 
1], [1, 0], the
+* results will be:
+* 
+* minBy(0): [0, 1]
+* minBy(1): [1, 0]
+* 
+* 
+* Example 2: Given a data set with elements [0, 
0], [0, 1], the
+* results will be:
+* 
+* minBy(0, 1): [0, 0]
+* 
+* 
+* If multiple values with minimum value at the specified fields exist, 
a random one will be
+* picked.
+* 
+* Internally, this operation is implemented as a {@link 
ReduceFunction}.
+*
+*/
+  def minBy(fields: Int*) : Unit  = {
--- End diff --

This should return the ReduceOperator. My bad. Not sure whether the 
existing test case really tests the entire functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61854388
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

I guess so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-03 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61854150
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

> Thus, I would assume that you can't reuse the Java implementation here.
So the point here is that SelectbyMaxFunction has to be refactored to work 
with Scala tuples also. Or add something new?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61853639
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  private val tupleTypeInfo: TupleTypeInfo[Tuple5[Integer, Long, String, 
Long, Integer]] =
--- End diff --

A Scala tuple is `scala.Tuple2[Int, String]`, for example. But not 
`o.a.f.api.java.tuple.Tuple2`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61853470
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

Yes, since it was intended to work with Flink's Java tuples. But a Scala 
tuple is not a subtype of `Tuple`. Thus, I would assume that you can't reuse 
the Java implementation here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-02 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1856#issuecomment-216442490
  
Removed the unused import. That was causing a build error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-02 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1856#issuecomment-216300472
  
Added maxBy and minBy to GroupedDataSet too. So should be fine now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-02 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61769507
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  private val tupleTypeInfo: TupleTypeInfo[Tuple5[Integer, Long, String, 
Long, Integer]] =
--- End diff --

Sorry. I am not sure on this. Can you give some examples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-05-02 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r61769474
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

I have a doubt here. Previously also this SelectByMaxFunction was extending 
T extends Tuple. So TupleTypeInfo was actually allowed here. So even now we are 
allowing TypeInformation which is the super class of TupleTypeInfo. So it 
should work the same way as it was earlier?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-24 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1856#issuecomment-214132181
  
Thanks @tillrohrmann . I was on vacation for the last week. I will get back 
to this and update the PR ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1856#issuecomment-213030347
  
Thanks for your contribution @ramkrish86. Good work. But before merging, we 
should address the support for Scala tuples and add the `maxBy/minBy` to the 
Scala `GroupedDataSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r60620601
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple, Tuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert;
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[Tuple5[Integer, Long, String, Long, 
Integer]]()
+  private val customTypeData = List[CustomType](new CustomType())
+  private val tupleTypeInfo: TupleTypeInfo[Tuple5[Integer, Long, String, 
Long, Integer]] =
--- End diff --

We should add a test for Scala tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r60620477
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -72,8 +73,8 @@ public T reduce(T value1, T value2) throws Exception {
for (int position : fields) {
// Save position of compared key
// Get both values - both implement comparable
-   Comparable comparable1 = 
value1.getFieldNotNull(position);
-   Comparable comparable2 = 
value2.getFieldNotNull(position);
+   Comparable comparable1 = 
((Tuple)value1).getFieldNotNull(position);
--- End diff --

This won't work with Scala tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r60620452
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -48,7 +49,7 @@ public SelectByMinFunction(TupleTypeInfo type, int... 
fields) {
}
 
// Check whether type is comparable
-   if (!type.getTypeAt(field).isKeyType()) {
+   if 
(!((TupleTypeInfo)type).getTypeAt(field).isKeyType()) {
--- End diff --

Scala tuple types are not of type `TupleTypeInfo` but instead 
`TupleTypeInfoBase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-06 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1856#issuecomment-206693834
  
The failure in this build 
https://travis-ci.org/apache/flink/jobs/121075057 looks unrelated. The 
failure is related to this test case 
testKafkaOffsetRetrievalToZookeeper(org.apache.flink.streaming.connectors.kafka.Kafka08ITCase)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3650 Add maxBy/minBy to Scala DataSet AP...

2016-04-06 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/1856

FLINK-3650 Add maxBy/minBy to Scala DataSet API

I have tried to expose the maxBy/minBy API to scala DataSet. But one thing 
to note is that in the existing scala DataSet API code groupBy() API returns a 
GroupedDataSet whereas in the case of java DataSet API it is UnsortedGrouping. 
The code in scala DataSet is

`  //  public UnsortedGrouping groupBy(String... fields) {
  //new UnsortedGrouping(this, new Keys.ExpressionKeys(fields, 
getType()));
  //  }
`
already commented out. The UnsortedGrouping internally has maxBy and minBy. 
So in this PR I have not tried to change those and hence the test case also 
does not cover groupBy() clause followed by maxBy and minBy ( they are now 
available only in java based MAxOperatorTest class).
Please review and provide valuable feedback.
Please note the change done to SelectByMaxFunction and SelectByMinFunction 
to support all Tuples but the API itself checks if the type is of type Tuple. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3650

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1856.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1856


commit 1b46ebfa3489432adde5a032c892dd5ec6c6d61c
Author: Vasudevan 
Date:   2016-04-06T06:13:07Z

FLINK-3650 Add maxBy/minBy to Scala DataSet API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---