[GitHub] flink pull request #3627: Release 0.4 alpha.0
GitHub user fisherbj opened a pull request: https://github.com/apache/flink/pull/3627 Release 0.4 alpha.0 Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fisherbj/flink release-0.4-alpha.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3627.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3627 commit 3a4f98e9f4fb4b69ffd66e8880c246da6bcf33d4 Author: physikerwelt (Moritz Schubotz)Date: 2013-10-29T00:15:12Z Change Version to 0.4-alpha.0 create the test branch pre-release to test the release process bash script inspired from http://stackoverflow.com/questions/8217844/bash-recursive-program-to-replace-text-in-a-tree-of-files commit 0685642a5f06b3d69669e4b33aaac4f590071b2d Author: Robert Metzger Date: 2013-10-30T10:33:14Z Hotfixing version detection commit 811923f4f7bfcb4bd0c60b08c7af51c01fec7cfd Author: Robert Metzger Date: 2013-10-30T13:19:41Z Changed version to SNAPSHOT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6201: Assignee: shijinkui > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui >Assignee: shijinkui >Priority: Trivial > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6201: - Priority: Trivial (was: Major) > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui >Priority: Trivial > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6201: - Summary: move python example files from resources to the examples (was: move python example files to the examples dir) > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6201) move python example files to the examples dir
shijinkui created FLINK-6201: Summary: move python example files to the examples dir Key: FLINK-6201 URL: https://issues.apache.org/jira/browse/FLINK-6201 Project: Flink Issue Type: Sub-task Components: Examples Reporter: shijinkui Python example in the resource dir is not suitable. Move them to the examples/python dir. ``` ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api resources/python 0755 ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944427#comment-15944427 ] lincoln.lee edited comment on FLINK-5498 at 3/28/17 2:41 AM: - Yes, it's a good idea using two Row fields in a result Row for outer joins with non-equi join conditions. The GroupReduceFunction only need to know the outer join type and assemble the final output Row. was (Author: lincoln.86xy): Yes, it's a good idea using two {code}Row{code} fields in a result {code}Row{code} for outer joins with non-equi join conditions. The {code}GroupReduceFunction{code} only need to know the outer join type and assemble the final output {code}Row{code}. > Add support for left/right outer joins with non-equality predicates (and 1+ > equality predicates) > > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. > More details: https://goo.gl/ngekca -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944427#comment-15944427 ] lincoln.lee commented on FLINK-5498: Yes, it's a good idea using two {code}Row{code} fields in a result {code}Row{code} for outer joins with non-equi join conditions. The {code}GroupReduceFunction{code} only need to know the outer join type and assemble the final output {code}Row{code}. > Add support for left/right outer joins with non-equality predicates (and 1+ > equality predicates) > > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. > More details: https://goo.gl/ngekca -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944402#comment-15944402 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320480 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, +implicitResultType: TypeInformation[_], +params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val resultType = getResultType(arguments, implicitResultType) +TableFunctionCall(name, this, params, resultType) + } + + /** +* Internal user of [[getResultType()]] +* +* @param arguments arguments of a function call (only literal arguments +* are passed, nulls for non-literal ones) +* @param implicitResultType The implicit result type +* @return [[TypeInformation]] of result type or null if Flink should determine the type +*/ + private[table] def getResultType(arguments: java.util.List[AnyRef], --- End diff -- The format of the parameters should be consistent with others > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320480 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, +implicitResultType: TypeInformation[_], +params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val resultType = getResultType(arguments, implicitResultType) +TableFunctionCall(name, this, params, resultType) + } + + /** +* Internal user of [[getResultType()]] +* +* @param arguments arguments of a function call (only literal arguments +* are passed, nulls for non-literal ones) +* @param implicitResultType The implicit result type +* @return [[TypeInformation]] of result type or null if Flink should determine the type +*/ + private[table] def getResultType(arguments: java.util.List[AnyRef], --- End diff -- The format of the parameters should be consistent with others --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944401#comment-15944401 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], --- End diff -- Can we just separate this implementation into 2 classes, one for which we already know the type, and one for whose type has not been decided yet. Why i suggesting this is that i noticed `tableFunction.getResultType` is called multiple times when evaluating the plan, even we had already determine the row type long time ago, i.e. when `apply` method from `TableFunction` has been called. > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], --- End diff -- Can we just separate this implementation into 2 classes, one for which we already know the type, and one for whose type has not been decided yet. Why i suggesting this is that i noticed `tableFunction.getResultType` is called multiple times when evaluating the plan, even we had already determine the row type long time ago, i.e. when `apply` method from `TableFunction` has been called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944394#comment-15944394 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108319900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], +val implicitResultType: TypeInformation[_], +val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { - if (fieldIndexes.length != fieldNames.length) { -throw new TableException( - "Number of field indexes and field names must be equal.") - } + override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { -throw new TableException( - "Table field names must be unique.") + override def getRowType(typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { + +// Get the result type from table function. If it is not null, the implicitResultType may +// already be generated by Table API's apply() method. +val resultType = if (tableFunction.getResultType(arguments) != null) { + tableFunction.getResultType(arguments) +} else { + implicitResultType +} +val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) +buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) } - val fieldTypes: Array[TypeInformation[_]] = -typeInfo match { - case cType: CompositeType[T] => -if (fieldNames.length != cType.getArity) { - throw new TableException( -s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") -} - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => -if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( -"Non-composite input type may have only a single field and its index must be 0.") -} -Array(aType) + private [table] def buildRelDataType(typeFactory: RelDataTypeFactory, --- End diff -- I think this method can go into `UserDefinedFunctionUtils` > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108319900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], +val implicitResultType: TypeInformation[_], +val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { - if (fieldIndexes.length != fieldNames.length) { -throw new TableException( - "Number of field indexes and field names must be equal.") - } + override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { -throw new TableException( - "Table field names must be unique.") + override def getRowType(typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { + +// Get the result type from table function. If it is not null, the implicitResultType may +// already be generated by Table API's apply() method. +val resultType = if (tableFunction.getResultType(arguments) != null) { + tableFunction.getResultType(arguments) +} else { + implicitResultType +} +val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) +buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) } - val fieldTypes: Array[TypeInformation[_]] = -typeInfo match { - case cType: CompositeType[T] => -if (fieldNames.length != cType.getArity) { - throw new TableException( -s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") -} - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => -if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( -"Non-composite input type may have only a single field and its index must be 0.") -} -Array(aType) + private [table] def buildRelDataType(typeFactory: RelDataTypeFactory, --- End diff -- I think this method can go into `UserDefinedFunctionUtils` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6192) reuse zookeeer client created by CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944389#comment-15944389 ] ASF GitHub Bot commented on FLINK-6192: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3617 Glad we have same idea :) @tillrohrmann I'll mark the JIRA duplicated now and close this PR as soon as you open the new one. > reuse zookeeer client created by CuratorFramework > - > > Key: FLINK-6192 > URL: https://issues.apache.org/jira/browse/FLINK-6192 > Project: Flink > Issue Type: Sub-task > Components: JobManager, YARN >Reporter: Tao Wang >Assignee: Tao Wang > > Now in yarn mode, there're three places using zookeeper client(web monitor, > jobmanager and resourcemanager) in ApplicationMaster/JobManager, while > there're two in TaskManager. They create new one zookeeper client when they > need them. > I believe there're more other places do the same thing, but in one JVM, one > CuratorFramework is enough for connections to one zookeeper client, so we > need a singleton to reuse them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3617: [FLINK-6192]reuse zookeeper client created by CuratorFram...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3617 Glad we have same idea :) @tillrohrmann I'll mark the JIRA duplicated now and close this PR as soon as you open the new one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944373#comment-15944373 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318532 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -131,8 +202,10 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or --- End diff -- I think the comment should also be updated, to make user be aware of the behavior that he can determine the result type based on the parameters. > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318532 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -131,8 +202,10 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or --- End diff -- I think the comment should also be updated, to make user be aware of the behavior that he can determine the result type based on the parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944370#comment-15944370 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, --- End diff -- Can we move this to a util class? Since this is a user facing API class, we don't want to expose such kind of thing. > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, --- End diff -- Can we move this to a util class? Since this is a user facing API class, we don't want to expose such kind of thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944368#comment-15944368 ] ASF GitHub Bot commented on FLINK-6196: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318278 --- Diff: flink-core/src/test/java/org/apache/flink/types/RowTest.java --- @@ -21,6 +21,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; --- End diff -- this import is unused > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318278 --- Diff: flink-core/src/test/java/org/apache/flink/types/RowTest.java --- @@ -21,6 +21,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; --- End diff -- this import is unused --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6200) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-6200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6200: --- Issue Type: Sub-task (was: New Feature) Parent: FLINK-4557 > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-6200 > URL: https://issues.apache.org/jira/browse/FLINK-6200 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > The following restrictions should initially apply: > All OVER clauses in the same SELECT clause must be exactly the same. > The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > bounded PRECEDING is not supported (see FLINK-5655) > FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > Design of the DataStream operator to compute OVER ROW aggregates > Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108314395 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Hi @fhueske I had created a new JIRA. [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200) for the `RANGE` case. But I do not have permission to modify this JIRA's name. So, can you help me to do it? :) Hi, @hongyuhong I'm glad to hear that you want to continue do the `RANGE` case. Feel free you want to take [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944331#comment-15944331 ] ASF GitHub Bot commented on FLINK-5658: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108314395 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Hi @fhueske I had created a new JIRA. [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200) for the `RANGE` case. But I do not have permission to modify this JIRA's name. So, can you help me to do it? :) Hi, @hongyuhong I'm glad to hear that you want to continue do the `RANGE` case. Feel free you want to take [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200). > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: hongyuhong > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944327#comment-15944327 ] Guowei Ma commented on FLINK-5892: -- hi [~Zentol] I am discussing the proposal with [~srichter]. I think I will do it after the discussion finish. > Recover job state at the granularity of operator > > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Guowei Ma >Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6200) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
sunjincheng created FLINK-6200: -- Summary: Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL Key: FLINK-6200 URL: https://issues.apache.org/jira/browse/FLINK-6200 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: sunjincheng The goal of this issue is to add support for OVER RANGE aggregations on event time streams to the SQL interface. Queries similar to the following should be supported: SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB, MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream The following restrictions should initially apply: All OVER clauses in the same SELECT clause must be exactly the same. The PARTITION BY clause is optional (no partitioning results in single threaded execution). The ORDER BY clause may only have rowTime() as parameter. rowTime() is a parameterless scalar function that just indicates processing time mode. bounded PRECEDING is not supported (see FLINK-5655) FOLLOWING is not supported. The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well. This issue includes: Design of the DataStream operator to compute OVER ROW aggregates Translation from Calcite's RelNode representation (LogicalProject with RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944049#comment-15944049 ] Fabian Hueske commented on FLINK-5829: -- I just had a chat with [~wheat9] and we came to the conclusion that using a custom schema is not so easy. We could register a sub-schema, but then all table references in SQL queries would need to be prefixed with the schema name. In fact, I'm not sure how important it is to be able to remove tables from the root schema. It seems that this would be mostly relevant for long running applications that run many queries. Such applications could also register an own external catalog and manage the available tables themselves. What do you think, @twalthr? > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943960#comment-15943960 ] Fabian Hueske commented on FLINK-5829: -- I think we could implement our own internal default schema (similar to the {{ExternalCatalogSchema}}) and register it in Calcite. This would allow us to arbitrarily modify the registered tables. > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943938#comment-15943938 ] Haohui Mai commented on FLINK-5829: --- Unfortunately it seems that it is infeasible to inherit and forward the calls because Calcite declares most of the methods as package local methods. It seems that Calcite intentionally keeps the class as a package local class. Another option is to defer the functionality of unregistering tables until Calcite provides the API. We can open a jira in Calcite and get it fixed in Calcite 1.13. However, given the release schedule, it seems to me that the functionality will be deferred to Flink 1.4. [~twalthr] does it sound okay to you? What do you think? > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()
[ https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943926#comment-15943926 ] ASF GitHub Bot commented on FLINK-6103: --- Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3598 Does the current exception handling address your comments @StephanEwen? > LocalFileSystem rename() uses File.renameTo() > - > > Key: FLINK-6103 > URL: https://issues.apache.org/jira/browse/FLINK-6103 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: filesystem > > I've tried to move a directory to another on the LocalFilesystem and it > doesn't work (in my case fs is an instance of java.io.UnixFileSystem). > As for Flink-1840 (there was a PR to fix the issue - > https://github.com/apache/flink/pull/578) the problem is that > {{File.renameTo()}} is not reliable. > Indeed, the Javadoc says: > bq. Renames the file denoted by this abstract pathname. Many aspects of the > behavior of this method are inherently platform-dependent: The rename > operation might not be able to move a file from one filesystem to another, it > might not be atomic, and it might not succeed if a file with the destination > abstract pathname already exists. The return value should always be checked > to make sure that the rename operation was successful. Note that the > java.nio.file.Files class defines the move method to move or rename a file in > a platform independent manner -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...
Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3598 Does the current exception handling address your comments @StephanEwen? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6196: -- Component/s: Table API & SQL > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943893#comment-15943893 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 @fhueske @stefanobortoli I rebased and created the tests. Please have a look > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3607: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 @fhueske @stefanobortoli I rebased and created the tests. Please have a look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6199) Single outstanding Async I/O operation per key
Jamie Grier created FLINK-6199: -- Summary: Single outstanding Async I/O operation per key Key: FLINK-6199 URL: https://issues.apache.org/jira/browse/FLINK-6199 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Jamie Grier I would like to propose we extend the Async I/O semantics a bit such that a user can guarantee a single outstanding async request per key. This would allow a user to order async requests per key while still achieving the throughput benefits of using Async I/O in the first place. This is essential for operations where stream order is important but we still need to use Async operations to interact with an external system in a performant way. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943728#comment-15943728 ] ASF GitHub Bot commented on FLINK-5785: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3625 Regarding the license: Every (non-binary) file in the flink repository must have the apache license at the very top of the file. Simply take a look at an existing scala class and you'll see what i mean. Second: It is not required to open a new PR when making changes, you can add commits to the branch of the PR. (note that force-pushes should only be done if necessary). Third, the file count in this PR is dramatically higher than in the last one (4 vs 84), is this intended or a mistake? > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3625: [FLINK-5785] Add an Imputer for preparing data
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3625 Regarding the license: Every (non-binary) file in the flink repository must have the apache license at the very top of the file. Simply take a look at an existing scala class and you'll see what i mean. Second: It is not required to open a new PR when making changes, you can add commits to the branch of the PR. (note that force-pushes should only be done if necessary). Third, the file count in this PR is dramatically higher than in the last one (4 vs 84), is this intended or a mistake? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5913) Example drivers
[ https://issues.apache.org/jira/browse/FLINK-5913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5913: -- Description: Replace existing and create new algorithm {{Driver}} implementations for each of the library methods. (was: Replace existing and create new algorithm {{Driver}} implementations for each of the library methods. Replace the existing {{Usage.java}} (which only displays class names) with a new {{Runner.java}} allowing users to select and configure an input, algorithm, and output.) > Example drivers > --- > > Key: FLINK-5913 > URL: https://issues.apache.org/jira/browse/FLINK-5913 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > Replace existing and create new algorithm {{Driver}} implementations for each > of the library methods. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5912) Inputs for CSV and graph generators
[ https://issues.apache.org/jira/browse/FLINK-5912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943646#comment-15943646 ] ASF GitHub Bot commented on FLINK-5912: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3626 [FLINK-5912] [gelly] Inputs for CSV and graph generators Create Input classes for reading graphs from CSV as well as for each of the graph generators. Inputs are tested in driver integration tests of FLINK-4949. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5912_inputs_for_csv_and_graph_generators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3626 commit e1a5ac72341f1304d737b09b2825ae8da609fc36 Author: Greg HoganDate: 2017-03-27T14:08:59Z [FLINK-5912] [gelly] Inputs for CSV and graph generators Create Input classes for reading graphs from CSV as well as for each of the graph generators. > Inputs for CSV and graph generators > --- > > Key: FLINK-5912 > URL: https://issues.apache.org/jira/browse/FLINK-5912 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > Create {{Input}} classes for reading graphs from CSV as well as for each of > the graph generators. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3626: [FLINK-5912] [gelly] Inputs for CSV and graph gene...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3626 [FLINK-5912] [gelly] Inputs for CSV and graph generators Create Input classes for reading graphs from CSV as well as for each of the graph generators. Inputs are tested in driver integration tests of FLINK-4949. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5912_inputs_for_csv_and_graph_generators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3626 commit e1a5ac72341f1304d737b09b2825ae8da609fc36 Author: Greg HoganDate: 2017-03-27T14:08:59Z [FLINK-5912] [gelly] Inputs for CSV and graph generators Create Input classes for reading graphs from CSV as well as for each of the graph generators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6192) reuse zookeeer client created by CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943638#comment-15943638 ] ASF GitHub Bot commented on FLINK-6192: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3617 Thanks for your contribution @WangTaoTheTonic. Your observation of the problem is correct. Actually I'm also working on refactoring the usage of the ZooKeeper based leader election and retrieval services. The respective issue is [FLINK-6078](https://issues.apache.org/jira/browse/FLINK-6078). The problem is not only creating a single curator framework but also preventing the leader retrieval and election services from closing it when shutting down the respective service. Moreover, we should use the newly introduced `HighAvailabilityServices` as a factory for the services. That way the service instantiation can be controlled from a single place. Given that, I think that we can close this PR. I will open a PR for FLINK-6078 in the next days. Feel free to review it. > reuse zookeeer client created by CuratorFramework > - > > Key: FLINK-6192 > URL: https://issues.apache.org/jira/browse/FLINK-6192 > Project: Flink > Issue Type: Sub-task > Components: JobManager, YARN >Reporter: Tao Wang >Assignee: Tao Wang > > Now in yarn mode, there're three places using zookeeper client(web monitor, > jobmanager and resourcemanager) in ApplicationMaster/JobManager, while > there're two in TaskManager. They create new one zookeeper client when they > need them. > I believe there're more other places do the same thing, but in one JVM, one > CuratorFramework is enough for connections to one zookeeper client, so we > need a singleton to reuse them. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3617: [FLINK-6192]reuse zookeeper client created by CuratorFram...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3617 Thanks for your contribution @WangTaoTheTonic. Your observation of the problem is correct. Actually I'm also working on refactoring the usage of the ZooKeeper based leader election and retrieval services. The respective issue is [FLINK-6078](https://issues.apache.org/jira/browse/FLINK-6078). The problem is not only creating a single curator framework but also preventing the leader retrieval and election services from closing it when shutting down the respective service. Moreover, we should use the newly introduced `HighAvailabilityServices` as a factory for the services. That way the service instantiation can be controlled from a single place. Given that, I think that we can close this PR. I will open a PR for FLINK-6078 in the next days. Feel free to review it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943528#comment-15943528 ] ASF GitHub Bot commented on FLINK-5785: --- GitHub user p4nna opened a pull request: https://github.com/apache/flink/pull/3625 [FLINK-5785] Add an Imputer for preparing data Provides an Imputer for sparse DataSets of Vectors. Adds missing values with the mean, median or most frequent value of each vector resp. dimension You can merge this pull request into a Git repository by running: $ git pull https://github.com/p4nna/flink ml-Imputer-edits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3625.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3625 commit f2875ac5890564213d5f055d710976d1fede3962 Author: p4nnaDate: 2017-03-27T09:47:39Z Add files via upload commit 8e6909b52dad34d6c4cd6c84618616ac50cd83d1 Author: p4nna Date: 2017-03-27T09:49:59Z Test for Imputer class Two testclasses which test the functions implemented in the new imputer class. One for the rowwise imputing over all vectors and one for the vectorwise imputing commit 0c420a84c136b330135ce180db04d899b5a6f54c Author: p4nna Date: 2017-03-27T09:56:51Z removed unused imports and methods commit 9136607e84a0297bb4fb24a53bad9950b86bf116 Author: p4nna Date: 2017-03-27T15:58:37Z Imputer was added adds missing values in sparse DataSets of Vectors > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3625: [FLINK-5785] Add an Imputer for preparing data
GitHub user p4nna opened a pull request: https://github.com/apache/flink/pull/3625 [FLINK-5785] Add an Imputer for preparing data Provides an Imputer for sparse DataSets of Vectors. Adds missing values with the mean, median or most frequent value of each vector resp. dimension You can merge this pull request into a Git repository by running: $ git pull https://github.com/p4nna/flink ml-Imputer-edits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3625.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3625 commit f2875ac5890564213d5f055d710976d1fede3962 Author: p4nnaDate: 2017-03-27T09:47:39Z Add files via upload commit 8e6909b52dad34d6c4cd6c84618616ac50cd83d1 Author: p4nna Date: 2017-03-27T09:49:59Z Test for Imputer class Two testclasses which test the functions implemented in the new imputer class. One for the rowwise imputing over all vectors and one for the vectorwise imputing commit 0c420a84c136b330135ce180db04d899b5a6f54c Author: p4nna Date: 2017-03-27T09:56:51Z removed unused imports and methods commit 9136607e84a0297bb4fb24a53bad9950b86bf116 Author: p4nna Date: 2017-03-27T15:58:37Z Imputer was added adds missing values in sparse DataSets of Vectors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3620: [FLINK-5785] Add an Imputer for preparing data
Github user p4nna closed the pull request at: https://github.com/apache/flink/pull/3620 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943507#comment-15943507 ] ASF GitHub Bot commented on FLINK-5785: --- Github user p4nna closed the pull request at: https://github.com/apache/flink/pull/3620 > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3620: [FLINK-5785] Add an Imputer for preparing data
Github user p4nna commented on the issue: https://github.com/apache/flink/pull/3620 What does that mean and how could I fix it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943439#comment-15943439 ] ASF GitHub Bot commented on FLINK-5785: --- Github user p4nna commented on the issue: https://github.com/apache/flink/pull/3620 What does that mean and how could I fix it? > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6198) Update the documentation of the CEP library to include all the new features.
Kostas Kloudas created FLINK-6198: - Summary: Update the documentation of the CEP library to include all the new features. Key: FLINK-6198 URL: https://issues.apache.org/jira/browse/FLINK-6198 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Kostas Kloudas Fix For: 1.3.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6197) Add support for iterative conditions.
[ https://issues.apache.org/jira/browse/FLINK-6197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943411#comment-15943411 ] ASF GitHub Bot commented on FLINK-6197: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3624 [FLINK-6197] [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-iterative-functions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3624 commit dfb848b83864071ef6738261e425bf4d0e43575d Author: kl0uDate: 2017-03-22T14:52:07Z [FLINK-6197] [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. > Add support for iterative conditions. > - > > Key: FLINK-6197 > URL: https://issues.apache.org/jira/browse/FLINK-6197 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > So far, the {{where}} clause only supported simple {{FilterFunction}} > conditions. > With this, we want to add support for conditions where the an event is > accepted not only based on its own properties, e.g. name, as it was before, > but also based on some statistic computed over previously accepted events in > the pattern, e.g. if the price is higher than the average of the prices of > the previously accepted events. > This in combination with the recently added quantifiers will allow for a lot > more expressive patterns. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3624: [FLINK-6197] [cep] Add support for iterative condi...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3624 [FLINK-6197] [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-iterative-functions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3624 commit dfb848b83864071ef6738261e425bf4d0e43575d Author: kl0uDate: 2017-03-22T14:52:07Z [FLINK-6197] [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6197) Add support for iterative conditions.
Kostas Kloudas created FLINK-6197: - Summary: Add support for iterative conditions. Key: FLINK-6197 URL: https://issues.apache.org/jira/browse/FLINK-6197 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.3.0 So far, the {{where}} clause only supported simple {{FilterFunction}} conditions. With this, we want to add support for conditions where the an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. This in combination with the recently added quantifiers will allow for a lot more expressive patterns. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943399#comment-15943399 ] ASF GitHub Bot commented on FLINK-5785: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3620 It appears that all files are lacking the apache license: `Too many files with unapproved license: 4 See RAT report in: /home/travis/build/apache/flink/target/rat.txt` > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3620: [FLINK-5785] Add an Imputer for preparing data
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3620 It appears that all files are lacking the apache license: `Too many files with unapproved license: 4 See RAT report in: /home/travis/build/apache/flink/target/rat.txt` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
[ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943378#comment-15943378 ] ASF GitHub Bot commented on FLINK-6196: --- GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3623 [FLINK-6196] [table] Support dynamic schema in Table Function Type: Improvement Priority: Major Components: table, udtf Problem Definition: Support dynamic schema in Table Function Design: 1. Modified the getResult() interfaces of an UDTF. Suport java sytle arguments of a list. only literals will be passed to the UDTf. 1. Define the TableFunction and TableFunctionCall clearly. A TableFunction is an object that the ResultType and parameters are not determined. A TableFunctionCall is an object that thre ResultType and paramenters are determined. 1. Implement the TableAPI, Expression and SQL style call stack of the getResultType. Impact Analysis: UDTF, an interface has changed. Test: All tests done. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3623 commit 9be655dec415a720051b85ae78e7f052a6e22f81 Author: Zhuoluo YangDate: 2017-03-14T08:44:02Z [FLINK-6039] [core] Row of TableFunction should support flexible number of fields commit a87721b18a6972cd0b80384a0812df27b5253f9c Author: Zhuoluo Yang Date: 2017-03-16T06:34:39Z [FLINK-6039] [core] Revert some modifications commit 140e6c76b51a80b35c1071590b19aefbf65990eb Author: Zhuoluo Yang Date: 2017-03-17T16:56:13Z [FLINK-6039] [core] Support dynamic schema of TableFunction commit 6ac05e217acb6c0c73a1ea978891ff88d78bc0a8 Author: Zhuoluo Yang Date: 2017-03-22T03:49:49Z [FLINK-6039] [core] Fix the dynamic schema of Table API commit 1c88362c714c7f4e6f0a56397e9cf2801abfcf0a Author: Zhuoluo Yang Date: 2017-03-22T08:29:31Z [FLINK-6039] [core] Fix build exception commit eba7ba8cbd7c576510e649f1b4ecad2a55300f82 Author: Zhuoluo Yang Date: 2017-03-22T09:53:28Z [FLINK-6039] [core] fix comments and test failure commit 768d03d09267ce46b789cf58add9f49455d01585 Author: Zhuoluo Yang Date: 2017-03-23T06:00:13Z Add a test case for the test commit 6fe805f3b1f83c66c02dca8c65892b7b8d48f3e4 Author: Zhuoluo Yang Date: 2017-03-27T14:25:36Z [FLINK-6196] [table] Support dynamic schema in Table Function > Support dynamic schema in Table Function > > > Key: FLINK-6196 > URL: https://issues.apache.org/jira/browse/FLINK-6196 > Project: Flink > Issue Type: Improvement >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In many of our use cases. We have to decide the schema of a UDTF at the run > time. For example. udtf('c1, c2, c3') will generate three columns for a > lateral view. > Most systems such as calcite and hive support this feature. However, the > current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3623 [FLINK-6196] [table] Support dynamic schema in Table Function Type: Improvement Priority: Major Components: table, udtf Problem Definition: Support dynamic schema in Table Function Design: 1. Modified the getResult() interfaces of an UDTF. Suport java sytle arguments of a list. only literals will be passed to the UDTf. 1. Define the TableFunction and TableFunctionCall clearly. A TableFunction is an object that the ResultType and parameters are not determined. A TableFunctionCall is an object that thre ResultType and paramenters are determined. 1. Implement the TableAPI, Expression and SQL style call stack of the getResultType. Impact Analysis: UDTF, an interface has changed. Test: All tests done. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3623 commit 9be655dec415a720051b85ae78e7f052a6e22f81 Author: Zhuoluo YangDate: 2017-03-14T08:44:02Z [FLINK-6039] [core] Row of TableFunction should support flexible number of fields commit a87721b18a6972cd0b80384a0812df27b5253f9c Author: Zhuoluo Yang Date: 2017-03-16T06:34:39Z [FLINK-6039] [core] Revert some modifications commit 140e6c76b51a80b35c1071590b19aefbf65990eb Author: Zhuoluo Yang Date: 2017-03-17T16:56:13Z [FLINK-6039] [core] Support dynamic schema of TableFunction commit 6ac05e217acb6c0c73a1ea978891ff88d78bc0a8 Author: Zhuoluo Yang Date: 2017-03-22T03:49:49Z [FLINK-6039] [core] Fix the dynamic schema of Table API commit 1c88362c714c7f4e6f0a56397e9cf2801abfcf0a Author: Zhuoluo Yang Date: 2017-03-22T08:29:31Z [FLINK-6039] [core] Fix build exception commit eba7ba8cbd7c576510e649f1b4ecad2a55300f82 Author: Zhuoluo Yang Date: 2017-03-22T09:53:28Z [FLINK-6039] [core] fix comments and test failure commit 768d03d09267ce46b789cf58add9f49455d01585 Author: Zhuoluo Yang Date: 2017-03-23T06:00:13Z Add a test case for the test commit 6fe805f3b1f83c66c02dca8c65892b7b8d48f3e4 Author: Zhuoluo Yang Date: 2017-03-27T14:25:36Z [FLINK-6196] [table] Support dynamic schema in Table Function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943360#comment-15943360 ] Wei-Che Wei commented on FLINK-5982: Hi [~till.rohrmann], I prefer not to pass only {{Environment}} to {{BatchTask}}. Because this task tries to merge the {{AbstractInvokable}} and {{StatefulTask}}, that will make more complexity to distinguish {{BatchTask}} or {{StreamTask}} when calling {{loadAndInstantiateInvokable()}} in {{Task}}. Make subclasses all have the same constructor can simplify the behavior of {{AbstractInvokable}} in {{Task}}. In this way, I will prefer the suggestion from [~StephanEwen]. It temporarily forces {{BatchTask}} won't get a non-null state before the {{BatchTask}} actually handles recovery state. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6136) Separate EmbeddedNonHaServices and NonHaServices
[ https://issues.apache.org/jira/browse/FLINK-6136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943352#comment-15943352 ] ASF GitHub Bot commented on FLINK-6136: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3622 [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices This PR introduces a standalone high availability services implementation which can be used in a distributed setting with no HA guarantees. Additionally, it introduces a common base class which is also used by the EmbeddedHaServices. This base class instantiates the standalone variants of the checkpoint recovery factory, submitted job graphs store, running jobs registry and blob store. The StandaloneHaServices are instantiated with a fixed address for the Job- and ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by the corresponding LeaderRetrievalServices when being started. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink separateNonHaServices Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3622 commit 3f85d6b3ef41df4e795bb3ef5beecca49606caba Author: Till RohrmannDate: 2017-03-21T14:10:15Z [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices This PR introduces a standalone high availability services implementation which can be used in a distributed setting with no HA guarantees. Additionally, it introduces a common base class which is also used by the EmbeddedHaServices. This base class instantiates the standalone variants of the checkpoint recovery factory, submitted job graphs store, running jobs registry and blob store. The StandaloneHaServices are instantiated with a fixed address for the Job- and ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by the corresponding LeaderRetrievalServices when being started. > Separate EmbeddedNonHaServices and NonHaServices > > > Key: FLINK-6136 > URL: https://issues.apache.org/jira/browse/FLINK-6136 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > Currently, the {{NonHaServices}} and the {{EmbeddedNonHaServices}} share the > same code base for the {{JobManager}} leader election and retrieval. This > works if the respective components run in the same process. However, in case > of a distributed execution, we have to instantiate > {{StandaloneLeaderRetrieval/ElectionServices}} to make the leader retrieval > possible. > I propose to instantiate the {{StandaloneLeaderRetrieval/ElectionServices}} > in the {{NonHaServices}} implementation and to use them in case of a > distributed non-ha execution setting. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3622: [FLINK-6136] Separate EmbeddedHaServices and Stand...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3622 [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices This PR introduces a standalone high availability services implementation which can be used in a distributed setting with no HA guarantees. Additionally, it introduces a common base class which is also used by the EmbeddedHaServices. This base class instantiates the standalone variants of the checkpoint recovery factory, submitted job graphs store, running jobs registry and blob store. The StandaloneHaServices are instantiated with a fixed address for the Job- and ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by the corresponding LeaderRetrievalServices when being started. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink separateNonHaServices Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3622 commit 3f85d6b3ef41df4e795bb3ef5beecca49606caba Author: Till RohrmannDate: 2017-03-21T14:10:15Z [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices This PR introduces a standalone high availability services implementation which can be used in a distributed setting with no HA guarantees. Additionally, it introduces a common base class which is also used by the EmbeddedHaServices. This base class instantiates the standalone variants of the checkpoint recovery factory, submitted job graphs store, running jobs registry and blob store. The StandaloneHaServices are instantiated with a fixed address for the Job- and ResourceManager. This address and the HighAvailability.DEFAULT_LEADER_ID is returned by the corresponding LeaderRetrievalServices when being started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6196) Support dynamic schema in Table Function
Zhuoluo Yang created FLINK-6196: --- Summary: Support dynamic schema in Table Function Key: FLINK-6196 URL: https://issues.apache.org/jira/browse/FLINK-6196 Project: Flink Issue Type: Improvement Reporter: Zhuoluo Yang Assignee: Zhuoluo Yang In many of our use cases. We have to decide the schema of a UDTF at the run time. For example. udtf('c1, c2, c3') will generate three columns for a lateral view. Most systems such as calcite and hive support this feature. However, the current implementation of flink didn't implement the feature correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6169) yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error
[ https://issues.apache.org/jira/browse/FLINK-6169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-6169. --- Resolution: Not A Problem > yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error > -- > > Key: FLINK-6169 > URL: https://issues.apache.org/jira/browse/FLINK-6169 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient should be stopped when deployment fails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 sweet! thanks @StefanRRichter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943328#comment-15943328 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/1668 sweet! thanks @StefanRRichter > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943325#comment-15943325 ] ASF GitHub Bot commented on FLINK-3257: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/1668 For raw operator state, override `AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside your operator. Your implementation calls to super, then it can obtain the raw stream via `context.getRawOperatorStateOutput()`. This stream works like a normal output stream, except that you can also call `stream.startNewPartition()`. This signals that a partition is started and previous partitions are finalized/immutable. Partitions are the atomic units of state redistribution, think of them as the indiviual elements in a `ListCheckpointed` state. For restoring, override `AbstractStreamOperator::initializeState(StateInitializationContext context)`. After calling super, `context.getRawOperatorStateInputs()` provides an iterable with one input stream per partition that your operator should restore. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/1668 For raw operator state, override `AbstractStreamOperator::snapshotState(StateSnapshotContext context)` inside your operator. Your implementation calls to super, then it can obtain the raw stream via `context.getRawOperatorStateOutput()`. This stream works like a normal output stream, except that you can also call `stream.startNewPartition()`. This signals that a partition is started and previous partitions are finalized/immutable. Partitions are the atomic units of state redistribution, think of them as the indiviual elements in a `ListCheckpointed` state. For restoring, override `AbstractStreamOperator::initializeState(StateInitializationContext context)`. After calling super, `context.getRawOperatorStateInputs()` provides an iterable with one input stream per partition that your operator should restore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6169) yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error
[ https://issues.apache.org/jira/browse/FLINK-6169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943318#comment-15943318 ] ASF GitHub Bot commented on FLINK-6169: --- Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/3608 > yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error > -- > > Key: FLINK-6169 > URL: https://issues.apache.org/jira/browse/FLINK-6169 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient should be stopped when deployment fails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/3608 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6043) Display time when exceptions/root cause of failure happened
[ https://issues.apache.org/jira/browse/FLINK-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943312#comment-15943312 ] ASF GitHub Bot commented on FLINK-6043: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3583 Of course it wouldn't be the end of the world to display an inconsistent timestamp, it just doesn't make sense for them to be different though. The root exception display is only meant to highlight the first exception that caused the job to fail; so why should the timestamps be different. Basically, i wanna know why it is so horrible to pass on the timestamp. > Display time when exceptions/root cause of failure happened > --- > > Key: FLINK-6043 > URL: https://issues.apache.org/jira/browse/FLINK-6043 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > In order to better understand the behaviour of Flink jobs, it would be nice > to add timestamp information to exception causing the job to restart or to > fail. This information could then be displayed in the web UI making it easier > for the user to understand what happened when. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3583 Of course it wouldn't be the end of the world to display an inconsistent timestamp, it just doesn't make sense for them to be different though. The root exception display is only meant to highlight the first exception that caused the job to fail; so why should the timestamps be different. Basically, i wanna know why it is so horrible to pass on the timestamp. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6195) Move gelly-examples jar from opt to examples
[ https://issues.apache.org/jira/browse/FLINK-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-6195: -- Component/s: (was: Gelly) Build System > Move gelly-examples jar from opt to examples > > > Key: FLINK-6195 > URL: https://issues.apache.org/jira/browse/FLINK-6195 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0 > > > The {{opt}} directory should be reserved for Flink JARs which users may > optionally move to {{lib}} to be loaded by the runtime. > {{flink-gelly-examples}} is a user program so is being moved to the > {{examples}} folder. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6042) Display last n exceptions/causes for job restarts in Web UI
[ https://issues.apache.org/jira/browse/FLINK-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943306#comment-15943306 ] ASF GitHub Bot commented on FLINK-6042: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3601 > Display last n exceptions/causes for job restarts in Web UI > --- > > Key: FLINK-6042 > URL: https://issues.apache.org/jira/browse/FLINK-6042 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Webfrontend >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > > Users requested that it would be nice to see the last {{n}} exceptions > causing a job restart in the Web UI. This will help to more easily debug and > operate a job. > We could store the root causes for failures similar to how prior executions > are stored in the {{ExecutionVertex}} using the {{EvictingBoundedList}} and > then serve this information via the Web UI. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3601: [WIP [FLINK-6042] [web] Display exception history
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3601 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943286#comment-15943286 ] Stephan Ewen commented on FLINK-5982: - I think that is actually simpler to shoot yourself in the foot with "special meaning" objects like {{TaskStateHandles.EMPTY}}. For example: The object also comes via RPC, so a simple mistake in the singleton resolving code means that the {{if (state == TaskStateHandles.EMPTY)}} actually breaks the code. I personally find good null handling quite effective. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6195) Move gelly-examples jar from opt to examples
Greg Hogan created FLINK-6195: - Summary: Move gelly-examples jar from opt to examples Key: FLINK-6195 URL: https://issues.apache.org/jira/browse/FLINK-6195 Project: Flink Issue Type: Sub-task Components: Gelly Affects Versions: 1.3.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial Fix For: 1.3.0 The {{opt}} directory should be reserved for Flink JARs which users may optionally move to {{lib}} to be loaded by the runtime. {{flink-gelly-examples}} is a user program so is being moved to the {{examples}} folder. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943273#comment-15943273 ] Till Rohrmann commented on FLINK-5982: -- True, we can also throw an {{IllegalStateException}}. I was just wondering why giving the user/developer the possibility to shoot himself in the foot by allowing to pass a state handles object to a {{BatchTask}}. Imo, it's safer to simply pass the {{Environment}} to the {{BatchTask}} and the call the super class's constructor with the given {{Environment}} and {{TaskStateHandles.EMPTY}}. That way we could enforce that the task state handles field is never {{null}} by checking it in the constructor. > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6043) Display time when exceptions/root cause of failure happened
[ https://issues.apache.org/jira/browse/FLINK-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943272#comment-15943272 ] ASF GitHub Bot commented on FLINK-6043: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3583 It's inconsistent, that's all I'm saying. Passing an ErrorInfo instead of a Throwable is also useful (probably even required) for FLINK-6042 since it allows us to attach meta data to the exception (task name, task location, attempt id). The attempt id is especially important so we can extract the correct exception history from the executions. > Display time when exceptions/root cause of failure happened > --- > > Key: FLINK-6043 > URL: https://issues.apache.org/jira/browse/FLINK-6043 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > In order to better understand the behaviour of Flink jobs, it would be nice > to add timestamp information to exception causing the job to restart or to > fail. This information could then be displayed in the web UI making it easier > for the user to understand what happened when. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3583 It's inconsistent, that's all I'm saying. Passing an ErrorInfo instead of a Throwable is also useful (probably even required) for FLINK-6042 since it allows us to attach meta data to the exception (task name, task location, attempt id). The attempt id is especially important so we can extract the correct exception history from the executions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3320) Add not operator to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz reassigned FLINK-3320: --- Assignee: Dawid Wysakowicz > Add not operator to CEP's pattern API > - > > Key: FLINK-3320 > URL: https://issues.apache.org/jira/browse/FLINK-3320 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Dawid Wysakowicz >Priority: Minor > > Currently, CEP's pattern API is lacking a {{not}} operator which would you > allow to define which events must not occur in a matched pattern. It would be > beneficial to add this operator in order to increase the set of supported > patterns. > {code} > Pattern.begin("start").where(...).followedBy("middle").notFollowedBy("not").where(...).notNext("not2").where(...) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition
[ https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943257#comment-15943257 ] Fabian Hueske commented on FLINK-6082: -- I think the semantics of the query in the example are not expected. If you would try to execute the query {{SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' HOUR AND current_timestamp}} on a regular database, you would get an error because, {{a}} is not part of a {{GROUP BY}} clause. IMO, a time-based predicate is not related to aggregations and should not be a shortcut for OVER windows. Instead, the predicate {{proctime BETWEEN current_ timestamp - INTERVAL '1' HOUR AND current_timestamp}} will ensure that the result of the query only contains results of the last hour. If you run the following query: {{SELECT data FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' MINUTE AND current_timestamp}} on this table || time || data || | 12:00:00 | data1 | | 12:00:20 | data2 | | 12:00:30 | data3 | | 12:01:10 | data4 | | 12:01:40 | data5 | The result at time 12:00:50 should be {{data1, data2, data3}}, at time 12:01:20 {{data2, data3, data4}}, and at time 12:02:00 {{data4, data5}}. So the result is a tail of the stream of 1-minute length. All data that is moving out of the tail needs to be removed (retracted) from the queries result. Of course you can apply an aggregation on the table but with the same semantics as a batch query. For example you could compute the number of records with a {{COUNT(*)}} aggregate as in the following query: {{SELECT COUNT( *) FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' MINUTE AND current_timestamp}} The result of this query would be a single row which is constantly updated. The query simply counts how many rows have been received in the last minute. > Support window definition for SQL Queries based on WHERE clause with time > condition > --- > > Key: FLINK-6082 > URL: https://issues.apache.org/jira/browse/FLINK-6082 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu > > Time target: Proc Time > Calcite documentation refers to query examples where the (time) > boundaries are defined as condition within the WHERE clause. As Flink > community targets compatibility with Calcite, it makes sense to support > the definition of windows via this method as well as corresponding > aggregation on top of them. > SQL targeted query examples: > > ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ > timestamp - INTERVAL '1' HOUR AND current\_timestamp``` > General comment: > 1) window boundaries are defined as conditions in WHERE clause. > 2) For indicating the usage of different stream times, rowtime and > proctime can be used > 3) The boundaries are defined based on special construct provided by > calcite: current\_timestamp and time operations > Description: > > The logic of this operator is strictly related to supporting aggregates > over sliding windows defined with OVER > ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653), > [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654), > [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655), > [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658), > [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this > issue the design considered queries where the window is defined with the > syntax of OVER clause and aggregates are applied over this period. This > is similar in behavior with the only exception that the window > boundaries are defined with respect to the WHERE conditions. Besides > this the logic and the types of aggregates to be supported should be the > same (sum, count, avg, min, max). Supporting these types of query is > related to the pie chart problem tackled by calcite. > Similar as for the OVER windows, the construct should build rolling > windows (i.e., windows that are triggered and move with every incoming > event). > Functionality example > - > We exemplify below the functionality of the IN/Exists when working with > streams. > `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp > - INTERVAL '1' HOUR AND current_timestamp;` > ||IngestionTime(Event)|| Stream1|| Output|| > |10:00:01 |Id1,10 |Id1,1| > |10:02:00 |Id2,2 |Id2,2| > |11:25:00 |Id3,2 |Id3,1| > |12:03:00 |Id4,15 |Id4,2| > |12:05:00 |Id5,11 |Id5,3| > |12:56:00 |Id6,20 |Id6,3| > |...| > Implementation option > - > Considering that the query follows the same functionality as for the > aggregates over window, the
[jira] [Commented] (FLINK-6169) yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error
[ https://issues.apache.org/jira/browse/FLINK-6169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943248#comment-15943248 ] ASF GitHub Bot commented on FLINK-6169: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3608#discussion_r108166257 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -507,12 +507,14 @@ protected YarnClusterClient deployInternal() throws Exception { final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; if(jobManagerMemoryMb > maxRes.getMemory() ) { failSessionDuringDeployment(yarnClient, yarnApplication); + yarnClient.stop(); --- End diff -- This is already called in `failSessionDuringDeployment`. > yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error > -- > > Key: FLINK-6169 > URL: https://issues.apache.org/jira/browse/FLINK-6169 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient should be stopped when deployment fails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3608#discussion_r108166257 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -507,12 +507,14 @@ protected YarnClusterClient deployInternal() throws Exception { final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; if(jobManagerMemoryMb > maxRes.getMemory() ) { failSessionDuringDeployment(yarnClient, yarnApplication); + yarnClient.stop(); --- End diff -- This is already called in `failSessionDuringDeployment`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943245#comment-15943245 ] ASF GitHub Bot commented on FLINK-3257: --- Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108166093 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- it's another instance that is why I was fetching it back there. The OperatorChain basically deserialises and sets up another instance through the configuration. > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > --- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Paris Carbone >Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the
[GitHub] flink pull request #1668: [FLINK-3257] Add Exactly-Once Processing Guarantee...
Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r108166093 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -17,100 +17,164 @@ package org.apache.flink.streaming.runtime.tasks; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.types.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * TODO write javadoc + * + * - open a list state per snapshot process + * - book-keep snapshot logs + * - Clean up state when a savepoint is complete - ONLY in-transit records who do NOT belong in other snapshots + * + * @param + */ @Internal -public class StreamIterationHead extends OneInputStreamTask{ +public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); private volatile boolean running = true; - // - + private volatile RecordWriterOutput[] outputs; + + private UpstreamLogger upstreamLogger; + + private Object lock; + + @Override + public void init() throws Exception { + this.lock = getCheckpointLock(); + getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); + operatorChain = new OperatorChain<>(this); + this.upstreamLogger = (UpstreamLogger) operatorChain.getHeadOperator(); --- End diff -- it's another instance that is why I was fetching it back there. The OperatorChain basically deserialises and sets up another instance through the configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3583: [FLINK-6043] [web] Display exception timestamp
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3583 Wouldn't it be ok, if the timestamps differed slightly? I mean the EG root cause timestamp is the time when the EG was failed, whereas the `Execution` related timestamp says when the failure at the `Execution` was detected, for example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6043) Display time when exceptions/root cause of failure happened
[ https://issues.apache.org/jira/browse/FLINK-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943242#comment-15943242 ] ASF GitHub Bot commented on FLINK-6043: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3583 Wouldn't it be ok, if the timestamps differed slightly? I mean the EG root cause timestamp is the time when the EG was failed, whereas the `Execution` related timestamp says when the failure at the `Execution` was detected, for example. > Display time when exceptions/root cause of failure happened > --- > > Key: FLINK-6043 > URL: https://issues.apache.org/jira/browse/FLINK-6043 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > In order to better understand the behaviour of Flink jobs, it would be nice > to add timestamp information to exception causing the job to restart or to > fail. This information could then be displayed in the web UI making it easier > for the user to understand what happened when. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1750) Add canonical correlation analysis (CCA) to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943234#comment-15943234 ] Till Rohrmann commented on FLINK-1750: -- Alright. Thanks for letting me know :-) > Add canonical correlation analysis (CCA) to machine learning library > > > Key: FLINK-1750 > URL: https://issues.apache.org/jira/browse/FLINK-1750 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML > > Canonical correlation analysis (CCA) [1] can be used to find correlated > features between two random variables. Moreover, CCA can be used for > dimensionality reduction. > Maybe the work of Jia Chen and Ioannis D. Schizas [2] can be adapted to > realize a distributed CCA with Flink. > Resources: > [1] [http://en.wikipedia.org/wiki/Canonical_correlation] > [2] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp==6810359] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector
[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943233#comment-15943233 ] ASF GitHub Bot commented on FLINK-4821: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai Ok, I see. Thanks for your reminder. =) > Implement rescalable non-partitioned state for Kinesis Connector > > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...
Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai Ok, I see. Thanks for your reminder. =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3620: [FLINK-5785] Add an Imputer for preparing data
GitHub user p4nna opened a pull request: https://github.com/apache/flink/pull/3620 [FLINK-5785] Add an Imputer for preparing data Provides an imputer method which adds missing values to a sparse DataSet of vectors. Those can be filled with the mean, the median or the most frequent value of each row or optionally column. Like that incomplete data don't have to be thrown away, but rather can be used to train a machine learning algorithm You can merge this pull request into a Git repository by running: $ git pull https://github.com/p4nna/flink ml-Imputer-edits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3620 commit f2875ac5890564213d5f055d710976d1fede3962 Author: p4nnaDate: 2017-03-27T09:47:39Z Add files via upload commit 8e6909b52dad34d6c4cd6c84618616ac50cd83d1 Author: p4nna Date: 2017-03-27T09:49:59Z Test for Imputer class Two testclasses which test the functions implemented in the new imputer class. One for the rowwise imputing over all vectors and one for the vectorwise imputing commit 0c420a84c136b330135ce180db04d899b5a6f54c Author: p4nna Date: 2017-03-27T09:56:51Z removed unused imports and methods --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6165) Implement internal continuity for looping states.
[ https://issues.apache.org/jira/browse/FLINK-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943228#comment-15943228 ] ASF GitHub Bot commented on FLINK-6165: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/3621 [FLINK-6165] [cep] Implement internal continuity for looping states. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink consecutive Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3621 commit 2ec2db3742596459e8c2cd60529f0319ad2e7776 Author: Dawid WysakowiczDate: 2017-03-27T13:05:11Z [FLINK-6165] Implement internal continuity for looping states. > Implement internal continuity for looping states. > - > > Key: FLINK-6165 > URL: https://issues.apache.org/jira/browse/FLINK-6165 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > We should be able to specify an internal continuity for a looping state. The > API could look like: {{zeroOrMore().consecutive()}}. So that we have a > continuity up to the first element of a loop and between elements in the loop. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3621: [FLINK-6165] [cep] Implement internal continuity f...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/3621 [FLINK-6165] [cep] Implement internal continuity for looping states. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink consecutive Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3621 commit 2ec2db3742596459e8c2cd60529f0319ad2e7776 Author: Dawid WysakowiczDate: 2017-03-27T13:05:11Z [FLINK-6165] Implement internal continuity for looping states. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943222#comment-15943222 ] Anna Beer commented on FLINK-5785: -- [~Zentol] Thank you for the detailed description, hope I've done it right this time: https://github.com/apache/flink/pull/3620 > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943219#comment-15943219 ] ASF GitHub Bot commented on FLINK-5785: --- GitHub user p4nna opened a pull request: https://github.com/apache/flink/pull/3620 [FLINK-5785] Add an Imputer for preparing data Provides an imputer method which adds missing values to a sparse DataSet of vectors. Those can be filled with the mean, the median or the most frequent value of each row or optionally column. Like that incomplete data don't have to be thrown away, but rather can be used to train a machine learning algorithm You can merge this pull request into a Git repository by running: $ git pull https://github.com/p4nna/flink ml-Imputer-edits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3620 commit f2875ac5890564213d5f055d710976d1fede3962 Author: p4nnaDate: 2017-03-27T09:47:39Z Add files via upload commit 8e6909b52dad34d6c4cd6c84618616ac50cd83d1 Author: p4nna Date: 2017-03-27T09:49:59Z Test for Imputer class Two testclasses which test the functions implemented in the new imputer class. One for the rowwise imputing over all vectors and one for the vectorwise imputing commit 0c420a84c136b330135ce180db04d899b5a6f54c Author: p4nna Date: 2017-03-27T09:56:51Z removed unused imports and methods > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943212#comment-15943212 ] Stephan Ewen commented on FLINK-4319: - Flink on Kubernetes works today already. We need FLIP-6 only for elasticity in service resource scaling. > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tony810430! Sorry for the long pause on this PR. After some back and forth offline discussions with others on how exactly we want to proceed with this, we decided to stick with using union state to cope with the shard discovery on restore problem (at least for 1.3.0). Therefore, we can finally continue work here :-D First of all, to use union state, instead of `ListCheckpointed`, we should use `CheckpointedFunction` instead. There is a PR for exposing union state to the public API (#3508), but in case that isn't merged yet within the next few days, I suggest that you don't need to be blocked when you continue your work on this PR. For now, you can cast the operator state store instance retrieved through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to use broadcast state. One thing to also note, which is missing in you previous work on this, is that we need a migration path from the old state access (i.e., via `CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`). The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very good example of how to do this. Simply put, in the end, the `FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and `CheckpointedFunction`, and bridge the old state read from the legacy `restoreState(...)` method to the new `initializeState(...)` method. The bridge would simply be a field variable in the consumer class. The `FlinkKafkaConsumerBase` also serves as a good example of how to use the `CheckpointedFunction` if you have questions there. Let me know if you have any questions with this, and feel free to ping me any time! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943198#comment-15943198 ] shijinkui commented on FLINK-4319: -- Flink on Kubernetes, do we have some schedule? > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6141) Add buffering service for stream operators
[ https://issues.apache.org/jira/browse/FLINK-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943182#comment-15943182 ] Stephan Ewen commented on FLINK-6141: - I don't think I have capacity to help out here before the 1.3 release. I can try and comment on design documents, though. In the general topic of side inputs, why not go with the "simple" cases first - the ones that do not need buffering? > Add buffering service for stream operators > -- > > Key: FLINK-6141 > URL: https://issues.apache.org/jira/browse/FLINK-6141 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Aljoscha Krettek > > As mentioned in > https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API > we need a way of buffering incoming elements until a side input that is > required for processing them is ready. > There has to be an implementation for non-keyed operators and for keyed > operators because in keyed operators we need to ensure that we store the > buffered elements in the correct key group when checkpointing. > For the interface, I propose this: > {code} > @PublicEvolving > public interface ElementBuffer{ > /** >* Adds the given element to the buffer for the given namespace. >*/ > void add(N namespace, T element); > /** >* Returns an {@code Iterable} over all buffered elements for the given > namespace. >*/ > Iterable values(N namespace); > /** >* Clears all buffered elements for the given namespace. >*/ > void clear(N namespace); > } > {code} > {{AbstractStreamOperator}} would provide a method {{getElementBuffer()}} that > would return the appropriate implementation for a non-keyed or keyed operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5785) Add an Imputer for preparing data
[ https://issues.apache.org/jira/browse/FLINK-5785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943184#comment-15943184 ] Chesnay Schepler commented on FLINK-5785: - [~beera] You've opened the PR against your own fork of flink, and not the original apache one. Please close the PR and follow the steps below: Go to https://github.com/p4nna/flink , there is a drop-down list where you can select the branch you want to merge. Select the ml-Imputer-edits branch, this should lead you to https://github.com/p4nna/flink/tree/ml-Imputer-edits. Directly next to the drop-down list you should see a "New pull request" button. Push that thing. In the next page, which should be titled "Comparing changes", make sure that "base fork" = "apache/flink", "base" = "master", "head fork" = "p4nna/flink" and "compare" = "ml-Imputer-edits". The page should then look like this: https://github.com/apache/flink/compare/master...p4nna:ml-Imputer-edits?expand=1 >From here on out you should know the way, please ping me if anything doesn't >work as described. > Add an Imputer for preparing data > - > > Key: FLINK-5785 > URL: https://issues.apache.org/jira/browse/FLINK-5785 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos > > We need to add an Imputer as described in [1]. > "The Imputer class provides basic strategies for imputing missing values, > either using the mean, the median or the most frequent value of the row or > column in which the missing values are located. This class also allows for > different missing values encodings." > References > 1. http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing > 2. > http://scikit-learn.org/stable/auto_examples/missing_values.html#sphx-glr-auto-examples-missing-values-py -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector
[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943180#comment-15943180 ] ASF GitHub Bot commented on FLINK-4821: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tony810430! Sorry for the long pause on this PR. After some back and forth offline discussions with others on how exactly we want to proceed with this, we decided to stick with using union state to cope with the shard discovery on restore problem (at least for 1.3.0). Therefore, we can finally continue work here :-D First of all, to use union state, instead of `ListCheckpointed`, we should use `CheckpointedFunction` instead. There is a PR for exposing union state to the public API (#3508), but in case that isn't merged yet within the next few days, I suggest that you don't need to be blocked when you continue your work on this PR. For now, you can cast the operator state store instance retrieved through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to use broadcast state. One thing to also note, which is missing in you previous work on this, is that we need a migration path from the old state access (i.e., via `CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`). The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very good example of how to do this. Simply put, in the end, the `FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and `CheckpointedFunction`, and bridge the old state read from the legacy `restoreState(...)` method to the new `initializeState(...)` method. The bridge would simply be a field variable in the consumer class. The `FlinkKafkaConsumerBase` also serves as a good example of how to use the `CheckpointedFunction` if you have questions there. Let me know if you have any questions with this, and feel free to ping me any time! > Implement rescalable non-partitioned state for Kinesis Connector > > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943179#comment-15943179 ] ASF GitHub Bot commented on FLINK-5653: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3574 I agree @huawei-flink, an efficient order-preserving state primitive (generic or specific for time) would be very helpful. > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)