[jira] [Commented] (FLINK-5586) Extend TableProgramsTestBase for object reuse modes

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881829#comment-15881829
 ] 

ASF GitHub Bot commented on FLINK-5586:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3339
  
rebased to the latest master.


> Extend TableProgramsTestBase for object reuse modes
> ---
>
> Key: FLINK-5586
> URL: https://issues.apache.org/jira/browse/FLINK-5586
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Kurt Young
>
> We should also test if all runtime operators of the Table API work correctly 
> if object reuse mode is set to true. This should be done for all 
> cluster-based ITCases, not the collection-based ones.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3339: [FLINK-5586] [table] Extend TableProgramsClusterTestBase ...

2017-02-23 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3339
  
rebased to the latest master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835712
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
 
   /** The initial accumulator for Max aggregate function */
   class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 var i: Int = 0
 while (i < accumulators.size()) {
   val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]]
-  if (ret.sum == null.asInstanceOf[T]) {
-ret.sum = a.sum
-  } else if (a.sum != null.asInstanceOf[T]) {
-ret.sum = numeric.plus(ret.sum, a.sum)
+  if (a.f1) {
+if (!ret.f1) {
--- End diff --

Since we start with `sum = 0` we don't need this condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881429#comment-15881429
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828543
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[SumAccumulator[T]]
-  if (accum.sum == null.asInstanceOf[T]) {
-accum.sum = v
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  if (!a.f1) {
--- End diff --

since we start with `sum = 0` we don't need this condition.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881431#comment-15881431
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828705
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -127,30 +132,31 @@ class DecimalSumAggFunction extends 
AggregateFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
-  if (accum.sum == null) {
-accum.sum = v
+  if (accum.f1 == false) {
--- End diff --

remove condition.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881432#comment-15881432
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[MaxAccumulator[T]]
-  if (accum.max == null || ord.compare(accum.max, v) < 0) {
-accum.max = v
+  val a = accumulator.asInstanceOf[MaxAccumulator[T]]
+  if (!a.f1 || ord.compare(a.f0, v) < 0) {
+a.f0 = v
+if (!a.f1) {
--- End diff --

the condition can be removed. We can simply reassign `true`.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881425#comment-15881425
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 var i: Int = 0
 while (i < accumulators.size()) {
   val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]]
-  if (ret.sum == null.asInstanceOf[T]) {
-ret.sum = a.sum
-  } else if (a.sum != null.asInstanceOf[T]) {
-ret.sum = numeric.plus(ret.sum, a.sum)
+  if (a.f1) {
+if (!ret.f1) {
--- End diff --

Since we start with `sum = 0` we don't need this condition.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881426#comment-15881426
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835712
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
 
   /** The initial accumulator for Max aggregate function */
   class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881430#comment-15881430
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 
   /** The initial accumulator for Sum aggregate function */
   class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`. It will then be passed down from the parent 
class.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881423#comment-15881423
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 
   /** The initial accumulator for Sum aggregate function */
   class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
-var sum: T = null.asInstanceOf[T]
+f0 = 0.asInstanceOf[T] //sum
--- End diff --

change this to `f0 = numeric.zero`


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881427#comment-15881427
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826611
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends 
MaxAggFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]]
-  if (accum.max == null || accum.max.compareTo(v) < 0) {
-accum.max = v
+  if (!accum.f1 || accum.f0.compareTo(v) < 0) {
+accum.f0 = v
+if (!accum.f1) {
--- End diff --

remove condition.


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881424#comment-15881424
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 ---
@@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
 
   /** The initial accumulator for Min aggregate function */
   class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 
   /** The initial accumulator for Sum aggregate function */
   class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`. It will then be passed down from the parent 
class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828705
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -127,30 +132,31 @@ class DecimalSumAggFunction extends 
AggregateFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
-  if (accum.sum == null) {
-accum.sum = v
+  if (accum.f1 == false) {
--- End diff --

remove condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881428#comment-15881428
 ] 

ASF GitHub Bot commented on FLINK-5767:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826652
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 ---
@@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[MinAccumulator[T]]
-  if (accum.max == null || ord.compare(accum.max, v) > 0) {
-accum.max = v
+  val a = accumulator.asInstanceOf[MinAccumulator[T]]
+  if (!a.f1 || ord.compare(a.f0, v) > 0) {
+a.f0 = v
+if (!a.f1) {
--- End diff --

Remove condition


> New aggregate function interface and built-in aggregate functions
> -
>
> Key: FLINK-5767
> URL: https://issues.apache.org/jira/browse/FLINK-5767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> Add a new aggregate function interface. This includes implementing the 
> aggregate interface, migrating the existing aggregation functions to this 
> interface, and adding the unit tests for these functions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102828543
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[SumAccumulator[T]]
-  if (accum.sum == null.asInstanceOf[T]) {
-accum.sum = v
+  val a = accumulator.asInstanceOf[SumAccumulator[T]]
+  if (!a.f1) {
--- End diff --

since we start with `sum = 0` we don't need this condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826611
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends 
MaxAggFunction[BigDecimal] {
 if (value != null) {
   val v = value.asInstanceOf[BigDecimal]
   val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]]
-  if (accum.max == null || accum.max.compareTo(v) < 0) {
-accum.max = v
+  if (!accum.f1 || accum.f0.compareTo(v) < 0) {
+accum.f0 = v
+if (!accum.f1) {
--- End diff --

remove condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 ---
@@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
 
   /** The initial accumulator for Min aggregate function */
   class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
--- End diff --

We can remove the type `T`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState

2017-02-23 Thread shixiaogang
Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3336


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4856) Add MapState for keyed streams

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881755#comment-15881755
 ] 

ASF GitHub Bot commented on FLINK-4856:
---

Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/3336


> Add MapState for keyed streams
> --
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5900) Add non-partial merge Aggregates and unit tests

2017-02-23 Thread Shaoxuan Wang (JIRA)
Shaoxuan Wang created FLINK-5900:


 Summary: Add non-partial merge Aggregates and unit tests
 Key: FLINK-5900
 URL: https://issues.apache.org/jira/browse/FLINK-5900
 Project: Flink
  Issue Type: Improvement
Reporter: Shaoxuan Wang
Assignee: Shaoxuan Wang


Current built-in aggregates all support partial-merge. We are blind and not 
sure if the non-partial aggregate works or not. We should add non-partial merge 
Aggregates and unit tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102835493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
@@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
 
   /** The initial accumulator for Sum aggregate function */
   class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
-var sum: T = null.asInstanceOf[T]
+f0 = 0.asInstanceOf[T] //sum
--- End diff --

change this to `f0 = numeric.zero`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
 ---
@@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[MaxAccumulator[T]]
-  if (accum.max == null || ord.compare(accum.max, v) < 0) {
-accum.max = v
+  val a = accumulator.asInstanceOf[MaxAccumulator[T]]
+  if (!a.f1 || ord.compare(a.f0, v) < 0) {
+a.f0 = v
+if (!a.f1) {
--- End diff --

the condition can be removed. We can simply reassign `true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...

2017-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3354#discussion_r102826652
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
 ---
@@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: 
Ordering[T]) extends AggregateFun
   override def accumulate(accumulator: Accumulator, value: Any) = {
 if (value != null) {
   val v = value.asInstanceOf[T]
-  val accum = accumulator.asInstanceOf[MinAccumulator[T]]
-  if (accum.max == null || ord.compare(accum.max, v) > 0) {
-accum.max = v
+  val a = accumulator.asInstanceOf[MinAccumulator[T]]
+  if (!a.f1 || ord.compare(a.f0, v) > 0) {
+a.f0 = v
+if (!a.f1) {
--- End diff --

Remove condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5863) Unify the serialization of queryable list states in different backends

2017-02-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5863.
---
Resolution: Fixed

Now that we are refactoring the queryable states, we can make the changes then.

> Unify the serialization of queryable list states in different backends
> --
>
> Key: FLINK-5863
> URL: https://issues.apache.org/jira/browse/FLINK-5863
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>Priority: Minor
>
> Now the deserialization of list states is implemented in 
> {{KvStateRequestSerializer}}. The serialization however is implemented 
> individually in different backends. 
> We should provide a method in {{KvStateRequestSerializer}} to remove the 
> redundant code.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor

2017-02-23 Thread Xiaogang Shi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaogang Shi closed FLINK-5790.
---
Resolution: Fixed

Fixed via d47446cafffe0d34d89488f6eb860aa139ceb3f1

> Use list types when ListStateDescriptor extends StateDescriptor
> ---
>
> Key: FLINK-5790
> URL: https://issues.apache.org/jira/browse/FLINK-5790
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Flink keeps the state serializer in {{StateDescriptor}}, but it's the 
> serializer of list elements  that is put in {{ListStateDescriptor}}. The 
> implementation is a little confusing. Some backends need to construct the 
> state serializer with the element serializer by themselves.
> We should use an {{ArrayListSerializer}}, which is composed of the serializer 
> of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid 
> constructing the state serializer.
> If a backend needs customized serialization of the state (e.g. 
> {{RocksDBStateBackend}}), it still can obtain the element serializer from the 
> {{ArrayListSerializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5899) Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction

2017-02-23 Thread Shaoxuan Wang (JIRA)
Shaoxuan Wang created FLINK-5899:


 Summary: Fix the bug in initializing the 
DataSetTumbleTimeWindowAggReduceGroupFunction
 Key: FLINK-5899
 URL: https://issues.apache.org/jira/browse/FLINK-5899
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Shaoxuan Wang
Assignee: Shaoxuan Wang


The row length used to initialize DataSetTumbleTimeWindowAggReduceGroupFunction 
was not set properly. (I think this is introduced by mistake when merging the 
code).
We currently lack the built-in non-partial-merge Aggregates. Therefore this has 
not been captured by the unit test. 

Reproduce step:
1. set the "supportPartial" to false for SumAggregate
2. Then both testAllEventTimeTumblingWindowOverTime and 
testEventTimeTumblingGroupWindowOverTime will fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881737#comment-15881737
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske],

How about this approach:
We both provide {{FilterableTableSource}} and {{PartitionableTableSource}}, 
keep {{FilterableTableSource}} as it is, and add methods like 
{{getAllPartitions}} and {{applyPartitionPruning}} to 
{{PartitionableTableSource}}. From a developer's point of view, we can treat 
these two traits completely independent. It will be easier for a developer to 
implement each functionality independently in comparing with mixing all the 
logic into the {{FilterableTableSource. setPredicate()}}. Also in the future, i 
think it will be very likely that these two traits will be applied by framework 
in different optimization stage. We apply the partition pruning as early as 
possible in the logical optimization and let filter pushdown been applied a 
little bit later because it should do some heavy weighted physical level 
analysis first. 
BTW, this approach still can achieve the approach you suggested, you can 
implement {{FilterableTableSource}} only and do all the pruning and filtering 
if you like. 

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881789#comment-15881789
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102865002
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -78,20 +78,7 @@ object UserDefinedFunctionUtils {
   function: UserDefinedFunction,
   signature: Seq[TypeInformation[_]])
 : Option[Array[Class[_]]] = {
-// We compare the raw Java classes not the TypeInformation.
-// TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
-val actualSignature = typeInfoToClass(signature)
-val signatures = getSignatures(function)
-
-signatures
-  // go over all signatures and find one matching actual signature
-  .find { curSig =>
-  // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
-curSig.zipWithIndex.forall { case (clazz, i) =>
-  parameterTypeEquals(actualSignature(i), clazz)
-}
-}
--- End diff --

I deleted them, because both methods are simply copy and paste. One was 
used for ScalarFunction, the other was used for TableFunction.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881790#comment-15881790
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102864763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

This is correct. Because if there is multiple methods found (override), it 
will throw another exception.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-23 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102865002
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -78,20 +78,7 @@ object UserDefinedFunctionUtils {
   function: UserDefinedFunction,
   signature: Seq[TypeInformation[_]])
 : Option[Array[Class[_]]] = {
-// We compare the raw Java classes not the TypeInformation.
-// TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
-val actualSignature = typeInfoToClass(signature)
-val signatures = getSignatures(function)
-
-signatures
-  // go over all signatures and find one matching actual signature
-  .find { curSig =>
-  // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
-curSig.zipWithIndex.forall { case (clazz, i) =>
-  parameterTypeEquals(actualSignature(i), clazz)
-}
-}
--- End diff --

I deleted them, because both methods are simply copy and paste. One was 
used for ScalarFunction, the other was used for TableFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-23 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102864763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

This is correct. Because if there is multiple methods found (override), it 
will throw another exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881329#comment-15881329
 ] 

ASF GitHub Bot commented on FLINK-5710:
---

Github user haohui closed the pull request at:

https://github.com/apache/flink/pull/3370


> Add ProcTime() function to indicate StreamSQL
> -
>
> Key: FLINK-5710
> URL: https://issues.apache.org/jira/browse/FLINK-5710
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Stefano Bortoli
>Assignee: Stefano Bortoli
>Priority: Minor
>
> procTime() is a parameterless scalar function that just indicates processing 
> time mode



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5242) Implement Scala API for BipartiteGraph

2017-02-23 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-5242:
--
Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-2254

> Implement Scala API for BipartiteGraph
> --
>
> Key: FLINK-5242
> URL: https://issues.apache.org/jira/browse/FLINK-5242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>  Labels: features
>
> Should implement BipartiteGraph in flink-gelly-scala project similarly to 
> Graph class.
> Depends on this: https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3404: [FLINK-5597] [docs] Improve the LocalClusteringCoe...

2017-02-23 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3404

[FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5597_improve_the_localclusteringcoefficient_documentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3404






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5597) Improve the LocalClusteringCoefficient documentation

2017-02-23 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-5597:
--
Fix Version/s: 1.3.0

> Improve the LocalClusteringCoefficient documentation
> 
>
> Key: FLINK-5597
> URL: https://issues.apache.org/jira/browse/FLINK-5597
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Gelly
>Affects Versions: 1.3.0
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> The LocalClusteringCoefficient usage section should explain what is the 
> algorithm output and how to retrieve the actual local clustering coefficient 
> scores from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102830609
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Good catch, @tzulitai !

I tried the buffer approach and had no luck. The problem is that calling 
`emitRecord`needs to pass in both the offset and the record itself -- The 
record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer 
itself needs to buffer the deserialized value and the record itself -- it 
cannot solve the problem of having a collector per record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-23 Thread haohui
Github user haohui closed the pull request at:

https://github.com/apache/flink/pull/3370


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881376#comment-15881376
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102830609
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Good catch, @tzulitai !

I tried the buffer approach and had no luck. The problem is that calling 
`emitRecord`needs to pass in both the offset and the record itself -- The 
record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer 
itself needs to buffer the deserialized value and the record itself -- it 
cannot solve the problem of having a collector per record.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5597) Improve the LocalClusteringCoefficient documentation

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881320#comment-15881320
 ] 

ASF GitHub Bot commented on FLINK-5597:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3404

[FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
5597_improve_the_localclusteringcoefficient_documentation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3404






> Improve the LocalClusteringCoefficient documentation
> 
>
> Key: FLINK-5597
> URL: https://issues.apache.org/jira/browse/FLINK-5597
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Gelly
>Affects Versions: 1.3.0
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
>
> The LocalClusteringCoefficient usage section should explain what is the 
> algorithm output and how to retrieve the actual local clustering coefficient 
> scores from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread godfrey he (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999
 ] 

godfrey he commented on FLINK-5859:
---

Hi, [~fhueske], Thanks for your advice.
yes, partition pruning is a kind of coarse-grained filter push-down, both 
filter-pushdown and partition-pruning have common parts that are extracting 
predicate from filter-condition base on the interest of different datasources. 
But, IMO, filter-pushdown and partition-pruning are independent concept in 
general. 
The following table shows that different datasources have different traits:

||Trait||Example||
|filter-pushdown only|MySQL, HBase|
|partiton-pruning only|CSV, TEXT|
|both filter-pushdown and partition-pruning| Parquet, Druid|

IMO, we should provide a clear concept as [~ykt836] mentioned above for 
developers, that includes both FilterableTableSource and 
PartitionableTableSource.

Looking forward to your advice, thanks.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5901:

Description: 
The DAG of running jobs can not show properly in IE11(I am using 
11.0.9600.18059, but assuming same with IE9). The description of task is 
not shown within the rectangle.

Chrome is well. I pasted the screeshot under IE and Chrome below.

  was:
The DAG of running jobs can not show properly in IE11(I am using 
11.0.9600.18059, but assuming same with IE9). The description of task is 
not shown within the rectangle.

Chrome is well.


> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
> Attachments: using chrom(same job).png, using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well. I pasted the screeshot under IE and Chrome below.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5902) Some images can not show in IE

2017-02-23 Thread Tao Wang (JIRA)
Tao Wang created FLINK-5902:
---

 Summary: Some images can not show in IE
 Key: FLINK-5902
 URL: https://issues.apache.org/jira/browse/FLINK-5902
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
 Environment: IE
Reporter: Tao Wang


Some images in the Overview page can not show in IE, as it is good in Chrome.

I'm using IE 11, but think same with IE9 I'll paste the screenshot later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5901:

Attachment: IE11 with problem.png

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
> Attachments: IE11 with problem.png, using chrom(same job).png, using 
> IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well. I pasted the screeshot under IE and Chrome below.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881989#comment-15881989
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881264
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I see what you are saying. The trade off here is handing offs the objects 
another time, but I think it's okay. I'll update the PR accordingly.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread godfrey he (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999
 ] 

godfrey he edited comment on FLINK-5859 at 2/24/17 5:59 AM:


Hi, [~fhueske], Thanks for your advice.
yes, partition pruning is a kind of coarse-grained filter push-down, both 
filter-pushdown and partition-pruning have common part that is extracting 
predicate from filter-condition base on the interest of different datasources. 
But, filter-pushdown and partition-pruning are independent concept in general. 
The following table shows that different datasources have different traits:

||Trait||Example||
|filter-pushdown only|MySQL, HBase|
|partiton-pruning only|CSV, TEXT|
|both filter-pushdown and partition-pruning| Parquet, Druid|

IMO, we should provide a clear concept as [~ykt836] mentioned above for 
developers, that includes both FilterableTableSource and 
PartitionableTableSource.

Looking forward to your advice, thanks.



was (Author: godfreyhe):
Hi, [~fhueske], Thanks for your advice.
yes, partition pruning is a kind of coarse-grained filter push-down, both 
filter-pushdown and partition-pruning have common parts that are extracting 
predicate from filter-condition base on the interest of different datasources. 
But, IMO, filter-pushdown and partition-pruning are independent concept in 
general. 
The following table shows that different datasources have different traits:

||Trait||Example||
|filter-pushdown only|MySQL, HBase|
|partiton-pruning only|CSV, TEXT|
|both filter-pushdown and partition-pruning| Parquet, Druid|

IMO, we should provide a clear concept as [~ykt836] mentioned above for 
developers, that includes both FilterableTableSource and 
PartitionableTableSource.

Looking forward to your advice, thanks.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5902) Some images can not show in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5902:

Attachment: chrome is ok.png

> Some images can not show in IE
> --
>
> Key: FLINK-5902
> URL: https://issues.apache.org/jira/browse/FLINK-5902
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE
>Reporter: Tao Wang
> Attachments: chrome is ok.png, IE 11 with problem.png
>
>
> Some images in the Overview page can not show in IE, as it is good in Chrome.
> I'm using IE 11, but think same with IE9 I'll paste the screenshot 
> later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5902) Some images can not show in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5902:

Attachment: IE 11 with problem.png

> Some images can not show in IE
> --
>
> Key: FLINK-5902
> URL: https://issues.apache.org/jira/browse/FLINK-5902
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE
>Reporter: Tao Wang
> Attachments: IE 11 with problem.png
>
>
> Some images in the Overview page can not show in IE, as it is good in Chrome.
> I'm using IE 11, but think same with IE9 I'll paste the screenshot 
> later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5901:

Attachment: (was: IE11 with problem.png)

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
> Attachments: using chrom(same job).png, using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well. I pasted the screeshot under IE and Chrome below.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5570) Support register external catalog to table environment

2017-02-23 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-5570:
-
Description: 
This issue aims to support register one or more {{ExternalCatalog}} (which is 
referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
{{TableEnvironment}}. After registration, SQL and TableAPI queries could access 
to tables in the external catalogs without register those tables one by one to 
{{TableEnvironment}} beforehand.

We plan to add two APIs in {{TableEnvironment}}:
1. register externalCatalog
{code}
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
Unit
{code}
2. scan a table from registered catalog and returns the resulting {{Table}},  
the API is very useful in TableAPI queries.
{code}
def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
{code}

> Support register external catalog to table environment
> --
>
> Key: FLINK-5570
> URL: https://issues.apache.org/jira/browse/FLINK-5570
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> This issue aims to support register one or more {{ExternalCatalog}} (which is 
> referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
> {{TableEnvironment}}. After registration, SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables one 
> by one to {{TableEnvironment}} beforehand.
> We plan to add two APIs in {{TableEnvironment}}:
> 1. register externalCatalog
> {code}
> def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
> Unit
> {code}
> 2. scan a table from registered catalog and returns the resulting {{Table}},  
> the API is very useful in TableAPI queries.
> {code}
> def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3405: [FLINK-5899] [table] Fix the bug in EventTimeTumbl...

2017-02-23 Thread shaoxuan-wang
GitHub user shaoxuan-wang opened a pull request:

https://github.com/apache/flink/pull/3405

[FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for 
non-partialMerge aggregate

I have changed the supportPartial to false for all built-in Aggregates, and 
run all the UTs. Luckily this is the only bug we have so far.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shaoxuan-wang/flink F5899-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3405


commit bb233179d06325b481fe0e2a903a55c547529f06
Author: shaoxuan-wang 
Date:   2017-02-24T03:57:44Z

[FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for 
non-partialMerge aggregate




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881967#comment-15881967
 ] 

Scott Kidder commented on FLINK-5898:
-

Hi [~tzulitai],

I'll look into fixing this in the KPL. I noticed that the method that installs 
the KPL binary uses a shared lock, which would allow multiple processes to 
obtain overlapping locks and write to the same file simultaneously:
https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java#L815

I'll try patching the KPL to obtain an exclusive lock. I'll also file a Github 
issue against the KPL to see what the KPL authors think.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-02-23 Thread Tao Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882126#comment-15882126
 ] 

Tao Wang commented on FLINK-5903:
-

I've located the reason and fix it ASAP.

> taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in 
> YARN mode
> ---
>
> Key: FLINK-5903
> URL: https://issues.apache.org/jira/browse/FLINK-5903
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Tao Wang
>
> Now Flink did not respect taskmanager.numberOfTaskSlots and 
> yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.
> Details is that taskmanager.numberOfTaskSlots is not working in anyway 
> andyarn.containers.vcores is only used in requesting container(TM) resources 
> but not aware to TM, which means TM will always think it has default(1) Slots 
> if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-02-23 Thread Tao Wang (JIRA)
Tao Wang created FLINK-5903:
---

 Summary: taskmanager.numberOfTaskSlots and yarn.containers.vcores 
did not work well in YARN mode
 Key: FLINK-5903
 URL: https://issues.apache.org/jira/browse/FLINK-5903
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: Tao Wang


Now Flink did not respect taskmanager.numberOfTaskSlots and 
yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.

Details is that taskmanager.numberOfTaskSlots is not working in anyway 
andyarn.containers.vcores is only used in requesting container(TM) resources 
but not aware to TM, which means TM will always think it has default(1) Slots 
if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-02-23 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
> Can we just use the ${project.build.directory} as java.io.tmpdir ?
@wenlong88 Sorry for late reply. 
It's good question. If use `${project.build.directory}` without sub 
directory `tmp`, the UT will create various directories, maybe the directories 
overlap with other dir, such as `classes`/`surefire-reports` and so on. 

Using a special dir `tmp` can avoid the probability of directory conflict.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881895#comment-15881895
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
> Can we just use the ${project.build.directory} as java.io.tmpdir ?
@wenlong88 Sorry for late reply. 
It's good question. If use `${project.build.directory}` without sub 
directory `tmp`, the UT will create various directories, maybe the directories 
overlap with other dir, such as `classes`/`surefire-reports` and so on. 

Using a special dir `tmp` can avoid the probability of directory conflict.



> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
> Fix For: 1.2.1
>
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 5:12 AM:
-

Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks (which doesn't make sense), and there's no means to coordinate 
multiple subtasks to synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

If there is a possible way to solve this without replacing KPL and is within 
our reach, then I'm against considering the replacement. Right now I just don't 
see a possible solution other than KPL changing the binary file to be different 
across processes, but that's not something we can really push.


was (Author: tzulitai):
Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks, and there's no means to coordinate multiple subtasks to 
synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

If there is a possible way to solve this without replacing KPL and is within 
our reach, then I'm against considering the replacement. Right now I just don't 
see a possible solution other than KPL changing the binary file to be different 
across processes, but that's not something we can really push.

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881264
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I see what you are saying. The trade off here is handing offs the objects 
another time, but I think it's okay. I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882007#comment-15882007
 ] 

ASF GitHub Bot commented on FLINK-5692:
---

Github user jinmingjian commented on the issue:

https://github.com/apache/flink/pull/3373
  
@StephanEwen Just my coding habit. Correction done. 

And very appreciated for your review. I am open for more contribution! 
:tada: 


> Add an Option to Deactivate Kryo Fallback for Serializers
> -
>
> Key: FLINK-5692
> URL: https://issues.apache.org/jira/browse/FLINK-5692
> Project: Flink
>  Issue Type: New Feature
>  Components: Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>  Labels: easyfix, starter
>
> Some users want to avoid that Flink's serializers use Kryo, as it can easily 
> become a hotspot in serialization.
> For those users, it would help if there is a flag to "deactive generic 
> types". Those users could then see where types are used that default to Kryo 
> and change these types (make them PoJos, Value types, or write custom 
> serializers).
> There are two ways to approach that:
>   1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would 
> create a Kryo Serializer (when the respective flag is set in the 
> {{ExecutionConfig}})
>   2. Have a static flag on the {{TypeExtractor}} to throw an exception 
> whenever it would create a {{GenericTypeInfo}}. This approach has the 
> downside of introducing some static configuration to the TypeExtractor, but 
> may be more helpful because it throws exceptions in the programs at points 
> where the types are used (not where the serializers are created, which may be 
> much later).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3373: [FLINK-5692] [config] Add an Option to Deactivate Kryo Fa...

2017-02-23 Thread jinmingjian
Github user jinmingjian commented on the issue:

https://github.com/apache/flink/pull/3373
  
@StephanEwen Just my coding habit. Correction done. 

And very appreciated for your review. I am open for more contribution! 
:tada: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5903:

Attachment: set yarn.container.vcores to 5_RM.JPG
set yarn.container.vcores to 5_JM.JPG
set taskmanager.numberOfTaskSlots to 6.JPG

> taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in 
> YARN mode
> ---
>
> Key: FLINK-5903
> URL: https://issues.apache.org/jira/browse/FLINK-5903
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Tao Wang
> Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set 
> yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG
>
>
> Now Flink did not respect taskmanager.numberOfTaskSlots and 
> yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.
> Details is that taskmanager.numberOfTaskSlots is not working in anyway 
> andyarn.containers.vcores is only used in requesting container(TM) resources 
> but not aware to TM, which means TM will always think it has default(1) Slots 
> if -s is not configured.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks, and there's no means to coordinate multiple subtasks to 
synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881992#comment-15881992
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881632
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui, if you don't mind, I would also wait for @rmetzger to take another 
look at the new proposals here, before you jump back again into the code.
This part is quite critical for Flink Kafka's exacty-once guarantee, so 
another pair of eyes on this will be safer.

I would also like to do a thorough pass on your code and see if there are 
other problems, so you work on those all-together.

Is that ok for you? Sorry for some more waiting.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881632
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui, if you don't mind, I would also wait for @rmetzger to take another 
look at the new proposals here, before you jump back again into the code.
This part is quite critical for Flink Kafka's exacty-once guarantee, so 
another pair of eyes on this will be safer.

I would also like to do a thorough pass on your code and see if there are 
other problems, so you work on those all-together.

Is that ok for you? Sorry for some more waiting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882058#comment-15882058
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3166#discussion_r102886003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource =>
+calc.calcProgram.getCondition != null
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+val filterableSource = 
scan.tableSource.asInstanceOf[FilterableTableSource]
+
+val program = calc.calcProgram
+val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+val predicates = extractPredicateExpressions(
+  program,
+  call.builder().getRexBuilder,
+  tst.tableEnv.getFunctionCatalog)
+
+if (predicates.length != 0) {
+  val remainingPredicate = filterableSource.setPredicate(predicates)
--- End diff --

if remainingPredicate is empty, we should remove calc node also.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

2017-02-23 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3166#discussion_r102886003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
+import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource =>
+calc.calcProgram.getCondition != null
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+
+val filterableSource = 
scan.tableSource.asInstanceOf[FilterableTableSource]
+
+val program = calc.calcProgram
+val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]])
+val predicates = extractPredicateExpressions(
+  program,
+  call.builder().getRexBuilder,
+  tst.tableEnv.getFunctionCatalog)
+
+if (predicates.length != 0) {
+  val remainingPredicate = filterableSource.setPredicate(predicates)
--- End diff --

if remainingPredicate is empty, we should remove calc node also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5901:

Attachment: using IE.png

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
> Attachments: using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Wang updated FLINK-5901:

Attachment: using chrom(same job).png

> DAG can not show properly in IE
> ---
>
> Key: FLINK-5901
> URL: https://issues.apache.org/jira/browse/FLINK-5901
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
> Environment: IE 11
>Reporter: Tao Wang
> Attachments: using chrom(same job).png, using IE.png
>
>
> The DAG of running jobs can not show properly in IE11(I am using 
> 11.0.9600.18059, but assuming same with IE9). The description of task is 
> not shown within the rectangle.
> Chrome is well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate

2017-02-23 Thread Shaoxuan Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoxuan Wang updated FLINK-5899:
-
Summary: Fix the bug in EventTimeTumblingWindow for non-partialMerge 
aggregate  (was: Fix the bug in initializing the 
DataSetTumbleTimeWindowAggReduceGroupFunction)

> Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
> -
>
> Key: FLINK-5899
> URL: https://issues.apache.org/jira/browse/FLINK-5899
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> The row length used to initialize 
> DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think 
> this is introduced by mistake when merging the code).
> We currently lack the built-in non-partial-merge Aggregates. Therefore this 
> has not been captured by the unit test. 
> Reproduce step:
> 1. set the "supportPartial" to false for SumAggregate
> 2. Then both testAllEventTimeTumblingWindowOverTime and 
> testEventTimeTumblingGroupWindowOverTime will fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881900#comment-15881900
 ] 

ASF GitHub Bot commented on FLINK-5899:
---

GitHub user shaoxuan-wang opened a pull request:

https://github.com/apache/flink/pull/3405

[FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for 
non-partialMerge aggregate

I have changed the supportPartial to false for all built-in Aggregates, and 
run all the UTs. Luckily this is the only bug we have so far.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shaoxuan-wang/flink F5899-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3405


commit bb233179d06325b481fe0e2a903a55c547529f06
Author: shaoxuan-wang 
Date:   2017-02-24T03:57:44Z

[FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for 
non-partialMerge aggregate




> Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
> -
>
> Key: FLINK-5899
> URL: https://issues.apache.org/jira/browse/FLINK-5899
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> The row length used to initialize 
> DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think 
> this is introduced by mistake when merging the code).
> We currently lack the built-in non-partial-merge Aggregates. Therefore this 
> has not been captured by the unit test. 
> Reproduce step:
> 1. set the "supportPartial" to false for SumAggregate
> 2. Then both testAllEventTimeTumblingWindowOverTime and 
> testEventTimeTumblingGroupWindowOverTime will fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 4:53 AM:
-

Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks, and there's no means to coordinate multiple subtasks to 
synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

If there is a possible way to solve this without replacing KPL and is within 
our reach, then I'm against considering the replacement. Right now I just don't 
see a possible solution other than KPL changing the binary file to be different 
across processes, but that's not something we can really push.


was (Author: tzulitai):
Thanks for looking into the issue [~skidder].

This seems tricky. It isn't possible to share the {{KinesisProducer}} across 
the subtasks, and there's no means to coordinate multiple subtasks to 
synchronize this access either.

I'm not sure how we should deal with this one ...
It does however bring up the question again of whether or not we should use the 
low-level Java SDK instead of KPL for implementation of 
{{FlinkKinesisProducer}}.
[~rmetzger] what do you think?

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881976#comment-15881976
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5898:


That's great! Thanks a lot for the efforts and please keep us posted :-)

> Race-Condition with Amazon Kinesis KPL
> --
>
> Key: FLINK-5898
> URL: https://issues.apache.org/jira/browse/FLINK-5898
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.2.0
>Reporter: Scott Kidder
>
> The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer 
> Library (KPL) to send messages to Kinesis streams. The KPL relies on a native 
> binary client to send messages to achieve better performance.
> When a Kinesis Producer is instantiated, the KPL will extract the native 
> binary to a sub-directory of `/tmp` (or whatever the platform-specific 
> temporary directory happens to be).
> The KPL tries to prevent multiple processes from extracting the binary at the 
> same time by wrapping the operation in a mutex. Unfortunately, this does not 
> prevent multiple Flink cores from trying to perform this operation at the 
> same time. If two or more processes attempt to do this at the same time, then 
> the native binary in /tmp will be corrupted.
> The authors of the KPL are aware of this possibility and suggest that users 
> of the KPL  not do that ... (sigh):
> https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897
> I encountered this in my production environment when bringing up a new Flink 
> task-manager with multiple cores and restoring from an earlier savepoint, 
> resulting in the instantiation of a KPL client on each core at roughly the 
> same time.
> A stack-trace follows:
> {noformat}
> java.lang.RuntimeException: Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
>   at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.SecurityException: The contents of the binary 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
>  is not what it's expected to be.
>   at 
> com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
>   ... 8 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881092
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui hmm this seems a bit odd to me. I think it should be achievable.

```
// the buffer; this can be shared
final List bufferedElements = new LinkedList<>();
// BufferCollector is an implementation of Collector that adds collected 
elements to bufferedElements; this can be shared
final BufferCollector collector = new BufferCollector(bufferedElements);

...

for (final ConsumerRecord record : partitionRecords) {
deserializer.deserialize(
record.key(), record.value(), record.topic(),
record.partition(), record.offset(), collector);

emitRecords(bufferedElements, partitionState, record.offset(), record);

bufferedElements.clear(); // after the elements for the record have 
been emitted, empty out the buffer
}
```

Doesn't this work? I haven't really tried this hands-on, so I might be 
overlooking something. Let me know what you think :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881986#comment-15881986
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881092
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui hmm this seems a bit odd to me. I think it should be achievable.

```
// the buffer; this can be shared
final List bufferedElements = new LinkedList<>();
// BufferCollector is an implementation of Collector that adds collected 
elements to bufferedElements; this can be shared
final BufferCollector collector = new BufferCollector(bufferedElements);

...

for (final ConsumerRecord record : partitionRecords) {
deserializer.deserialize(
record.key(), record.value(), record.topic(),
record.partition(), record.offset(), collector);

emitRecords(bufferedElements, partitionState, record.offset(), record);

bufferedElements.clear(); // after the elements for the record have 
been emitted, empty out the buffer
}
```

Doesn't this work? I haven't really tried this hands-on, so I might be 
overlooking something. Let me know what you think :)


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-23 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882017#comment-15882017
 ] 

Xingcan Cui commented on FLINK-5891:


Thanks for your explanation, [~greghogan]. I'm afraid my PR on 
https://issues.apache.org/jira/browse/FLINK-1526 gets the same problem as I 
also store values with non-primitive types (anyhow the primitive types will not 
be affected, right?) from the received messages. I saw the following code in 
Flink's ML lib. To avoid the reference problem, it makes a deep copy of each 
{{StreamRecord element}}.
{code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid}
...
// we have to buffer the elements until we receive the proper watermark
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
priorityQueue.offer(new 
StreamRecord(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
} else {
priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
...
{code}
So, what's your suggestions on fixing this? I'd like to work on it (and surely 
also the PR of Flink-1526).

> ConnectedComponents is broken when object reuse enabled
> ---
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>   public static final class CCUpdater
>   extends GatherFunction {
>   @Override
>   public void updateVertex(Vertex vertex, 
> MessageIterator messages) throws Exception {
>   VV current = vertex.getValue();
>   VV min = current;
>   for (VV msg : messages) {
>   if (msg.compareTo(min) < 0) {
>   min = msg;
>   }
>   }
>   if (!min.equals(current)) {
>   setNewVertexValue(min);
>   }
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5901) DAG can not show properly in IE

2017-02-23 Thread Tao Wang (JIRA)
Tao Wang created FLINK-5901:
---

 Summary: DAG can not show properly in IE
 Key: FLINK-5901
 URL: https://issues.apache.org/jira/browse/FLINK-5901
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
 Environment: IE 11
Reporter: Tao Wang


The DAG of running jobs can not show properly in IE11(I am using 
11.0.9600.18059, but assuming same with IE9). The description of task is 
not shown within the rectangle.

Chrome is well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-02-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3406

[flink-5568] [Table API & SQL]Introduce interface for catalog, and provide 
an in-memory implementation. Integrate external catalog with calcite catalog

This pr aims to introduce interface for catalog, and provide an in-memory 
implementation for test and develop, finally integrate external catalog with 
calcite catalog.
The main change including:
1. Introduce ExternalCatalog abstraction, including introduce 
ExternalCatalogDatabase as database   in catalog and ExternalCatalogTable as 
table in catalog.
2. Provide an in-memory implementation for test and develop.
3. Introduce ExternalCatalogSchema which is an implementation of Calcite 
Schema interface. It registers database in ExternalCatalog as calcite Schemas, 
and tables in a database as Calcite table.
4. Add ExternalCatalogCompatible annotation. The TableSource with this 
annotation represents it could be converted to or from externalCatalogTable. 
ExternalCatalogTableConverter is the converter between externalCatalogTable and 
tableSource.
5. Introduce CatalogTableHelper utility. It has two responsibilities: * 
automatically find the TableSources which are with ExternalCatalogCompatible 
annotation. * convert an ExternalCatalogTable instance to a TableSourceTable 
instance.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink dev

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3406


commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang 
Date:   2017-02-22T11:28:08Z

Introduce interface for external catalog, and provide an in-memory 
implementation for test or develop. Integrate with calcite catalog.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-23 Thread haohui
GitHub user haohui reopened a pull request:

https://github.com/apache/flink/pull/3370

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.

This is the commit we used internally -- There is no unit tests associated 
with this PR. It simply serves as a reference point for #3302.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3370.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3370


commit e68a7ad22cad926dac2f211fa3bd56ef481c4036
Author: Haohui Mai 
Date:   2017-02-23T21:51:45Z

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881507#comment-15881507
 ] 

ASF GitHub Bot commented on FLINK-5710:
---

GitHub user haohui reopened a pull request:

https://github.com/apache/flink/pull/3370

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.

This is the commit we used internally -- There is no unit tests associated 
with this PR. It simply serves as a reference point for #3302.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3370.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3370


commit e68a7ad22cad926dac2f211fa3bd56ef481c4036
Author: Haohui Mai 
Date:   2017-02-23T21:51:45Z

[FLINK-5710] Add ProcTime() function to indicate StreamSQL.




> Add ProcTime() function to indicate StreamSQL
> -
>
> Key: FLINK-5710
> URL: https://issues.apache.org/jira/browse/FLINK-5710
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Stefano Bortoli
>Assignee: Stefano Bortoli
>Priority: Minor
>
> procTime() is a parameterless scalar function that just indicates processing 
> time mode



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5898) Race-Condition with Amazon Kinesis KPL

2017-02-23 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-5898:
---

 Summary: Race-Condition with Amazon Kinesis KPL
 Key: FLINK-5898
 URL: https://issues.apache.org/jira/browse/FLINK-5898
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.2.0
Reporter: Scott Kidder


The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer Library 
(KPL) to send messages to Kinesis streams. The KPL relies on a native binary 
client to send messages to achieve better performance.

When a Kinesis Producer is instantiated, the KPL will extract the native binary 
to a sub-directory of `/tmp` (or whatever the platform-specific temporary 
directory happens to be).

The KPL tries to prevent multiple processes from extracting the binary at the 
same time by wrapping the operation in a mutex. Unfortunately, this does not 
prevent multiple Flink cores from trying to perform this operation at the same 
time. If two or more processes attempt to do this at the same time, then the 
native binary in /tmp will be corrupted.

The authors of the KPL are aware of this possibility and suggest that users of 
the KPL  not do that ... (sigh):
https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897

I encountered this in my production environment when bringing up a new Flink 
task-manager with multiple cores and restoring from an earlier savepoint, 
resulting in the instantiation of a KPL client on each core at roughly the same 
time.

A stack-trace follows:

{noformat}
java.lang.RuntimeException: Could not copy native binaries to temp directory 
/tmp/amazon-kinesis-producer-native-binaries
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849)
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.SecurityException: The contents of the binary 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2
 is not what it's expected to be.
at 
com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822)
... 8 more
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5743) Mark WindowedStream.aggregate* methods as PublicEvolving

2017-02-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5743:

Fix Version/s: 1.3.0

> Mark WindowedStream.aggregate* methods as PublicEvolving
> 
>
> Key: FLINK-5743
> URL: https://issues.apache.org/jira/browse/FLINK-5743
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> IMHO, they are to new for knowing whether they will persist in their current 
> form.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5894) HA docs are misleading re: state backends

2017-02-23 Thread David Anderson (JIRA)
David Anderson created FLINK-5894:
-

 Summary: HA docs are misleading re: state backends
 Key: FLINK-5894
 URL: https://issues.apache.org/jira/browse/FLINK-5894
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: David Anderson


Towards the end of 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/jobmanager_high_availability.html#configuration
 it says "Currently, only the file system state backend is supported in HA 
mode." 

The state handles are written to the FileSystem and a reference to them is kept 
in ZooKeeper. So it's actually independent of the backend being used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled

2017-02-23 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880459#comment-15880459
 ] 

Greg Hogan commented on FLINK-5891:
---

[~xccui] even when wrapping in another object both references will point to the 
same object, and Flink will eventually overwrite the value with a later 
deserialization. Elsewhere we have made use of {{CopyableValue}} but this 
restricts the permitted types. One can also leave object reuse disabled or use 
immutable types (effectively disabling object reuse).

> ConnectedComponents is broken when object reuse enabled
> ---
>
> Key: FLINK-5891
> URL: https://issues.apache.org/jira/browse/FLINK-5891
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} 
> is storing a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
>   public static final class CCUpdater
>   extends GatherFunction {
>   @Override
>   public void updateVertex(Vertex vertex, 
> MessageIterator messages) throws Exception {
>   VV current = vertex.getValue();
>   VV min = current;
>   for (VV msg : messages) {
>   if (msg.compareTo(min) < 0) {
>   min = msg;
>   }
>   }
>   if (!min.equals(current)) {
>   setNewVertexValue(min);
>   }
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink

2017-02-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
Thanks for looking at that! I'll open a new discussion thread on the 
Mailing lists to discuss Side Outputs and split/select and how we're going to 
proceed with that.

Regarding your other questions: I think we might add such an `Evaluator` 
interface in the future but for now I would like to keep it simple and see if 
that works for people. And yes, a user would have to use `allowedLateness` and 
`sideOutputLateData` at the same time if they want to use late data, or they 
can go with the default allowed lateness of zero and also get the late data as 
a side output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3393: [FLINK-3903][docs] adding alternative installation...

2017-02-23 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3393#discussion_r102723574
  
--- Diff: docs/quickstart/setup_quickstart.md ---
@@ -72,6 +72,15 @@ $ cd build-target   # this is where Flink is 
installed to
 ~~~
 {% endif %}
 
+### Alternatively
--- End diff --

Yes. And perhaps use the `` formatting 
as with `dataset_transformation.md` and elsewhere. With tabs it is easy to see 
the different choices.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3903) Homebrew Installation

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880521#comment-15880521
 ] 

ASF GitHub Bot commented on FLINK-3903:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3393#discussion_r102723574
  
--- Diff: docs/quickstart/setup_quickstart.md ---
@@ -72,6 +72,15 @@ $ cd build-target   # this is where Flink is 
installed to
 ~~~
 {% endif %}
 
+### Alternatively
--- End diff --

Yes. And perhaps use the `` formatting 
as with `dataset_transformation.md` and elsewhere. With tabs it is easy to see 
the different choices.


> Homebrew Installation
> -
>
> Key: FLINK-3903
> URL: https://issues.apache.org/jira/browse/FLINK-3903
> Project: Flink
>  Issue Type: Task
>  Components: Build System, Documentation
>Reporter: Eron Wright 
>Priority: Minor
>  Labels: starter
>
> Recently I submitted a formula for apache-flink to the 
> [homebrew|http://brew.sh/] project.   Homebrew simplifies installation on Mac:
> {code}
> $ brew install apache-flink
> ...
> $ flink --version
> Version: 1.0.2, Commit ID: d39af15
> {code}
> Updates to the formula are adhoc at the moment.  I opened this issue to 
> formalize updating homebrew into the release process.  I drafted a procedure 
> doc here:
> [https://gist.github.com/EronWright/b62bd3b192a15be4c200a2542f7c9376]
>  
> Please also consider updating the website documentation to suggest homebrew 
> as an alternate installation method for Mac users.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726419
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee ---
@@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 
'dndLists'])
 
 .value 'flinkConfig', {
   jobServer: ''
-#  jobServer: 'http://localhost:8081/'
+  # jobServer: 'http://localhost:8081/'
--- End diff --

I updated it because I had the same problem with sublime :) ... It was 
easier for my workflow to indent it like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880559#comment-15880559
 ] 

ASF GitHub Bot commented on FLINK-3427:
---

Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726454
  
--- Diff: 
flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---
@@ -81,6 +78,51 @@ angular.module('flinkApp')
 JobsService.stopJob($stateParams.jobid).then (data) ->
   {}
 
+  loadJob = ()->
--- End diff --

Fixed.


> Add watermark monitoring to JobManager web frontend
> ---
>
> Key: FLINK-3427
> URL: https://issues.apache.org/jira/browse/FLINK-3427
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Robert Metzger
>
> Currently, its quite hard to figure out issues with the watermarks.
> I think we can improve the situation by reporting the following information 
> through the metrics system:
> - Report the current low watermark for each operator (this way, you can see 
> if one operator is preventing the watermarks to rise)
> - Report the number of events arrived after the low watermark (users can see 
> how accurate the watermarks are)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880558#comment-15880558
 ] 

ASF GitHub Bot commented on FLINK-3427:
---

Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726419
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee ---
@@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 
'dndLists'])
 
 .value 'flinkConfig', {
   jobServer: ''
-#  jobServer: 'http://localhost:8081/'
+  # jobServer: 'http://localhost:8081/'
--- End diff --

I updated it because I had the same problem with sublime :) ... It was 
easier for my workflow to indent it like this.


> Add watermark monitoring to JobManager web frontend
> ---
>
> Key: FLINK-3427
> URL: https://issues.apache.org/jira/browse/FLINK-3427
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Robert Metzger
>
> Currently, its quite hard to figure out issues with the watermarks.
> I think we can improve the situation by reporting the following information 
> through the metrics system:
> - Report the current low watermark for each operator (this way, you can see 
> if one operator is preventing the watermarks to rise)
> - Report the number of events arrived after the low watermark (users can see 
> how accurate the watermarks are)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726454
  
--- Diff: 
flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---
@@ -81,6 +78,51 @@ angular.module('flinkApp')
 JobsService.stopJob($stateParams.jobid).then (data) ->
   {}
 
+  loadJob = ()->
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880549#comment-15880549
 ] 

ASF GitHub Bot commented on FLINK-3427:
---

Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726198
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee 
---
@@ -87,3 +87,27 @@ angular.module('flinkApp')
 
 .filter "percentage", ->
   (number) -> (number * 100).toFixed(0) + '%'
+
+.filter "parseWatermark", ->
+  (value) ->
+if value <= -9223372036854776000
+  return 'No Watermark'
+else
+  return value
+
+.filter "lowWatermark", ->
+  (watermarks, nodeid) ->
+lowWatermark = "None"
+if watermarks != null && watermarks[nodeid] && 
watermarks[nodeid].length
+  values = (watermark.value for watermark in watermarks[nodeid])
+  lowWatermark = Math.min.apply(null, values)
+  if lowWatermark <= -9223372036854776000
+lowWatermark = "No Watermark"
+return lowWatermark
+
+.filter "watermarksByNode", ->
+  (watermarks, nodeid) ->
+arr = []
+if watermarks != null && watermarks[nodeid] && 
watermarks[nodeid].length
+  arr = watermarks[nodeid]
--- End diff --

no, because initially there are no watermarks, so we must check if they 
exist or not.  Nonetheless I have refactored this function and moved it to 
jobs.ctrl.coffee


> Add watermark monitoring to JobManager web frontend
> ---
>
> Key: FLINK-3427
> URL: https://issues.apache.org/jira/browse/FLINK-3427
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Robert Metzger
>
> Currently, its quite hard to figure out issues with the watermarks.
> I think we can improve the situation by reporting the following information 
> through the metrics system:
> - Report the current low watermark for each operator (this way, you can see 
> if one operator is preventing the watermarks to rise)
> - Report the number of events arrived after the low watermark (users can see 
> how accurate the watermarks are)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102726198
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee 
---
@@ -87,3 +87,27 @@ angular.module('flinkApp')
 
 .filter "percentage", ->
   (number) -> (number * 100).toFixed(0) + '%'
+
+.filter "parseWatermark", ->
+  (value) ->
+if value <= -9223372036854776000
+  return 'No Watermark'
+else
+  return value
+
+.filter "lowWatermark", ->
+  (watermarks, nodeid) ->
+lowWatermark = "None"
+if watermarks != null && watermarks[nodeid] && 
watermarks[nodeid].length
+  values = (watermark.value for watermark in watermarks[nodeid])
+  lowWatermark = Math.min.apply(null, values)
+  if lowWatermark <= -9223372036854776000
+lowWatermark = "No Watermark"
+return lowWatermark
+
+.filter "watermarksByNode", ->
+  (watermarks, nodeid) ->
+arr = []
+if watermarks != null && watermarks[nodeid] && 
watermarks[nodeid].length
+  arr = watermarks[nodeid]
--- End diff --

no, because initially there are no watermarks, so we must check if they 
exist or not.  Nonetheless I have refactored this function and moved it to 
jobs.ctrl.coffee


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880416#comment-15880416
 ] 

Fabian Hueske commented on FLINK-5859:
--

For such cases, we could either 

1. implement {{FilterableTableSource}} and manually figure out filters and 
partitions or 
2. {{PartitionableTableSource}} could have another method 
{{setFilterPredicate()}} which has the same semantics as 
{{FilterableTableSource.setPredicate()}} but which is called from 
{{PartitionableTableSource.setPredicate()}} with the remaining predicates which 
could not be used to prune partitions.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880467#comment-15880467
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
Thanks for looking at that! I'll open a new discussion thread on the 
Mailing lists to discuss Side Outputs and split/select and how we're going to 
proceed with that.

Regarding your other questions: I think we might add such an `Evaluator` 
interface in the future but for now I would like to keep it simple and see if 
that works for people. And yes, a user would have to use `allowedLateness` and 
`sideOutputLateData` at the same time if they want to use late data, or they 
can go with the default allowed lateness of zero and also get the late data as 
a side output.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102725706
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade 
---
@@ -0,0 +1,27 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+

+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks
 | watermarksByNode:nodeid).length")
+  thead
+tr
+  th id
--- End diff --

I'm not sure to what we're referring to here. Can you clarify?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880542#comment-15880542
 ] 

ASF GitHub Bot commented on FLINK-3427:
---

Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102725620
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade 
---
@@ -0,0 +1,27 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+

+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks
 | watermarksByNode:nodeid).length")
--- End diff --

Fixed with the latest push


> Add watermark monitoring to JobManager web frontend
> ---
>
> Key: FLINK-3427
> URL: https://issues.apache.org/jira/browse/FLINK-3427
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Robert Metzger
>
> Currently, its quite hard to figure out issues with the watermarks.
> I think we can improve the situation by reporting the following information 
> through the metrics system:
> - Report the current low watermark for each operator (this way, you can see 
> if one operator is preventing the watermarks to rise)
> - Report the number of events arrived after the low watermark (users can see 
> how accurate the watermarks are)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880543#comment-15880543
 ] 

ASF GitHub Bot commented on FLINK-3427:
---

Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102725706
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade 
---
@@ -0,0 +1,27 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+

+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks
 | watermarksByNode:nodeid).length")
+  thead
+tr
+  th id
--- End diff --

I'm not sure to what we're referring to here. Can you clarify?


> Add watermark monitoring to JobManager web frontend
> ---
>
> Key: FLINK-3427
> URL: https://issues.apache.org/jira/browse/FLINK-3427
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Robert Metzger
>
> Currently, its quite hard to figure out issues with the watermarks.
> I think we can improve the situation by reporting the following information 
> through the metrics system:
> - Report the current low watermark for each operator (this way, you can see 
> if one operator is preventing the watermarks to rise)
> - Report the number of events arrived after the low watermark (users can see 
> how accurate the watermarks are)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102725620
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade 
---
@@ -0,0 +1,27 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+

+table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks
 | watermarksByNode:nodeid).length")
--- End diff --

Fixed with the latest push


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...

2017-02-23 Thread nellboy
Github user nellboy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3366#discussion_r102725785
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee 
---
@@ -87,3 +87,27 @@ angular.module('flinkApp')
 
 .filter "percentage", ->
   (number) -> (number * 100).toFixed(0) + '%'
+
+.filter "parseWatermark", ->
+  (value) ->
+if value <= -9223372036854776000
+  return 'No Watermark'
+else
+  return value
+
+.filter "lowWatermark", ->
+  (watermarks, nodeid) ->
+lowWatermark = "None"
+if watermarks != null && watermarks[nodeid] && 
watermarks[nodeid].length
+  values = (watermark.value for watermark in watermarks[nodeid])
+  lowWatermark = Math.min.apply(null, values)
+  if lowWatermark <= -9223372036854776000
+lowWatermark = "No Watermark"
--- End diff --

Fixed with latest push


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >