[jira] [Commented] (FLINK-5586) Extend TableProgramsTestBase for object reuse modes
[ https://issues.apache.org/jira/browse/FLINK-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881829#comment-15881829 ] ASF GitHub Bot commented on FLINK-5586: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3339 rebased to the latest master. > Extend TableProgramsTestBase for object reuse modes > --- > > Key: FLINK-5586 > URL: https://issues.apache.org/jira/browse/FLINK-5586 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Kurt Young > > We should also test if all runtime operators of the Table API work correctly > if object reuse mode is set to true. This should be done for all > cluster-based ITCases, not the collection-based ones. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3339: [FLINK-5586] [table] Extend TableProgramsClusterTestBase ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3339 rebased to the latest master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Max aggregate function */ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]] - if (ret.sum == null.asInstanceOf[T]) { -ret.sum = a.sum - } else if (a.sum != null.asInstanceOf[T]) { -ret.sum = numeric.plus(ret.sum, a.sum) + if (a.f1) { +if (!ret.f1) { --- End diff -- Since we start with `sum = 0` we don't need this condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881429#comment-15881429 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828543 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[SumAccumulator[T]] - if (accum.sum == null.asInstanceOf[T]) { -accum.sum = v + val a = accumulator.asInstanceOf[SumAccumulator[T]] + if (!a.f1) { --- End diff -- since we start with `sum = 0` we don't need this condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881431#comment-15881431 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -127,30 +132,31 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] - if (accum.sum == null) { -accum.sum = v + if (accum.f1 == false) { --- End diff -- remove condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881432#comment-15881432 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MaxAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) < 0) { -accum.max = v + val a = accumulator.asInstanceOf[MaxAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) < 0) { +a.f0 = v +if (!a.f1) { --- End diff -- the condition can be removed. We can simply reassign `true`. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881425#comment-15881425 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]] - if (ret.sum == null.asInstanceOf[T]) { -ret.sum = a.sum - } else if (a.sum != null.asInstanceOf[T]) { -ret.sum = numeric.plus(ret.sum, a.sum) + if (a.f1) { +if (!ret.f1) { --- End diff -- Since we start with `sum = 0` we don't need this condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881426#comment-15881426 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Max aggregate function */ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881430#comment-15881430 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T`. It will then be passed down from the parent class. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881423#comment-15881423 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { -var sum: T = null.asInstanceOf[T] +f0 = 0.asInstanceOf[T] //sum --- End diff -- change this to `f0 = numeric.zero` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881427#comment-15881427 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826611 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]] - if (accum.max == null || accum.max.compareTo(v) < 0) { -accum.max = v + if (!accum.f1 || accum.f0.compareTo(v) < 0) { +accum.f0 = v +if (!accum.f1) { --- End diff -- remove condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881424#comment-15881424 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Min aggregate function */ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T`. It will then be passed down from the parent class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -127,30 +132,31 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] - if (accum.sum == null) { -accum.sum = v + if (accum.f1 == false) { --- End diff -- remove condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881428#comment-15881428 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MinAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) > 0) { -accum.max = v + val a = accumulator.asInstanceOf[MinAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) > 0) { +a.f0 = v +if (!a.f1) { --- End diff -- Remove condition > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828543 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[SumAccumulator[T]] - if (accum.sum == null.asInstanceOf[T]) { -accum.sum = v + val a = accumulator.asInstanceOf[SumAccumulator[T]] + if (!a.f1) { --- End diff -- since we start with `sum = 0` we don't need this condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826611 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]] - if (accum.max == null || accum.max.compareTo(v) < 0) { -accum.max = v + if (!accum.f1 || accum.f0.compareTo(v) < 0) { +accum.f0 = v +if (!accum.f1) { --- End diff -- remove condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Min aggregate function */ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3336 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881755#comment-15881755 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3336 > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5900) Add non-partial merge Aggregates and unit tests
Shaoxuan Wang created FLINK-5900: Summary: Add non-partial merge Aggregates and unit tests Key: FLINK-5900 URL: https://issues.apache.org/jira/browse/FLINK-5900 Project: Flink Issue Type: Improvement Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang Current built-in aggregates all support partial-merge. We are blind and not sure if the non-partial aggregate works or not. We should add non-partial merge Aggregates and unit tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { -var sum: T = null.asInstanceOf[T] +f0 = 0.asInstanceOf[T] //sum --- End diff -- change this to `f0 = numeric.zero` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MaxAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) < 0) { -accum.max = v + val a = accumulator.asInstanceOf[MaxAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) < 0) { +a.f0 = v +if (!a.f1) { --- End diff -- the condition can be removed. We can simply reassign `true`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MinAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) > 0) { -accum.max = v + val a = accumulator.asInstanceOf[MinAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) > 0) { +a.f0 = v +if (!a.f1) { --- End diff -- Remove condition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5863) Unify the serialization of queryable list states in different backends
[ https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5863. --- Resolution: Fixed Now that we are refactoring the queryable states, we can make the changes then. > Unify the serialization of queryable list states in different backends > -- > > Key: FLINK-5863 > URL: https://issues.apache.org/jira/browse/FLINK-5863 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi >Priority: Minor > > Now the deserialization of list states is implemented in > {{KvStateRequestSerializer}}. The serialization however is implemented > individually in different backends. > We should provide a method in {{KvStateRequestSerializer}} to remove the > redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor
[ https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5790. --- Resolution: Fixed Fixed via d47446cafffe0d34d89488f6eb860aa139ceb3f1 > Use list types when ListStateDescriptor extends StateDescriptor > --- > > Key: FLINK-5790 > URL: https://issues.apache.org/jira/browse/FLINK-5790 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Flink keeps the state serializer in {{StateDescriptor}}, but it's the > serializer of list elements that is put in {{ListStateDescriptor}}. The > implementation is a little confusing. Some backends need to construct the > state serializer with the element serializer by themselves. > We should use an {{ArrayListSerializer}}, which is composed of the serializer > of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid > constructing the state serializer. > If a backend needs customized serialization of the state (e.g. > {{RocksDBStateBackend}}), it still can obtain the element serializer from the > {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5899) Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction
Shaoxuan Wang created FLINK-5899: Summary: Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction Key: FLINK-5899 URL: https://issues.apache.org/jira/browse/FLINK-5899 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang The row length used to initialize DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think this is introduced by mistake when merging the code). We currently lack the built-in non-partial-merge Aggregates. Therefore this has not been captured by the unit test. Reproduce step: 1. set the "supportPartial" to false for SumAggregate 2. Then both testAllEventTimeTumblingWindowOverTime and testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881737#comment-15881737 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], How about this approach: We both provide {{FilterableTableSource}} and {{PartitionableTableSource}}, keep {{FilterableTableSource}} as it is, and add methods like {{getAllPartitions}} and {{applyPartitionPruning}} to {{PartitionableTableSource}}. From a developer's point of view, we can treat these two traits completely independent. It will be easier for a developer to implement each functionality independently in comparing with mixing all the logic into the {{FilterableTableSource. setPredicate()}}. Also in the future, i think it will be very likely that these two traits will be applied by framework in different optimization stage. We apply the partition pruning as early as possible in the logical optimization and let filter pushdown been applied a little bit later because it should do some heavy weighted physical level analysis first. BTW, this approach still can achieve the approach you suggested, you can implement {{FilterableTableSource}} only and do all the pruning and filtering if you like. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments
[ https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881789#comment-15881789 ] ASF GitHub Bot commented on FLINK-5881: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. > ScalarFunction(UDF) should support variable types and variable arguments > - > > Key: FLINK-5881 > URL: https://issues.apache.org/jira/browse/FLINK-5881 > Project: Flink > Issue Type: Sub-task >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > As a sub-task of FLINK-5826. We would like to support the ScalarFunction > first and make the review a little bit easier. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments
[ https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881790#comment-15881790 ] ASF GitHub Bot commented on FLINK-5881: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. > ScalarFunction(UDF) should support variable types and variable arguments > - > > Key: FLINK-5881 > URL: https://issues.apache.org/jira/browse/FLINK-5881 > Project: Flink > Issue Type: Sub-task >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > As a sub-task of FLINK-5826. We would like to support the ScalarFunction > first and make the review a little bit easier. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881329#comment-15881329 ] ASF GitHub Bot commented on FLINK-5710: --- Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3370 > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5242) Implement Scala API for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5242: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-2254 > Implement Scala API for BipartiteGraph > -- > > Key: FLINK-5242 > URL: https://issues.apache.org/jira/browse/FLINK-5242 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > Labels: features > > Should implement BipartiteGraph in flink-gelly-scala project similarly to > Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3404: [FLINK-5597] [docs] Improve the LocalClusteringCoe...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3404 [FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5597_improve_the_localclusteringcoefficient_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3404 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5597) Improve the LocalClusteringCoefficient documentation
[ https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5597: -- Fix Version/s: 1.3.0 > Improve the LocalClusteringCoefficient documentation > > > Key: FLINK-5597 > URL: https://issues.apache.org/jira/browse/FLINK-5597 > Project: Flink > Issue Type: Improvement > Components: Documentation, Gelly >Affects Versions: 1.3.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > Fix For: 1.3.0 > > > The LocalClusteringCoefficient usage section should explain what is the > algorithm output and how to retrieve the actual local clustering coefficient > scores from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102830609 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- Good catch, @tzulitai ! I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself -- The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself -- it cannot solve the problem of having a collector per record. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...
Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3370 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881376#comment-15881376 ] ASF GitHub Bot commented on FLINK-3679: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102830609 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- Good catch, @tzulitai ! I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself -- The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself -- it cannot solve the problem of having a collector per record. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5597) Improve the LocalClusteringCoefficient documentation
[ https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881320#comment-15881320 ] ASF GitHub Bot commented on FLINK-5597: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3404 [FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5597_improve_the_localclusteringcoefficient_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3404 > Improve the LocalClusteringCoefficient documentation > > > Key: FLINK-5597 > URL: https://issues.apache.org/jira/browse/FLINK-5597 > Project: Flink > Issue Type: Improvement > Components: Documentation, Gelly >Affects Versions: 1.3.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The LocalClusteringCoefficient usage section should explain what is the > algorithm output and how to retrieve the actual local clustering coefficient > scores from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999 ] godfrey he commented on FLINK-5859: --- Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common parts that are extracting predicate from filter-condition base on the interest of different datasources. But, IMO, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Description: The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. I pasted the screeshot under IE and Chrome below. was: The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5902) Some images can not show in IE
Tao Wang created FLINK-5902: --- Summary: Some images can not show in IE Key: FLINK-5902 URL: https://issues.apache.org/jira/browse/FLINK-5902 Project: Flink Issue Type: Bug Components: Webfrontend Environment: IE Reporter: Tao Wang Some images in the Overview page can not show in IE, as it is good in Chrome. I'm using IE 11, but think same with IE9 I'll paste the screenshot later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: IE11 with problem.png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: IE11 with problem.png, using chrom(same job).png, using > IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881989#comment-15881989 ] ASF GitHub Bot commented on FLINK-3679: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881264 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999 ] godfrey he edited comment on FLINK-5859 at 2/24/17 5:59 AM: Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common part that is extracting predicate from filter-condition base on the interest of different datasources. But, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. was (Author: godfreyhe): Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common parts that are extracting predicate from filter-condition base on the interest of different datasources. But, IMO, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5902: Attachment: chrome is ok.png > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: chrome is ok.png, IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5902: Attachment: IE 11 with problem.png > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: (was: IE11 with problem.png) > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5570) Support register external catalog to table environment
[ https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jingzhang updated FLINK-5570: - Description: This issue aims to support register one or more {{ExternalCatalog}} (which is referred in https://issues.apache.org/jira/browse/FLINK-5568) to {{TableEnvironment}}. After registration, SQL and TableAPI queries could access to tables in the external catalogs without register those tables one by one to {{TableEnvironment}} beforehand. We plan to add two APIs in {{TableEnvironment}}: 1. register externalCatalog {code} def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit {code} 2. scan a table from registered catalog and returns the resulting {{Table}}, the API is very useful in TableAPI queries. {code} def scan(catalogName: String, tableIdentifier: TableIdentifier): Table {code} > Support register external catalog to table environment > -- > > Key: FLINK-5570 > URL: https://issues.apache.org/jira/browse/FLINK-5570 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: jingzhang > > This issue aims to support register one or more {{ExternalCatalog}} (which is > referred in https://issues.apache.org/jira/browse/FLINK-5568) to > {{TableEnvironment}}. After registration, SQL and TableAPI queries could > access to tables in the external catalogs without register those tables one > by one to {{TableEnvironment}} beforehand. > We plan to add two APIs in {{TableEnvironment}}: > 1. register externalCatalog > {code} > def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): > Unit > {code} > 2. scan a table from registered catalog and returns the resulting {{Table}}, > the API is very useful in TableAPI queries. > {code} > def scan(catalogName: String, tableIdentifier: TableIdentifier): Table > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3405: [FLINK-5899] [table] Fix the bug in EventTimeTumbl...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3405 [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate I have changed the supportPartial to false for all built-in Aggregates, and run all the UTs. Luckily this is the only bug we have so far. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5899-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3405 commit bb233179d06325b481fe0e2a903a55c547529f06 Author: shaoxuan-wangDate: 2017-02-24T03:57:44Z [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881967#comment-15881967 ] Scott Kidder commented on FLINK-5898: - Hi [~tzulitai], I'll look into fixing this in the KPL. I noticed that the method that installs the KPL binary uses a shared lock, which would allow multiple processes to obtain overlapping locks and write to the same file simultaneously: https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java#L815 I'll try patching the KPL to obtain an exclusive lock. I'll also file a Github issue against the KPL to see what the KPL authors think. > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
[ https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882126#comment-15882126 ] Tao Wang commented on FLINK-5903: - I've located the reason and fix it ASAP. > taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in > YARN mode > --- > > Key: FLINK-5903 > URL: https://issues.apache.org/jira/browse/FLINK-5903 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Tao Wang > > Now Flink did not respect taskmanager.numberOfTaskSlots and > yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. > Details is that taskmanager.numberOfTaskSlots is not working in anyway > andyarn.containers.vcores is only used in requesting container(TM) resources > but not aware to TM, which means TM will always think it has default(1) Slots > if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
Tao Wang created FLINK-5903: --- Summary: taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode Key: FLINK-5903 URL: https://issues.apache.org/jira/browse/FLINK-5903 Project: Flink Issue Type: Bug Components: YARN Reporter: Tao Wang Now Flink did not respect taskmanager.numberOfTaskSlots and yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. Details is that taskmanager.numberOfTaskSlots is not working in anyway andyarn.containers.vcores is only used in requesting container(TM) resources but not aware to TM, which means TM will always think it has default(1) Slots if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 > Can we just use the ${project.build.directory} as java.io.tmpdir ? @wenlong88 Sorry for late reply. It's good question. If use `${project.build.directory}` without sub directory `tmp`, the UT will create various directories, maybe the directories overlap with other dir, such as `classes`ï¼`surefire-reports` and so on. Using a special dir `tmp` can avoid the probability of directory conflict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881895#comment-15881895 ] ASF GitHub Bot commented on FLINK-5546: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 > Can we just use the ${project.build.directory} as java.io.tmpdir ? @wenlong88 Sorry for late reply. It's good question. If use `${project.build.directory}` without sub directory `tmp`, the UT will create various directories, maybe the directories overlap with other dir, such as `classes`/`surefire-reports` and so on. Using a special dir `tmp` can avoid the probability of directory conflict. > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Sub-task > Components: Build System > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > Fix For: 1.2.1 > > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 5:12 AM: - Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks (which doesn't make sense), and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. was (Author: tzulitai): Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881264 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers
[ https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882007#comment-15882007 ] ASF GitHub Bot commented on FLINK-5692: --- Github user jinmingjian commented on the issue: https://github.com/apache/flink/pull/3373 @StephanEwen Just my coding habit. Correction done. And very appreciated for your review. I am open for more contribution! :tada: > Add an Option to Deactivate Kryo Fallback for Serializers > - > > Key: FLINK-5692 > URL: https://issues.apache.org/jira/browse/FLINK-5692 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Jin Mingjian > Labels: easyfix, starter > > Some users want to avoid that Flink's serializers use Kryo, as it can easily > become a hotspot in serialization. > For those users, it would help if there is a flag to "deactive generic > types". Those users could then see where types are used that default to Kryo > and change these types (make them PoJos, Value types, or write custom > serializers). > There are two ways to approach that: > 1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would > create a Kryo Serializer (when the respective flag is set in the > {{ExecutionConfig}}) > 2. Have a static flag on the {{TypeExtractor}} to throw an exception > whenever it would create a {{GenericTypeInfo}}. This approach has the > downside of introducing some static configuration to the TypeExtractor, but > may be more helpful because it throws exceptions in the programs at points > where the types are used (not where the serializers are created, which may be > much later). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3373: [FLINK-5692] [config] Add an Option to Deactivate Kryo Fa...
Github user jinmingjian commented on the issue: https://github.com/apache/flink/pull/3373 @StephanEwen Just my coding habit. Correction done. And very appreciated for your review. I am open for more contribution! :tada: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
[ https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5903: Attachment: set yarn.container.vcores to 5_RM.JPG set yarn.container.vcores to 5_JM.JPG set taskmanager.numberOfTaskSlots to 6.JPG > taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in > YARN mode > --- > > Key: FLINK-5903 > URL: https://issues.apache.org/jira/browse/FLINK-5903 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Tao Wang > Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set > yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG > > > Now Flink did not respect taskmanager.numberOfTaskSlots and > yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. > Details is that taskmanager.numberOfTaskSlots is not working in anyway > andyarn.containers.vcores is only used in requesting container(TM) resources > but not aware to TM, which means TM will always think it has default(1) Slots > if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai commented on FLINK-5898: Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881992#comment-15881992 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881632 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code. This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer. I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together. Is that ok for you? Sorry for some more waiting. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881632 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code. This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer. I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together. Is that ok for you? Sorry for some more waiting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882058#comment-15882058 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102886003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- if remainingPredicate is empty, we should remove calc node also. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102886003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- if remainingPredicate is empty, we should remove calc node also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: using IE.png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: using chrom(same job).png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
[ https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-5899: - Summary: Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate (was: Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction) > Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > - > > Key: FLINK-5899 > URL: https://issues.apache.org/jira/browse/FLINK-5899 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > The row length used to initialize > DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think > this is introduced by mistake when merging the code). > We currently lack the built-in non-partial-merge Aggregates. Therefore this > has not been captured by the unit test. > Reproduce step: > 1. set the "supportPartial" to false for SumAggregate > 2. Then both testAllEventTimeTumblingWindowOverTime and > testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
[ https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881900#comment-15881900 ] ASF GitHub Bot commented on FLINK-5899: --- GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3405 [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate I have changed the supportPartial to false for all built-in Aggregates, and run all the UTs. Luckily this is the only bug we have so far. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5899-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3405 commit bb233179d06325b481fe0e2a903a55c547529f06 Author: shaoxuan-wangDate: 2017-02-24T03:57:44Z [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > - > > Key: FLINK-5899 > URL: https://issues.apache.org/jira/browse/FLINK-5899 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > The row length used to initialize > DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think > this is introduced by mistake when merging the code). > We currently lack the built-in non-partial-merge Aggregates. Therefore this > has not been captured by the unit test. > Reproduce step: > 1. set the "supportPartial" to false for SumAggregate > 2. Then both testAllEventTimeTumblingWindowOverTime and > testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 4:53 AM: - Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. was (Author: tzulitai): Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881976#comment-15881976 ] Tzu-Li (Gordon) Tai commented on FLINK-5898: That's great! Thanks a lot for the efforts and please keep us posted :-) > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881092 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui hmm this seems a bit odd to me. I think it should be achievable. ``` // the buffer; this can be shared final List bufferedElements = new LinkedList<>(); // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared final BufferCollector collector = new BufferCollector(bufferedElements); ... for (final ConsumerRecordrecord : partitionRecords) { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer } ``` Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881986#comment-15881986 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881092 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui hmm this seems a bit odd to me. I think it should be achievable. ``` // the buffer; this can be shared final List bufferedElements = new LinkedList<>(); // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared final BufferCollector collector = new BufferCollector(bufferedElements); ... for (final ConsumerRecordrecord : partitionRecords) { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer } ``` Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think :) > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882017#comment-15882017 ] Xingcan Cui commented on FLINK-5891: Thanks for your explanation, [~greghogan]. I'm afraid my PR on https://issues.apache.org/jira/browse/FLINK-1526 gets the same problem as I also store values with non-primitive types (anyhow the primitive types will not be affected, right?) from the received messages. I saw the following code in Flink's ML lib. To avoid the reference problem, it makes a deep copy of each {{StreamRecord element}}. {code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid} ... // we have to buffer the elements until we receive the proper watermark if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed priorityQueue.offer(new StreamRecord(inputSerializer.copy(element.getValue()), element.getTimestamp())); } else { priorityQueue.offer(element); } updatePriorityQueue(priorityQueue); ... {code} So, what's your suggestions on fixing this? I'd like to work on it (and surely also the PR of Flink-1526). > ConnectedComponents is broken when object reuse enabled > --- > > Key: FLINK-5891 > URL: https://issues.apache.org/jira/browse/FLINK-5891 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan > > {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} > is storing a value from its iterator. > {{GSAConnectedComponents}} does not have this limitation. > {code} > public static final class CCUpdater> extends GatherFunction { > @Override > public void updateVertex(Vertex vertex, > MessageIterator messages) throws Exception { > VV current = vertex.getValue(); > VV min = current; > for (VV msg : messages) { > if (msg.compareTo(min) < 0) { > min = msg; > } > } > if (!min.equals(current)) { > setNewVertexValue(min); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5901) DAG can not show properly in IE
Tao Wang created FLINK-5901: --- Summary: DAG can not show properly in IE Key: FLINK-5901 URL: https://issues.apache.org/jira/browse/FLINK-5901 Project: Flink Issue Type: Bug Components: Webfrontend Environment: IE 11 Reporter: Tao Wang The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3406 [flink-5568] [Table API & SQL]Introduce interface for catalog, and provide an in-memory implementation. Integrate external catalog with calcite catalog This pr aims to introduce interface for catalog, and provide an in-memory implementation for test and develop, finally integrate external catalog with calcite catalog. The main change including: 1. Introduce ExternalCatalog abstraction, including introduce ExternalCatalogDatabase as database in catalog and ExternalCatalogTable as table in catalog. 2. Provide an in-memory implementation for test and develop. 3. Introduce ExternalCatalogSchema which is an implementation of Calcite Schema interface. It registers database in ExternalCatalog as calcite Schemas, and tables in a database as Calcite table. 4. Add ExternalCatalogCompatible annotation. The TableSource with this annotation represents it could be converted to or from externalCatalogTable. ExternalCatalogTableConverter is the converter between externalCatalogTable and tableSource. 5. Introduce CatalogTableHelper utility. It has two responsibilities: * automatically find the TableSources which are with ExternalCatalogCompatible annotation. * convert an ExternalCatalogTable instance to a TableSourceTable instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink dev Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3406 commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476 Author: jingzhangDate: 2017-02-22T11:28:08Z Introduce interface for external catalog, and provide an in-memory implementation for test or develop. Integrate with calcite catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...
GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit e68a7ad22cad926dac2f211fa3bd56ef481c4036 Author: Haohui MaiDate: 2017-02-23T21:51:45Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881507#comment-15881507 ] ASF GitHub Bot commented on FLINK-5710: --- GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit e68a7ad22cad926dac2f211fa3bd56ef481c4036 Author: Haohui MaiDate: 2017-02-23T21:51:45Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
Scott Kidder created FLINK-5898: --- Summary: Race-Condition with Amazon Kinesis KPL Key: FLINK-5898 URL: https://issues.apache.org/jira/browse/FLINK-5898 Project: Flink Issue Type: Bug Components: Kinesis Connector Affects Versions: 1.2.0 Reporter: Scott Kidder The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer Library (KPL) to send messages to Kinesis streams. The KPL relies on a native binary client to send messages to achieve better performance. When a Kinesis Producer is instantiated, the KPL will extract the native binary to a sub-directory of `/tmp` (or whatever the platform-specific temporary directory happens to be). The KPL tries to prevent multiple processes from extracting the binary at the same time by wrapping the operation in a mutex. Unfortunately, this does not prevent multiple Flink cores from trying to perform this operation at the same time. If two or more processes attempt to do this at the same time, then the native binary in /tmp will be corrupted. The authors of the KPL are aware of this possibility and suggest that users of the KPL not do that ... (sigh): https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 I encountered this in my production environment when bringing up a new Flink task-manager with multiple cores and restoring from an earlier savepoint, resulting in the instantiation of a KPL client on each core at roughly the same time. A stack-trace follows: {noformat} java.lang.RuntimeException: Could not copy native binaries to temp directory /tmp/amazon-kinesis-producer-native-binaries at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) at com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.SecurityException: The contents of the binary /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 is not what it's expected to be. at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) ... 8 more {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5743) Mark WindowedStream.aggregate* methods as PublicEvolving
[ https://issues.apache.org/jira/browse/FLINK-5743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5743: Fix Version/s: 1.3.0 > Mark WindowedStream.aggregate* methods as PublicEvolving > > > Key: FLINK-5743 > URL: https://issues.apache.org/jira/browse/FLINK-5743 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > IMHO, they are to new for knowing whether they will persist in their current > form. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5894) HA docs are misleading re: state backends
David Anderson created FLINK-5894: - Summary: HA docs are misleading re: state backends Key: FLINK-5894 URL: https://issues.apache.org/jira/browse/FLINK-5894 Project: Flink Issue Type: Improvement Components: Documentation Reporter: David Anderson Towards the end of https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/jobmanager_high_availability.html#configuration it says "Currently, only the file system state backend is supported in HA mode." The state handles are written to the FileSystem and a reference to them is kept in ZooKeeper. So it's actually independent of the backend being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880459#comment-15880459 ] Greg Hogan commented on FLINK-5891: --- [~xccui] even when wrapping in another object both references will point to the same object, and Flink will eventually overwrite the value with a later deserialization. Elsewhere we have made use of {{CopyableValue}} but this restricts the permitted types. One can also leave object reuse disabled or use immutable types (effectively disabling object reuse). > ConnectedComponents is broken when object reuse enabled > --- > > Key: FLINK-5891 > URL: https://issues.apache.org/jira/browse/FLINK-5891 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan > > {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} > is storing a value from its iterator. > {{GSAConnectedComponents}} does not have this limitation. > {code} > public static final class CCUpdater> extends GatherFunction { > @Override > public void updateVertex(Vertex vertex, > MessageIterator messages) throws Exception { > VV current = vertex.getValue(); > VV min = current; > for (VV msg : messages) { > if (msg.compareTo(min) < 0) { > min = msg; > } > } > if (!min.equals(current)) { > setNewVertexValue(min); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2982: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that. Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3393: [FLINK-3903][docs] adding alternative installation...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3393#discussion_r102723574 --- Diff: docs/quickstart/setup_quickstart.md --- @@ -72,6 +72,15 @@ $ cd build-target # this is where Flink is installed to ~~~ {% endif %} +### Alternatively --- End diff -- Yes. And perhaps use the `` formatting as with `dataset_transformation.md` and elsewhere. With tabs it is easy to see the different choices. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3903) Homebrew Installation
[ https://issues.apache.org/jira/browse/FLINK-3903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880521#comment-15880521 ] ASF GitHub Bot commented on FLINK-3903: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3393#discussion_r102723574 --- Diff: docs/quickstart/setup_quickstart.md --- @@ -72,6 +72,15 @@ $ cd build-target # this is where Flink is installed to ~~~ {% endif %} +### Alternatively --- End diff -- Yes. And perhaps use the `` formatting as with `dataset_transformation.md` and elsewhere. With tabs it is easy to see the different choices. > Homebrew Installation > - > > Key: FLINK-3903 > URL: https://issues.apache.org/jira/browse/FLINK-3903 > Project: Flink > Issue Type: Task > Components: Build System, Documentation >Reporter: Eron Wright >Priority: Minor > Labels: starter > > Recently I submitted a formula for apache-flink to the > [homebrew|http://brew.sh/] project. Homebrew simplifies installation on Mac: > {code} > $ brew install apache-flink > ... > $ flink --version > Version: 1.0.2, Commit ID: d39af15 > {code} > Updates to the formula are adhoc at the moment. I opened this issue to > formalize updating homebrew into the release process. I drafted a procedure > doc here: > [https://gist.github.com/EronWright/b62bd3b192a15be4c200a2542f7c9376] > > Please also consider updating the website documentation to suggest homebrew > as an alternate installation method for Mac users. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726419 --- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee --- @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) .value 'flinkConfig', { jobServer: '' -# jobServer: 'http://localhost:8081/' + # jobServer: 'http://localhost:8081/' --- End diff -- I updated it because I had the same problem with sublime :) ... It was easier for my workflow to indent it like this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880559#comment-15880559 ] ASF GitHub Bot commented on FLINK-3427: --- Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726454 --- Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee --- @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> --- End diff -- Fixed. > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880558#comment-15880558 ] ASF GitHub Bot commented on FLINK-3427: --- Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726419 --- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee --- @@ -30,7 +30,7 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) .value 'flinkConfig', { jobServer: '' -# jobServer: 'http://localhost:8081/' + # jobServer: 'http://localhost:8081/' --- End diff -- I updated it because I had the same problem with sublime :) ... It was easier for my workflow to indent it like this. > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726454 --- Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee --- @@ -81,6 +78,51 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + loadJob = ()-> --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880549#comment-15880549 ] ASF GitHub Bot commented on FLINK-3427: --- Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726198 --- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee --- @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> +if value <= -9223372036854776000 + return 'No Watermark' +else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> +lowWatermark = "None" +if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + values = (watermark.value for watermark in watermarks[nodeid]) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 +lowWatermark = "No Watermark" +return lowWatermark + +.filter "watermarksByNode", -> + (watermarks, nodeid) -> +arr = [] +if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + arr = watermarks[nodeid] --- End diff -- no, because initially there are no watermarks, so we must check if they exist or not. Nonetheless I have refactored this function and moved it to jobs.ctrl.coffee > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102726198 --- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee --- @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> +if value <= -9223372036854776000 + return 'No Watermark' +else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> +lowWatermark = "None" +if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + values = (watermark.value for watermark in watermarks[nodeid]) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 +lowWatermark = "No Watermark" +return lowWatermark + +.filter "watermarksByNode", -> + (watermarks, nodeid) -> +arr = [] +if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + arr = watermarks[nodeid] --- End diff -- no, because initially there are no watermarks, so we must check if they exist or not. Nonetheless I have refactored this function and moved it to jobs.ctrl.coffee --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880416#comment-15880416 ] Fabian Hueske commented on FLINK-5859: -- For such cases, we could either 1. implement {{FilterableTableSource}} and manually figure out filters and partitions or 2. {{PartitionableTableSource}} could have another method {{setFilterPredicate()}} which has the same semantics as {{FilterableTableSource.setPredicate()}} but which is called from {{PartitionableTableSource.setPredicate()}} with the remaining predicates which could not be used to prune partitions. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880467#comment-15880467 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that. Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725706 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade --- @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") + thead +tr + th id --- End diff -- I'm not sure to what we're referring to here. Can you clarify? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880542#comment-15880542 ] ASF GitHub Bot commented on FLINK-3427: --- Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725620 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade --- @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") --- End diff -- Fixed with the latest push > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880543#comment-15880543 ] ASF GitHub Bot commented on FLINK-3427: --- Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725706 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade --- @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") + thead +tr + th id --- End diff -- I'm not sure to what we're referring to here. Can you clarify? > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725620 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade --- @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="(watermarks | watermarksByNode:nodeid).length") --- End diff -- Fixed with the latest push --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3366: [FLINK-3427] [webui] watermarks tab and low waterm...
Github user nellboy commented on a diff in the pull request: https://github.com/apache/flink/pull/3366#discussion_r102725785 --- Diff: flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee --- @@ -87,3 +87,27 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", -> + (value) -> +if value <= -9223372036854776000 + return 'No Watermark' +else + return value + +.filter "lowWatermark", -> + (watermarks, nodeid) -> +lowWatermark = "None" +if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + values = (watermark.value for watermark in watermarks[nodeid]) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= -9223372036854776000 +lowWatermark = "No Watermark" --- End diff -- Fixed with latest push --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---